tillrohrmann commented on a change in pull request #13004:
URL: https://github.com/apache/flink/pull/13004#discussion_r463060550



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/jobmanager/JobManagerFlinkMemory.java
##########
@@ -50,7 +53,8 @@
        private final MemorySize jvmHeap;
        private final MemorySize offHeapMemory;
 
-       JobManagerFlinkMemory(MemorySize jvmHeap, MemorySize offHeapMemory) {
+       @VisibleForTesting

Review comment:
       The reason why Xintong added this annotation is that he changed the 
visibility of this class for some tests I believe.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing 
resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends 
ResourceIDRetrievable> {
+
+       /**
+        * Initialize the deployment specific components.
+        *
+        * @param resourceEventHandler Handler that handles resource events.
+        */
+       void initialize(ResourceEventHandler<WorkerType> resourceEventHandler) 
throws Throwable;
+
+       /**
+        * Terminate the deployment specific components.
+        *
+        * @return A future that will be completed successfully when the driver 
is terminated, or exceptionally if cannot be
+        * terminated.
+        */
+       CompletableFuture<Void> terminate();
+
+       /**
+        * The deployment specific code to deregister the application. This 
should report the application's final status and
+        * shut down the resource manager driver cleanly.
+        *
+        * <p>This method also needs to make sure all pending containers that 
are not registered yet are returned.
+        *
+        * @param finalStatus The application status to report.
+        * @param optionalDiagnostics A diagnostics message or {@code null}.
+        * @throws ResourceManagerException if the application could not be 
shut down.
+        */
+       void deregisterApplication(ApplicationStatus finalStatus, @Nullable 
String optionalDiagnostics) throws Throwable;
+
+       /**
+        * Request resource from the external resource manager.
+        *
+        * <p>This method request a new resource from the external resource 
manager, and tries to launch a task manager
+        * inside the allocated resource, with respect to the provided 
taskExecutorProcessSpec The returned future will be

Review comment:
       ```suggestion
         * inside the allocated resource, with respect to the provided 
taskExecutorProcessSpec. The returned future will be
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest extends TestLogger {
+
+       private static final long TIMEOUT_SEC = 5L;
+       private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
+
+       private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = 
WorkerResourceSpec.ZERO;
+
+       /**
+        * Tests worker successfully requested, started and registered.
+        */
+       @Test
+       public void testStartNewWorker() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final CompletableFuture<TaskExecutorProcessSpec> 
requestWorkerFromDriverFuture = new CompletableFuture<>();
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               
requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceId);
+                       });
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec taskExecutorProcessSpec 
= requestWorkerFromDriverFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // worker registered, verify registration 
succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker failed while requesting.
+        */
+       @Test
+       public void testStartNewWorkerFailedRequesting() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<CompletableFuture<ResourceID>> 
resourceIdFutures = new ArrayList<>();
+                       resourceIdFutures.add(new CompletableFuture<>());
+                       resourceIdFutures.add(new CompletableFuture<>());
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return resourceIdFutures.get(idx);
+                       });
+
+                       slotManagerBuilder.setGetRequiredResourcesSupplier(() 
-> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec1 = 
requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec1,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // first request failed, verify requesting 
another worker from driver
+                               runInMainThread(() -> 
resourceIdFutures.get(0).completeExceptionally(new Throwable("testing error")));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec2 =
+                                               
requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+
+                               // second request allocated, verify 
registration succeed
+                               runInMainThread(() -> 
resourceIdFutures.get(1).complete(tmResourceId));
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker terminated after requested before registered.
+        */
+       @Test
+       public void testWorkerTerminatedBeforeRegister() throws Exception {
+               new Context() {{
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<ResourceID> tmResourceIds = new 
ArrayList<>();
+                       tmResourceIds.add(ResourceID.generate());
+                       tmResourceIds.add(ResourceID.generate());
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceIds.get(idx));
+                       });
+
+                       slotManagerBuilder.setGetRequiredResourcesSupplier(() 
-> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec1 = 
requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec1,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // first worker failed before register, verify 
requesting another worker from driver
+                               runInMainThread(() -> 
getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec2 =
+                                               
requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+
+                               // second worker registered, verify 
registration succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceIds.get(1));
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker terminated after registered.
+        */
+       @Test
+       public void testWorkerTerminatedAfterRegister() throws Exception {
+               new Context() {{
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<ResourceID> tmResourceIds = new 
ArrayList<>();
+                       tmResourceIds.add(ResourceID.generate());
+                       tmResourceIds.add(ResourceID.generate());
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceIds.get(idx));
+                       });
+
+                       slotManagerBuilder.setGetRequiredResourcesSupplier(() 
-> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec1 = 
requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec1,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // first worker registered, verify registration 
succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture1 = registerTaskExecutor(tmResourceIds.get(0));
+                               
assertThat(registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+
+                               // first worker terminated, verify requesting 
another worker from driver
+                               runInMainThread(() -> 
getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec2 =
+                                               
requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+
+                               // second worker registered, verify 
registration succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture2 = registerTaskExecutor(tmResourceIds.get(1));
+                               
assertThat(registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker terminated and is no longer required.
+        */
+       @Test
+       public void testWorkerTerminatedNoLongerRequired() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceId);
+                       });
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec taskExecutorProcessSpec 
= requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // worker registered, verify registration 
succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+
+                               // worker terminated, verify not requesting new 
worker
+                               runInMainThread(() -> {
+                                       
getResourceManager().onWorkerTerminated(tmResourceId);
+                                       // needs to return something, so that 
we can use `get()` to make sure the main thread processing
+                                       // finishes before the assertions
+                                       return null;
+                               }).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               
assertFalse(requestWorkerFromDriverFutures.get(1).isDone());
+                       });
+               }};
+       }
+
+       /**
+        * Tests workers from previous attempt successfully recovered and 
registered.
+        */
+       @Test
+       public void testRecoverWorkerFromPreviousAttempt() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+
+                       runTest(() -> {
+                               runInMainThread(() -> 
getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(tmResourceId)));
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests decline unknown worker registration.
+        */
+       @Test
+       public void testRegisterUnknownWorker() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(ResourceID.generate());
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Decline.class));
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnError() throws Exception {
+               new Context() {{
+                       final Throwable fatalError = new Throwable("Testing 
fatal error");
+                       runTest(() -> {
+                               runInMainThread(() -> 
getResourceManager().onError(fatalError));
+                               final Throwable reportedError = 
getFatalErrorHandler().getErrorFuture().get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               assertThat(reportedError, is(fatalError));
+                       });
+               }};
+       }
+
+       class Context {
+
+               final Configuration flinkConfig = new Configuration();
+               final TestingResourceManagerDriver.Builder driverBuilder = new 
TestingResourceManagerDriver.Builder();
+               final TestingSlotManagerBuilder slotManagerBuilder = new 
TestingSlotManagerBuilder();
+
+               private ActiveResourceManager<ResourceID> resourceManager;
+               private TestingFatalErrorHandler fatalErrorHandler;
+
+               ActiveResourceManager<ResourceID> getResourceManager() {
+                       return resourceManager;
+               }
+
+               TestingFatalErrorHandler getFatalErrorHandler() {
+                       return fatalErrorHandler;
+               }
+
+               void runTest(RunnableWithException testMethod) throws Exception 
{
+                       fatalErrorHandler = new TestingFatalErrorHandler();
+                       resourceManager = createAndStartResourceManager(
+                                       flinkConfig,
+                                       driverBuilder.build(),
+                                       slotManagerBuilder.createSlotManager());
+
+                       try {
+                               testMethod.run();
+                       } finally {
+                               resourceManager.close();
+                       }
+               }
+
+               private ActiveResourceManager<ResourceID> 
createAndStartResourceManager(
+                               Configuration configuration,
+                               ResourceManagerDriver<ResourceID> driver,
+                               SlotManager slotManager) throws Exception {
+                       final TestingRpcService rpcService = new 
TestingRpcService(configuration);

Review comment:
       nit: I think it would be a bit cheaper to reuse the `TestingRpcService` 
via `@ClassRule static final TestingRpcServiceResource 
testingRpcServiceResource = ....`.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest extends TestLogger {
+
+       private static final long TIMEOUT_SEC = 5L;
+       private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
+
+       private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = 
WorkerResourceSpec.ZERO;
+
+       /**
+        * Tests worker successfully requested, started and registered.
+        */
+       @Test
+       public void testStartNewWorker() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final CompletableFuture<TaskExecutorProcessSpec> 
requestWorkerFromDriverFuture = new CompletableFuture<>();
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               
requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceId);
+                       });
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec taskExecutorProcessSpec 
= requestWorkerFromDriverFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // worker registered, verify registration 
succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker failed while requesting.
+        */
+       @Test
+       public void testStartNewWorkerFailedRequesting() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<CompletableFuture<ResourceID>> 
resourceIdFutures = new ArrayList<>();
+                       resourceIdFutures.add(new CompletableFuture<>());
+                       resourceIdFutures.add(new CompletableFuture<>());
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return resourceIdFutures.get(idx);
+                       });
+
+                       slotManagerBuilder.setGetRequiredResourcesSupplier(() 
-> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec1 = 
requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec1,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // first request failed, verify requesting 
another worker from driver
+                               runInMainThread(() -> 
resourceIdFutures.get(0).completeExceptionally(new Throwable("testing error")));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec2 =
+                                               
requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+
+                               // second request allocated, verify 
registration succeed
+                               runInMainThread(() -> 
resourceIdFutures.get(1).complete(tmResourceId));
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker terminated after requested before registered.
+        */
+       @Test
+       public void testWorkerTerminatedBeforeRegister() throws Exception {
+               new Context() {{
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<ResourceID> tmResourceIds = new 
ArrayList<>();
+                       tmResourceIds.add(ResourceID.generate());
+                       tmResourceIds.add(ResourceID.generate());
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceIds.get(idx));
+                       });
+
+                       slotManagerBuilder.setGetRequiredResourcesSupplier(() 
-> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec1 = 
requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec1,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // first worker failed before register, verify 
requesting another worker from driver
+                               runInMainThread(() -> 
getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec2 =
+                                               
requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+
+                               // second worker registered, verify 
registration succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceIds.get(1));
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker terminated after registered.
+        */
+       @Test
+       public void testWorkerTerminatedAfterRegister() throws Exception {
+               new Context() {{
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<ResourceID> tmResourceIds = new 
ArrayList<>();
+                       tmResourceIds.add(ResourceID.generate());
+                       tmResourceIds.add(ResourceID.generate());
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceIds.get(idx));
+                       });
+
+                       slotManagerBuilder.setGetRequiredResourcesSupplier(() 
-> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec1 = 
requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec1,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // first worker registered, verify registration 
succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture1 = registerTaskExecutor(tmResourceIds.get(0));
+                               
assertThat(registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+
+                               // first worker terminated, verify requesting 
another worker from driver
+                               runInMainThread(() -> 
getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec2 =
+                                               
requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+
+                               // second worker registered, verify 
registration succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture2 = registerTaskExecutor(tmResourceIds.get(1));
+                               
assertThat(registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker terminated and is no longer required.
+        */
+       @Test
+       public void testWorkerTerminatedNoLongerRequired() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceId);
+                       });
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec taskExecutorProcessSpec 
= requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // worker registered, verify registration 
succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+
+                               // worker terminated, verify not requesting new 
worker
+                               runInMainThread(() -> {
+                                       
getResourceManager().onWorkerTerminated(tmResourceId);
+                                       // needs to return something, so that 
we can use `get()` to make sure the main thread processing
+                                       // finishes before the assertions
+                                       return null;
+                               }).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               
assertFalse(requestWorkerFromDriverFutures.get(1).isDone());
+                       });
+               }};
+       }
+
+       /**
+        * Tests workers from previous attempt successfully recovered and 
registered.
+        */
+       @Test
+       public void testRecoverWorkerFromPreviousAttempt() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+
+                       runTest(() -> {
+                               runInMainThread(() -> 
getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(tmResourceId)));
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests decline unknown worker registration.
+        */
+       @Test
+       public void testRegisterUnknownWorker() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(ResourceID.generate());
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Decline.class));
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnError() throws Exception {
+               new Context() {{
+                       final Throwable fatalError = new Throwable("Testing 
fatal error");
+                       runTest(() -> {
+                               runInMainThread(() -> 
getResourceManager().onError(fatalError));
+                               final Throwable reportedError = 
getFatalErrorHandler().getErrorFuture().get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               assertThat(reportedError, is(fatalError));
+                       });
+               }};
+       }
+
+       class Context {
+
+               final Configuration flinkConfig = new Configuration();
+               final TestingResourceManagerDriver.Builder driverBuilder = new 
TestingResourceManagerDriver.Builder();
+               final TestingSlotManagerBuilder slotManagerBuilder = new 
TestingSlotManagerBuilder();
+
+               private ActiveResourceManager<ResourceID> resourceManager;
+               private TestingFatalErrorHandler fatalErrorHandler;
+
+               ActiveResourceManager<ResourceID> getResourceManager() {
+                       return resourceManager;
+               }
+
+               TestingFatalErrorHandler getFatalErrorHandler() {
+                       return fatalErrorHandler;
+               }
+
+               void runTest(RunnableWithException testMethod) throws Exception 
{
+                       fatalErrorHandler = new TestingFatalErrorHandler();
+                       resourceManager = createAndStartResourceManager(
+                                       flinkConfig,
+                                       driverBuilder.build(),
+                                       slotManagerBuilder.createSlotManager());
+
+                       try {
+                               testMethod.run();
+                       } finally {
+                               resourceManager.close();
+                       }
+               }
+
+               private ActiveResourceManager<ResourceID> 
createAndStartResourceManager(
+                               Configuration configuration,
+                               ResourceManagerDriver<ResourceID> driver,
+                               SlotManager slotManager) throws Exception {
+                       final TestingRpcService rpcService = new 
TestingRpcService(configuration);
+                       final MockResourceManagerRuntimeServices rmServices = 
new MockResourceManagerRuntimeServices(rpcService, TIMEOUT_TIME, slotManager);
+
+                       final ActiveResourceManager<ResourceID> 
activeResourceManager = new ActiveResourceManager<>(
+                                       driver,
+                                       configuration,
+                                       rpcService,
+                                       ResourceID.generate(),
+                                       rmServices.highAvailabilityServices,
+                                       rmServices.heartbeatServices,
+                                       rmServices.slotManager,
+                                       
NoOpResourceManagerPartitionTracker::get,
+                                       rmServices.jobLeaderIdService,
+                                       new ClusterInformation("localhost", 
1234),
+                                       fatalErrorHandler,
+                                       
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
+
+                       activeResourceManager.start();
+                       rmServices.grantLeadership();
+
+                       return activeResourceManager;
+               }
+
+               public void runInMainThread(Runnable runnable) {
+                       resourceManager.handleInMainThread(runnable);
+               }
+
+               public <T> CompletableFuture<T> runInMainThread(Callable<T> 
callable) {
+                       return resourceManager.runInMainThread(callable, 
TIMEOUT_TIME);
+               }
+
+               CompletableFuture<RegistrationResponse> 
registerTaskExecutor(ResourceID resourceID) throws Exception {
+                       final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                                       .createTestingTaskExecutorGateway();
+                       ((TestingRpcService) 
resourceManager.getRpcService()).registerGateway(resourceID.toString(), 
taskExecutorGateway);

Review comment:
       Having a class wide `TestingRpcService` would avoid this casting here.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceEventHandler.java
##########
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+
+import java.util.Collection;
+
+/**
+ * Callback interfaces for handling resource events from external resource 
managers.
+ */
+public interface ResourceEventHandler<WorkerType extends 
ResourceIDRetrievable> {
+
+       /**
+        * Notifies that workers of previous attempt have been recovered from 
the external resource manager.
+        *
+        * @param recoveredWorkers Collection of worker nodes, in the 
deployment specific type.
+        */
+       void onPreviousAttemptWorkersRecovered(Collection<WorkerType> 
recoveredWorkers);
+
+       /**
+        * Notifies that the worker has been terminated.
+        *
+        * <p>See also {@link ResourceManagerDriver#requestResource}.
+        *
+        * @param resourceId Identifier of the terminated worker.
+        */
+       void onWorkerTerminated(ResourceID resourceId);
+
+       /**
+        * Notifies that an error has occurred that the process cannot proceed.
+        *
+        * @param exception Exception that describes the error.
+        */
+       void onError(Throwable exception);
+
+       /**
+        * Execute given runnable in the rpc main thread.
+        *
+        * @param runnable Runnable to be executed.
+        */
+       void handleInMainThread(Runnable runnable);

Review comment:
       Would it make sense to pass into the 
`ResourceManagerDriver.initialize()` a `MainThreadExecutor` instead of offering 
this method? The advantage would be that once could directly run future call 
backs in the main thread and does not have to call this method from the 
callback. Moreover, it would separate the concerns a bit better because the 
event handler does not need to know about the main thread if I'm not mistaken.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing 
resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends 
ResourceIDRetrievable> {
+
+       /**
+        * Initialize the deployment specific components.
+        *
+        * @param resourceEventHandler Handler that handles resource events.
+        */
+       void initialize(ResourceEventHandler<WorkerType> resourceEventHandler) 
throws Throwable;
+
+       /**
+        * Terminate the deployment specific components.
+        *
+        * @return A future that will be completed successfully when the driver 
is terminated, or exceptionally if cannot be
+        * terminated.
+        */
+       CompletableFuture<Void> terminate();
+
+       /**
+        * The deployment specific code to deregister the application. This 
should report the application's final status and
+        * shut down the resource manager driver cleanly.
+        *
+        * <p>This method also needs to make sure all pending containers that 
are not registered yet are returned.
+        *
+        * @param finalStatus The application status to report.
+        * @param optionalDiagnostics A diagnostics message or {@code null}.
+        * @throws ResourceManagerException if the application could not be 
shut down.
+        */
+       void deregisterApplication(ApplicationStatus finalStatus, @Nullable 
String optionalDiagnostics) throws Throwable;
+
+       /**
+        * Request resource from the external resource manager.
+        *
+        * <p>This method request a new resource from the external resource 
manager, and tries to launch a task manager
+        * inside the allocated resource, with respect to the provided 
taskExecutorProcessSpec The returned future will be
+        * completed with a worker node in the deployment specific type, or 
exceptionally if the allocation has failed.
+        *
+        * <p>Note: Success completion of the returned future does not 
necessarily mean the success of resource allocation
+        * and task manager launching. Allocation and launching failures can 
still happen after the future completion. In
+        * such cases, {@link ResourceEventHandler#onWorkerTerminated} will be 
called.
+        *
+        * <p>The future is guaranteed to be completed in the rpc main thread, 
before trying to launch the task manager,
+        * thus before the task manager registration. It is also guaranteed that
+        * {@link ResourceEventHandler#onWorkerTerminated} will not be called 
on the requested worker, until the returned
+        * future is completed successfully.

Review comment:
       Nice description of the contract.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing 
resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends 
ResourceIDRetrievable> {
+
+       /**
+        * Initialize the deployment specific components.
+        *
+        * @param resourceEventHandler Handler that handles resource events.
+        */
+       void initialize(ResourceEventHandler<WorkerType> resourceEventHandler) 
throws Throwable;
+
+       /**
+        * Terminate the deployment specific components.
+        *
+        * @return A future that will be completed successfully when the driver 
is terminated, or exceptionally if cannot be
+        * terminated.
+        */
+       CompletableFuture<Void> terminate();
+
+       /**
+        * The deployment specific code to deregister the application. This 
should report the application's final status and
+        * shut down the resource manager driver cleanly.
+        *
+        * <p>This method also needs to make sure all pending containers that 
are not registered yet are returned.
+        *
+        * @param finalStatus The application status to report.
+        * @param optionalDiagnostics A diagnostics message or {@code null}.
+        * @throws ResourceManagerException if the application could not be 
shut down.
+        */
+       void deregisterApplication(ApplicationStatus finalStatus, @Nullable 
String optionalDiagnostics) throws Throwable;
+
+       /**
+        * Request resource from the external resource manager.
+        *
+        * <p>This method request a new resource from the external resource 
manager, and tries to launch a task manager
+        * inside the allocated resource, with respect to the provided 
taskExecutorProcessSpec The returned future will be
+        * completed with a worker node in the deployment specific type, or 
exceptionally if the allocation has failed.
+        *
+        * <p>Note: Success completion of the returned future does not 
necessarily mean the success of resource allocation

Review comment:
       ```suggestion
         * <p>Note: Completion of the returned future does not necessarily mean 
the success of resource allocation
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ResourceManagerDriver.java
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * A {@link ResourceManagerDriver} is responsible for requesting and releasing 
resources from/to a particular external
+ * resource manager.
+ */
+public interface ResourceManagerDriver<WorkerType extends 
ResourceIDRetrievable> {
+
+       /**
+        * Initialize the deployment specific components.
+        *
+        * @param resourceEventHandler Handler that handles resource events.
+        */
+       void initialize(ResourceEventHandler<WorkerType> resourceEventHandler) 
throws Throwable;
+
+       /**
+        * Terminate the deployment specific components.
+        *
+        * @return A future that will be completed successfully when the driver 
is terminated, or exceptionally if cannot be
+        * terminated.
+        */
+       CompletableFuture<Void> terminate();
+
+       /**
+        * The deployment specific code to deregister the application. This 
should report the application's final status and
+        * shut down the resource manager driver cleanly.
+        *
+        * <p>This method also needs to make sure all pending containers that 
are not registered yet are returned.
+        *
+        * @param finalStatus The application status to report.
+        * @param optionalDiagnostics A diagnostics message or {@code null}.
+        * @throws ResourceManagerException if the application could not be 
shut down.
+        */
+       void deregisterApplication(ApplicationStatus finalStatus, @Nullable 
String optionalDiagnostics) throws Throwable;

Review comment:
       What is the relation between these two methods? From the JavaDocs, it 
reads as if `deregisterApplication` would also shut down the driver similar to 
`terminate`. Can they be unified?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An active implementation of {@link ResourceManager}.
+ *
+ * <p>This resource manager actively requests and releases resources from/to 
the external resource management frameworks.
+ * With different {@link ResourceManagerDriver} provided, this resource 
manager can work with various frameworks.
+ */
+public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
+               extends ResourceManager<WorkerType> implements 
ResourceEventHandler<WorkerType> {
+
+       protected final Configuration flinkConfig;
+
+       private final ResourceManagerDriver<WorkerType> resourceManagerDriver;
+
+       /** All workers maintained by {@link ActiveResourceManager}. */
+       private final Map<ResourceID, WorkerType> workerNodeMap;
+
+       /** Number of requested and not registered workers per worker resource 
spec. */
+       private final PendingWorkerCounter pendingWorkerCounter;
+
+       /** Identifiers and worker resource spec of requested not registered 
workers. */
+       private final Map<ResourceID, WorkerResourceSpec> 
currentAttemptUnregisteredWorkers;
+
+       public ActiveResourceManager(
+                       ResourceManagerDriver<WorkerType> resourceManagerDriver,
+                       Configuration flinkConfig,
+                       RpcService rpcService,
+                       ResourceID resourceId,
+                       HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
+                       SlotManager slotManager,
+                       ResourceManagerPartitionTrackerFactory 
clusterPartitionTrackerFactory,
+                       JobLeaderIdService jobLeaderIdService,
+                       ClusterInformation clusterInformation,
+                       FatalErrorHandler fatalErrorHandler,
+                       ResourceManagerMetricGroup resourceManagerMetricGroup) {
+               super(
+                               rpcService,
+                               resourceId,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               slotManager,
+                               clusterPartitionTrackerFactory,
+                               jobLeaderIdService,
+                               clusterInformation,
+                               fatalErrorHandler,
+                               resourceManagerMetricGroup,
+                               
AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)));
+
+               this.flinkConfig = flinkConfig;
+               this.resourceManagerDriver = 
Preconditions.checkNotNull(resourceManagerDriver);
+               this.workerNodeMap = new HashMap<>();
+               this.pendingWorkerCounter = new PendingWorkerCounter();
+               this.currentAttemptUnregisteredWorkers = new HashMap<>();
+       }
+
+       // 
------------------------------------------------------------------------
+       //  ResourceManager
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected void initialize() throws ResourceManagerException {
+               try {
+                       resourceManagerDriver.initialize(this);
+               } catch (Throwable t) {
+                       throw new ResourceManagerException("Cannot initialize 
resource provider.", t);
+               }
+       }
+
+       @Override
+       protected void terminate() throws ResourceManagerException {
+               try {
+                       resourceManagerDriver.terminate().get();
+               } catch (Throwable t) {
+                       throw new ResourceManagerException("Cannot terminate 
resource provider.", t);
+               }
+       }
+
+       @Override
+       protected void internalDeregisterApplication(ApplicationStatus 
finalStatus, @Nullable String optionalDiagnostics)
+                       throws ResourceManagerException {
+               try {
+                       
resourceManagerDriver.deregisterApplication(finalStatus, optionalDiagnostics);
+               } catch (Throwable t) {
+                       throw new ResourceManagerException("Cannot deregister 
application.", t);
+               }
+       }
+
+       @Override
+       public boolean startNewWorker(WorkerResourceSpec workerResourceSpec) {
+               requestNewWorker(workerResourceSpec);
+               return true;
+       }
+
+       @Override
+       protected WorkerType workerStarted(ResourceID resourceID) {
+               return workerNodeMap.get(resourceID);
+       }
+
+       @Override
+       public boolean stopWorker(WorkerType worker) {
+               final ResourceID resourceId = worker.getResourceID();
+               resourceManagerDriver.releaseResource(worker);
+
+               log.info("Stopping worker {}.", resourceId);
+
+               clearStateForWorker(resourceId);
+
+               return true;
+       }
+
+       @Override
+       protected void onWorkerRegistered(WorkerType worker) {
+               final ResourceID resourceId = worker.getResourceID();
+               log.info("Worker {} is registered.", resourceId);
+
+               final WorkerResourceSpec workerResourceSpec = 
currentAttemptUnregisteredWorkers.remove(resourceId);
+               if (workerResourceSpec != null) {
+                       final int count = 
pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+                       log.info("Worker {} with resource spec {} was requested 
in current attempt." +
+                                                       " Current pending count 
after registering: {}.",
+                                       resourceId,
+                                       workerResourceSpec,
+                                       count);
+               }
+       }
+
+       // 
------------------------------------------------------------------------
+       //  ResourceEventListener
+       // 
------------------------------------------------------------------------
+
+       @Override
+       public void onPreviousAttemptWorkersRecovered(Collection<WorkerType> 
recoveredWorkers) {
+               log.info("Recovered {} workers from previous attempt.", 
recoveredWorkers.size());
+               for (WorkerType worker : recoveredWorkers) {
+                       final ResourceID resourceId = worker.getResourceID();
+                       workerNodeMap.put(resourceId, worker);
+                       log.info("Worker {} recovered from previous attempt.", 
resourceId);
+               }
+       }
+
+       @Override
+       public void onWorkerTerminated(ResourceID resourceId) {
+               log.info("Worker {} is terminated.", resourceId);
+               if (clearStateForWorker(resourceId)) {
+                       requestWorkerIfRequired();
+               }
+       }
+
+       @Override
+       public void onError(Throwable exception) {
+               onFatalError(exception);
+       }
+
+       @Override
+       public void handleInMainThread(Runnable runnable) {
+               runAsync(runnable);
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Internal
+       // 
------------------------------------------------------------------------
+
+       private void requestNewWorker(WorkerResourceSpec workerResourceSpec) {
+               final TaskExecutorProcessSpec taskExecutorProcessSpec =
+                               
TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
workerResourceSpec);
+               final int pendingCount = 
pendingWorkerCounter.increaseAndGet(workerResourceSpec);
+
+               log.info("Requesting new worker with resource spec {}, current 
pending count: {}.",
+                               workerResourceSpec,
+                               pendingCount);
+
+               resourceManagerDriver.requestResource(taskExecutorProcessSpec)
+                               .whenComplete((worker, exception) -> {
+                                       if (exception != null) {
+                                               final int count = 
pendingWorkerCounter.decreaseAndGet(workerResourceSpec);
+                                               log.warn("Failed requesting 
worker with resource spec {}, current pending count: {}, exception: {}",
+                                                               
workerResourceSpec,
+                                                               count,
+                                                               exception);
+                                               requestWorkerIfRequired();
+                                       } else {
+                                               final ResourceID resourceId = 
worker.getResourceID();
+                                               workerNodeMap.put(resourceId, 
worker);
+                                               
currentAttemptUnregisteredWorkers.put(resourceId, workerResourceSpec);
+                                               log.info("Requested worker {} 
with resource spec {}.",
+                                                               resourceId,
+                                                               
workerResourceSpec);
+                                       }
+                               });

Review comment:
       It would be good to guard against unwanted exceptions via 
`FutureUtils.assertNoException(...)`. In this case `whenComplete` would have to 
become `handle` which returns `null`.

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/util/config/memory/CommonProcessMemorySpec.java
##########
@@ -94,4 +96,16 @@ public MemorySize getTotalFlinkMemorySize() {
        public MemorySize getTotalProcessMemorySize() {
                return 
flinkMemory.getTotalFlinkMemorySize().add(getJvmMetaspaceSize()).add(getJvmOverheadSize());
        }
+
+       @Override
+       public boolean equals(Object obj) {

Review comment:
       For what do we need the `equals` method on `CommonProcessMemorySpec`?

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/TestingResourceManagerDriver.java
##########
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.util.function.BiConsumerWithException;
+import org.apache.flink.util.function.ConsumerWithException;
+
+import javax.annotation.Nullable;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Consumer;
+import java.util.function.Function;
+import java.util.function.Supplier;
+
+/**
+ * Testing implementation of {@link ResourceManagerDriver}.
+ */
+public class TestingResourceManagerDriver implements 
ResourceManagerDriver<ResourceID> {
+
+       private final ConsumerWithException<ResourceEventHandler<ResourceID>, 
Throwable> initializeConsumer;
+       private final Supplier<CompletableFuture<Void>> terminateSupplier;
+       private final BiConsumerWithException<ApplicationStatus, String, 
Throwable> deregisterApplicationConsumer;
+       private final Function<TaskExecutorProcessSpec, 
CompletableFuture<ResourceID>> requestResourceFunction;
+       private final Consumer<ResourceID> releaseResourceConsumer;
+
+       private TestingResourceManagerDriver(
+                       final 
ConsumerWithException<ResourceEventHandler<ResourceID>, Throwable> 
initializeConsumer,
+                       final Supplier<CompletableFuture<Void>> 
terminateSupplier,
+                       final BiConsumerWithException<ApplicationStatus, 
String, Throwable> deregisterApplicationConsumer,
+                       final Function<TaskExecutorProcessSpec, 
CompletableFuture<ResourceID>> requestResourceFunction,
+                       final Consumer<ResourceID> releaseResourceConsumer) {
+               this.initializeConsumer = 
Preconditions.checkNotNull(initializeConsumer);
+               this.terminateSupplier = 
Preconditions.checkNotNull(terminateSupplier);
+               this.deregisterApplicationConsumer = 
Preconditions.checkNotNull(deregisterApplicationConsumer);
+               this.requestResourceFunction = 
Preconditions.checkNotNull(requestResourceFunction);
+               this.releaseResourceConsumer = 
Preconditions.checkNotNull(releaseResourceConsumer);
+       }
+
+       @Override
+       public void initialize(ResourceEventHandler<ResourceID> 
resourceEventHandler) throws Throwable {
+               initializeConsumer.accept(resourceEventHandler);
+       }
+
+       @Override
+       public CompletableFuture<Void> terminate() {
+               return terminateSupplier.get();
+       }
+
+       @Override
+       public void deregisterApplication(ApplicationStatus finalStatus, 
@Nullable String optionalDiagnostics) throws Throwable {
+               deregisterApplicationConsumer.accept(finalStatus, 
optionalDiagnostics);
+       }
+
+       @Override
+       public CompletableFuture<ResourceID> 
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+               return requestResourceFunction.apply(taskExecutorProcessSpec);
+       }
+
+       @Override
+       public void releaseResource(ResourceID worker) {
+               releaseResourceConsumer.accept(worker);
+       }
+
+       public static class Builder {
+               private ConsumerWithException<ResourceEventHandler<ResourceID>, 
Throwable> initializeConsumer =
+                               (ignore) -> {};
+
+               private Supplier<CompletableFuture<Void>> terminateSupplier =
+                               () -> CompletableFuture.completedFuture(null);
+
+               private BiConsumerWithException<ApplicationStatus, String, 
Throwable> deregisterApplicationConsumer =
+                               (ignore1, ignore20) -> {};

Review comment:
       ```suggestion
                                (ignore1, ignore2) -> {};
   ```

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest extends TestLogger {
+
+       private static final long TIMEOUT_SEC = 5L;
+       private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
+
+       private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = 
WorkerResourceSpec.ZERO;
+
+       /**
+        * Tests worker successfully requested, started and registered.
+        */
+       @Test
+       public void testStartNewWorker() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final CompletableFuture<TaskExecutorProcessSpec> 
requestWorkerFromDriverFuture = new CompletableFuture<>();
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               
requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceId);
+                       });
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec taskExecutorProcessSpec 
= requestWorkerFromDriverFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // worker registered, verify registration 
succeed

Review comment:
       ```suggestion
                                // worker registered, verify registration 
succeeded
   ```

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManager.java
##########
@@ -0,0 +1,287 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceIDRetrievable;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.heartbeat.HeartbeatServices;
+import org.apache.flink.runtime.highavailability.HighAvailabilityServices;
+import 
org.apache.flink.runtime.io.network.partition.ResourceManagerPartitionTrackerFactory;
+import org.apache.flink.runtime.metrics.groups.ResourceManagerMetricGroup;
+import org.apache.flink.runtime.resourcemanager.JobLeaderIdService;
+import org.apache.flink.runtime.resourcemanager.ResourceManager;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import org.apache.flink.runtime.rpc.FatalErrorHandler;
+import org.apache.flink.runtime.rpc.RpcService;
+import org.apache.flink.util.Preconditions;
+
+import javax.annotation.Nullable;
+
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+
+/**
+ * An active implementation of {@link ResourceManager}.
+ *
+ * <p>This resource manager actively requests and releases resources from/to 
the external resource management frameworks.
+ * With different {@link ResourceManagerDriver} provided, this resource 
manager can work with various frameworks.
+ */
+public class ActiveResourceManager<WorkerType extends ResourceIDRetrievable>
+               extends ResourceManager<WorkerType> implements 
ResourceEventHandler<WorkerType> {
+
+       protected final Configuration flinkConfig;
+
+       private final ResourceManagerDriver<WorkerType> resourceManagerDriver;
+
+       /** All workers maintained by {@link ActiveResourceManager}. */
+       private final Map<ResourceID, WorkerType> workerNodeMap;
+
+       /** Number of requested and not registered workers per worker resource 
spec. */
+       private final PendingWorkerCounter pendingWorkerCounter;
+
+       /** Identifiers and worker resource spec of requested not registered 
workers. */
+       private final Map<ResourceID, WorkerResourceSpec> 
currentAttemptUnregisteredWorkers;
+
+       public ActiveResourceManager(
+                       ResourceManagerDriver<WorkerType> resourceManagerDriver,
+                       Configuration flinkConfig,
+                       RpcService rpcService,
+                       ResourceID resourceId,
+                       HighAvailabilityServices highAvailabilityServices,
+                       HeartbeatServices heartbeatServices,
+                       SlotManager slotManager,
+                       ResourceManagerPartitionTrackerFactory 
clusterPartitionTrackerFactory,
+                       JobLeaderIdService jobLeaderIdService,
+                       ClusterInformation clusterInformation,
+                       FatalErrorHandler fatalErrorHandler,
+                       ResourceManagerMetricGroup resourceManagerMetricGroup) {
+               super(
+                               rpcService,
+                               resourceId,
+                               highAvailabilityServices,
+                               heartbeatServices,
+                               slotManager,
+                               clusterPartitionTrackerFactory,
+                               jobLeaderIdService,
+                               clusterInformation,
+                               fatalErrorHandler,
+                               resourceManagerMetricGroup,
+                               
AkkaUtils.getTimeoutAsTime(Preconditions.checkNotNull(flinkConfig)));
+
+               this.flinkConfig = flinkConfig;
+               this.resourceManagerDriver = 
Preconditions.checkNotNull(resourceManagerDriver);

Review comment:
       null checks seem a bit inconsistent here.

##########
File path: 
flink-runtime/src/test/java/org/apache/flink/runtime/resourcemanager/active/ActiveResourceManagerTest.java
##########
@@ -0,0 +1,428 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.resourcemanager.active;
+
+import org.apache.flink.api.common.time.Time;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessUtils;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.clusterframework.types.ResourceProfile;
+import org.apache.flink.runtime.entrypoint.ClusterInformation;
+import org.apache.flink.runtime.instance.HardwareDescription;
+import 
org.apache.flink.runtime.io.network.partition.NoOpResourceManagerPartitionTracker;
+import org.apache.flink.runtime.metrics.groups.UnregisteredMetricGroups;
+import org.apache.flink.runtime.registration.RegistrationResponse;
+import org.apache.flink.runtime.resourcemanager.TaskExecutorRegistration;
+import org.apache.flink.runtime.resourcemanager.WorkerResourceSpec;
+import org.apache.flink.runtime.resourcemanager.slotmanager.SlotManager;
+import 
org.apache.flink.runtime.resourcemanager.slotmanager.TestingSlotManagerBuilder;
+import 
org.apache.flink.runtime.resourcemanager.utils.MockResourceManagerRuntimeServices;
+import org.apache.flink.runtime.rpc.TestingRpcService;
+import org.apache.flink.runtime.taskexecutor.TaskExecutorGateway;
+import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder;
+import org.apache.flink.runtime.util.TestingFatalErrorHandler;
+import org.apache.flink.util.TestLogger;
+import org.apache.flink.util.function.RunnableWithException;
+
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.concurrent.Callable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.hamcrest.Matchers.instanceOf;
+import static org.hamcrest.Matchers.is;
+import static org.hamcrest.Matchers.lessThan;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link ActiveResourceManager}.
+ */
+public class ActiveResourceManagerTest extends TestLogger {
+
+       private static final long TIMEOUT_SEC = 5L;
+       private static final Time TIMEOUT_TIME = Time.seconds(TIMEOUT_SEC);
+
+       private static final WorkerResourceSpec WORKER_RESOURCE_SPEC = 
WorkerResourceSpec.ZERO;
+
+       /**
+        * Tests worker successfully requested, started and registered.
+        */
+       @Test
+       public void testStartNewWorker() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final CompletableFuture<TaskExecutorProcessSpec> 
requestWorkerFromDriverFuture = new CompletableFuture<>();
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               
requestWorkerFromDriverFuture.complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceId);
+                       });
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec taskExecutorProcessSpec 
= requestWorkerFromDriverFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // worker registered, verify registration 
succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker failed while requesting.
+        */
+       @Test
+       public void testStartNewWorkerFailedRequesting() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<CompletableFuture<ResourceID>> 
resourceIdFutures = new ArrayList<>();
+                       resourceIdFutures.add(new CompletableFuture<>());
+                       resourceIdFutures.add(new CompletableFuture<>());
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return resourceIdFutures.get(idx);
+                       });
+
+                       slotManagerBuilder.setGetRequiredResourcesSupplier(() 
-> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec1 = 
requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec1,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // first request failed, verify requesting 
another worker from driver
+                               runInMainThread(() -> 
resourceIdFutures.get(0).completeExceptionally(new Throwable("testing error")));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec2 =
+                                               
requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+
+                               // second request allocated, verify 
registration succeed
+                               runInMainThread(() -> 
resourceIdFutures.get(1).complete(tmResourceId));
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker terminated after requested before registered.
+        */
+       @Test
+       public void testWorkerTerminatedBeforeRegister() throws Exception {
+               new Context() {{
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<ResourceID> tmResourceIds = new 
ArrayList<>();
+                       tmResourceIds.add(ResourceID.generate());
+                       tmResourceIds.add(ResourceID.generate());
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceIds.get(idx));
+                       });
+
+                       slotManagerBuilder.setGetRequiredResourcesSupplier(() 
-> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec1 = 
requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec1,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // first worker failed before register, verify 
requesting another worker from driver
+                               runInMainThread(() -> 
getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec2 =
+                                               
requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+
+                               // second worker registered, verify 
registration succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceIds.get(1));
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker terminated after registered.
+        */
+       @Test
+       public void testWorkerTerminatedAfterRegister() throws Exception {
+               new Context() {{
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<ResourceID> tmResourceIds = new 
ArrayList<>();
+                       tmResourceIds.add(ResourceID.generate());
+                       tmResourceIds.add(ResourceID.generate());
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceIds.get(idx));
+                       });
+
+                       slotManagerBuilder.setGetRequiredResourcesSupplier(() 
-> Collections.singletonMap(WORKER_RESOURCE_SPEC, 1));
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec1 = 
requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec1,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // first worker registered, verify registration 
succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture1 = registerTaskExecutor(tmResourceIds.get(0));
+                               
assertThat(registerTaskExecutorFuture1.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+
+                               // first worker terminated, verify requesting 
another worker from driver
+                               runInMainThread(() -> 
getResourceManager().onWorkerTerminated(tmResourceIds.get(0)));
+                               TaskExecutorProcessSpec 
taskExecutorProcessSpec2 =
+                                               
requestWorkerFromDriverFutures.get(1).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               assertThat(taskExecutorProcessSpec2, 
is(taskExecutorProcessSpec1));
+
+                               // second worker registered, verify 
registration succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture2 = registerTaskExecutor(tmResourceIds.get(1));
+                               
assertThat(registerTaskExecutorFuture2.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests worker terminated and is no longer required.
+        */
+       @Test
+       public void testWorkerTerminatedNoLongerRequired() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+                       final AtomicInteger requestCount = new AtomicInteger(0);
+
+                       final List<CompletableFuture<TaskExecutorProcessSpec>> 
requestWorkerFromDriverFutures = new ArrayList<>();
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+                       requestWorkerFromDriverFutures.add(new 
CompletableFuture<>());
+
+                       
driverBuilder.setRequestResourceFunction(taskExecutorProcessSpec -> {
+                               int idx = requestCount.getAndIncrement();
+                               assertThat(idx, lessThan(2));
+
+                               
requestWorkerFromDriverFutures.get(idx).complete(taskExecutorProcessSpec);
+                               return 
CompletableFuture.completedFuture(tmResourceId);
+                       });
+
+                       runTest(() -> {
+                               // received worker request, verify requesting 
from driver
+                               CompletableFuture<Boolean> startNewWorkerFuture 
= runInMainThread(() ->
+                                               
getResourceManager().startNewWorker(WORKER_RESOURCE_SPEC));
+                               TaskExecutorProcessSpec taskExecutorProcessSpec 
= requestWorkerFromDriverFutures.get(0).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
+                               
assertThat(startNewWorkerFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), is(true));
+                               assertThat(taskExecutorProcessSpec,
+                                               
is(TaskExecutorProcessUtils.processSpecFromWorkerResourceSpec(flinkConfig, 
WORKER_RESOURCE_SPEC)));
+
+                               // worker registered, verify registration 
succeed
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+
+                               // worker terminated, verify not requesting new 
worker
+                               runInMainThread(() -> {
+                                       
getResourceManager().onWorkerTerminated(tmResourceId);
+                                       // needs to return something, so that 
we can use `get()` to make sure the main thread processing
+                                       // finishes before the assertions
+                                       return null;
+                               }).get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               
assertFalse(requestWorkerFromDriverFutures.get(1).isDone());
+                       });
+               }};
+       }
+
+       /**
+        * Tests workers from previous attempt successfully recovered and 
registered.
+        */
+       @Test
+       public void testRecoverWorkerFromPreviousAttempt() throws Exception {
+               new Context() {{
+                       final ResourceID tmResourceId = ResourceID.generate();
+
+                       runTest(() -> {
+                               runInMainThread(() -> 
getResourceManager().onPreviousAttemptWorkersRecovered(Collections.singleton(tmResourceId)));
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(tmResourceId);
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Success.class));
+                       });
+               }};
+       }
+
+       /**
+        * Tests decline unknown worker registration.
+        */
+       @Test
+       public void testRegisterUnknownWorker() throws Exception {
+               new Context() {{
+                       runTest(() -> {
+                               CompletableFuture<RegistrationResponse> 
registerTaskExecutorFuture = registerTaskExecutor(ResourceID.generate());
+                               
assertThat(registerTaskExecutorFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS), 
instanceOf(RegistrationResponse.Decline.class));
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnError() throws Exception {
+               new Context() {{
+                       final Throwable fatalError = new Throwable("Testing 
fatal error");
+                       runTest(() -> {
+                               runInMainThread(() -> 
getResourceManager().onError(fatalError));
+                               final Throwable reportedError = 
getFatalErrorHandler().getErrorFuture().get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               assertThat(reportedError, is(fatalError));
+                       });
+               }};
+       }
+
+       class Context {
+
+               final Configuration flinkConfig = new Configuration();
+               final TestingResourceManagerDriver.Builder driverBuilder = new 
TestingResourceManagerDriver.Builder();
+               final TestingSlotManagerBuilder slotManagerBuilder = new 
TestingSlotManagerBuilder();
+
+               private ActiveResourceManager<ResourceID> resourceManager;
+               private TestingFatalErrorHandler fatalErrorHandler;
+
+               ActiveResourceManager<ResourceID> getResourceManager() {
+                       return resourceManager;
+               }
+
+               TestingFatalErrorHandler getFatalErrorHandler() {
+                       return fatalErrorHandler;
+               }
+
+               void runTest(RunnableWithException testMethod) throws Exception 
{
+                       fatalErrorHandler = new TestingFatalErrorHandler();
+                       resourceManager = createAndStartResourceManager(
+                                       flinkConfig,
+                                       driverBuilder.build(),
+                                       slotManagerBuilder.createSlotManager());
+
+                       try {
+                               testMethod.run();
+                       } finally {
+                               resourceManager.close();
+                       }
+               }
+
+               private ActiveResourceManager<ResourceID> 
createAndStartResourceManager(
+                               Configuration configuration,
+                               ResourceManagerDriver<ResourceID> driver,
+                               SlotManager slotManager) throws Exception {
+                       final TestingRpcService rpcService = new 
TestingRpcService(configuration);
+                       final MockResourceManagerRuntimeServices rmServices = 
new MockResourceManagerRuntimeServices(rpcService, TIMEOUT_TIME, slotManager);
+
+                       final ActiveResourceManager<ResourceID> 
activeResourceManager = new ActiveResourceManager<>(
+                                       driver,
+                                       configuration,
+                                       rpcService,
+                                       ResourceID.generate(),
+                                       rmServices.highAvailabilityServices,
+                                       rmServices.heartbeatServices,
+                                       rmServices.slotManager,
+                                       
NoOpResourceManagerPartitionTracker::get,
+                                       rmServices.jobLeaderIdService,
+                                       new ClusterInformation("localhost", 
1234),
+                                       fatalErrorHandler,
+                                       
UnregisteredMetricGroups.createUnregisteredResourceManagerMetricGroup());
+
+                       activeResourceManager.start();
+                       rmServices.grantLeadership();
+
+                       return activeResourceManager;
+               }
+
+               public void runInMainThread(Runnable runnable) {
+                       resourceManager.handleInMainThread(runnable);
+               }
+
+               public <T> CompletableFuture<T> runInMainThread(Callable<T> 
callable) {
+                       return resourceManager.runInMainThread(callable, 
TIMEOUT_TIME);
+               }
+
+               CompletableFuture<RegistrationResponse> 
registerTaskExecutor(ResourceID resourceID) throws Exception {
+                       final TaskExecutorGateway taskExecutorGateway = new 
TestingTaskExecutorGatewayBuilder()
+                                       .createTestingTaskExecutorGateway();
+                       ((TestingRpcService) 
resourceManager.getRpcService()).registerGateway(resourceID.toString(), 
taskExecutorGateway);
+
+                       final TaskExecutorRegistration taskExecutorRegistration 
= new TaskExecutorRegistration(
+                                       resourceID.toString(),
+                                       resourceID,
+                                       1234,
+                                       new HardwareDescription(1, 2L, 3L, 4L),
+                                       ResourceProfile.ZERO,
+                                       ResourceProfile.ZERO);
+
+                       return runInMainThread(() -> 
resourceManager.registerTaskExecutor(taskExecutorRegistration, TIMEOUT_TIME))
+                                       .get(TIMEOUT_SEC, TimeUnit.SECONDS);

Review comment:
       We could avoid running something in the main thread by using the 
`ResourceManagerGateway.registerTaskExecutor` here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to