Repository: flink Updated Branches: refs/heads/master ff1660629 -> ba03b78c7
http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 3b1a1b4..b6a0637 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -51,14 +51,15 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobgraph.tasks.InputSplitProvider; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalListener; import org.apache.flink.runtime.memory.MemoryManager; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.metrics.MetricRegistry; -import org.apache.flink.runtime.registration.RegistrationConnectionListener; -import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.registration.RegistrationConnectionListener; +import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; @@ -302,7 +303,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public CompletableFuture<Acknowledge> submitTask( TaskDeploymentDescriptor tdd, - UUID jobManagerLeaderId, + JobMasterId jobMasterId, Time timeout) { try { @@ -327,10 +328,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { throw new TaskSubmissionException(message); } - if (!Objects.equals(jobManagerConnection.getLeaderId(), jobManagerLeaderId)) { + if (!Objects.equals(jobManagerConnection.getJobMasterId(), jobMasterId)) { final String message = "Rejecting the task submission because the job manager leader id " + - jobManagerLeaderId + " does not match the expected job manager leader id " + - jobManagerConnection.getLeaderId() + '.'; + jobMasterId + " does not match the expected job manager leader id " + + jobManagerConnection.getJobMasterId() + '.'; log.debug(message); throw new TaskSubmissionException(message); @@ -353,9 +354,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { tdd.getAttemptNumber()); InputSplitProvider inputSplitProvider = new RpcInputSplitProvider( - jobManagerConnection.getLeaderId(), jobManagerConnection.getJobManagerGateway(), - jobInformation.getJobId(), taskInformation.getJobVertexId(), tdd.getExecutionAttemptId(), taskManagerConfiguration.getTimeout()); @@ -766,7 +765,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { final JobMasterGateway jobMasterGateway = jobManagerConnection.getJobManagerGateway(); final Iterator<TaskSlot> reservedSlotsIterator = taskSlotTable.getAllocatedSlots(jobId); - final UUID leaderId = jobManagerConnection.getLeaderId(); + final JobMasterId jobMasterId = jobManagerConnection.getJobMasterId(); final Collection<SlotOffer> reservedSlots = new HashSet<>(2); @@ -777,13 +776,11 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { // the slot is either free or releasing at the moment final String message = "Could not mark slot " + jobId + " active."; log.debug(message); - jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), - leaderId, new Exception(message)); + jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message)); } } catch (SlotNotFoundException e) { final String message = "Could not mark slot " + jobId + " active."; - jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), - leaderId, new Exception(message)); + jobMasterGateway.failSlot(getResourceID(), offer.getAllocationId(), new Exception(message)); continue; } reservedSlots.add(offer); @@ -792,7 +789,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { CompletableFuture<Collection<SlotOffer>> acceptedSlotsFuture = jobMasterGateway.offerSlots( getResourceID(), reservedSlots, - leaderId, taskManagerConfiguration.getTimeout()); acceptedSlotsFuture.whenCompleteAsync( @@ -813,7 +809,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } } else { // check if the response is still valid - if (isJobManagerConnectionValid(jobId, leaderId)) { + if (isJobManagerConnectionValid(jobId, jobMasterId)) { // mark accepted slots active for (SlotOffer acceptedSlot : acceptedSlots) { reservedSlots.remove(acceptedSlot); @@ -839,14 +835,14 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } } - private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, UUID jobManagerLeaderId, JMTMRegistrationSuccess registrationSuccess) { + private void establishJobManagerConnection(JobID jobId, final JobMasterGateway jobMasterGateway, JMTMRegistrationSuccess registrationSuccess) { if (jobManagerTable.contains(jobId)) { JobManagerConnection oldJobManagerConnection = jobManagerTable.get(jobId); - if (Objects.equals(oldJobManagerConnection.getLeaderId(), jobManagerLeaderId)) { + if (Objects.equals(oldJobManagerConnection.getJobMasterId(), jobMasterGateway.getFencingToken())) { // we already are connected to the given job manager - log.debug("Ignore JobManager gained leadership message for {} because we are already connected to it.", jobManagerLeaderId); + log.debug("Ignore JobManager gained leadership message for {} because we are already connected to it.", jobMasterGateway.getFencingToken()); return; } else { closeJobManagerConnection(jobId, new Exception("Found new job leader for job id " + jobId + '.')); @@ -860,7 +856,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { jobId, jobManagerResourceID, jobMasterGateway, - jobManagerLeaderId, registrationSuccess.getBlobPort()); jobManagerConnections.put(jobManagerResourceID, newJobManagerConnection); jobManagerTable.put(jobId, newJobManagerConnection); @@ -927,15 +922,13 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { JobID jobID, ResourceID resourceID, JobMasterGateway jobMasterGateway, - UUID jobManagerLeaderId, int blobPort) { Preconditions.checkNotNull(jobID); Preconditions.checkNotNull(resourceID); - Preconditions.checkNotNull(jobManagerLeaderId); Preconditions.checkNotNull(jobMasterGateway); Preconditions.checkArgument(blobPort > 0 || blobPort < MAX_BLOB_PORT, "Blob server port is out of range."); - TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobManagerLeaderId, jobMasterGateway); + TaskManagerActions taskManagerActions = new TaskManagerActionsImpl(jobMasterGateway); CheckpointResponder checkpointResponder = new RpcCheckpointResponder(jobMasterGateway); @@ -959,18 +952,16 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } ResultPartitionConsumableNotifier resultPartitionConsumableNotifier = new RpcResultPartitionConsumableNotifier( - jobManagerLeaderId, jobMasterGateway, getRpcService().getExecutor(), taskManagerConfiguration.getTimeout()); - PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobManagerLeaderId, jobMasterGateway); + PartitionProducerStateChecker partitionStateChecker = new RpcPartitionStateChecker(jobMasterGateway); return new JobManagerConnection( jobID, resourceID, jobMasterGateway, - jobManagerLeaderId, taskManagerActions, checkpointResponder, blobCache, @@ -1006,13 +997,12 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } private void updateTaskExecutionState( - final UUID jobMasterLeaderId, final JobMasterGateway jobMasterGateway, final TaskExecutionState taskExecutionState) { final ExecutionAttemptID executionAttemptID = taskExecutionState.getID(); - CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(jobMasterLeaderId, taskExecutionState); + CompletableFuture<Acknowledge> futureAcknowledge = jobMasterGateway.updateTaskExecutionState(taskExecutionState); futureAcknowledge.whenCompleteAsync( (ack, throwable) -> { @@ -1024,7 +1014,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } private void unregisterTaskAndNotifyFinalState( - final UUID jobMasterLeaderId, final JobMasterGateway jobMasterGateway, final ExecutionAttemptID executionAttemptID) { @@ -1044,7 +1033,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { AccumulatorSnapshot accumulatorSnapshot = task.getAccumulatorRegistry().getSnapshot(); updateTaskExecutionState( - jobMasterLeaderId, jobMasterGateway, new TaskExecutionState( task.getJobID(), @@ -1101,10 +1089,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { return (resourceManagerConnection != null && resourceManagerConnection.isConnected()); } - private boolean isJobManagerConnectionValid(JobID jobId, UUID leaderId) { + private boolean isJobManagerConnectionValid(JobID jobId, JobMasterId jobMasterId) { JobManagerConnection jmConnection = jobManagerTable.get(jobId); - return jmConnection != null && Objects.equals(jmConnection.getLeaderId(), leaderId); + return jmConnection != null && Objects.equals(jmConnection.getJobMasterId(), jobMasterId); } // ------------------------------------------------------------------------ @@ -1152,7 +1140,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { // ------------------------------------------------------------------------ /** - * The listener for leader changes of the resource manager + * The listener for leader changes of the resource manager. */ private final class ResourceManagerLeaderListener implements LeaderRetrievalListener { @@ -1176,23 +1164,18 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { public void jobManagerGainedLeadership( final JobID jobId, final JobMasterGateway jobManagerGateway, - final UUID jobLeaderId, final JMTMRegistrationSuccess registrationMessage) { - runAsync(new Runnable() { - @Override - public void run() { + runAsync( + () -> establishJobManagerConnection( jobId, jobManagerGateway, - jobLeaderId, - registrationMessage); - } - }); + registrationMessage)); } @Override - public void jobManagerLostLeadership(final JobID jobId, final UUID jobLeaderId) { - log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobLeaderId); + public void jobManagerLostLeadership(final JobID jobId, final JobMasterId jobMasterId) { + log.info("JobManager for job {} with leader id {} lost leadership.", jobId, jobMasterId); runAsync(new Runnable() { @Override @@ -1233,11 +1216,9 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { } private final class TaskManagerActionsImpl implements TaskManagerActions { - private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; - private TaskManagerActionsImpl(UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway) { - this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); + private TaskManagerActionsImpl(JobMasterGateway jobMasterGateway) { this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); } @@ -1246,7 +1227,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { runAsync(new Runnable() { @Override public void run() { - unregisterTaskAndNotifyFinalState(jobMasterLeaderId, jobMasterGateway, executionAttemptID); + unregisterTaskAndNotifyFinalState(jobMasterGateway, executionAttemptID); } }); } @@ -1273,7 +1254,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public void updateTaskExecutionState(final TaskExecutionState taskExecutionState) { - TaskExecutor.this.updateTaskExecutionState(jobMasterLeaderId, jobMasterGateway, taskExecutionState); + TaskExecutor.this.updateTaskExecutionState(jobMasterGateway, taskExecutionState); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index fd56255..ee0f69d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -27,13 +27,13 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; +import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskmanager.Task; -import java.util.UUID; import java.util.concurrent.CompletableFuture; /** @@ -64,13 +64,13 @@ public interface TaskExecutorGateway extends RpcGateway { * Submit a {@link Task} to the {@link TaskExecutor}. * * @param tdd describing the task to submit - * @param leaderId of the job leader + * @param jobMasterId identifying the submitting JobMaster * @param timeout of the submit operation * @return Future acknowledge of the successful operation */ CompletableFuture<Acknowledge> submitTask( TaskDeploymentDescriptor tdd, - UUID leaderId, + JobMasterId jobMasterId, @RpcTimeout Time timeout); /** http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java index a919c78..baa403b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcInputSplitProvider.java @@ -18,7 +18,6 @@ package org.apache.flink.runtime.taskexecutor.rpc; -import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.core.io.InputSplit; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; @@ -30,27 +29,20 @@ import org.apache.flink.runtime.jobmaster.SerializedInputSplit; import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; -import java.util.UUID; import java.util.concurrent.CompletableFuture; public class RpcInputSplitProvider implements InputSplitProvider { - private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; - private final JobID jobID; private final JobVertexID jobVertexID; private final ExecutionAttemptID executionAttemptID; private final Time timeout; public RpcInputSplitProvider( - UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway, - JobID jobID, JobVertexID jobVertexID, ExecutionAttemptID executionAttemptID, Time timeout) { - this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); - this.jobID = Preconditions.checkNotNull(jobID); this.jobVertexID = Preconditions.checkNotNull(jobVertexID); this.executionAttemptID = Preconditions.checkNotNull(executionAttemptID); this.timeout = Preconditions.checkNotNull(timeout); @@ -62,7 +54,8 @@ public class RpcInputSplitProvider implements InputSplitProvider { Preconditions.checkNotNull(userCodeClassLoader); CompletableFuture<SerializedInputSplit> futureInputSplit = jobMasterGateway.requestNextInputSplit( - jobMasterLeaderId, jobVertexID, executionAttemptID); + jobVertexID, + executionAttemptID); try { SerializedInputSplit serializedInputSplit = futureInputSplit.get(timeout.getSize(), timeout.getUnit()); http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java index 26e1b0e..f3eb717 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcPartitionStateChecker.java @@ -26,16 +26,13 @@ import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.jobmaster.JobMasterGateway; import org.apache.flink.util.Preconditions; -import java.util.UUID; import java.util.concurrent.CompletableFuture; public class RpcPartitionStateChecker implements PartitionProducerStateChecker { - private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; - public RpcPartitionStateChecker(UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway) { - this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); + public RpcPartitionStateChecker(JobMasterGateway jobMasterGateway) { this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); } @@ -45,6 +42,6 @@ public class RpcPartitionStateChecker implements PartitionProducerStateChecker { IntermediateDataSetID resultId, ResultPartitionID partitionId) { - return jobMasterGateway.requestPartitionState(jobMasterLeaderId, resultId, partitionId); + return jobMasterGateway.requestPartitionState(resultId, partitionId); } } http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java index d898562..82a6fbc 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/rpc/RpcResultPartitionConsumableNotifier.java @@ -29,7 +29,6 @@ import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -37,25 +36,21 @@ public class RpcResultPartitionConsumableNotifier implements ResultPartitionCons private static final Logger LOG = LoggerFactory.getLogger(RpcResultPartitionConsumableNotifier.class); - private final UUID jobMasterLeaderId; private final JobMasterGateway jobMasterGateway; private final Executor executor; private final Time timeout; public RpcResultPartitionConsumableNotifier( - UUID jobMasterLeaderId, JobMasterGateway jobMasterGateway, Executor executor, Time timeout) { - this.jobMasterLeaderId = Preconditions.checkNotNull(jobMasterLeaderId); this.jobMasterGateway = Preconditions.checkNotNull(jobMasterGateway); this.executor = Preconditions.checkNotNull(executor); this.timeout = Preconditions.checkNotNull(timeout); } @Override public void notifyPartitionConsumable(JobID jobId, ResultPartitionID partitionId, final TaskActions taskActions) { - CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers( - jobMasterLeaderId, partitionId, timeout); + CompletableFuture<Acknowledge> acknowledgeFuture = jobMasterGateway.scheduleOrUpdateConsumers(partitionId, timeout); acknowledgeFuture.whenCompleteAsync( (Acknowledge ack, Throwable throwable) -> { http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 55499f5..6013e91 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.instance.ActorGateway; import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; @@ -577,7 +578,7 @@ public class ResourceManagerTest extends TestLogger { final ResourceID jmResourceId = new ResourceID(jobMasterAddress); final ResourceID rmResourceId = ResourceID.generate(); final ResourceManagerId rmLeaderId = ResourceManagerId.generate(); - final UUID jmLeaderId = UUID.randomUUID(); + final JobMasterId jobMasterId = JobMasterId.generate(); final JobID jobId = new JobID(); final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); @@ -590,7 +591,7 @@ public class ResourceManagerTest extends TestLogger { Time.seconds(5L)); final TestingLeaderElectionService rmLeaderElectionService = new TestingLeaderElectionService(); - final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderId); + final TestingLeaderRetrievalService jmLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); final TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); highAvailabilityServices.setJobMasterLeaderRetriever(jobId, jmLeaderRetrievalService); @@ -633,7 +634,7 @@ public class ResourceManagerTest extends TestLogger { // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager( - jmLeaderId, + jobMasterId, jmResourceId, jobMasterAddress, jobId, @@ -665,7 +666,7 @@ public class ResourceManagerTest extends TestLogger { // run the timeout runnable to simulate a heartbeat timeout timeoutRunnable.run(); - verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(jmLeaderId), eq(rmLeaderId), any(TimeoutException.class)); + verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).disconnectResourceManager(eq(rmLeaderId), any(TimeoutException.class)); } finally { rpcService.stopService(); http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java index 8d613ac..9d742e2 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolRpcTest.java @@ -25,15 +25,16 @@ import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.jobmanager.scheduler.NoResourceAvailableException; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; +import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.util.clock.SystemClock; +import org.apache.flink.util.TestLogger; import org.junit.AfterClass; import org.junit.BeforeClass; import org.junit.Test; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -47,7 +48,7 @@ import static org.mockito.Mockito.mock; /** * Tests for the SlotPool using a proper RPC setup. */ -public class SlotPoolRpcTest { +public class SlotPoolRpcTest extends TestLogger { private static RpcService rpcService; @@ -80,7 +81,7 @@ public class SlotPoolRpcTest { Time.days(1), Time.days(1), Time.milliseconds(100) // this is the timeout for the request tested here ); - pool.start(UUID.randomUUID(), "foobar"); + pool.start(JobMasterId.generate(), "foobar"); CompletableFuture<SimpleSlot> future = pool.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, Time.days(1)); http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index ead453e..5993dcb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.jobmanager.scheduler.ScheduledUnit; import org.apache.flink.runtime.jobmanager.slots.AllocatedSlot; import org.apache.flink.runtime.jobmanager.slots.TaskManagerGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.rpc.RpcService; @@ -39,7 +40,6 @@ import org.mockito.ArgumentCaptor; import org.mockito.Mockito; import java.util.List; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; @@ -88,7 +88,7 @@ public class SlotPoolTest extends TestLogger { assertFalse(future.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); @@ -125,7 +125,7 @@ public class SlotPoolTest extends TestLogger { ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds()).times(2)) - .requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + .requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues(); @@ -168,7 +168,7 @@ public class SlotPoolTest extends TestLogger { assertFalse(future1.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); @@ -211,7 +211,7 @@ public class SlotPoolTest extends TestLogger { assertFalse(future.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); @@ -266,7 +266,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(JobMasterId.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); @@ -297,7 +297,7 @@ public class SlotPoolTest extends TestLogger { private static ResourceManagerGateway createResourceManagerGatewayMock() { ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway - .requestSlot(any(UUID.class), any(SlotRequest.class), any(Time.class))) + .requestSlot(any(JobMasterId.class), any(SlotRequest.class), any(Time.class))) .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); return resourceManagerGateway; @@ -308,7 +308,7 @@ public class SlotPoolTest extends TestLogger { ResourceManagerGateway resourceManagerGateway) throws Exception { final String jobManagerAddress = "foobar"; - slotPool.start(UUID.randomUUID(), jobManagerAddress); + slotPool.start(JobMasterId.generate(), jobManagerAddress); slotPool.connectToResourceManager(resourceManagerGateway); http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java index 0f38db2..b4f50fb 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobManagerRunnerMockTest.java @@ -133,7 +133,7 @@ public class JobManagerRunnerMockTest extends TestLogger { assertTrue(!jobCompletion.isJobFinished()); assertTrue(!jobCompletion.isJobFailed()); - verify(jobManager).start(any(UUID.class), any(Time.class)); + verify(jobManager).start(any(JobMasterId.class), any(Time.class)); runner.shutdown(); verify(leaderElectionService).stop(); @@ -165,9 +165,9 @@ public class JobManagerRunnerMockTest extends TestLogger { public void testJobFinished() throws Exception { runner.start(); - UUID leaderSessionID = UUID.randomUUID(); - runner.grantLeadership(leaderSessionID); - verify(jobManager).start(eq(leaderSessionID), any(Time.class)); + JobMasterId jobMasterId = JobMasterId.generate(); + runner.grantLeadership(jobMasterId.toUUID()); + verify(jobManager).start(eq(jobMasterId), any(Time.class)); assertTrue(!jobCompletion.isJobFinished()); // runner been told by JobManager that job is finished @@ -185,9 +185,9 @@ public class JobManagerRunnerMockTest extends TestLogger { public void testJobFailed() throws Exception { runner.start(); - UUID leaderSessionID = UUID.randomUUID(); - runner.grantLeadership(leaderSessionID); - verify(jobManager).start(eq(leaderSessionID), any(Time.class)); + JobMasterId jobMasterId = JobMasterId.generate(); + runner.grantLeadership(jobMasterId.toUUID()); + verify(jobManager).start(eq(jobMasterId), any(Time.class)); assertTrue(!jobCompletion.isJobFinished()); // runner been told by JobManager that job is failed @@ -204,9 +204,9 @@ public class JobManagerRunnerMockTest extends TestLogger { public void testLeadershipRevoked() throws Exception { runner.start(); - UUID leaderSessionID = UUID.randomUUID(); - runner.grantLeadership(leaderSessionID); - verify(jobManager).start(eq(leaderSessionID), any(Time.class)); + JobMasterId jobMasterId = JobMasterId.generate(); + runner.grantLeadership(jobMasterId.toUUID()); + verify(jobManager).start(eq(jobMasterId), any(Time.class)); assertTrue(!jobCompletion.isJobFinished()); runner.revokeLeadership(); @@ -219,18 +219,18 @@ public class JobManagerRunnerMockTest extends TestLogger { public void testRegainLeadership() throws Exception { runner.start(); - UUID leaderSessionID = UUID.randomUUID(); - runner.grantLeadership(leaderSessionID); - verify(jobManager).start(eq(leaderSessionID), any(Time.class)); + JobMasterId jobMasterId = JobMasterId.generate(); + runner.grantLeadership(jobMasterId.toUUID()); + verify(jobManager).start(eq(jobMasterId), any(Time.class)); assertTrue(!jobCompletion.isJobFinished()); runner.revokeLeadership(); verify(jobManager).suspend(any(Throwable.class), any(Time.class)); assertFalse(runner.isShutdown()); - UUID leaderSessionID2 = UUID.randomUUID(); - runner.grantLeadership(leaderSessionID2); - verify(jobManager).start(eq(leaderSessionID2), any(Time.class)); + JobMasterId jobMasterId2 = JobMasterId.generate(); + runner.grantLeadership(jobMasterId2.toUUID()); + verify(jobManager).start(eq(jobMasterId2), any(Time.class)); } private static class TestingOnCompletionActions implements OnCompletionActions, FatalErrorHandler { http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 6282ea0..64cc13b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -51,16 +51,12 @@ import org.powermock.modules.junit4.PowerMockRunner; import java.net.InetAddress; import java.net.URL; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; -import static org.mockito.Matchers.any; -import static org.mockito.Matchers.eq; import static org.mockito.Mockito.*; -import static org.mockito.Mockito.when; @RunWith(PowerMockRunner.class) @PrepareForTest(BlobLibraryCacheManager.class) @@ -79,7 +75,7 @@ public class JobMasterTest extends TestLogger { final TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final String jobManagerAddress = "jm"; - final UUID jmLeaderId = UUID.randomUUID(); + final JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID jmResourceId = new ResourceID(jobManagerAddress); final String taskManagerAddress = "tm"; @@ -118,7 +114,7 @@ public class JobMasterTest extends TestLogger { testingFatalErrorHandler, new FlinkUserCodeClassLoader(new URL[0])); - CompletableFuture<Acknowledge> startFuture = jobMaster.start(jmLeaderId, testingTimeout); + CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); // wait for the start to complete startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -127,7 +123,7 @@ public class JobMasterTest extends TestLogger { // register task manager will trigger monitor heartbeat target, schedule heartbeat request at interval time CompletableFuture<RegistrationResponse> registrationResponse = jobMasterGateway - .registerTaskManager(taskManagerAddress, taskManagerLocation, jmLeaderId, testingTimeout); + .registerTaskManager(taskManagerAddress, taskManagerLocation, testingTimeout); // wait for the completion of the registration registrationResponse.get(); @@ -169,7 +165,7 @@ public class JobMasterTest extends TestLogger { final String resourceManagerAddress = "rm"; final String jobManagerAddress = "jm"; final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); - final UUID jmLeaderId = UUID.randomUUID(); + final JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); final ResourceID jmResourceId = new ResourceID(jobManagerAddress); final JobGraph jobGraph = new JobGraph(); @@ -188,7 +184,7 @@ public class JobMasterTest extends TestLogger { final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway.registerJobManager( - any(UUID.class), + any(JobMasterId.class), any(ResourceID.class), anyString(), any(JobID.class), @@ -219,7 +215,7 @@ public class JobMasterTest extends TestLogger { testingFatalErrorHandler, new FlinkUserCodeClassLoader(new URL[0])); - CompletableFuture<Acknowledge> startFuture = jobMaster.start(jmLeaderId, testingTimeout); + CompletableFuture<Acknowledge> startFuture = jobMaster.start(jobMasterId, testingTimeout); // wait for the start operation to complete startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); @@ -229,7 +225,7 @@ public class JobMasterTest extends TestLogger { // register job manager success will trigger monitor heartbeat target between jm and rm verify(resourceManagerGateway, timeout(testingTimeout.toMilliseconds())).registerJobManager( - eq(jmLeaderId), + eq(jobMasterId), eq(jmResourceId), anyString(), eq(jobGraph.getJobID()), http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java index 7b8703e..fb5ee8b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/JobLeaderIdServiceTest.java @@ -22,6 +22,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.time.Time; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; +import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.util.TestLogger; import org.junit.Test; @@ -62,7 +63,7 @@ public class JobLeaderIdServiceTest extends TestLogger { public void testAddingJob() throws Exception { final JobID jobId = new JobID(); final String address = "foobar"; - final UUID leaderId = UUID.randomUUID(); + final JobMasterId leaderId = JobMasterId.generate(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( null, @@ -83,10 +84,10 @@ public class JobLeaderIdServiceTest extends TestLogger { jobLeaderIdService.addJob(jobId); - CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + CompletableFuture<JobMasterId> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); // notify the leader id service about the new leader - leaderRetrievalService.notifyListener(address, leaderId); + leaderRetrievalService.notifyListener(address, leaderId.toUUID()); assertEquals(leaderId, leaderIdFuture.get()); @@ -117,7 +118,7 @@ public class JobLeaderIdServiceTest extends TestLogger { jobLeaderIdService.addJob(jobId); - CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + CompletableFuture<JobMasterId> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); // remove the job before we could find a leader jobLeaderIdService.removeJob(jobId); @@ -183,7 +184,7 @@ public class JobLeaderIdServiceTest extends TestLogger { public void jobTimeoutAfterLostLeadership() throws Exception { final JobID jobId = new JobID(); final String address = "foobar"; - final UUID leaderId = UUID.randomUUID(); + final JobMasterId leaderId = JobMasterId.generate(); TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); TestingLeaderRetrievalService leaderRetrievalService = new TestingLeaderRetrievalService( null, @@ -228,10 +229,10 @@ public class JobLeaderIdServiceTest extends TestLogger { jobLeaderIdService.addJob(jobId); - CompletableFuture<UUID> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + CompletableFuture<JobMasterId> leaderIdFuture = jobLeaderIdService.getLeaderId(jobId); // notify the leader id service about the new leader - leaderRetrievalService.notifyListener(address, leaderId); + leaderRetrievalService.notifyListener(address, leaderId.toUUID()); assertEquals(leaderId, leaderIdFuture.get()); http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java index 1de3284..156bc73 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerJobMasterTest.java @@ -26,6 +26,7 @@ import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; import org.apache.flink.runtime.highavailability.TestingHighAvailabilityServices; import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.LeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; @@ -46,7 +47,6 @@ import org.junit.After; import org.junit.Before; import org.junit.Test; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.TimeUnit; @@ -78,16 +78,16 @@ public class ResourceManagerJobMasterTest extends TestLogger { public void testRegisterJobMaster() throws Exception { String jobMasterAddress = "/jobMasterAddress1"; JobID jobID = mockJobMaster(jobMasterAddress); - UUID jmLeaderID = UUID.randomUUID(); + JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager<?> resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class), jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); // test response successful CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager( - jmLeaderID, + jobMasterId, jmResourceId, jobMasterAddress, jobID, @@ -107,9 +107,9 @@ public class ResourceManagerJobMasterTest extends TestLogger { public void testRegisterJobMasterWithUnmatchedLeaderSessionId1() throws Exception { String jobMasterAddress = "/jobMasterAddress1"; JobID jobID = mockJobMaster(jobMasterAddress); - UUID jmLeaderID = UUID.randomUUID(); + JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID jmResourceId = new ResourceID(jobMasterAddress); - TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jmLeaderID); + TestingLeaderRetrievalService jobMasterLeaderRetrievalService = new TestingLeaderRetrievalService(jobMasterAddress, jobMasterId.toUUID()); TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); final ResourceManager<?> resourceManager = createAndStartResourceManager(mock(LeaderElectionService.class), jobID, jobMasterLeaderRetrievalService, testingFatalErrorHandler); final ResourceManagerGateway wronglyFencedGateway = rpcService.connect(resourceManager.getAddress(), ResourceManagerId.generate(), ResourceManagerGateway.class) @@ -117,7 +117,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { // test throw exception when receive a registration from job master which takes unmatched leaderSessionId CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = wronglyFencedGateway.registerJobManager( - jmLeaderID, + jobMasterId, jmResourceId, jobMasterAddress, jobID, @@ -152,9 +152,9 @@ public class ResourceManagerJobMasterTest extends TestLogger { final ResourceID jmResourceId = new ResourceID(jobMasterAddress); // test throw exception when receive a registration from job master which takes unmatched leaderSessionId - UUID differentLeaderSessionID = UUID.randomUUID(); + JobMasterId differentJobMasterId = JobMasterId.generate(); CompletableFuture<RegistrationResponse> unMatchedLeaderFuture = rmGateway.registerJobManager( - differentLeaderSessionID, + differentJobMasterId, jmResourceId, jobMasterAddress, jobID, @@ -185,7 +185,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { // test throw exception when receive a registration from job master which takes invalid address String invalidAddress = "/jobMasterAddress2"; CompletableFuture<RegistrationResponse> invalidAddressFuture = rmGateway.registerJobManager( - HighAvailabilityServices.DEFAULT_LEADER_ID, + new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID), jmResourceId, invalidAddress, jobID, @@ -221,7 +221,7 @@ public class ResourceManagerJobMasterTest extends TestLogger { // this should fail because we try to register a job leader listener for an unknown job id CompletableFuture<RegistrationResponse> registrationFuture = rmGateway.registerJobManager( - HighAvailabilityServices.DEFAULT_LEADER_ID, + new JobMasterId(HighAvailabilityServices.DEFAULT_LEADER_ID), jmResourceId, jobMasterAddress, unknownJobIDToHAServices, http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java index 945cbf3..348dce6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorITCase.java @@ -33,6 +33,7 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.network.NetworkEnvironment; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.jobmaster.JobMasterRegistrationSuccess; import org.apache.flink.runtime.leaderelection.TestingLeaderElectionService; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; @@ -92,7 +93,7 @@ public class TaskExecutorITCase extends TestLogger { final TestingLeaderRetrievalService rmLeaderRetrievalService = new TestingLeaderRetrievalService(null, null); final String rmAddress = "rm"; final String jmAddress = "jm"; - final UUID jmLeaderId = UUID.randomUUID(); + final JobMasterId jobMasterId = JobMasterId.generate(); final ResourceID rmResourceId = new ResourceID(rmAddress); final ResourceID jmResourceId = new ResourceID(jmAddress); final JobID jobId = new JobID(); @@ -100,7 +101,7 @@ public class TaskExecutorITCase extends TestLogger { testingHAServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); testingHAServices.setResourceManagerLeaderRetriever(rmLeaderRetrievalService); - testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jmLeaderId)); + testingHAServices.setJobMasterLeaderRetriever(jobId, new TestingLeaderRetrievalService(jmAddress, jobMasterId.toUUID())); TestingRpcService rpcService = new TestingRpcService(); ResourceManagerConfiguration resourceManagerConfiguration = new ResourceManagerConfiguration( @@ -162,14 +163,14 @@ public class TaskExecutorITCase extends TestLogger { JobMasterGateway jmGateway = mock(JobMasterGateway.class); - when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), eq(jmLeaderId), any(Time.class))) + when(jmGateway.registerTaskManager(any(String.class), any(TaskManagerLocation.class), any(Time.class))) .thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(taskManagerResourceId, 1234))); when(jmGateway.getHostname()).thenReturn(jmAddress); when(jmGateway.offerSlots( eq(taskManagerResourceId), any(Iterable.class), - eq(jmLeaderId), any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); + when(jmGateway.getFencingToken()).thenReturn(jobMasterId); rpcService.registerGateway(rmAddress, resourceManager.getSelfGateway(ResourceManagerGateway.class)); @@ -193,7 +194,7 @@ public class TaskExecutorITCase extends TestLogger { rmLeaderRetrievalService.notifyListener(rmAddress, rmLeaderId); CompletableFuture<RegistrationResponse> registrationResponseFuture = rmGateway.registerJobManager( - jmLeaderId, + jobMasterId, jmResourceId, jmAddress, jobId, @@ -203,14 +204,14 @@ public class TaskExecutorITCase extends TestLogger { assertTrue(registrationResponse instanceof JobMasterRegistrationSuccess); - CompletableFuture<Acknowledge> slotAck = rmGateway.requestSlot(jmLeaderId, slotRequest, timeout); + CompletableFuture<Acknowledge> slotAck = rmGateway.requestSlot(jobMasterId, slotRequest, timeout); slotAck.get(); verify(jmGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots( eq(taskManagerResourceId), (Iterable<SlotOffer>)argThat(Matchers.contains(slotOffer)), - eq(jmLeaderId), any(Time.class)); + any(Time.class)); } finally { if (testingFatalErrorHandler.hasExceptionOccurred()) { testingFatalErrorHandler.rethrowError(); http://git-wip-us.apache.org/repos/asf/flink/blob/ba03b78c/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java index 2112c1b..7146445 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskExecutorTest.java @@ -55,6 +55,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.jobmaster.JMTMRegistrationSuccess; import org.apache.flink.runtime.jobmaster.JobMasterGateway; +import org.apache.flink.runtime.jobmaster.JobMasterId; import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.memory.MemoryManager; @@ -190,7 +191,6 @@ public class TaskExecutorTest extends TestLogger { when(jobMasterGateway.registerTaskManager( any(String.class), eq(taskManagerLocation), - eq(jmLeaderId), any(Time.class) )).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getAddress()).thenReturn(jobMasterAddress); @@ -228,7 +228,7 @@ public class TaskExecutorTest extends TestLogger { // register task manager success will trigger monitoring heartbeat target between tm and jm verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).registerTaskManager( - eq(taskManager.getAddress()), eq(taskManagerLocation), eq(jmLeaderId), any(Time.class)); + eq(taskManager.getAddress()), eq(taskManagerLocation), any(Time.class)); // the timeout should trigger disconnecting from the JobManager verify(jobMasterGateway, timeout(heartbeatTimeout * 50L)).disconnectTaskManager(eq(taskManagerLocation.getResourceID()), any(TimeoutException.class)); @@ -657,7 +657,7 @@ public class TaskExecutorTest extends TestLogger { final TaskManagerConfiguration taskManagerConfiguration = TaskManagerConfiguration.fromConfiguration(configuration); final JobID jobId = new JobID(); final AllocationID allocationId = new AllocationID(); - final UUID jobManagerLeaderId = UUID.randomUUID(); + final JobMasterId jobMasterId = JobMasterId.generate(); final JobVertexID jobVertexId = new JobVertexID(); JobInformation jobInformation = new JobInformation( @@ -694,11 +694,13 @@ public class TaskExecutorTest extends TestLogger { final LibraryCacheManager libraryCacheManager = mock(LibraryCacheManager.class); when(libraryCacheManager.getClassLoader(any(JobID.class))).thenReturn(ClassLoader.getSystemClassLoader()); + final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); + when(jobMasterGateway.getFencingToken()).thenReturn(jobMasterId); + final JobManagerConnection jobManagerConnection = new JobManagerConnection( jobId, ResourceID.generate(), - mock(JobMasterGateway.class), - jobManagerLeaderId, + jobMasterGateway, mock(TaskManagerActions.class), mock(CheckpointResponder.class), mock(BlobCache.class), @@ -755,7 +757,7 @@ public class TaskExecutorTest extends TestLogger { final TaskExecutorGateway tmGateway = taskManager.getSelfGateway(TaskExecutorGateway.class); - tmGateway.submitTask(tdd, jobManagerLeaderId, timeout); + tmGateway.submitTask(tdd, jobMasterId, timeout); CompletableFuture<Boolean> completionFuture = TestInvokable.completableFuture; @@ -833,14 +835,12 @@ public class TaskExecutorTest extends TestLogger { when(jobMasterGateway.registerTaskManager( any(String.class), eq(taskManagerLocation), - eq(jobManagerLeaderId), any(Time.class) )).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); when(jobMasterGateway.offerSlots( any(ResourceID.class), any(Iterable.class), - any(UUID.class), any(Time.class))).thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); @@ -894,7 +894,6 @@ public class TaskExecutorTest extends TestLogger { verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots( any(ResourceID.class), (Iterable<SlotOffer>)Matchers.argThat(contains(slotOffer)), - eq(jobManagerLeaderId), any(Time.class)); // check if a concurrent error occurred @@ -958,13 +957,12 @@ public class TaskExecutorTest extends TestLogger { when(jobMasterGateway.registerTaskManager( any(String.class), eq(taskManagerLocation), - eq(jobManagerLeaderId), any(Time.class) )).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); when(jobMasterGateway.offerSlots( - any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class))) + any(ResourceID.class), any(Iterable.class), any(Time.class))) .thenReturn(CompletableFuture.completedFuture((Collection<SlotOffer>)Collections.singleton(offer1))); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); @@ -1163,10 +1161,10 @@ public class TaskExecutorTest extends TestLogger { final ResourceID resourceManagerResourceId = new ResourceID(resourceManagerAddress); final String jobManagerAddress = "jm"; - final UUID jobManagerLeaderId = UUID.randomUUID(); + final JobMasterId jobMasterId = JobMasterId.generate(); final LeaderRetrievalService resourceManagerLeaderRetrievalService = new TestingLeaderRetrievalService(resourceManagerAddress, resourceManagerLeaderId); - final LeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(jobManagerAddress, jobManagerLeaderId); + final LeaderRetrievalService jobManagerLeaderRetrievalService = new TestingLeaderRetrievalService(jobManagerAddress, jobMasterId.toUUID()); haServices.setResourceManagerLeaderRetriever(resourceManagerLeaderRetrievalService); haServices.setJobMasterLeaderRetriever(jobId, jobManagerLeaderRetrievalService); @@ -1193,13 +1191,10 @@ public class TaskExecutorTest extends TestLogger { when(jobMasterGateway.registerTaskManager( any(String.class), eq(taskManagerLocation), - eq(jobManagerLeaderId), any(Time.class) )).thenReturn(CompletableFuture.completedFuture(new JMTMRegistrationSuccess(jmResourceId, blobPort))); when(jobMasterGateway.getHostname()).thenReturn(jobManagerAddress); - when(jobMasterGateway.updateTaskExecutionState( - any(UUID.class), - any(TaskExecutionState.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); + when(jobMasterGateway.updateTaskExecutionState(any(TaskExecutionState.class))).thenReturn(CompletableFuture.completedFuture(Acknowledge.get())); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); @@ -1212,7 +1207,6 @@ public class TaskExecutorTest extends TestLogger { jobId, jmResourceId, jobMasterGateway, - jobManagerLeaderId, mock(TaskManagerActions.class), mock(CheckpointResponder.class), mock(BlobCache.class), @@ -1297,7 +1291,6 @@ public class TaskExecutorTest extends TestLogger { jobMasterGateway.offerSlots( any(ResourceID.class), any(Iterable.class), - eq(jobManagerLeaderId), any(Time.class))) .thenReturn(offerResultFuture); @@ -1305,10 +1298,10 @@ public class TaskExecutorTest extends TestLogger { // been properly started. This will also offer the slots to the job master jobLeaderService.addJob(jobId, jobManagerAddress); - verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Iterable.class), eq(jobManagerLeaderId), any(Time.class)); + verify(jobMasterGateway, Mockito.timeout(timeout.toMilliseconds())).offerSlots(any(ResourceID.class), any(Iterable.class), any(Time.class)); // submit the task without having acknowledge the offered slots - tmGateway.submitTask(tdd, jobManagerLeaderId, timeout); + tmGateway.submitTask(tdd, jobMasterId, timeout); // acknowledge the offered slots offerResultFuture.complete(Collections.singleton(offer1)); @@ -1351,7 +1344,6 @@ public class TaskExecutorTest extends TestLogger { final JobID jobId = new JobID(); final JobMasterGateway jobMasterGateway = mock(JobMasterGateway.class); when(jobMasterGateway.getHostname()).thenReturn("localhost"); - final UUID jobLeaderId = UUID.randomUUID(); final JMTMRegistrationSuccess registrationMessage = new JMTMRegistrationSuccess(ResourceID.generate(), 1); final JobManagerTable jobManagerTableMock = spy(new JobManagerTable()); @@ -1382,10 +1374,10 @@ public class TaskExecutorTest extends TestLogger { JobLeaderListener taskExecutorListener = jobLeaderListenerArgumentCaptor.getValue(); - taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, jobLeaderId, registrationMessage); + taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, registrationMessage); // duplicate job manager gained leadership message - taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, jobLeaderId, registrationMessage); + taskExecutorListener.jobManagerGainedLeadership(jobId, jobMasterGateway, registrationMessage); ArgumentCaptor<JobManagerConnection> jobManagerConnectionArgumentCaptor = ArgumentCaptor.forClass(JobManagerConnection.class);