Repository: flink Updated Branches: refs/heads/master 1fc4a6097 -> 0f3de89af
[FLINK-7501] Generalize RegisteredRpcConnection to support generic leader ids The RegisteredRpcConnection now supports generic leader ids/fencing tokens. This will allow to introduce component specific leader ids/fencing tokens. This closes #4580. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/0f3de89a Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/0f3de89a Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/0f3de89a Branch: refs/heads/master Commit: 0f3de89af4ef4f570e125b0d50110bfa5d0ce80b Parents: 1fc4a60 Author: Till Rohrmann <trohrm...@apache.org> Authored: Thu Aug 24 15:29:47 2017 +0200 Committer: Till Rohrmann <trohrm...@apache.org> Committed: Sun Sep 3 23:08:25 2017 +0200 ---------------------------------------------------------------------- .../flink/runtime/jobmaster/JobMaster.java | 6 +-- .../registration/RegisteredRpcConnection.java | 37 +++++++++-------- .../registration/RetryingRegistration.java | 43 ++++++++++---------- .../runtime/taskexecutor/JobLeaderService.java | 8 ++-- ...TaskExecutorToResourceManagerConnection.java | 6 +-- .../RegisteredRpcConnectionTest.java | 4 +- .../registration/RetryingRegistrationTest.java | 2 +- 7 files changed, 54 insertions(+), 52 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/0f3de89a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java index c30749c..7e48da1 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/jobmaster/JobMaster.java @@ -1058,7 +1058,7 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { //---------------------------------------------------------------------------------------------- private class ResourceManagerConnection - extends RegisteredRpcConnection<ResourceManagerGateway, JobMasterRegistrationSuccess> + extends RegisteredRpcConnection<UUID, ResourceManagerGateway, JobMasterRegistrationSuccess> { private final JobID jobID; @@ -1088,8 +1088,8 @@ public class JobMaster extends RpcEndpoint implements JobMasterGateway { } @Override - protected RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() { - return new RetryingRegistration<ResourceManagerGateway, JobMasterRegistrationSuccess>( + protected RetryingRegistration<UUID, ResourceManagerGateway, JobMasterRegistrationSuccess> generateRegistration() { + return new RetryingRegistration<UUID, ResourceManagerGateway, JobMasterRegistrationSuccess>( log, getRpcService(), "ResourceManager", ResourceManagerGateway.class, getTargetAddress(), getTargetLeaderId()) { http://git-wip-us.apache.org/repos/asf/flink/blob/0f3de89a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java index da46e1c..a585f0d 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RegisteredRpcConnection.java @@ -23,7 +23,7 @@ import org.apache.flink.runtime.rpc.RpcGateway; import org.slf4j.Logger; -import java.util.UUID; +import java.io.Serializable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executor; @@ -39,16 +39,17 @@ import static org.apache.flink.util.Preconditions.checkState; * The RPC connection can be closed, for example when the target where it tries to register * at looses leader status. * - * @param <Gateway> The type of the gateway to connect to. - * @param <Success> The type of the successful registration responses. + * @param <F> The type of the fencing token + * @param <G> The type of the gateway to connect to. + * @param <S> The type of the successful registration responses. */ -public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> { +public abstract class RegisteredRpcConnection<F extends Serializable, G extends RpcGateway, S extends RegistrationResponse.Success> { /** The logger for all log messages of this class. */ protected final Logger log; - /** The target component leaderID, for example the ResourceManager leaderID. */ - private final UUID targetLeaderId; + /** The fencing token fo the remote component. */ + private final F fencingToken; /** The target component Address, for example the ResourceManager Address. */ private final String targetAddress; @@ -57,20 +58,20 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes private final Executor executor; /** The Registration of this RPC connection. */ - private RetryingRegistration<Gateway, Success> pendingRegistration; + private RetryingRegistration<F, G, S> pendingRegistration; /** The gateway to register, it's null until the registration is completed. */ - private volatile Gateway targetGateway; + private volatile G targetGateway; /** Flag indicating that the RPC connection is closed. */ private volatile boolean closed; // ------------------------------------------------------------------------ - public RegisteredRpcConnection(Logger log, String targetAddress, UUID targetLeaderId, Executor executor) { + public RegisteredRpcConnection(Logger log, String targetAddress, F fencingToken, Executor executor) { this.log = checkNotNull(log); this.targetAddress = checkNotNull(targetAddress); - this.targetLeaderId = checkNotNull(targetLeaderId); + this.fencingToken = checkNotNull(fencingToken); this.executor = checkNotNull(executor); } @@ -86,10 +87,10 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes pendingRegistration = checkNotNull(generateRegistration()); pendingRegistration.startRegistration(); - CompletableFuture<Tuple2<Gateway, Success>> future = pendingRegistration.getFuture(); + CompletableFuture<Tuple2<G, S>> future = pendingRegistration.getFuture(); future.whenCompleteAsync( - (Tuple2<Gateway, Success> result, Throwable failure) -> { + (Tuple2<G, S> result, Throwable failure) -> { // this future should only ever fail if there is a bug, not if the registration is declined if (failure != null) { onRegistrationFailure(failure); @@ -103,12 +104,12 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes /** * This method generate a specific Registration, for example TaskExecutor Registration at the ResourceManager. */ - protected abstract RetryingRegistration<Gateway, Success> generateRegistration(); + protected abstract RetryingRegistration<F, G, S> generateRegistration(); /** * This method handle the Registration Response. */ - protected abstract void onRegistrationSuccess(Success success); + protected abstract void onRegistrationSuccess(S success); /** * This method handle the Registration failure. @@ -135,8 +136,8 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes // Properties // ------------------------------------------------------------------------ - public UUID getTargetLeaderId() { - return targetLeaderId; + public F getTargetLeaderId() { + return fencingToken; } public String getTargetAddress() { @@ -146,7 +147,7 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes /** * Gets the RegisteredGateway. This returns null until the registration is completed. */ - public Gateway getTargetGateway() { + public G getTargetGateway() { return targetGateway; } @@ -158,7 +159,7 @@ public abstract class RegisteredRpcConnection<Gateway extends RpcGateway, Succes @Override public String toString() { - String connectionInfo = "(ADDRESS: " + targetAddress + " LEADERID: " + targetLeaderId + ")"; + String connectionInfo = "(ADDRESS: " + targetAddress + " FENCINGTOKEN: " + fencingToken + ")"; if (isConnected()) { connectionInfo = "RPC connection to " + targetGateway.getClass().getSimpleName() + " " + connectionInfo; http://git-wip-us.apache.org/repos/asf/flink/blob/0f3de89a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index 6a18ffd..be30c68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -24,7 +24,7 @@ import org.apache.flink.runtime.rpc.RpcService; import org.slf4j.Logger; -import java.util.UUID; +import java.io.Serializable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -43,10 +43,11 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * The registration can be canceled, for example when the target where it tries to register * at looses leader status. * - * @param <Gateway> The type of the gateway to connect to. - * @param <Success> The type of the successful registration responses. + * @param <F> The type of the fencing token + * @param <G> The type of the gateway to connect to. + * @param <S> The type of the successful registration responses. */ -public abstract class RetryingRegistration<Gateway extends RpcGateway, Success extends RegistrationResponse.Success> { +public abstract class RetryingRegistration<F extends Serializable, G extends RpcGateway, S extends RegistrationResponse.Success> { // ------------------------------------------------------------------------ // default configuration values @@ -74,13 +75,13 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e private final String targetName; - private final Class<Gateway> targetType; + private final Class<G> targetType; private final String targetAddress; - private final UUID leaderId; + private final F fencingToken; - private final CompletableFuture<Tuple2<Gateway, Success>> completionFuture; + private final CompletableFuture<Tuple2<G, S>> completionFuture; private final long initialRegistrationTimeout; @@ -98,10 +99,10 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e Logger log, RpcService rpcService, String targetName, - Class<Gateway> targetType, + Class<G> targetType, String targetAddress, - UUID leaderId) { - this(log, rpcService, targetName, targetType, targetAddress, leaderId, + F fencingToken) { + this(log, rpcService, targetName, targetType, targetAddress, fencingToken, INITIAL_REGISTRATION_TIMEOUT_MILLIS, MAX_REGISTRATION_TIMEOUT_MILLIS, ERROR_REGISTRATION_DELAY_MILLIS, REFUSED_REGISTRATION_DELAY_MILLIS); } @@ -110,9 +111,9 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e Logger log, RpcService rpcService, String targetName, - Class<Gateway> targetType, + Class<G> targetType, String targetAddress, - UUID leaderId, + F fencingToken, long initialRegistrationTimeout, long maxRegistrationTimeout, long delayOnError, @@ -128,7 +129,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e this.targetName = checkNotNull(targetName); this.targetType = checkNotNull(targetType); this.targetAddress = checkNotNull(targetAddress); - this.leaderId = checkNotNull(leaderId); + this.fencingToken = checkNotNull(fencingToken); this.initialRegistrationTimeout = initialRegistrationTimeout; this.maxRegistrationTimeout = maxRegistrationTimeout; this.delayOnError = delayOnError; @@ -141,7 +142,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e // completion and cancellation // ------------------------------------------------------------------------ - public CompletableFuture<Tuple2<Gateway, Success>> getFuture() { + public CompletableFuture<Tuple2<G, S>> getFuture() { return completionFuture; } @@ -165,7 +166,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e // ------------------------------------------------------------------------ protected abstract CompletableFuture<RegistrationResponse> invokeRegistration( - Gateway gateway, UUID leaderId, long timeoutMillis) throws Exception; + G gateway, F fencingToken, long timeoutMillis) throws Exception; /** * This method resolves the target address to a callable gateway and starts the @@ -175,11 +176,11 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e public void startRegistration() { try { // trigger resolution of the resource manager address to a callable gateway - CompletableFuture<Gateway> resourceManagerFuture = rpcService.connect(targetAddress, targetType); + CompletableFuture<G> resourceManagerFuture = rpcService.connect(targetAddress, targetType); // upon success, start the registration attempts CompletableFuture<Void> resourceManagerAcceptFuture = resourceManagerFuture.thenAcceptAsync( - (Gateway result) -> { + (G result) -> { log.info("Resolved {} address, beginning registration", targetName); register(result, 1, initialRegistrationTimeout); }, @@ -206,7 +207,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e * depending on the result. */ @SuppressWarnings("unchecked") - private void register(final Gateway gateway, final int attempt, final long timeoutMillis) { + private void register(final G gateway, final int attempt, final long timeoutMillis) { // eager check for canceling to avoid some unnecessary work if (canceled) { return; @@ -214,7 +215,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e try { log.info("Registration at {} attempt {} (timeout={}ms)", targetName, attempt, timeoutMillis); - CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration(gateway, leaderId, timeoutMillis); + CompletableFuture<RegistrationResponse> registrationFuture = invokeRegistration(gateway, fencingToken, timeoutMillis); // if the registration was successful, let the TaskExecutor know CompletableFuture<Void> registrationAcceptFuture = registrationFuture.thenAcceptAsync( @@ -222,7 +223,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e if (!isCanceled()) { if (result instanceof RegistrationResponse.Success) { // registration successful! - Success success = (Success) result; + S success = (S) result; completionFuture.complete(Tuple2.of(gateway, success)); } else { @@ -274,7 +275,7 @@ public abstract class RetryingRegistration<Gateway extends RpcGateway, Success e } } - private void registerLater(final Gateway gateway, final int attempt, final long timeoutMillis, long delay) { + private void registerLater(final G gateway, final int attempt, final long timeoutMillis, long delay) { rpcService.scheduleRunnable(new Runnable() { @Override public void run() { http://git-wip-us.apache.org/repos/asf/flink/blob/0f3de89a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java index 2ebf3c1..6d1f22c 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/JobLeaderService.java @@ -210,7 +210,7 @@ public class JobLeaderService { private final JobID jobId; /** Rpc connection to the job leader */ - private RegisteredRpcConnection<JobMasterGateway, JMTMRegistrationSuccess> rpcConnection; + private RegisteredRpcConnection<UUID, JobMasterGateway, JMTMRegistrationSuccess> rpcConnection; /** State of the listener */ private volatile boolean stopped; @@ -299,7 +299,7 @@ public class JobLeaderService { /** * Rpc connection for the job manager <--> task manager connection. */ - private final class JobManagerRegisteredRpcConnection extends RegisteredRpcConnection<JobMasterGateway, JMTMRegistrationSuccess> { + private final class JobManagerRegisteredRpcConnection extends RegisteredRpcConnection<UUID, JobMasterGateway, JMTMRegistrationSuccess> { JobManagerRegisteredRpcConnection( Logger log, @@ -310,7 +310,7 @@ public class JobLeaderService { } @Override - protected RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() { + protected RetryingRegistration<UUID, JobMasterGateway, JMTMRegistrationSuccess> generateRegistration() { return new JobLeaderService.JobManagerRetryingRegistration( LOG, rpcService, @@ -351,7 +351,7 @@ public class JobLeaderService { * Retrying registration for the job manager <--> task manager connection. */ private static final class JobManagerRetryingRegistration - extends RetryingRegistration<JobMasterGateway, JMTMRegistrationSuccess> + extends RetryingRegistration<UUID, JobMasterGateway, JMTMRegistrationSuccess> { private final String taskManagerRpcAddress; http://git-wip-us.apache.org/repos/asf/flink/blob/0f3de89a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java index 4084d67..24eb540 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutorToResourceManagerConnection.java @@ -41,7 +41,7 @@ import static org.apache.flink.util.Preconditions.checkNotNull; * The connection between a TaskExecutor and the ResourceManager. */ public class TaskExecutorToResourceManagerConnection - extends RegisteredRpcConnection<ResourceManagerGateway, TaskExecutorRegistrationSuccess> { + extends RegisteredRpcConnection<UUID, ResourceManagerGateway, TaskExecutorRegistrationSuccess> { private final RpcService rpcService; @@ -79,7 +79,7 @@ public class TaskExecutorToResourceManagerConnection @Override - protected RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() { + protected RetryingRegistration<UUID, ResourceManagerGateway, TaskExecutorRegistrationSuccess> generateRegistration() { return new TaskExecutorToResourceManagerConnection.ResourceManagerRegistration( log, rpcService, @@ -127,7 +127,7 @@ public class TaskExecutorToResourceManagerConnection // ------------------------------------------------------------------------ private static class ResourceManagerRegistration - extends RetryingRegistration<ResourceManagerGateway, TaskExecutorRegistrationSuccess> { + extends RetryingRegistration<UUID, ResourceManagerGateway, TaskExecutorRegistrationSuccess> { private final String taskExecutorAddress; http://git-wip-us.apache.org/repos/asf/flink/blob/0f3de89a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java index a454867..19a5756 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RegisteredRpcConnectionTest.java @@ -141,7 +141,7 @@ public class RegisteredRpcConnectionTest extends TestLogger { // test RegisteredRpcConnection // ------------------------------------------------------------------------ - private static class TestRpcConnection extends RegisteredRpcConnection<TestRegistrationGateway, TestRegistrationSuccess> { + private static class TestRpcConnection extends RegisteredRpcConnection<UUID, TestRegistrationGateway, TestRegistrationSuccess> { private final RpcService rpcService; @@ -155,7 +155,7 @@ public class RegisteredRpcConnectionTest extends TestLogger { } @Override - protected RetryingRegistration<TestRegistrationGateway, RetryingRegistrationTest.TestRegistrationSuccess> generateRegistration() { + protected RetryingRegistration<UUID, TestRegistrationGateway, RetryingRegistrationTest.TestRegistrationSuccess> generateRegistration() { return new RetryingRegistrationTest.TestRetryingRegistration(rpcService, getTargetAddress(), getTargetLeaderId()); } http://git-wip-us.apache.org/repos/asf/flink/blob/0f3de89a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index da992bb..ac0dbc5 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -320,7 +320,7 @@ public class RetryingRegistrationTest extends TestLogger { } } - static class TestRetryingRegistration extends RetryingRegistration<TestRegistrationGateway, TestRegistrationSuccess> { + static class TestRetryingRegistration extends RetryingRegistration<UUID, TestRegistrationGateway, TestRegistrationSuccess> { // we use shorter timeouts here to speed up the tests static final long INITIAL_TIMEOUT = 20;