Repository: flink Updated Branches: refs/heads/master c6243b8b1 -> 9b0ba7ba3
[FLINK-7651] [flip-6] Delay RetryingRegistration in case of connection error Similar to a registration error we should also delay the retrying registration in case of connection error which could happen if the remote endpoint has not been started yet. This closes #4686. Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b0ba7ba Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b0ba7ba Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b0ba7ba Branch: refs/heads/master Commit: 9b0ba7ba3fdcf63c8ba21545772fbb950383026e Parents: c6243b8 Author: Till Rohrmann <[email protected]> Authored: Wed Sep 20 12:04:58 2017 +0200 Committer: Till Rohrmann <[email protected]> Committed: Thu Sep 21 09:32:23 2017 +0200 ---------------------------------------------------------------------- .../registration/RetryingRegistration.java | 24 ++++++++++++++++---- .../registration/RetryingRegistrationTest.java | 23 ++++++++++++++++--- 2 files changed, 39 insertions(+), 8 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/9b0ba7ba/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java index ce4a798..378ac6a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java @@ -151,6 +151,7 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc * Cancels the registration procedure. */ public void cancel() { + completionFuture.cancel(false); canceled = true; } @@ -175,6 +176,11 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc */ @SuppressWarnings("unchecked") public void startRegistration() { + if (canceled) { + // we already got canceled + return; + } + try { // trigger resolution of the resource manager address to a callable gateway final CompletableFuture<G> resourceManagerFuture; @@ -199,16 +205,17 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc // upon failure, retry, unless this is cancelled resourceManagerAcceptFuture.whenCompleteAsync( (Void v, Throwable failure) -> { - if (failure != null && !isCanceled()) { - log.warn("Could not resolve {} address {}, retrying...", targetName, targetAddress, failure); - startRegistration(); + if (failure != null && !canceled) { + log.warn("Could not resolve {} address {}, retrying in {} ms", targetName, targetAddress, delayOnError, failure); + + startRegistrationLater(delayOnError); } }, rpcService.getExecutor()); } catch (Throwable t) { - cancel(); completionFuture.completeExceptionally(t); + cancel(); } } @@ -280,8 +287,8 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc rpcService.getExecutor()); } catch (Throwable t) { - cancel(); completionFuture.completeExceptionally(t); + cancel(); } } @@ -293,4 +300,11 @@ public abstract class RetryingRegistration<F extends Serializable, G extends Rpc } }, delay, TimeUnit.MILLISECONDS); } + + private void startRegistrationLater(final long delay) { + rpcService.scheduleRunnable( + this::startRegistration, + delay, + TimeUnit.MILLISECONDS); + } } http://git-wip-us.apache.org/repos/asf/flink/blob/9b0ba7ba/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java index ac0dbc5..7fc6897 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java @@ -22,16 +22,17 @@ import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.RpcService; import org.apache.flink.runtime.rpc.TestingRpcService; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.util.TestLogger; import org.junit.Test; +import org.mockito.invocation.InvocationOnMock; import org.slf4j.LoggerFactory; import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; @@ -115,7 +116,7 @@ public class RetryingRegistrationTest extends TestLogger { final String testId = "laissez les bon temps roulez"; final UUID leaderId = UUID.randomUUID(); - ExecutorService executor = Executors.newCachedThreadPool(); + ExecutorService executor = TestingUtils.defaultExecutor(); TestRegistrationGateway testGateway = new TestRegistrationGateway(new TestRegistrationSuccess(testId)); try { @@ -126,20 +127,36 @@ public class RetryingRegistrationTest extends TestLogger { CompletableFuture.completedFuture(testGateway) // second connection attempt succeeds ); when(rpc.getExecutor()).thenReturn(executor); + when(rpc.scheduleRunnable(any(Runnable.class), anyLong(), any(TimeUnit.class))).thenAnswer( + (InvocationOnMock invocation) -> { + final Runnable runnable = invocation.getArgumentAt(0, Runnable.class); + final long delay = invocation.getArgumentAt(1, Long.class); + final TimeUnit timeUnit = invocation.getArgumentAt(2, TimeUnit.class); + return TestingUtils.defaultScheduledExecutor().schedule(runnable, delay, timeUnit); + }); TestRetryingRegistration registration = new TestRetryingRegistration(rpc, "foobar address", leaderId); + + long start = System.currentTimeMillis(); + registration.startRegistration(); Tuple2<TestRegistrationGateway, TestRegistrationSuccess> success = registration.getFuture().get(10L, TimeUnit.SECONDS); + // measure the duration of the registration --> should be longer than the error delay + long duration = System.currentTimeMillis() - start; + + assertTrue( + "The registration should have failed the first time. Thus the duration should be longer than at least a single error delay.", + duration > TestRetryingRegistration.DELAY_ON_ERROR); + // validate correct invocation and result assertEquals(testId, success.f1.getCorrelationId()); assertEquals(leaderId, testGateway.getInvocations().take().leaderId()); } finally { testGateway.stop(); - executor.shutdown(); } }
