This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 2284f777ecd3b62b412bd0fdb9dbcf492314c589
Author: Till Rohrmann <[email protected]>
AuthorDate: Mon May 13 15:45:03 2019 +0200

    [FLINK-12260][tests] Speed up 
ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor
    
    Use latches instead of timeouts/sleeps to test problematic thread 
interleaving.
    
    This closes #8415.
---
 .../ResourceManagerTaskExecutorTest.java           | 38 +++++++++++++++-------
 .../flink/runtime/rpc/TestingRpcService.java       | 29 +++++++----------
 2 files changed, 39 insertions(+), 28 deletions(-)

diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index fcb92af..63d8245 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.resourcemanager;
 
 import org.apache.flink.api.common.time.Time;
+import org.apache.flink.core.testutils.OneShotLatch;
 import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
 import org.apache.flink.runtime.clusterframework.types.SlotID;
@@ -45,6 +46,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor;
 import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway;
 import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 import org.apache.flink.runtime.util.TestingFatalErrorHandler;
 import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
@@ -217,23 +219,36 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
         */
        @Test
        public void testDelayedRegisterTaskExecutor() throws Exception {
-               // additional delay over RPC timeout
-               // use a value much smaller (< 1/2) than heartbeat timeout not 
to hit the timeout on delay for race test below
-               final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+               final Time fastTimeout = Time.milliseconds(1L);
                try {
-                       // first registration is with connection delay longer 
than timeout expecting timeout and then retry
-                       
rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + 
additionalDelayMillis);
+                       final OneShotLatch startConnection = new OneShotLatch();
+                       final OneShotLatch finishConnection = new 
OneShotLatch();
+
+                       // first registration is with blocking connection
+                       rpcService.setRpcGatewayFutureFunction(rpcGateway ->
+                               CompletableFuture.supplyAsync(
+                                       () -> {
+                                               startConnection.trigger();
+                                               try {
+                                                       
finishConnection.await();
+                                               } catch (InterruptedException 
ignored) {}
+                                               return rpcGateway;
+                                       },
+                                       TestingUtils.defaultExecutor()));
+
                        CompletableFuture<RegistrationResponse> firstFuture =
-                               
rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), 
taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+                               
rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), 
taskExecutorResourceID, dataPort, hardwareDescription, fastTimeout);
                        try {
                                firstFuture.get();
                                fail("Should have failed because connection to 
taskmanager is delayed beyond timeout");
                        } catch (Exception e) {
-                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
AskTimeoutException);
+                               
assertThat(ExceptionUtils.stripExecutionException(e), 
instanceOf(AskTimeoutException.class));
                        }
 
+                       startConnection.await();
+
                        // second registration after timeout is with no delay, 
expecting it to be succeeded
-                       rpcService.setConnectionDelayMillis(0);
+                       rpcService.resetRpcGatewayFutureFunction();
                        CompletableFuture<RegistrationResponse> secondFuture =
                                
rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), 
taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
                        RegistrationResponse response = secondFuture.get();
@@ -244,8 +259,9 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                        rmGateway.sendSlotReport(taskExecutorResourceID,
                                ((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(), slotReport, TIMEOUT).get();
 
-                       // wait enough for the first registration's connection 
delay to be over letting its remaining part go through
-                       Thread.sleep(additionalDelayMillis * 2);
+                       // let the remaining part of the first registration 
proceed
+                       finishConnection.trigger();
+                       Thread.sleep(1L);
 
                        // verify that the latest registration is valid not 
being unregistered by the delayed one
                        final TaskManagerInfo taskManagerInfo = 
rmGateway.requestTaskManagerInfo(
@@ -254,7 +270,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
                        assertThat(taskManagerInfo.getResourceId(), 
equalTo(taskExecutorResourceID));
                        assertThat(taskManagerInfo.getNumberSlots(), 
equalTo(1));
                } finally {
-                       rpcService.setConnectionDelayMillis(0L);
+                       rpcService.resetRpcGatewayFutureFunction();
                }
        }
 
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index 6d12266..f11269d 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,11 +23,11 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
-import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Function;
 
 import static org.apache.flink.util.Preconditions.checkNotNull;
 
@@ -53,11 +53,12 @@ import static 
org.apache.flink.util.Preconditions.checkNotNull;
  */
 public class TestingRpcService extends AkkaRpcService {
 
+       private static final Function<RpcGateway, 
CompletableFuture<RpcGateway>> DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION = 
CompletableFuture::completedFuture;
+
        /** Map of pre-registered connections. */
        private final ConcurrentHashMap<String, RpcGateway> 
registeredConnections;
 
-       /** Artificial delay on connection */
-       private long connectionDelayMillis;
+       private volatile Function<RpcGateway, CompletableFuture<RpcGateway>> 
rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
 
        /**
         * Creates a new {@code TestingRpcService}.
@@ -103,19 +104,9 @@ public class TestingRpcService extends AkkaRpcService {
                }
        }
 
+       @SuppressWarnings("unchecked")
        private <C extends RpcGateway> CompletableFuture<C> 
getRpcGatewayFuture(C gateway) {
-               if (connectionDelayMillis <= 0) {
-                       return CompletableFuture.completedFuture(gateway);
-               } else {
-                       return CompletableFuture.supplyAsync(
-                               () -> {
-                                       try {
-                                               
Thread.sleep(connectionDelayMillis);
-                                       } catch (InterruptedException ignored) 
{}
-                                       return gateway;
-                               },
-                               TestingUtils.defaultExecutor());
-               }
+               return (CompletableFuture<C>) 
rpcGatewayFutureFunction.apply(gateway);
        }
 
        @Override
@@ -159,7 +150,11 @@ public class TestingRpcService extends AkkaRpcService {
                registeredConnections.clear();
        }
 
-       public void setConnectionDelayMillis(long connectionDelayMillis) {
-               this.connectionDelayMillis = connectionDelayMillis;
+       public void resetRpcGatewayFutureFunction() {
+               rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION;
+       }
+
+       public void setRpcGatewayFutureFunction(Function<RpcGateway, 
CompletableFuture<RpcGateway>> rpcGatewayFutureFunction) {
+               this.rpcGatewayFutureFunction = rpcGatewayFutureFunction;
        }
 }

Reply via email to