This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 2284f777ecd3b62b412bd0fdb9dbcf492314c589 Author: Till Rohrmann <[email protected]> AuthorDate: Mon May 13 15:45:03 2019 +0200 [FLINK-12260][tests] Speed up ResourceManagerTaskExecutorTest#testDelayedRegisterTaskExecutor Use latches instead of timeouts/sleeps to test problematic thread interleaving. This closes #8415. --- .../ResourceManagerTaskExecutorTest.java | 38 +++++++++++++++------- .../flink/runtime/rpc/TestingRpcService.java | 29 +++++++---------- 2 files changed, 39 insertions(+), 28 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index fcb92af..63d8245 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.resourcemanager; import org.apache.flink.api.common.time.Time; +import org.apache.flink.core.testutils.OneShotLatch; import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.clusterframework.types.ResourceProfile; import org.apache.flink.runtime.clusterframework.types.SlotID; @@ -45,6 +46,7 @@ import org.apache.flink.runtime.taskexecutor.TaskExecutor; import org.apache.flink.runtime.taskexecutor.TaskExecutorRegistrationSuccess; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; +import org.apache.flink.runtime.testingUtils.TestingUtils; import org.apache.flink.runtime.util.TestingFatalErrorHandler; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; @@ -217,23 +219,36 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { */ @Test public void testDelayedRegisterTaskExecutor() throws Exception { - // additional delay over RPC timeout - // use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below - final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5; + final Time fastTimeout = Time.milliseconds(1L); try { - // first registration is with connection delay longer than timeout expecting timeout and then retry - rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis); + final OneShotLatch startConnection = new OneShotLatch(); + final OneShotLatch finishConnection = new OneShotLatch(); + + // first registration is with blocking connection + rpcService.setRpcGatewayFutureFunction(rpcGateway -> + CompletableFuture.supplyAsync( + () -> { + startConnection.trigger(); + try { + finishConnection.await(); + } catch (InterruptedException ignored) {} + return rpcGateway; + }, + TestingUtils.defaultExecutor())); + CompletableFuture<RegistrationResponse> firstFuture = - rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT); + rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, fastTimeout); try { firstFuture.get(); fail("Should have failed because connection to taskmanager is delayed beyond timeout"); } catch (Exception e) { - assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException); + assertThat(ExceptionUtils.stripExecutionException(e), instanceOf(AskTimeoutException.class)); } + startConnection.await(); + // second registration after timeout is with no delay, expecting it to be succeeded - rpcService.setConnectionDelayMillis(0); + rpcService.resetRpcGatewayFutureFunction(); CompletableFuture<RegistrationResponse> secondFuture = rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT); RegistrationResponse response = secondFuture.get(); @@ -244,8 +259,9 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { rmGateway.sendSlotReport(taskExecutorResourceID, ((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get(); - // wait enough for the first registration's connection delay to be over letting its remaining part go through - Thread.sleep(additionalDelayMillis * 2); + // let the remaining part of the first registration proceed + finishConnection.trigger(); + Thread.sleep(1L); // verify that the latest registration is valid not being unregistered by the delayed one final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo( @@ -254,7 +270,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID)); assertThat(taskManagerInfo.getNumberSlots(), equalTo(1)); } finally { - rpcService.setConnectionDelayMillis(0L); + rpcService.resetRpcGatewayFutureFunction(); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index 6d12266..f11269d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -23,11 +23,11 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; -import org.apache.flink.runtime.testingUtils.TestingUtils; import java.io.Serializable; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; import static org.apache.flink.util.Preconditions.checkNotNull; @@ -53,11 +53,12 @@ import static org.apache.flink.util.Preconditions.checkNotNull; */ public class TestingRpcService extends AkkaRpcService { + private static final Function<RpcGateway, CompletableFuture<RpcGateway>> DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION = CompletableFuture::completedFuture; + /** Map of pre-registered connections. */ private final ConcurrentHashMap<String, RpcGateway> registeredConnections; - /** Artificial delay on connection */ - private long connectionDelayMillis; + private volatile Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION; /** * Creates a new {@code TestingRpcService}. @@ -103,19 +104,9 @@ public class TestingRpcService extends AkkaRpcService { } } + @SuppressWarnings("unchecked") private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) { - if (connectionDelayMillis <= 0) { - return CompletableFuture.completedFuture(gateway); - } else { - return CompletableFuture.supplyAsync( - () -> { - try { - Thread.sleep(connectionDelayMillis); - } catch (InterruptedException ignored) {} - return gateway; - }, - TestingUtils.defaultExecutor()); - } + return (CompletableFuture<C>) rpcGatewayFutureFunction.apply(gateway); } @Override @@ -159,7 +150,11 @@ public class TestingRpcService extends AkkaRpcService { registeredConnections.clear(); } - public void setConnectionDelayMillis(long connectionDelayMillis) { - this.connectionDelayMillis = connectionDelayMillis; + public void resetRpcGatewayFutureFunction() { + rpcGatewayFutureFunction = DEFAULT_RPC_GATEWAY_FUTURE_FUNCTION; + } + + public void setRpcGatewayFutureFunction(Function<RpcGateway, CompletableFuture<RpcGateway>> rpcGatewayFutureFunction) { + this.rpcGatewayFutureFunction = rpcGatewayFutureFunction; } }
