This is an automated email from the ASF dual-hosted git repository. trohrmann pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 07773d0d9251d6ad8c1770de985d33be8e72b032 Author: Hwanju Kim <[email protected]> AuthorDate: Thu May 9 17:03:37 2019 -0700 [FLINK-12260] Slot allocation failure by taskmanager registration timeout and race TaskExecutor registration has asynchronous process, which allows a next retry after timeout to be processed first ahead of earlier request. Such delayed timed-out request can accidently unregister a valid task manager, whose slots are permanently not reported to job manager. This patch introduces ongoing task executor futures to prevent such race. --- .../runtime/resourcemanager/ResourceManager.java | 27 +++++++---- .../ResourceManagerTaskExecutorTest.java | 52 +++++++++++++++++++++- .../flink/runtime/rpc/TestingRpcService.java | 27 ++++++++++- 3 files changed, 95 insertions(+), 11 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java index 12860a8..03e1d87 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java @@ -114,6 +114,9 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> /** All currently registered TaskExecutors with there framework specific worker information. */ private final Map<ResourceID, WorkerRegistration<WorkerType>> taskExecutors; + /** Ongoing registration of TaskExecutors per resource ID. */ + private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>> taskExecutorGatewayFutures; + /** High availability services for leader retrieval and election. */ private final HighAvailabilityServices highAvailabilityServices; @@ -186,6 +189,7 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> this.jobManagerRegistrations = new HashMap<>(4); this.jmResourceIdRegistrations = new HashMap<>(4); this.taskExecutors = new HashMap<>(8); + this.taskExecutorGatewayFutures = new HashMap<>(8); } @@ -371,18 +375,25 @@ public abstract class ResourceManager<WorkerType extends ResourceIDRetrievable> final Time timeout) { CompletableFuture<TaskExecutorGateway> taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, TaskExecutorGateway.class); + taskExecutorGatewayFutures.put(taskExecutorResourceId, taskExecutorGatewayFuture); return taskExecutorGatewayFuture.handleAsync( (TaskExecutorGateway taskExecutorGateway, Throwable throwable) -> { - if (throwable != null) { - return new RegistrationResponse.Decline(throwable.getMessage()); + if (taskExecutorGatewayFuture == taskExecutorGatewayFutures.get(taskExecutorResourceId)) { + taskExecutorGatewayFutures.remove(taskExecutorResourceId); + if (throwable != null) { + return new RegistrationResponse.Decline(throwable.getMessage()); + } else { + return registerTaskExecutorInternal( + taskExecutorGateway, + taskExecutorAddress, + taskExecutorResourceId, + dataPort, + hardwareDescription); + } } else { - return registerTaskExecutorInternal( - taskExecutorGateway, - taskExecutorAddress, - taskExecutorResourceId, - dataPort, - hardwareDescription); + log.info("Ignoring outdated TaskExecutorGateway connection."); + return new RegistrationResponse.Decline("Decline outdated task executor registration."); } }, getMainThreadExecutor()); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java index 95b3d08..fcb92af 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java @@ -50,6 +50,7 @@ import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.TestLogger; +import akka.pattern.AskTimeoutException; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -79,6 +80,8 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { private static final Time TIMEOUT = Time.seconds(10L); + private static final long HEARTBEAT_TIMEOUT = 5000; + private static TestingRpcService rpcService; private TestingTaskExecutorGateway taskExecutorGateway; @@ -133,7 +136,7 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { private StandaloneResourceManager createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception { TestingHighAvailabilityServices highAvailabilityServices = new TestingHighAvailabilityServices(); - HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, 1000L); + HeartbeatServices heartbeatServices = new HeartbeatServices(1000L, HEARTBEAT_TIMEOUT); highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService); SlotManager slotManager = SlotManagerBuilder.newBuilder() @@ -209,6 +212,53 @@ public class ResourceManagerTaskExecutorTest extends TestLogger { } /** + * Test delayed registration of task executor where the delay is introduced during connection from resource manager + * to the registering task executor. + */ + @Test + public void testDelayedRegisterTaskExecutor() throws Exception { + // additional delay over RPC timeout + // use a value much smaller (< 1/2) than heartbeat timeout not to hit the timeout on delay for race test below + final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5; + try { + // first registration is with connection delay longer than timeout expecting timeout and then retry + rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + additionalDelayMillis); + CompletableFuture<RegistrationResponse> firstFuture = + rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT); + try { + firstFuture.get(); + fail("Should have failed because connection to taskmanager is delayed beyond timeout"); + } catch (Exception e) { + assertTrue(ExceptionUtils.stripExecutionException(e) instanceof AskTimeoutException); + } + + // second registration after timeout is with no delay, expecting it to be succeeded + rpcService.setConnectionDelayMillis(0); + CompletableFuture<RegistrationResponse> secondFuture = + rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT); + RegistrationResponse response = secondFuture.get(); + assertTrue(response instanceof TaskExecutorRegistrationSuccess); + + // on success, send slot report for taskmanager registration + final SlotReport slotReport = new SlotReport(new SlotStatus(new SlotID(taskExecutorResourceID, 0), ResourceProfile.UNKNOWN)); + rmGateway.sendSlotReport(taskExecutorResourceID, + ((TaskExecutorRegistrationSuccess) response).getRegistrationId(), slotReport, TIMEOUT).get(); + + // wait enough for the first registration's connection delay to be over letting its remaining part go through + Thread.sleep(additionalDelayMillis * 2); + + // verify that the latest registration is valid not being unregistered by the delayed one + final TaskManagerInfo taskManagerInfo = rmGateway.requestTaskManagerInfo( + taskExecutorResourceID, + TIMEOUT).get(); + assertThat(taskManagerInfo.getResourceId(), equalTo(taskExecutorResourceID)); + assertThat(taskManagerInfo.getNumberSlots(), equalTo(1)); + } finally { + rpcService.setConnectionDelayMillis(0L); + } + } + + /** * Tests that a TaskExecutor can disconnect from the {@link ResourceManager}. */ @Test diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java index f42f09c..6d12266 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java @@ -23,6 +23,7 @@ import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.runtime.concurrent.FutureUtils; import org.apache.flink.runtime.rpc.akka.AkkaRpcService; import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration; +import org.apache.flink.runtime.testingUtils.TestingUtils; import java.io.Serializable; import java.util.concurrent.CompletableFuture; @@ -55,6 +56,9 @@ public class TestingRpcService extends AkkaRpcService { /** Map of pre-registered connections. */ private final ConcurrentHashMap<String, RpcGateway> registeredConnections; + /** Artificial delay on connection */ + private long connectionDelayMillis; + /** * Creates a new {@code TestingRpcService}. */ @@ -99,6 +103,21 @@ public class TestingRpcService extends AkkaRpcService { } } + private <C extends RpcGateway> CompletableFuture<C> getRpcGatewayFuture(C gateway) { + if (connectionDelayMillis <= 0) { + return CompletableFuture.completedFuture(gateway); + } else { + return CompletableFuture.supplyAsync( + () -> { + try { + Thread.sleep(connectionDelayMillis); + } catch (InterruptedException ignored) {} + return gateway; + }, + TestingUtils.defaultExecutor()); + } + } + @Override public <C extends RpcGateway> CompletableFuture<C> connect(String address, Class<C> clazz) { RpcGateway gateway = registeredConnections.get(address); @@ -107,7 +126,7 @@ public class TestingRpcService extends AkkaRpcService { if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return CompletableFuture.completedFuture(typedGateway); + return getRpcGatewayFuture(typedGateway); } else { return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz)); } @@ -127,7 +146,7 @@ public class TestingRpcService extends AkkaRpcService { if (clazz.isAssignableFrom(gateway.getClass())) { @SuppressWarnings("unchecked") C typedGateway = (C) gateway; - return CompletableFuture.completedFuture(typedGateway); + return getRpcGatewayFuture(typedGateway); } else { return FutureUtils.completedExceptionally(new Exception("Gateway registered under " + address + " is not of type " + clazz)); } @@ -139,4 +158,8 @@ public class TestingRpcService extends AkkaRpcService { public void clearGateways() { registeredConnections.clear(); } + + public void setConnectionDelayMillis(long connectionDelayMillis) { + this.connectionDelayMillis = connectionDelayMillis; + } }
