This is an automated email from the ASF dual-hosted git repository.

trohrmann pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 07773d0d9251d6ad8c1770de985d33be8e72b032
Author: Hwanju Kim <[email protected]>
AuthorDate: Thu May 9 17:03:37 2019 -0700

    [FLINK-12260] Slot allocation failure by taskmanager registration timeout 
and race
    
    TaskExecutor registration has asynchronous process, which allows a next
    retry after timeout to be processed first ahead of earlier request. Such
    delayed timed-out request can accidently unregister a valid task
    manager, whose slots are permanently not reported to job manager. This
    patch introduces ongoing task executor futures to prevent such race.
---
 .../runtime/resourcemanager/ResourceManager.java   | 27 +++++++----
 .../ResourceManagerTaskExecutorTest.java           | 52 +++++++++++++++++++++-
 .../flink/runtime/rpc/TestingRpcService.java       | 27 ++++++++++-
 3 files changed, 95 insertions(+), 11 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
index 12860a8..03e1d87 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/ResourceManager.java
@@ -114,6 +114,9 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
        /** All currently registered TaskExecutors with there framework 
specific worker information. */
        private final Map<ResourceID, WorkerRegistration<WorkerType>> 
taskExecutors;
 
+       /** Ongoing registration of TaskExecutors per resource ID. */
+       private final Map<ResourceID, CompletableFuture<TaskExecutorGateway>> 
taskExecutorGatewayFutures;
+
        /** High availability services for leader retrieval and election. */
        private final HighAvailabilityServices highAvailabilityServices;
 
@@ -186,6 +189,7 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                this.jobManagerRegistrations = new HashMap<>(4);
                this.jmResourceIdRegistrations = new HashMap<>(4);
                this.taskExecutors = new HashMap<>(8);
+               this.taskExecutorGatewayFutures = new HashMap<>(8);
        }
 
 
@@ -371,18 +375,25 @@ public abstract class ResourceManager<WorkerType extends 
ResourceIDRetrievable>
                        final Time timeout) {
 
                CompletableFuture<TaskExecutorGateway> 
taskExecutorGatewayFuture = getRpcService().connect(taskExecutorAddress, 
TaskExecutorGateway.class);
+               taskExecutorGatewayFutures.put(taskExecutorResourceId, 
taskExecutorGatewayFuture);
 
                return taskExecutorGatewayFuture.handleAsync(
                        (TaskExecutorGateway taskExecutorGateway, Throwable 
throwable) -> {
-                               if (throwable != null) {
-                                       return new 
RegistrationResponse.Decline(throwable.getMessage());
+                               if (taskExecutorGatewayFuture == 
taskExecutorGatewayFutures.get(taskExecutorResourceId)) {
+                                       
taskExecutorGatewayFutures.remove(taskExecutorResourceId);
+                                       if (throwable != null) {
+                                               return new 
RegistrationResponse.Decline(throwable.getMessage());
+                                       } else {
+                                               return 
registerTaskExecutorInternal(
+                                                       taskExecutorGateway,
+                                                       taskExecutorAddress,
+                                                       taskExecutorResourceId,
+                                                       dataPort,
+                                                       hardwareDescription);
+                                       }
                                } else {
-                                       return registerTaskExecutorInternal(
-                                               taskExecutorGateway,
-                                               taskExecutorAddress,
-                                               taskExecutorResourceId,
-                                               dataPort,
-                                               hardwareDescription);
+                                       log.info("Ignoring outdated 
TaskExecutorGateway connection.");
+                                       return new 
RegistrationResponse.Decline("Decline outdated task executor registration.");
                                }
                        },
                        getMainThreadExecutor());
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
index 95b3d08..fcb92af 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/ResourceManagerTaskExecutorTest.java
@@ -50,6 +50,7 @@ import org.apache.flink.util.ExceptionUtils;
 import org.apache.flink.util.FlinkException;
 import org.apache.flink.util.TestLogger;
 
+import akka.pattern.AskTimeoutException;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Before;
@@ -79,6 +80,8 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
 
        private static final Time TIMEOUT = Time.seconds(10L);
 
+       private static final long HEARTBEAT_TIMEOUT = 5000;
+
        private static TestingRpcService rpcService;
 
        private TestingTaskExecutorGateway taskExecutorGateway;
@@ -133,7 +136,7 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
 
        private StandaloneResourceManager 
createAndStartResourceManager(LeaderElectionService rmLeaderElectionService, 
FatalErrorHandler fatalErrorHandler) throws Exception {
                TestingHighAvailabilityServices highAvailabilityServices = new 
TestingHighAvailabilityServices();
-               HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, 1000L);
+               HeartbeatServices heartbeatServices = new 
HeartbeatServices(1000L, HEARTBEAT_TIMEOUT);
                
highAvailabilityServices.setResourceManagerLeaderElectionService(rmLeaderElectionService);
 
                SlotManager slotManager = SlotManagerBuilder.newBuilder()
@@ -209,6 +212,53 @@ public class ResourceManagerTaskExecutorTest extends 
TestLogger {
        }
 
        /**
+        * Test delayed registration of task executor where the delay is 
introduced during connection from resource manager
+        * to the registering task executor.
+        */
+       @Test
+       public void testDelayedRegisterTaskExecutor() throws Exception {
+               // additional delay over RPC timeout
+               // use a value much smaller (< 1/2) than heartbeat timeout not 
to hit the timeout on delay for race test below
+               final long additionalDelayMillis = HEARTBEAT_TIMEOUT / 5;
+               try {
+                       // first registration is with connection delay longer 
than timeout expecting timeout and then retry
+                       
rpcService.setConnectionDelayMillis(TIMEOUT.toMilliseconds() + 
additionalDelayMillis);
+                       CompletableFuture<RegistrationResponse> firstFuture =
+                               
rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), 
taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+                       try {
+                               firstFuture.get();
+                               fail("Should have failed because connection to 
taskmanager is delayed beyond timeout");
+                       } catch (Exception e) {
+                               
assertTrue(ExceptionUtils.stripExecutionException(e) instanceof 
AskTimeoutException);
+                       }
+
+                       // second registration after timeout is with no delay, 
expecting it to be succeeded
+                       rpcService.setConnectionDelayMillis(0);
+                       CompletableFuture<RegistrationResponse> secondFuture =
+                               
rmGateway.registerTaskExecutor(taskExecutorGateway.getAddress(), 
taskExecutorResourceID, dataPort, hardwareDescription, TIMEOUT);
+                       RegistrationResponse response = secondFuture.get();
+                       assertTrue(response instanceof 
TaskExecutorRegistrationSuccess);
+
+                       // on success, send slot report for taskmanager 
registration
+                       final SlotReport slotReport = new SlotReport(new 
SlotStatus(new SlotID(taskExecutorResourceID, 0), ResourceProfile.UNKNOWN));
+                       rmGateway.sendSlotReport(taskExecutorResourceID,
+                               ((TaskExecutorRegistrationSuccess) 
response).getRegistrationId(), slotReport, TIMEOUT).get();
+
+                       // wait enough for the first registration's connection 
delay to be over letting its remaining part go through
+                       Thread.sleep(additionalDelayMillis * 2);
+
+                       // verify that the latest registration is valid not 
being unregistered by the delayed one
+                       final TaskManagerInfo taskManagerInfo = 
rmGateway.requestTaskManagerInfo(
+                               taskExecutorResourceID,
+                               TIMEOUT).get();
+                       assertThat(taskManagerInfo.getResourceId(), 
equalTo(taskExecutorResourceID));
+                       assertThat(taskManagerInfo.getNumberSlots(), 
equalTo(1));
+               } finally {
+                       rpcService.setConnectionDelayMillis(0L);
+               }
+       }
+
+       /**
         * Tests that a TaskExecutor can disconnect from the {@link 
ResourceManager}.
         */
        @Test
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
index f42f09c..6d12266 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/rpc/TestingRpcService.java
@@ -23,6 +23,7 @@ import org.apache.flink.runtime.akka.AkkaUtils;
 import org.apache.flink.runtime.concurrent.FutureUtils;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcService;
 import org.apache.flink.runtime.rpc.akka.AkkaRpcServiceConfiguration;
+import org.apache.flink.runtime.testingUtils.TestingUtils;
 
 import java.io.Serializable;
 import java.util.concurrent.CompletableFuture;
@@ -55,6 +56,9 @@ public class TestingRpcService extends AkkaRpcService {
        /** Map of pre-registered connections. */
        private final ConcurrentHashMap<String, RpcGateway> 
registeredConnections;
 
+       /** Artificial delay on connection */
+       private long connectionDelayMillis;
+
        /**
         * Creates a new {@code TestingRpcService}.
         */
@@ -99,6 +103,21 @@ public class TestingRpcService extends AkkaRpcService {
                }
        }
 
+       private <C extends RpcGateway> CompletableFuture<C> 
getRpcGatewayFuture(C gateway) {
+               if (connectionDelayMillis <= 0) {
+                       return CompletableFuture.completedFuture(gateway);
+               } else {
+                       return CompletableFuture.supplyAsync(
+                               () -> {
+                                       try {
+                                               
Thread.sleep(connectionDelayMillis);
+                                       } catch (InterruptedException ignored) 
{}
+                                       return gateway;
+                               },
+                               TestingUtils.defaultExecutor());
+               }
+       }
+
        @Override
        public <C extends RpcGateway> CompletableFuture<C> connect(String 
address, Class<C> clazz) {
                RpcGateway gateway = registeredConnections.get(address);
@@ -107,7 +126,7 @@ public class TestingRpcService extends AkkaRpcService {
                        if (clazz.isAssignableFrom(gateway.getClass())) {
                                @SuppressWarnings("unchecked")
                                C typedGateway = (C) gateway;
-                               return 
CompletableFuture.completedFuture(typedGateway);
+                               return getRpcGatewayFuture(typedGateway);
                        } else {
                                return FutureUtils.completedExceptionally(new 
Exception("Gateway registered under " + address + " is not of type " + clazz));
                        }
@@ -127,7 +146,7 @@ public class TestingRpcService extends AkkaRpcService {
                        if (clazz.isAssignableFrom(gateway.getClass())) {
                                @SuppressWarnings("unchecked")
                                C typedGateway = (C) gateway;
-                               return 
CompletableFuture.completedFuture(typedGateway);
+                               return getRpcGatewayFuture(typedGateway);
                        } else {
                                return FutureUtils.completedExceptionally(new 
Exception("Gateway registered under " + address + " is not of type " + clazz));
                        }
@@ -139,4 +158,8 @@ public class TestingRpcService extends AkkaRpcService {
        public void clearGateways() {
                registeredConnections.clear();
        }
+
+       public void setConnectionDelayMillis(long connectionDelayMillis) {
+               this.connectionDelayMillis = connectionDelayMillis;
+       }
 }

Reply via email to