Repository: flink
Updated Branches:
  refs/heads/master c6243b8b1 -> 9b0ba7ba3


[FLINK-7651] [flip-6] Delay RetryingRegistration in case of connection error

Similar to a registration error we should also delay the retrying registration 
in case of
connection error which could happen if the remote endpoint has not been started 
yet.

This closes #4686.


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/9b0ba7ba
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/9b0ba7ba
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/9b0ba7ba

Branch: refs/heads/master
Commit: 9b0ba7ba3fdcf63c8ba21545772fbb950383026e
Parents: c6243b8
Author: Till Rohrmann <[email protected]>
Authored: Wed Sep 20 12:04:58 2017 +0200
Committer: Till Rohrmann <[email protected]>
Committed: Thu Sep 21 09:32:23 2017 +0200

----------------------------------------------------------------------
 .../registration/RetryingRegistration.java      | 24 ++++++++++++++++----
 .../registration/RetryingRegistrationTest.java  | 23 ++++++++++++++++---
 2 files changed, 39 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/9b0ba7ba/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
index ce4a798..378ac6a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/registration/RetryingRegistration.java
@@ -151,6 +151,7 @@ public abstract class RetryingRegistration<F extends 
Serializable, G extends Rpc
         * Cancels the registration procedure.
         */
        public void cancel() {
+               completionFuture.cancel(false);
                canceled = true;
        }
 
@@ -175,6 +176,11 @@ public abstract class RetryingRegistration<F extends 
Serializable, G extends Rpc
         */
        @SuppressWarnings("unchecked")
        public void startRegistration() {
+               if (canceled) {
+                       // we already got canceled
+                       return;
+               }
+
                try {
                        // trigger resolution of the resource manager address 
to a callable gateway
                        final CompletableFuture<G> resourceManagerFuture;
@@ -199,16 +205,17 @@ public abstract class RetryingRegistration<F extends 
Serializable, G extends Rpc
                        // upon failure, retry, unless this is cancelled
                        resourceManagerAcceptFuture.whenCompleteAsync(
                                (Void v, Throwable failure) -> {
-                                       if (failure != null && !isCanceled()) {
-                                               log.warn("Could not resolve {} 
address {}, retrying...", targetName, targetAddress, failure);
-                                               startRegistration();
+                                       if (failure != null && !canceled) {
+                                               log.warn("Could not resolve {} 
address {}, retrying in {} ms", targetName, targetAddress, delayOnError, 
failure);
+
+                                               
startRegistrationLater(delayOnError);
                                        }
                                },
                                rpcService.getExecutor());
                }
                catch (Throwable t) {
-                       cancel();
                        completionFuture.completeExceptionally(t);
+                       cancel();
                }
        }
 
@@ -280,8 +287,8 @@ public abstract class RetryingRegistration<F extends 
Serializable, G extends Rpc
                                rpcService.getExecutor());
                }
                catch (Throwable t) {
-                       cancel();
                        completionFuture.completeExceptionally(t);
+                       cancel();
                }
        }
 
@@ -293,4 +300,11 @@ public abstract class RetryingRegistration<F extends 
Serializable, G extends Rpc
                        }
                }, delay, TimeUnit.MILLISECONDS);
        }
+
+       private void startRegistrationLater(final long delay) {
+               rpcService.scheduleRunnable(
+                       this::startRegistration,
+                       delay,
+                       TimeUnit.MILLISECONDS);
+       }
 }

http://git-wip-us.apache.org/repos/asf/flink/blob/9b0ba7ba/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
index ac0dbc5..7fc6897 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/registration/RetryingRegistrationTest.java
@@ -22,16 +22,17 @@ import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.RpcService;
 import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.util.TestLogger;
 
 import org.junit.Test;
+import org.mockito.invocation.InvocationOnMock;
 import org.slf4j.LoggerFactory;
 
 import java.util.UUID;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.TimeoutException;
 
@@ -115,7 +116,7 @@ public class RetryingRegistrationTest extends TestLogger {
                final String testId = "laissez les bon temps roulez";
                final UUID leaderId = UUID.randomUUID();
 
-               ExecutorService executor = Executors.newCachedThreadPool();
+               ExecutorService executor = TestingUtils.defaultExecutor();
                TestRegistrationGateway testGateway = new 
TestRegistrationGateway(new TestRegistrationSuccess(testId));
 
                try {
@@ -126,20 +127,36 @@ public class RetryingRegistrationTest extends TestLogger {
                                        
CompletableFuture.completedFuture(testGateway)                         // 
second connection attempt succeeds
                        );
                        when(rpc.getExecutor()).thenReturn(executor);
+                       when(rpc.scheduleRunnable(any(Runnable.class), 
anyLong(), any(TimeUnit.class))).thenAnswer(
+                               (InvocationOnMock invocation) -> {
+                                       final Runnable runnable = 
invocation.getArgumentAt(0, Runnable.class);
+                                       final long delay = 
invocation.getArgumentAt(1, Long.class);
+                                       final TimeUnit timeUnit = 
invocation.getArgumentAt(2, TimeUnit.class);
+                                       return 
TestingUtils.defaultScheduledExecutor().schedule(runnable, delay, timeUnit);
+                               });
 
                        TestRetryingRegistration registration = new 
TestRetryingRegistration(rpc, "foobar address", leaderId);
+
+                       long start = System.currentTimeMillis();
+
                        registration.startRegistration();
 
                        Tuple2<TestRegistrationGateway, 
TestRegistrationSuccess> success =
                                registration.getFuture().get(10L, 
TimeUnit.SECONDS);
 
+                       // measure the duration of the registration --> should 
be longer than the error delay
+                       long duration = System.currentTimeMillis() - start;
+
+                       assertTrue(
+                               "The registration should have failed the first 
time. Thus the duration should be longer than at least a single error delay.",
+                               duration > 
TestRetryingRegistration.DELAY_ON_ERROR);
+
                        // validate correct invocation and result
                        assertEquals(testId, success.f1.getCorrelationId());
                        assertEquals(leaderId, 
testGateway.getInvocations().take().leaderId());
                }
                finally {
                        testGateway.stop();
-                       executor.shutdown();
                }
        }
 

Reply via email to