[FLINK-7504] Fence the ResourceManager Properly fences the ResourceManager by letting it extend the FencedRpcEndpoint. Moreover, this PR introduces a ResourceManagerId which replaces the UUID as leader id/fencing token. This will give us more type safety when defining rpc interfaces.
This closes #4582. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/e70de0eb Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/e70de0eb Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/e70de0eb Branch: refs/heads/master Commit: e70de0eb8f2f2a89cf74aa4bc6d1501e07e22d43 Parents: 84c2a92 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Aug 24 15:11:00 2017 +0200 Committer: Till <till.rohrm...@gmail.com> Committed: Mon Sep 4 13:44:01 2017 +0200 ---------------------------------------------------------------------- .../MesosResourceManagerTest.java | 7 +- .../apache/flink/runtime/instance/SlotPool.java | 10 +- .../flink/runtime/instance/SlotPoolGateway.java | 4 +- .../flink/runtime/jobmaster/JobMaster.java | 49 +-- .../runtime/jobmaster/JobMasterGateway.java | 5 +- .../jobmaster/JobMasterRegistrationSuccess.java | 15 +- .../flink/runtime/minicluster/MiniCluster.java | 5 +- .../registration/RetryingRegistration.java | 12 +- .../resourcemanager/ResourceManager.java | 314 +++++++------------ .../resourcemanager/ResourceManagerGateway.java | 38 +-- .../resourcemanager/ResourceManagerId.java | 58 ++++ .../slotmanager/SlotManager.java | 22 +- .../runtime/taskexecutor/JobLeaderService.java | 7 +- .../runtime/taskexecutor/TaskExecutor.java | 35 +-- .../taskexecutor/TaskExecutorGateway.java | 8 +- ...TaskExecutorToResourceManagerConnection.java | 20 +- .../clusterframework/ResourceManagerTest.java | 7 +- .../flink/runtime/instance/SlotPoolTest.java | 14 +- .../flink/runtime/jobmaster/JobMasterTest.java | 9 +- .../runtime/metrics/MetricRegistryTest.java | 6 +- .../resourcemanager/ResourceManagerHATest.java | 37 ++- .../ResourceManagerJobMasterTest.java | 68 ++-- .../ResourceManagerTaskExecutorTest.java | 40 ++- .../slotmanager/SlotManagerTest.java | 115 ++++--- .../slotmanager/SlotProtocolTest.java | 14 +- .../taskexecutor/TaskExecutorITCase.java | 3 +- .../runtime/taskexecutor/TaskExecutorTest.java | 48 ++- 27 files changed, 462 insertions(+), 508 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java index cf0c913..02b043e 100644 --- a/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java +++ b/flink-mesos/src/test/java/org/apache/flink/mesos/runtime/clusterframework/MesosResourceManagerTest.java @@ -54,6 +54,7 @@ import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActions; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; @@ -326,7 +327,7 @@ public class MesosResourceManagerTest extends TestLogger { slotManagerStarted.complete(true); return null; } - }).when(slotManager).start(any(UUID.class), any(Executor.class), any(ResourceManagerActions.class)); + }).when(slotManager).start(any(ResourceManagerId.class), any(Executor.class), any(ResourceManagerActions.class)); when(slotManager.registerSlotRequest(any(SlotRequest.class))).thenReturn(true); } @@ -441,7 +442,7 @@ public class MesosResourceManagerTest extends TestLogger { */ public void registerJobMaster(MockJobMaster jobMaster) throws Exception { CompletableFuture<RegistrationResponse> registration = resourceManager.registerJobManager( - rmServices.rmLeaderSessionId, jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID, timeout); + jobMaster.leaderSessionID, jobMaster.resourceID, jobMaster.address, jobMaster.jobID, timeout); assertTrue(registration.get() instanceof JobMasterRegistrationSuccess); } @@ -617,7 +618,7 @@ public class MesosResourceManagerTest extends TestLogger { // send registration message CompletableFuture<RegistrationResponse> successfulFuture = - resourceManager.registerTaskExecutor(rmServices.rmLeaderSessionId, task1Executor.address, task1Executor.resourceID, slotReport, timeout); + resourceManager.registerTaskExecutor(task1Executor.address, task1Executor.resourceID, slotReport, timeout); RegistrationResponse response = successfulFuture.get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); assertTrue(response instanceof TaskExecutorRegistrationSuccess); http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java index bf3de25..326e3a2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPool.java @@ -121,9 +121,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { /** the leader id of job manager */ private UUID jobManagerLeaderId; - /** The leader id of resource manager */ - private UUID resourceManagerLeaderId; - /** The gateway to communicate with resource manager */ private ResourceManagerGateway resourceManagerGateway; @@ -199,7 +196,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { // do not accept any requests jobManagerLeaderId = null; - resourceManagerLeaderId = null; resourceManagerGateway = null; // Clear (but not release!) the available slots. The TaskManagers should re-register them @@ -240,8 +236,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { // ------------------------------------------------------------------------ @Override - public void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway) { - this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); + public void connectToResourceManager(ResourceManagerGateway resourceManagerGateway) { this.resourceManagerGateway = checkNotNull(resourceManagerGateway); // work on all slots waiting for this connection @@ -255,7 +250,6 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { @Override public void disconnectResourceManager() { - this.resourceManagerLeaderId = null; this.resourceManagerGateway = null; } @@ -319,7 +313,7 @@ public class SlotPool extends RpcEndpoint implements SlotPoolGateway { pendingRequests.put(allocationID, new PendingRequest(allocationID, future, resources)); CompletableFuture<Acknowledge> rmResponse = resourceManagerGateway.requestSlot( - jobManagerLeaderId, resourceManagerLeaderId, + jobManagerLeaderId, new SlotRequest(jobId, allocationID, resources, jobManagerAddress), resourceManagerRequestsTimeout); http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java index 32a9af5..06c4b12 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/instance/SlotPoolGateway.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.taskexecutor.slot.SlotOffer; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import java.util.Collection; -import java.util.UUID; import java.util.concurrent.CompletableFuture; /** @@ -55,10 +54,9 @@ public interface SlotPoolGateway extends RpcGateway { * Connects the SlotPool to the given ResourceManager. After this method is called, the * SlotPool will be able to request resources from the given ResourceManager. * - * @param resourceManagerLeaderId The leader session ID of the resource manager. * @param resourceManagerGateway The RPC gateway for the resource manager. */ - void connectToResourceManager(UUID resourceManagerLeaderId, ResourceManagerGateway resourceManagerGateway); + void connectToResourceManager(ResourceManagerGateway resourceManagerGateway); /** * Disconnects the slot pool from its current Resource Manager. After this call, the pool will not http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index 55bc8f8..a05242a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -81,6 +81,7 @@ import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -756,17 +757,17 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { @Override public void disconnectResourceManager( final UUID jobManagerLeaderId, - final UUID resourceManagerLeaderId, + final ResourceManagerId resourceManagerId, final Exception cause) { try { validateLeaderSessionId(jobManagerLeaderId); } catch (LeaderIdMismatchException e) { - log.warn("Cannot disconnect resource manager " + resourceManagerLeaderId + '.', e); + log.warn("Cannot disconnect resource manager " + resourceManagerId + '.', e); } if (resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) { + && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerId)) { closeResourceManagerConnection(cause); } } @@ -944,11 +945,11 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { } } - private void notifyOfNewResourceManagerLeader(final String resourceManagerAddress, final UUID resourceManagerLeaderId) { + private void notifyOfNewResourceManagerLeader(final String resourceManagerAddress, final ResourceManagerId resourceManagerId) { if (resourceManagerConnection != null) { if (resourceManagerAddress != null) { - if (resourceManagerAddress.equals(resourceManagerConnection.getTargetAddress()) - && resourceManagerLeaderId.equals(resourceManagerConnection.getTargetLeaderId())) { + if (Objects.equals(resourceManagerAddress, resourceManagerConnection.getTargetAddress()) + && Objects.equals(resourceManagerId, resourceManagerConnection.getTargetLeaderId())) { // both address and leader id are not changed, we can keep the old connection return; } @@ -974,7 +975,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { getAddress(), leaderSessionID, resourceManagerAddress, - resourceManagerLeaderId, + resourceManagerId, executor); resourceManagerConnection.start(); @@ -982,17 +983,17 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { } private void establishResourceManagerConnection(final JobMasterRegistrationSuccess success) { - final UUID resourceManagerLeaderId = success.getResourceManagerLeaderId(); + final ResourceManagerId resourceManagerId = success.getResourceManagerId(); // verify the response with current connection if (resourceManagerConnection != null - && resourceManagerConnection.getTargetLeaderId().equals(resourceManagerLeaderId)) { + && Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) { - log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerLeaderId); + log.info("JobManager successfully registered at ResourceManager, leader id: {}.", resourceManagerId); final ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); - slotPoolGateway.connectToResourceManager(resourceManagerLeaderId, resourceManagerGateway); + slotPoolGateway.connectToResourceManager(resourceManagerGateway); resourceManagerHeartbeatManager.monitorTarget(success.getResourceManagerResourceId(), new HeartbeatTarget<Void>() { @Override @@ -1005,6 +1006,9 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { // request heartbeat will never be called on the job manager side } }); + } else { + log.debug("Ignoring resource manager connection to {} because its a duplicate or outdated.", resourceManagerId); + } } @@ -1038,12 +1042,10 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID); - } - }); + runAsync( + () -> notifyOfNewResourceManagerLeader( + leaderAddress, + leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null)); } @Override @@ -1055,7 +1057,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { //---------------------------------------------------------------------------------------------- private class ResourceManagerConnection - extends RegisteredRpcConnection<UUID, ResourceManagerGateway, JobMasterRegistrationSuccess> + extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> { private final JobID jobID; @@ -1074,10 +1076,10 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { final String jobManagerRpcAddress, final UUID jobManagerLeaderID, final String resourceManagerAddress, - final UUID resourceManagerLeaderID, + final ResourceManagerId resourceManagerId, final Executor executor) { - super(log, resourceManagerAddress, resourceManagerLeaderID, executor); + super(log, resourceManagerAddress, resourceManagerId, executor); this.jobID = checkNotNull(jobID); this.jobManagerResourceID = checkNotNull(jobManagerResourceID); this.jobManagerRpcAddress = checkNotNull(jobManagerRpcAddress); @@ -1085,19 +1087,18 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { } @Override - protected RetryingRegistration<UUID, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() { - return new RetryingRegistration<UUID, ResourceManagerGateway, JobMasterRegistrationSuccess>( + protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() { + return new RetryingRegistration<ResourceManagerId, ResourceManagerGateway, JobMasterRegistrationSuccess>( log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, getTargetAddress(), getTargetLeaderId()) { @Override protected CompletableFuture<RegistrationResponse> invokeRegistration( - ResourceManagerGateway gateway, UUID leaderId, long timeoutMillis) throws Exception + ResourceManagerGateway gateway, ResourceManagerId fencingToken, long timeoutMillis) throws Exception { Time timeout = Time.milliseconds(timeoutMillis); return gateway.registerJobManager( - leaderId, jobManagerLeaderID, jobManagerResourceID, jobManagerRpcAddress, http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java index bfa2930..b39f419 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterGateway.java @@ -36,6 +36,7 @@ import org.apache.flink.runtime.query.KvStateID; import org.apache.flink.runtime.query.KvStateLocation; import org.apache.flink.runtime.query.KvStateServerAddress; import org.apache.flink.runtime.registration.RegistrationResponse; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.state.internal.InternalKvState; import org.apache.flink.runtime.state.KeyGroupRange; @@ -123,12 +124,12 @@ public interface JobMasterGateway extends CheckpointCoordinatorGateway { * Disconnects the resource manager from the job manager because of the given cause. * * @param jobManagerLeaderId identifying the job manager leader id - * @param resourceManagerLeaderId identifying the resource manager leader id + * @param resourceManagerId identifying the resource manager leader id * @param cause of the disconnect */ void disconnectResourceManager( final UUID jobManagerLeaderId, - final UUID resourceManagerLeaderId, + final ResourceManagerId resourceManagerId, final Exception cause); /** http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java index a7a6224..94ecfd2 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMasterRegistrationSuccess.java @@ -20,8 +20,7 @@ package org.apache.flink.runtime.jobmaster; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.registration.RegistrationResponse; - -import java.util.UUID; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -34,16 +33,16 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { private final long heartbeatInterval; - private final UUID resourceManagerLeaderId; + private final ResourceManagerId resourceManagerId; private final ResourceID resourceManagerResourceId; public JobMasterRegistrationSuccess( final long heartbeatInterval, - final UUID resourceManagerLeaderId, + final ResourceManagerId resourceManagerId, final ResourceID resourceManagerResourceId) { this.heartbeatInterval = heartbeatInterval; - this.resourceManagerLeaderId = checkNotNull(resourceManagerLeaderId); + this.resourceManagerId = checkNotNull(resourceManagerId); this.resourceManagerResourceId = checkNotNull(resourceManagerResourceId); } @@ -56,8 +55,8 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { return heartbeatInterval; } - public UUID getResourceManagerLeaderId() { - return resourceManagerLeaderId; + public ResourceManagerId getResourceManagerId() { + return resourceManagerId; } public ResourceID getResourceManagerResourceId() { @@ -68,7 +67,7 @@ public class JobMasterRegistrationSuccess extends RegistrationResponse.Success { public String toString() { return "JobMasterRegistrationSuccess{" + "heartbeatInterval=" + heartbeatInterval + - ", resourceManagerLeaderId=" + resourceManagerLeaderId + + ", resourceManagerLeaderId=" + resourceManagerId + ", resourceManagerResourceId=" + resourceManagerResourceId + '}'; } http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java index 9a4a76a..95f430c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/minicluster/MiniCluster.java @@ -38,6 +38,7 @@ import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService; import org.apache.flink.runtime.metrics.MetricRegistry; import org.apache.flink.runtime.metrics.MetricRegistryConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.ResourceManagerRunner; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcService; @@ -419,14 +420,14 @@ public class MiniCluster { final LeaderAddressAndId addressAndId = addressAndIdFuture.get(); final ResourceManagerGateway resourceManager = - commonRpcService.connect(addressAndId.leaderAddress(), ResourceManagerGateway.class).get(); + commonRpcService.connect(addressAndId.leaderAddress(), new ResourceManagerId(addressAndId.leaderId()), ResourceManagerGateway.class).get(); final int numTaskManagersToWaitFor = taskManagers.length; // poll and wait until enough TaskManagers are available while (true) { int numTaskManagersAvailable = - resourceManager.getNumberOfRegisteredTaskManagers(addressAndId.leaderId()).get(); + resourceManager.getNumberOfRegisteredTaskManagers().get(); if (numTaskManagersAvailable >= numTaskManagersToWaitFor) { break; http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index be30c68..ce4a798 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.registration; import org.apache.flink.api.java.tuple.Tuple2; +import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcService; @@ -176,7 +177,16 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc public void startRegistration() { try { // trigger resolution of the resource manager address to a callable gateway - CompletableFuture<G> resourceManagerFuture = rpcService.connect(targetAddress, targetType); + final CompletableFuture<G> resourceManagerFuture; + + if (FencedRpcGateway.class.isAssignableFrom(targetType)) { + resourceManagerFuture = (CompletableFuture<G>) rpcService.connect( + targetAddress, + fencingToken, + targetType.asSubclass(FencedRpcGateway.class)); + } else { + resourceManagerFuture = rpcService.connect(targetAddress, targetType); + } // upon success, start the registration attempts CompletableFuture<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync( http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 6b2c898..659b3d4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -33,7 +33,6 @@ import org.apache.flink.runtime.heartbeat.HeartbeatManager; import org.apache.flink.runtime.heartbeat.HeartbeatServices; import org.apache.flink.runtime.heartbeat.HeartbeatTarget; import org.apache.flink.runtime.highavailability.HighAvailabilityServices; -import org.apache.flink.runtime.highavailability.LeaderIdMismatchException; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.jobmaster.JobMasterGateway; @@ -50,11 +49,10 @@ import org.apache.flink.runtime.resourcemanager.slotmanager.ResourceManagerActio import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManagerException; import org.apache.flink.runtime.rpc.FatalErrorHandler; -import org.apache.flink.runtime.rpc.RpcEndpoint; +import org.apache.flink.runtime.rpc.FencedRpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.exceptions.LeaderSessionIDException; import org.apache.flink.runtime.taskexecutor.SlotReport; -import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.util.ExceptionUtils; @@ -77,12 +75,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * * <p>It offers the following methods as part of its rpc interface to interact with him remotely: * <ul> - * <li>{@link #registerJobManager(UUID, UUID, ResourceID, String, JobID, Time)} registers a {@link JobMaster} at the resource manager</li> - * <li>{@link #requestSlot(UUID, UUID, SlotRequest, Time)} requests a slot from the resource manager</li> + * <li>{@link #registerJobManager(UUID, ResourceID, String, JobID, Time)} registers a {@link JobMaster} at the resource manager</li> + * <li>{@link #requestSlot(UUID, SlotRequest, Time)} requests a slot from the resource manager</li> * </ul> */ public abstract class ResourceManager<WorkerType extends Serializable> - extends RpcEndpoint + extends FencedRpcEndpoint<ResourceManagerId> implements ResourceManagerGateway, LeaderContender { public static final String RESOURCE_MANAGER_NAME = "resourcemanager"; @@ -126,9 +124,6 @@ public abstract class ResourceManager<WorkerType extends Serializable> /** The service to elect a ResourceManager leader. */ private LeaderElectionService leaderElectionService; - /** ResourceManager's leader session id which is updated on leader election. */ - private volatile UUID leaderSessionId; - /** All registered listeners for status updates of the ResourceManager. */ private ConcurrentMap<String, InfoMessageListenerRpcGateway> infoMessageListeners; @@ -144,7 +139,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> JobLeaderIdService jobLeaderIdService, FatalErrorHandler fatalErrorHandler) { - super(rpcService, resourceManagerEndpointId); + super(rpcService, resourceManagerEndpointId, ResourceManagerId.generate()); this.resourceId = checkNotNull(resourceId); this.resourceManagerConfiguration = checkNotNull(resourceManagerConfiguration); @@ -169,7 +164,6 @@ public abstract class ResourceManager<WorkerType extends Serializable> this.jobManagerRegistrations = new HashMap<>(4); this.jmResourceIdRegistrations = new HashMap<>(4); this.taskExecutors = new HashMap<>(8); - this.leaderSessionId = null; infoMessageListeners = new ConcurrentHashMap<>(8); } @@ -246,147 +240,109 @@ public abstract class ResourceManager<WorkerType extends Serializable> @Override public CompletableFuture<RegistrationResponse> registerJobManager( - final UUID resourceManagerLeaderId, final UUID jobManagerLeaderId, final ResourceID jobManagerResourceId, final String jobManagerAddress, final JobID jobId, final Time timeout) { - checkNotNull(resourceManagerLeaderId); checkNotNull(jobManagerLeaderId); checkNotNull(jobManagerResourceId); checkNotNull(jobManagerAddress); checkNotNull(jobId); - if (isValid(resourceManagerLeaderId)) { - if (!jobLeaderIdService.containsJob(jobId)) { - try { - jobLeaderIdService.addJob(jobId); - } catch (Exception e) { - ResourceManagerException exception = new ResourceManagerException("Could not add the job " + - jobId + " to the job id leader service.", e); + if (!jobLeaderIdService.containsJob(jobId)) { + try { + jobLeaderIdService.addJob(jobId); + } catch (Exception e) { + ResourceManagerException exception = new ResourceManagerException("Could not add the job " + + jobId + " to the job id leader service.", e); onFatalError(exception); - log.error("Could not add job {} to job leader id service.", jobId, e); - return FutureUtils.completedExceptionally(exception); - } + log.error("Could not add job {} to job leader id service.", jobId, e); + return FutureUtils.completedExceptionally(exception); } + } - log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); + log.info("Registering job manager {}@{} for job {}.", jobManagerLeaderId, jobManagerAddress, jobId); - CompletableFuture<UUID> jobLeaderIdFuture; + CompletableFuture<UUID> jobLeaderIdFuture; - try { - jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId); - } catch (Exception e) { - // we cannot check the job leader id so let's fail - // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id - ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + - "job leader id future to verify the correct job leader.", e); + try { + jobLeaderIdFuture = jobLeaderIdService.getLeaderId(jobId); + } catch (Exception e) { + // we cannot check the job leader id so let's fail + // TODO: Maybe it's also ok to skip this check in case that we cannot check the leader id + ResourceManagerException exception = new ResourceManagerException("Cannot obtain the " + + "job leader id future to verify the correct job leader.", e); onFatalError(exception); - log.debug("Could not obtain the job leader id future to verify the correct job leader."); - return FutureUtils.completedExceptionally(exception); - } - - CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class); - - CompletableFuture<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync( - jobLeaderIdFuture, - (JobMasterGateway jobMasterGateway, UUID jobLeaderId) -> { - if (isValid(resourceManagerLeaderId)) { - if (Objects.equals(jobLeaderId, jobManagerLeaderId)) { - return registerJobMasterInternal( - jobMasterGateway, - jobLeaderId, - jobId, - jobManagerAddress, - jobManagerResourceId); - } else { - log.debug("The job manager leader id {} did not match the job " + - "leader id {}.", jobManagerLeaderId, jobLeaderId); - return new RegistrationResponse.Decline("Job manager leader id did not match."); - } - } else { - log.debug("The resource manager leader id changed {}. Discarding job " + - "manager registration from {}.", getLeaderSessionId(), jobManagerAddress); - return new RegistrationResponse.Decline("Resource manager leader id changed."); - } - }, - getMainThreadExecutor()); + log.debug("Could not obtain the job leader id future to verify the correct job leader."); + return FutureUtils.completedExceptionally(exception); + } - // handle exceptions which might have occurred in one of the futures inputs of combine - return registrationResponseFuture.handleAsync( - (RegistrationResponse registrationResponse, Throwable throwable) -> { - if (throwable != null) { - if (log.isDebugEnabled()) { - log.debug("Registration of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress, throwable); - } else { - log.info("Registration of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress); - } + CompletableFuture<JobMasterGateway> jobMasterGatewayFuture = getRpcService().connect(jobManagerAddress, JobMasterGateway.class); - return new RegistrationResponse.Decline(throwable.getMessage()); + CompletableFuture<RegistrationResponse> registrationResponseFuture = jobMasterGatewayFuture.thenCombineAsync( + jobLeaderIdFuture, + (JobMasterGateway jobMasterGateway, UUID jobLeaderId) -> { + if (Objects.equals(jobLeaderId, jobManagerLeaderId)) { + return registerJobMasterInternal( + jobMasterGateway, + jobLeaderId, + jobId, + jobManagerAddress, + jobManagerResourceId); + } else { + log.debug("The job manager leader id {} did not match the job " + + "leader id {}.", jobManagerLeaderId, jobLeaderId); + return new RegistrationResponse.Decline("Job manager leader id did not match."); + } + }, + getMainThreadExecutor()); + + // handle exceptions which might have occurred in one of the futures inputs of combine + return registrationResponseFuture.handleAsync( + (RegistrationResponse registrationResponse, Throwable throwable) -> { + if (throwable != null) { + if (log.isDebugEnabled()) { + log.debug("Registration of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress, throwable); } else { - return registrationResponse; + log.info("Registration of job manager {}@{} failed.", jobManagerLeaderId, jobManagerAddress); } - }, - getRpcService().getExecutor()); - } else { - log.debug("Discard register job manager message from {}, because the leader id " + - "{} did not match the expected leader id {}.", jobManagerAddress, - resourceManagerLeaderId, leaderSessionId); - return CompletableFuture.completedFuture( - new RegistrationResponse.Decline("Resource manager leader id did not match.")); - } + return new RegistrationResponse.Decline(throwable.getMessage()); + } else { + return registrationResponse; + } + }, + getRpcService().getExecutor()); } - /** - * Register a {@link TaskExecutor} at the resource manager. - * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param taskExecutorResourceId The resource ID of the TaskExecutor that registers - * - * @return The response by the ResourceManager. - */ @Override public CompletableFuture<RegistrationResponse> registerTaskExecutor( - final UUID resourceManagerLeaderId, final String taskExecutorAddress, final ResourceID taskExecutorResourceId, final SlotReport slotReport, final Time timeout) { - if (Objects.equals(leaderSessionId, resourceManagerLeaderId)) { - CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); + CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); - return taskExecutorGatewayFuture.handleAsync( - (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> { - if (throwable != null) { - return new RegistrationResponse.Decline(throwable.getMessage()); - } else { - return registerTaskExecutorInternal( - taskExecutorGateway, - taskExecutorAddress, - taskExecutorResourceId, - slotReport); - } - }, - getMainThreadExecutor()); - } else { - log.warn("Discard registration from TaskExecutor {} at ({}) because the expected leader session ID {} did " + - "not equal the received leader session ID {}", - taskExecutorResourceId, taskExecutorAddress, leaderSessionId, resourceManagerLeaderId); - - return CompletableFuture.completedFuture( - new RegistrationResponse.Decline("Discard registration because the leader id " + - resourceManagerLeaderId + " does not match the expected leader id " + - leaderSessionId + '.')); - } + return taskExecutorGatewayFuture.handleAsync( + (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> { + if (throwable != null) { + return new RegistrationResponse.Decline(throwable.getMessage()); + } else { + return registerTaskExecutorInternal( + taskExecutorGateway, + taskExecutorAddress, + taskExecutorResourceId, + slotReport); + } + }, + getMainThreadExecutor()); } @Override @@ -409,23 +365,12 @@ public abstract class ResourceManager<WorkerType extends Serializable> closeJobManagerConnection(jobId, cause); } - /** - * Requests a slot from the resource manager. - * - * @param slotRequest Slot request - * @return Slot assignment - */ @Override public CompletableFuture<Acknowledge> requestSlot( UUID jobMasterLeaderID, - UUID resourceManagerLeaderID, SlotRequest slotRequest, final Time timeout) { - if (!Objects.equals(resourceManagerLeaderID, leaderSessionId)) { - return FutureUtils.completedExceptionally(new LeaderSessionIDException(resourceManagerLeaderID, leaderSessionId)); - } - JobID jobId = slotRequest.getJobId(); JobManagerRegistration jobManagerRegistration = jobManagerRegistrations.get(jobId); @@ -452,41 +397,27 @@ public abstract class ResourceManager<WorkerType extends Serializable> } } - /** - * Notification from a TaskExecutor that a slot has become available. - * - * @param resourceManagerLeaderId TaskExecutor's resource manager leader id - * @param instanceID TaskExecutor's instance id - * @param slotId The slot id of the available slot - */ @Override public void notifySlotAvailable( - final UUID resourceManagerLeaderId, final InstanceID instanceID, final SlotID slotId, final AllocationID allocationId) { - if (Objects.equals(resourceManagerLeaderId, leaderSessionId)) { - final ResourceID resourceId = slotId.getResourceID(); - WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceId); + final ResourceID resourceId = slotId.getResourceID(); + WorkerRegistration<WorkerType> registration = taskExecutors.get(resourceId); - if (registration != null) { - InstanceID registrationId = registration.getInstanceID(); + if (registration != null) { + InstanceID registrationId = registration.getInstanceID(); - if (Objects.equals(registrationId, instanceID)) { - slotManager.freeSlot(slotId, allocationId); - } else { - log.debug("Invalid registration id for slot available message. This indicates an" + - " outdated request."); - } + if (Objects.equals(registrationId, instanceID)) { + slotManager.freeSlot(slotId, allocationId); } else { - log.debug("Could not find registration for resource id {}. Discarding the slot available" + - "message {}.", resourceId, slotId); + log.debug("Invalid registration id for slot available message. This indicates an" + + " outdated request."); } } else { - log.debug("Discarding notify slot available message for slot {}, because the " + - "leader id {} did not match the expected leader id {}.", slotId, - resourceManagerLeaderId, leaderSessionId); + log.debug("Could not find registration for resource id {}. Discarding the slot available" + + "message {}.", resourceId, slotId); } } @@ -545,27 +476,8 @@ public abstract class ResourceManager<WorkerType extends Serializable> } @Override - public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers(UUID requestLeaderSessionId) { - if (Objects.equals(leaderSessionId, requestLeaderSessionId)) { - return CompletableFuture.completedFuture(taskExecutors.size()); - } - else { - return FutureUtils.completedExceptionally(new LeaderIdMismatchException(leaderSessionId, requestLeaderSessionId)); - } - } - - // ------------------------------------------------------------------------ - // Testing methods - // ------------------------------------------------------------------------ - - /** - * Gets the leader session id of current resourceManager. - * - * @return return the leaderSessionId of current resourceManager, this returns null until the current resourceManager is granted leadership. - */ - @VisibleForTesting - UUID getLeaderSessionId() { - return leaderSessionId; + public CompletableFuture<Integer> getNumberOfRegisteredTaskManagers() { + return CompletableFuture.completedFuture(taskExecutors.size()); } // ------------------------------------------------------------------------ @@ -635,7 +547,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> return new JobMasterRegistrationSuccess( resourceManagerConfiguration.getHeartbeatInterval().toMilliseconds(), - getLeaderSessionId(), + getFencingToken(), resourceId); } @@ -706,8 +618,6 @@ public abstract class ResourceManager<WorkerType extends Serializable> } catch (Exception e) { onFatalError(new ResourceManagerException("Could not properly clear the job leader id service.", e)); } - - leaderSessionId = null; } /** @@ -735,7 +645,7 @@ public abstract class ResourceManager<WorkerType extends Serializable> jmResourceIdRegistrations.remove(jobManagerResourceId); // tell the job manager about the disconnect - jobMasterGateway.disconnectResourceManager(jobManagerLeaderId, getLeaderSessionId(), cause); + jobMasterGateway.disconnectResourceManager(jobManagerLeaderId, getFencingToken(), cause); } else { log.debug("There was no registered job manager for job {}.", jobId); } @@ -765,17 +675,6 @@ public abstract class ResourceManager<WorkerType extends Serializable> } } - /** - * Checks whether the given resource manager leader id is matching the current leader id and - * not null. - * - * @param resourceManagerLeaderId to check - * @return True if the given leader id matches the actual leader id and is not null; otherwise false - */ - protected boolean isValid(UUID resourceManagerLeaderId) { - return Objects.equals(resourceManagerLeaderId, leaderSessionId); - } - protected void removeJob(JobID jobId) { try { jobLeaderIdService.removeJob(jobId); @@ -848,29 +747,26 @@ public abstract class ResourceManager<WorkerType extends Serializable> */ @Override public void grantLeadership(final UUID newLeaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was granted leadership with leader session ID {}", getAddress(), newLeaderSessionID); + runAsyncWithoutFencing( + () -> { + final ResourceManagerId newResourceManagerId = new ResourceManagerId(newLeaderSessionID); + + log.info("ResourceManager {} was granted leadership with fencing token {}", getAddress(), newResourceManagerId); // clear the state if we've been the leader before - if (leaderSessionId != null) { + if (getFencingToken() != null) { clearState(); } - leaderSessionId = newLeaderSessionID; + setFencingToken(newResourceManagerId); - slotManager.start(leaderSessionId, getMainThreadExecutor(), new ResourceManagerActionsImpl()); + slotManager.start(getFencingToken(), getMainThreadExecutor(), new ResourceManagerActionsImpl()); - getRpcService().execute(new Runnable() { - @Override - public void run() { + getRpcService().execute( + () -> // confirming the leader session ID might be blocking, - leaderElectionService.confirmLeaderSessionID(newLeaderSessionID); - } - }); - } - }); + leaderElectionService.confirmLeaderSessionID(newLeaderSessionID)); + }); } /** @@ -878,16 +774,18 @@ public abstract class ResourceManager<WorkerType extends Serializable> */ @Override public void revokeLeadership() { - runAsync(new Runnable() { - @Override - public void run() { - log.info("ResourceManager {} was revoked leadership.", getAddress()); + runAsyncWithoutFencing( + () -> { + final ResourceManagerId newResourceManagerId = ResourceManagerId.generate(); + + log.info("ResourceManager {} was revoked leadership. Setting fencing token to {}.", getAddress(), newResourceManagerId); clearState(); + setFencingToken(newResourceManagerId); + slotManager.suspend(); - } - }); + }); } /** http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java index 1ba6893..ac81048 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerGateway.java @@ -26,11 +26,12 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.Acknowledge; -import org.apache.flink.runtime.rpc.RpcGateway; +import org.apache.flink.runtime.rpc.FencedRpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.jobmaster.JobMaster; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.taskexecutor.SlotReport; +import org.apache.flink.runtime.taskexecutor.TaskExecutor; import java.util.UUID; import java.util.concurrent.CompletableFuture; @@ -38,69 +39,61 @@ import java.util.concurrent.CompletableFuture; /** * The {@link ResourceManager}'s RPC gateway interface. */ -public interface ResourceManagerGateway extends RpcGateway { +public interface ResourceManagerGateway extends FencedRpcGateway<ResourceManagerId> { /** * Register a {@link JobMaster} at the resource manager. * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader * @param jobMasterLeaderId The fencing token for the JobMaster leader - * @param jobMasterResourceId The resource ID of the JobMaster that registers - * @param jobMasterAddress The address of the JobMaster that registers - * @param jobID The Job ID of the JobMaster that registers - * @param timeout Timeout for the future to complete + * @param jobMasterResourceId The resource ID of the JobMaster that registers + * @param jobMasterAddress The address of the JobMaster that registers + * @param jobId The Job ID of the JobMaster that registers + * @param timeout Timeout for the future to complete * @return Future registration response */ CompletableFuture<RegistrationResponse> registerJobManager( - UUID resourceManagerLeaderId, UUID jobMasterLeaderId, ResourceID jobMasterResourceId, String jobMasterAddress, - JobID jobID, + JobID jobId, @RpcTimeout Time timeout); /** * Requests a slot from the resource manager. * - * @param resourceManagerLeaderID leader if of the ResourceMaster * @param jobMasterLeaderID leader if of the JobMaster * @param slotRequest The slot to request * @return The confirmation that the slot gets allocated */ CompletableFuture<Acknowledge> requestSlot( - UUID resourceManagerLeaderID, UUID jobMasterLeaderID, SlotRequest slotRequest, @RpcTimeout Time timeout); /** - * Register a {@link org.apache.flink.runtime.taskexecutor.TaskExecutor} at the resource manager. + * Register a {@link TaskExecutor} at the resource manager. * - * @param resourceManagerLeaderId The fencing token for the ResourceManager leader - * @param taskExecutorAddress The address of the TaskExecutor that registers - * @param resourceID The resource ID of the TaskExecutor that registers - * @param slotReport The slot report containing free and allocated task slots - * @param timeout The timeout for the response. + * @param taskExecutorAddress The address of the TaskExecutor that registers + * @param resourceId The resource ID of the TaskExecutor that registers + * @param slotReport The slot report containing free and allocated task slots + * @param timeout The timeout for the response. * * @return The future to the response by the ResourceManager. */ CompletableFuture<RegistrationResponse> registerTaskExecutor( - UUID resourceManagerLeaderId, String taskExecutorAddress, - ResourceID resourceID, + ResourceID resourceId, SlotReport slotReport, @RpcTimeout Time timeout); /** * Sent by the TaskExecutor to notify the ResourceManager that a slot has become available. * - * @param resourceManagerLeaderId The ResourceManager leader id * @param instanceId TaskExecutor's instance id * @param slotID The SlotID of the freed slot * @param oldAllocationId to which the slot has been allocated */ void notifySlotAvailable( - UUID resourceManagerLeaderId, InstanceID instanceId, SlotID slotID, AllocationID oldAllocationId); @@ -130,10 +123,9 @@ public interface ResourceManagerGateway extends RpcGateway { /** * Gets the currently registered number of TaskManagers. * - * @param leaderSessionId The leader session ID with which to address the ResourceManager. * @return The future to the number of registered TaskManagers. */ - CompletableFuture<Integer> getNumberOfRegisteredTaskManagers(UUID leaderSessionId); + CompletableFuture<Integer> getNumberOfRegisteredTaskManagers(); /** * Sends the heartbeat to resource manager from task manager http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java new file mode 100644 index 0000000..3594e88 --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManagerId.java @@ -0,0 +1,58 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.resourcemanager; + +import org.apache.flink.util.AbstractID; + +import java.util.UUID; + +/** + * Fencing token for the {@link ResourceManager}. + */ +public class ResourceManagerId extends AbstractID { + + private static final long serialVersionUID = -6042820142662137374L; + + public ResourceManagerId(byte[] bytes) { + super(bytes); + } + + public ResourceManagerId(long lowerPart, long upperPart) { + super(lowerPart, upperPart); + } + + public ResourceManagerId(AbstractID id) { + super(id); + } + + public ResourceManagerId() { + } + + public ResourceManagerId(UUID uuid) { + this(uuid.getLeastSignificantBits(), uuid.getMostSignificantBits()); + } + + public UUID toUUID() { + return new UUID(getUpperPart(), getLowerPart()); + } + + public static ResourceManagerId generate() { + return new ResourceManagerId(); + } +} http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java index 5218286..d8eb47c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/slotmanager/SlotManager.java @@ -27,6 +27,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.concurrent.ScheduledExecutor; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.SlotRequest; import org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException; import org.apache.flink.runtime.resourcemanager.registration.TaskExecutorConnection; @@ -35,6 +36,7 @@ import org.apache.flink.runtime.taskexecutor.SlotStatus; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.exceptions.SlotAllocationException; import org.apache.flink.runtime.taskexecutor.exceptions.SlotOccupiedException; +import org.apache.flink.util.AbstractID; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -45,7 +47,6 @@ import java.util.Iterator; import java.util.LinkedHashMap; import java.util.Map; import java.util.Objects; -import java.util.UUID; import java.util.concurrent.CancellationException; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -64,7 +65,7 @@ import java.util.concurrent.TimeoutException; * slots are currently not used) and pending slot requests time out triggering their release and * failure, respectively. */ -public class SlotManager implements AutoCloseable { +public class SlotManager<T extends AbstractID> implements AutoCloseable { private static final Logger LOG = LoggerFactory.getLogger(SlotManager.class); /** Scheduled executor for timeouts */ @@ -94,8 +95,8 @@ public class SlotManager implements AutoCloseable { /** Map of pending/unfulfilled slot allocation requests */ private final HashMap<AllocationID, PendingSlotRequest> pendingSlotRequests; - /** Leader id of the containing component */ - private UUID leaderId; + /** ResourceManager's id */ + private ResourceManagerId resourceManagerId; /** Executor for future callbacks which have to be "synchronized" */ private Executor mainThreadExecutor; @@ -126,7 +127,7 @@ public class SlotManager implements AutoCloseable { fulfilledSlotRequests = new HashMap<>(16); pendingSlotRequests = new HashMap<>(16); - leaderId = null; + resourceManagerId = null; resourceManagerActions = null; mainThreadExecutor = null; taskManagerTimeoutCheck = null; @@ -142,13 +143,14 @@ public class SlotManager implements AutoCloseable { /** * Starts the slot manager with the given leader id and resource manager actions. * - * @param newLeaderId to use for communication with the task managers + * @param newResourceManagerId to use for communication with the task managers + * @param newMainThreadExecutor to use to run code in the ResourceManager's main thread * @param newResourceManagerActions to use for resource (de-)allocations */ - public void start(UUID newLeaderId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) { + public void start(ResourceManagerId newResourceManagerId, Executor newMainThreadExecutor, ResourceManagerActions newResourceManagerActions) { LOG.info("Starting the SlotManager."); - leaderId = Preconditions.checkNotNull(newLeaderId); + this.resourceManagerId = Preconditions.checkNotNull(newResourceManagerId); mainThreadExecutor = Preconditions.checkNotNull(newMainThreadExecutor); resourceManagerActions = Preconditions.checkNotNull(newResourceManagerActions); @@ -204,7 +206,7 @@ public class SlotManager implements AutoCloseable { unregisterTaskManager(registeredTaskManager); } - leaderId = null; + resourceManagerId = null; resourceManagerActions = null; started = false; } @@ -643,7 +645,7 @@ public class SlotManager implements AutoCloseable { pendingSlotRequest.getJobId(), allocationId, pendingSlotRequest.getTargetAddress(), - leaderId, + resourceManagerId, taskManagerRequestTimeout); requestFuture.whenComplete( http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index 6d1f22c..f564df4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -37,6 +37,7 @@ import org.slf4j.LoggerFactory; import java.util.HashMap; import java.util.Map; +import java.util.Objects; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -325,7 +326,7 @@ public class JobLeaderService { @Override protected void onRegistrationSuccess(JMTMRegistrationSuccess success) { // filter out old registration attempts - if (getTargetLeaderId().equals(currentLeaderId)) { + if (Objects.equals(getTargetLeaderId(), currentLeaderId)) { log.info("Successful registration at job manager {} for job {}.", getTargetAddress(), jobId); jobLeaderListener.jobManagerGainedLeadership(jobId, getTargetGateway(), getTargetLeaderId(), success); @@ -337,8 +338,8 @@ public class JobLeaderService { @Override protected void onRegistrationFailure(Throwable failure) { // filter out old registration attempts - if (getTargetLeaderId().equals(currentLeaderId)) { - log.info("Failed to register at job manager {} for job {}.", getTargetAddress(), jobId); + if (Objects.equals(getTargetLeaderId(), currentLeaderId)) { + log.info("Failed to register at job manager {} for job {}.", getTargetAddress(), jobId); jobLeaderListener.handleError(failure); } else { log.debug("Obsolete JobManager registration failure from {} with leader session ID {}.", getTargetAddress(), getTargetLeaderId(), failure); http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index ef47ad4..3b1a1b4 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -59,6 +59,7 @@ import org.apache.flink.runtime.registration.RegistrationConnectionListener; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.runtime.metrics.groups.TaskManagerMetricGroup; import org.apache.flink.runtime.metrics.groups.TaskMetricGroup; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.FatalErrorHandler; import org.apache.flink.runtime.rpc.RpcEndpoint; import org.apache.flink.runtime.rpc.RpcService; @@ -585,29 +586,18 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { // Slot allocation RPCs // ---------------------------------------------------------------------- - /** - * Requests a slot from the TaskManager - * - * @param slotId identifying the requested slot - * @param jobId identifying the job for which the request is issued - * @param allocationId id for the request - * @param targetAddress of the job manager requesting the slot - * @param rmLeaderId current leader id of the ResourceManager - * @throws SlotAllocationException if the slot allocation fails - * @return answer to the slot request - */ @Override public CompletableFuture<Acknowledge> requestSlot( final SlotID slotId, final JobID jobId, final AllocationID allocationId, final String targetAddress, - final UUID rmLeaderId, + final ResourceManagerId resourceManagerId, final Time timeout) { // TODO: Filter invalid requests from the resource manager by using the instance/registration Id log.info("Receive slot request {} for job {} from resource manager with leader id {}.", - allocationId, jobId, rmLeaderId); + allocationId, jobId, resourceManagerId); try { if (resourceManagerConnection == null) { @@ -616,8 +606,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { throw new SlotAllocationException(message); } - if (!resourceManagerConnection.getTargetLeaderId().equals(rmLeaderId)) { - final String message = "The leader id " + rmLeaderId + + if (!Objects.equals(resourceManagerConnection.getTargetLeaderId(), resourceManagerId)) { + final String message = "The leader id " + resourceManagerId + " does not match with the leader id of the connected resource manager " + resourceManagerConnection.getTargetLeaderId() + '.'; @@ -692,7 +682,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { // Internal resource manager connection methods // ------------------------------------------------------------------------ - private void notifyOfNewResourceManagerLeader(String newLeaderAddress, UUID newLeaderId) { + private void notifyOfNewResourceManagerLeader(String newLeaderAddress, ResourceManagerId newResourceManagerId) { if (resourceManagerConnection != null) { if (newLeaderAddress != null) { // the resource manager switched to a new leader @@ -723,7 +713,7 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { getResourceID(), taskSlotTable.createSlotReport(getResourceID()), newLeaderAddress, - newLeaderId, + newResourceManagerId, getMainThreadExecutor(), new ResourceManagerRegistrationListener()); resourceManagerConnection.start(); @@ -1079,7 +1069,6 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { ResourceManagerGateway resourceManagerGateway = resourceManagerConnection.getTargetGateway(); resourceManagerGateway.notifySlotAvailable( - resourceManagerConnection.getTargetLeaderId(), resourceManagerConnection.getRegistrationId(), new SlotID(getResourceID(), freedSlotIndex), allocationId); @@ -1169,12 +1158,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { @Override public void notifyLeaderAddress(final String leaderAddress, final UUID leaderSessionID) { - runAsync(new Runnable() { - @Override - public void run() { - notifyOfNewResourceManagerLeader(leaderAddress, leaderSessionID); - } - }); + runAsync( + () -> notifyOfNewResourceManagerLeader( + leaderAddress, + leaderSessionID != null ? new ResourceManagerId(leaderSessionID) : null)); } @Override http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java index 8084154..fd56255 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorGateway.java @@ -28,6 +28,7 @@ import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.PartitionInfo; import org.apache.flink.runtime.messages.Acknowledge; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.RpcGateway; import org.apache.flink.runtime.rpc.RpcTimeout; import org.apache.flink.runtime.taskmanager.Task; @@ -44,8 +45,11 @@ public interface TaskExecutorGateway extends RpcGateway { * Requests a slot from the TaskManager * * @param slotId slot id for the request + * @param jobId for which to request a slot * @param allocationId id for the request - * @param resourceManagerLeaderId current leader id of the ResourceManager + * @param targetAddress to which to offer the requested slots + * @param resourceManagerId current leader id of the ResourceManager + * @param timeout for the operation * @return answer to the slot request */ CompletableFuture<Acknowledge> requestSlot( @@ -53,7 +57,7 @@ public interface TaskExecutorGateway extends RpcGateway { JobID jobId, AllocationID allocationId, String targetAddress, - UUID resourceManagerLeaderId, + ResourceManagerId resourceManagerId, @RpcTimeout Time timeout); /** http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 24eb540..c3d3532 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.instance.InstanceID; import org.apache.flink.runtime.registration.RegisteredRpcConnection; import org.apache.flink.runtime.registration.RegistrationConnectionListener; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.registration.RetryingRegistration; @@ -31,7 +32,6 @@ import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; import org.apache.flink.util.Preconditions; import org.slf4j.Logger; -import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -41,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * The connection between a TaskExecutor and the ResourceManager. */ public class TaskExecutorToResourceManagerConnection - extends RegisteredRpcConnection<UUID, ResourceManagerGateway, TaskExecutorRegistrationSuccess> { + extends RegisteredRpcConnection<ResourceManagerId, ResourceManagerGateway, TaskExecutorRegistrationSuccess> { private final RpcService rpcService; @@ -64,11 +64,11 @@ public class TaskExecutorToResourceManagerConnection ResourceID taskManagerResourceId, SlotReport slotReport, String resourceManagerAddress, - UUID resourceManagerLeaderId, + ResourceManagerId resourceManagerId, Executor executor, RegistrationConnectionListener<TaskExecutorRegistrationSuccess> registrationListener) { - super(log, resourceManagerAddress, resourceManagerLeaderId, executor); + super(log, resourceManagerAddress, resourceManagerId, executor); this.rpcService = Preconditions.checkNotNull(rpcService); this.taskManagerAddress = Preconditions.checkNotNull(taskManagerAddress); @@ -79,7 +79,7 @@ public class TaskExecutorToResourceManagerConnection @Override - protected RetryingRegistration<UUID, ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() { + protected RetryingRegistration<ResourceManagerId, ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() { return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration( log, rpcService, @@ -127,7 +127,7 @@ public class TaskExecutorToResourceManagerConnection // ------------------------------------------------------------------------ private static class ResourceManagerRegistration - extends RetryingRegistration<UUID, ResourceManagerGateway, TaskExecutorRegistrationSuccess> { + extends RetryingRegistration<ResourceManagerId, ResourceManagerGateway, TaskExecutorRegistrationSuccess> { private final String taskExecutorAddress; @@ -139,12 +139,12 @@ public class TaskExecutorToResourceManagerConnection Logger log, RpcService rpcService, String targetAddress, - UUID leaderId, + ResourceManagerId resourceManagerId, String taskExecutorAddress, ResourceID resourceID, SlotReport slotReport) { - super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, leaderId); + super(log, rpcService, "ResourceManager", ResourceManagerGateway.class, targetAddress, resourceManagerId); this.taskExecutorAddress = checkNotNull(taskExecutorAddress); this.resourceID = checkNotNull(resourceID); this.slotReport = checkNotNull(slotReport); @@ -152,10 +152,10 @@ public class TaskExecutorToResourceManagerConnection @Override protected CompletableFuture<RegistrationResponse> invokeRegistration( - ResourceManagerGateway resourceManager, UUID leaderId, long timeoutMillis) throws Exception { + ResourceManagerGateway resourceManager, ResourceManagerId fencingToken, long timeoutMillis) throws Exception { Time timeout = Time.milliseconds(timeoutMillis); - return resourceManager.registerTaskExecutor(leaderId, taskExecutorAddress, resourceID, slotReport, timeout); + return resourceManager.registerTaskExecutor(taskExecutorAddress, resourceID, slotReport, timeout); } } } http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java index 737cede..55499f5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/clusterframework/ResourceManagerTest.java @@ -49,6 +49,7 @@ import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.JobLeaderIdService; import org.apache.flink.runtime.resourcemanager.ResourceManagerConfiguration; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.resourcemanager.StandaloneResourceManager; import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager; import org.apache.flink.runtime.rpc.TestingRpcService; @@ -532,7 +533,6 @@ public class ResourceManagerTest extends TestLogger { final SlotReport slotReport = new SlotReport(); // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerTaskExecutor( - rmLeaderSessionId, taskManagerAddress, taskManagerResourceID, slotReport, @@ -576,7 +576,7 @@ public class ResourceManagerTest extends TestLogger { final String jobMasterAddress = "jm"; final ResourceID jmResourceId = new ResourceID(jobMasterAddress); final ResourceID rmResourceId = ResourceID.generate(); - final UUID rmLeaderId = UUID.randomUUID(); + final ResourceManagerId rmLeaderId = ResourceManagerId.generate(); final UUID jmLeaderId = UUID.randomUUID(); final JobID jobId = new JobID(); @@ -629,11 +629,10 @@ public class ResourceManagerTest extends TestLogger { final ResourceManagerGateway rmGateway = resourceManager.getSelfGateway(ResourceManagerGateway.class); - rmLeaderElectionService.isLeader(rmLeaderId).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); + rmLeaderElectionService.isLeader(rmLeaderId.toUUID()).get(timeout.toMilliseconds(), TimeUnit.MILLISECONDS); // test registration response successful and it will trigger monitor heartbeat target, schedule heartbeat request at interval time CompletableFuture<RegistrationResponse> successfulFuture = rmGateway.registerJobManager( - rmLeaderId, jmLeaderId, jmResourceId, jobMasterAddress, http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java index 68c43f8..ead453e 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/instance/SlotPoolTest.java @@ -88,7 +88,7 @@ public class SlotPoolTest extends TestLogger { assertFalse(future.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); @@ -125,7 +125,7 @@ public class SlotPoolTest extends TestLogger { ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds()).times(2)) - .requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + .requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final List<SlotRequest> slotRequests = slotRequestArgumentCaptor.getAllValues(); @@ -168,7 +168,7 @@ public class SlotPoolTest extends TestLogger { assertFalse(future1.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); @@ -211,7 +211,7 @@ public class SlotPoolTest extends TestLogger { assertFalse(future.isDone()); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); @@ -266,7 +266,7 @@ public class SlotPoolTest extends TestLogger { CompletableFuture<SimpleSlot> future1 = slotPoolGateway.allocateSlot(mock(ScheduledUnit.class), DEFAULT_TESTING_PROFILE, null, timeout); ArgumentCaptor<SlotRequest> slotRequestArgumentCaptor = ArgumentCaptor.forClass(SlotRequest.class); - verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); + verify(resourceManagerGateway, Mockito.timeout(timeout.toMilliseconds())).requestSlot(any(UUID.class), slotRequestArgumentCaptor.capture(), any(Time.class)); final SlotRequest slotRequest = slotRequestArgumentCaptor.getValue(); @@ -297,7 +297,7 @@ public class SlotPoolTest extends TestLogger { private static ResourceManagerGateway createResourceManagerGatewayMock() { ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway - .requestSlot(any(UUID.class), any(UUID.class), any(SlotRequest.class), any(Time.class))) + .requestSlot(any(UUID.class), any(SlotRequest.class), any(Time.class))) .thenReturn(mock(CompletableFuture.class, RETURNS_MOCKS)); return resourceManagerGateway; @@ -310,7 +310,7 @@ public class SlotPoolTest extends TestLogger { slotPool.start(UUID.randomUUID(), jobManagerAddress); - slotPool.connectToResourceManager(UUID.randomUUID(), resourceManagerGateway); + slotPool.connectToResourceManager(resourceManagerGateway); return slotPool.getSelfGateway(SlotPoolGateway.class); } http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index 76a0b93..6282ea0 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -37,6 +37,7 @@ import org.apache.flink.runtime.leaderelection.TestingLeaderRetrievalService; import org.apache.flink.runtime.messages.Acknowledge; import org.apache.flink.runtime.registration.RegistrationResponse; import org.apache.flink.runtime.resourcemanager.ResourceManagerGateway; +import org.apache.flink.runtime.resourcemanager.ResourceManagerId; import org.apache.flink.runtime.rpc.TestingRpcService; import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway; import org.apache.flink.runtime.taskmanager.TaskManagerLocation; @@ -167,7 +168,7 @@ public class JobMasterTest extends TestLogger { public void testHeartbeatTimeoutWithResourceManager() throws Exception { final String resourceManagerAddress = "rm"; final String jobManagerAddress = "jm"; - final UUID rmLeaderId = UUID.randomUUID(); + final ResourceManagerId resourceManagerId = ResourceManagerId.generate(); final UUID jmLeaderId = UUID.randomUUID(); final ResourceID rmResourceId = new ResourceID(resourceManagerAddress); final ResourceID jmResourceId = new ResourceID(jobManagerAddress); @@ -188,13 +189,12 @@ public class JobMasterTest extends TestLogger { final ResourceManagerGateway resourceManagerGateway = mock(ResourceManagerGateway.class); when(resourceManagerGateway.registerJobManager( any(UUID.class), - any(UUID.class), any(ResourceID.class), anyString(), any(JobID.class), any(Time.class) )).thenReturn(CompletableFuture.completedFuture(new JobMasterRegistrationSuccess( - heartbeatInterval, rmLeaderId, rmResourceId))); + heartbeatInterval, resourceManagerId, rmResourceId))); final TestingRpcService rpc = new TestingRpcService(); rpc.registerGateway(resourceManagerAddress, resourceManagerGateway); @@ -225,11 +225,10 @@ public class JobMasterTest extends TestLogger { startFuture.get(testingTimeout.toMilliseconds(), TimeUnit.MILLISECONDS); // define a leader and see that a registration happens - rmLeaderRetrievalService.notifyListener(resourceManagerAddress, rmLeaderId); + rmLeaderRetrievalService.notifyListener(resourceManagerAddress, resourceManagerId.toUUID()); // register job manager success will trigger monitor heartbeat target between jm and rm verify(resourceManagerGateway, timeout(testingTimeout.toMilliseconds())).registerJobManager( - eq(rmLeaderId), eq(jmLeaderId), eq(jmResourceId), anyString(), http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java index 5568467..ccbb4f4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/metrics/MetricRegistryTest.java @@ -205,6 +205,10 @@ public class MetricRegistryTest extends TestLogger { MetricRegistry registry = new MetricRegistry(MetricRegistryConfiguration.fromConfiguration(config)); long start = System.currentTimeMillis(); + + // only start counting from now on + TestReporter3.reportCount = 0; + for (int x = 0; x < 10; x++) { Thread.sleep(100); int reportCount = TestReporter3.reportCount; @@ -218,7 +222,7 @@ public class MetricRegistryTest extends TestLogger { * or after T=50. */ long maxAllowedReports = (curT - start) / 50 + 2; - Assert.assertTrue("Too many report were triggered.", maxAllowedReports >= reportCount); + Assert.assertTrue("Too many reports were triggered.", maxAllowedReports >= reportCount); } Assert.assertTrue("No report was triggered.", TestReporter3.reportCount > 0); http://git-wip-us.apache.org/repos/asf/flink/blob/e70de0eb/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java index c213752..2b8792b 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerHATest.java @@ -82,7 +82,7 @@ public class ResourceManagerHATest extends TestLogger { TestingFatalErrorHandler testingFatalErrorHandler = new TestingFatalErrorHandler(); - CompletableFuture<UUID> revokedLeaderIdFuture = new CompletableFuture<>(); + CompletableFuture<ResourceManagerId> revokedLeaderIdFuture = new CompletableFuture<>(); final ResourceManager resourceManager = new StandaloneResourceManager( @@ -100,23 +100,28 @@ public class ResourceManagerHATest extends TestLogger { @Override public void revokeLeadership() { super.revokeLeadership(); - runAsync( - () -> revokedLeaderIdFuture.complete(getLeaderSessionId())); + runAsyncWithoutFencing( + () -> revokedLeaderIdFuture.complete(getFencingToken())); } }; - resourceManager.start(); - // before grant leadership, resourceManager's leaderId is null - Assert.assertEquals(null, resourceManager.getLeaderSessionId()); - final UUID leaderId = UUID.randomUUID(); - leaderElectionService.isLeader(leaderId); - // after grant leadership, resourceManager's leaderId has value - Assert.assertEquals(leaderId, leaderSessionIdFuture.get()); - // then revoke leadership, resourceManager's leaderId is null again - leaderElectionService.notLeader(); - Assert.assertEquals(null, revokedLeaderIdFuture.get()); - - if (testingFatalErrorHandler.hasExceptionOccurred()) { - testingFatalErrorHandler.rethrowError(); + + try { + resourceManager.start(); + + Assert.assertNotNull(resourceManager.getFencingToken()); + final UUID leaderId = UUID.randomUUID(); + leaderElectionService.isLeader(leaderId); + // after grant leadership, resourceManager's leaderId has value + Assert.assertEquals(leaderId, leaderSessionIdFuture.get()); + // then revoke leadership, resourceManager's leaderId should be different + leaderElectionService.notLeader(); + Assert.assertNotEquals(leaderId, revokedLeaderIdFuture.get()); + + if (testingFatalErrorHandler.hasExceptionOccurred()) { + testingFatalErrorHandler.rethrowError(); + } + } finally { + rpcService.stopService(); } } }