xintongsong commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r484622514



##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourceAdapter.java
##########
@@ -86,21 +81,21 @@
                }
        }
 
-       Set<WorkerResourceSpec> getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+       Set<TaskExecutorProcessSpec> getTaskExecutorProcessSpec(final Resource 
containerResource, final 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy) {

Review comment:
       ```suggestion
        Set<TaskExecutorProcessSpec> getTaskExecutorProcessSpec(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourceAdapter.java
##########
@@ -86,21 +81,21 @@
                }
        }
 
-       Set<WorkerResourceSpec> getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+       Set<TaskExecutorProcessSpec> getTaskExecutorProcessSpec(final Resource 
containerResource, final 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy) {
                final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
                return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
-                       .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptySet()).stream())
+                       .flatMap(resource -> 
containerResourceToTaskExecutorProcessSpec.getOrDefault(resource, 
Collections.emptySet()).stream())
                        .collect(Collectors.toSet());
        }
 
-       Set<Resource> getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+       Set<Resource> getEquivalentContainerResource(final Resource 
containerResource, final 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy) {

Review comment:
       ```suggestion
        Set<Resource> getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+       private final Map<String, String> yarnConfig;
+       private final String webInterfaceUrl;
+       private final String rpcAddress;
+
+       public YarnResourceManagerConfiguration(
+               Map<String, String> env,
+               @Nullable String webInterfaceUrl,
+               String rpcAddress) {
+               this.yarnConfig = 
getYarnConfFromEnv(Preconditions.checkNotNull(env));

Review comment:
       Instead of passing in the `env` and extract needed configurations in the 
constructor, I would suggest to do make the constructor private and take the 
extracted configs directly, and introduce a public static factory method to do 
the extractions.
   
   ```
   private YarnResourceManagerConfiguration(conf1, conf2, conf3, ..., 
webInterfaceUrl, rpcAddress) {
        ...
   }
   
   public static YarnResourceManagerConfiguration fromEnv(env, webInterfaceUrl, 
rpcAddress) {
        ...
        foobar = getYarnConfigFromEnv(env);
        ...
        return new YarnResourceConfiguration(...);
   }
   
   ```
   
   The idea is to keep as less as possible stuff in the constructor.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+       private final Map<String, String> yarnConfig;
+       private final String webInterfaceUrl;
+       private final String rpcAddress;
+
+       public YarnResourceManagerConfiguration(
+               Map<String, String> env,
+               @Nullable String webInterfaceUrl,
+               String rpcAddress) {
+               this.yarnConfig = 
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+               this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+               this.webInterfaceUrl = webInterfaceUrl;
+       }
+
+       public String getRpcAddress() {
+               return rpcAddress;
+       }
+
+       public String getWebInterfaceUrl() {
+               return webInterfaceUrl;
+       }
+
+       private static Map<String, String> getYarnConfFromEnv(Map<String, 
String> env) {
+               Map<String, String> configs = new HashMap<>();
+               configs.put(YarnConfigKeys.FLINK_YARN_FILES, 
env.get(YarnConfigKeys.FLINK_YARN_FILES));
+               configs.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, 
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+               configs.put(YarnConfigKeys.ENV_APP_ID, 
env.get(YarnConfigKeys.ENV_APP_ID));
+               configs.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR));
+               configs.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, 
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES));
+               configs.put(YarnConfigKeys.FLINK_DIST_JAR, 
env.get(YarnConfigKeys.FLINK_DIST_JAR));
+               configs.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, 
env.get(YarnConfigKeys.REMOTE_KEYTAB_PATH));
+               configs.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, 
env.get(YarnConfigKeys.LOCAL_KEYTAB_PATH));
+               configs.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL));
+               configs.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, 
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME));
+               configs.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, 
env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE));

Review comment:
       This one is used in the cluster entry point, but not in the resource 
manager. It can also be excluded.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import 
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import 
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class YarnResourceManagerDriver extends 
AbstractResourceManagerDriver<YarnWorkerNode> {
+
+       private static final Priority RM_REQUEST_PRIORITY = 
Priority.newInstance(1);
+
+       /** Environment variable name of the hostname given by the YARN.
+        * In task executor we use the hostnames given by YARN consistently 
throughout akka */
+       private static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+
+       static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received 
shutdown request from YARN ResourceManager.";
+
+       private final YarnConfiguration yarnConfig;
+
+       /** The process environment variables. */
+       private final YarnResourceManagerConfiguration configuration;
+
+       /** Default heartbeat interval between this resource manager and the 
YARN ResourceManager. */
+       private final int yarnHeartbeatIntervalMillis;
+
+       /** Client to communicate with the Resource Manager (YARN's master). */
+       private AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient;
+
+       /** The heartbeat interval while the resource master is waiting for 
containers. */
+       private final int containerRequestHeartbeatIntervalMillis;
+
+       /** Client to communicate with the Node manager and launch TaskExecutor 
processes. */
+       private NMClientAsync nodeManagerClient;
+
+       /** Request resource futures, keyed by container ids. */
+       private final Map<TaskExecutorProcessSpec, 
Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
+
+       private final TaskExecutorProcessSpecContainerResourceAdapter 
taskExecutorProcessSpecContainerResourceAdapter;
+
+       private final RegisterApplicationMasterResponseReflector 
registerApplicationMasterResponseReflector;
+
+       private 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy;
+
+       private final YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory;
+
+       private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
+
+       public YarnResourceManagerDriver(
+               Configuration flinkConfig,
+               YarnResourceManagerConfiguration configuration,
+               YarnResourceManagerClientFactory 
yarnResourceManagerClientFactory,
+               YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
+               super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+               this.yarnConfig = new YarnConfiguration();
+               this.requestResourceFutures = new HashMap<>();
+               this.configuration = configuration;
+
+               final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
+                       YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
+
+               final long yarnExpiryIntervalMS = yarnConfig.getLong(
+                       YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+                       YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+
+               if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
+                       log.warn("The heartbeat interval of the Flink 
Application master ({}) is greater " +
+                                       "than YARN's expiry interval ({}). The 
application is likely to be killed by YARN.",
+                               yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
+               }
+               yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
+               containerRequestHeartbeatIntervalMillis = 
flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
+
+               this.taskExecutorProcessSpecContainerResourceAdapter = new 
TaskExecutorProcessSpecContainerResourceAdapter(
+                       yarnConfig.getInt(
+                               
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+                               
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
+                       yarnConfig.getInt(
+                               
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+                               
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
+                       yarnConfig.getInt(
+                               
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+                               
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
+                       yarnConfig.getInt(
+                               
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+                               
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
+                       ExternalResourceUtils.getExternalResources(flinkConfig, 
YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
+               this.registerApplicationMasterResponseReflector = new 
RegisterApplicationMasterResponseReflector(log);
+
+               this.matchingStrategy = 
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
+                       
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
+
+               this.yarnResourceManagerClientFactory = 
yarnResourceManagerClientFactory;
+               this.yarnNodeManagerClientFactory = 
yarnNodeManagerClientFactory;
+       }
+
+       // 
------------------------------------------------------------------------
+       //  ResourceManagerDriver
+       // 
------------------------------------------------------------------------
+
+       @Override
+       protected void initializeInternal() throws Exception {
+               final YarnContainerEventHandler yarnContainerEventHandler = new 
YarnContainerEventHandler();
+               try {
+                       resourceManagerClient = 
yarnResourceManagerClientFactory.createAndStartResourceManagerClient(
+                               yarnConfig,
+                               yarnHeartbeatIntervalMillis,
+                               yarnContainerEventHandler);
+
+                       final RegisterApplicationMasterResponse 
registerApplicationMasterResponse = registerApplicationMaster();
+                       
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+                       
updateMatchingStrategy(registerApplicationMasterResponse);
+               } catch (Exception e) {
+                       throw new ResourceManagerException("Could not start 
resource manager client.", e);
+               }
+
+               nodeManagerClient = 
yarnNodeManagerClientFactory.createAndStartNodeManagerClient(yarnConfig, 
yarnContainerEventHandler);
+       }
+
+       @Override
+       public CompletableFuture<Void> terminate() {
+               // shut down all components
+               Exception exception = null;
+
+               if (resourceManagerClient != null) {
+                       try {
+                               resourceManagerClient.stop();
+                       } catch (Exception e) {
+                               exception = e;
+                       }
+               }
+
+               if (nodeManagerClient != null) {
+                       try {
+                               nodeManagerClient.stop();
+                       } catch (Exception e) {
+                               exception = ExceptionUtils.firstOrSuppressed(e, 
exception);
+                       }
+               }
+
+               return exception == null ?
+                       FutureUtils.completedVoidFuture() :
+                       FutureUtils.completedExceptionally(exception);
+       }
+
+       @Override
+       public void deregisterApplication(ApplicationStatus finalStatus, 
@Nullable String optionalDiagnostics) {
+               // first, de-register from YARN
+               final FinalApplicationStatus yarnStatus = 
getYarnStatus(finalStatus);
+               log.info("Unregister application from the YARN Resource Manager 
with final status {}.", yarnStatus);
+
+               final Optional<URL> historyServerURL = 
HistoryServerUtils.getHistoryServerURL(flinkConfig);
+
+               final String appTrackingUrl = 
historyServerURL.map(URL::toString).orElse("");
+
+               try {
+                       
resourceManagerClient.unregisterApplicationMaster(yarnStatus, 
optionalDiagnostics, appTrackingUrl);
+               } catch (Throwable t) {
+                       log.error("Could not unregister the application 
master.", t);
+               }
+
+               Utils.deleteApplicationFiles(configuration.getYarnFiles());
+       }
+
+       @Override
+       public CompletableFuture<YarnWorkerNode> 
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+               final Optional<Resource> containerResourceOptional = 
getContainerResource(taskExecutorProcessSpec);
+               final CompletableFuture<YarnWorkerNode> requestResourceFuture = 
new CompletableFuture<>();
+
+               if (containerResourceOptional.isPresent()) {
+                       
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
+
+                       // make sure we transmit the request fast and receive 
fast news of granted allocations
+                       
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
+
+                       
requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new 
LinkedList<>()).add(requestResourceFuture);
+
+                       log.info("Requesting new TaskExecutor container with 
resource {}.", taskExecutorProcessSpec);
+               } else {
+                       requestResourceFuture.completeExceptionally(
+                               new 
ResourceManagerException(String.format("Could not compute the container 
Resource from the given TaskExecutorProcessSpec %s.", 
taskExecutorProcessSpec)));
+               }
+
+               return requestResourceFuture;
+       }
+
+       @Override
+       public void releaseResource(YarnWorkerNode workerNode) {
+               final Container container = workerNode.getContainer();
+               log.info("Stopping container {}.", 
workerNode.getResourceID().getStringWithMetadata());
+               nodeManagerClient.stopContainerAsync(container.getId(), 
container.getNodeId());
+               
resourceManagerClient.releaseAssignedContainer(container.getId());
+       }
+
+       // 
------------------------------------------------------------------------
+       //  Internal
+       // 
------------------------------------------------------------------------
+
+       private void onContainersOfResourceAllocated(Resource resource, 
List<Container> containers) {
+               final List<TaskExecutorProcessSpec> 
pendingTaskExecutorProcessSpecs =
+                       
taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource,
 matchingStrategy).stream()
+                               .flatMap(spec -> 
Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
+                               .collect(Collectors.toList());
+
+               int numPending = pendingTaskExecutorProcessSpecs.size();
+               log.info("Received {} containers with resource {}, {} pending 
container requests.",
+                       containers.size(),
+                       resource,
+                       numPending);
+
+               final Iterator<Container> containerIterator = 
containers.iterator();
+               final Iterator<TaskExecutorProcessSpec> 
pendingTaskExecutorProcessSpecIterator = 
pendingTaskExecutorProcessSpecs.iterator();
+               final Iterator<AMRMClient.ContainerRequest> 
pendingRequestsIterator =
+                       getPendingRequestsAndCheckConsistency(resource, 
pendingTaskExecutorProcessSpecs.size()).iterator();
+
+               int numAccepted = 0;
+               while (containerIterator.hasNext() && 
pendingTaskExecutorProcessSpecIterator.hasNext()) {
+                       final TaskExecutorProcessSpec taskExecutorProcessSpec = 
pendingTaskExecutorProcessSpecIterator.next();
+                       final Container container = containerIterator.next();
+                       final AMRMClient.ContainerRequest pendingRequest = 
pendingRequestsIterator.next();
+                       final ResourceID resourceId = 
getContainerResourceId(container);
+                       final CompletableFuture<YarnWorkerNode> 
requestResourceFuture =
+                               
Preconditions.checkNotNull(requestResourceFutures.get(taskExecutorProcessSpec).poll(),
 "The requestResourceFuture queue for TasExecutorProcessSpec %s should not be 
empty.", taskExecutorProcessSpec);

Review comment:
       This only checks 
`requestResourceFutures.get(taskExecutorProcessSpec).poll()` is not null, while 
`requestResourceFutures.get(taskExecutorProcessSpec)` could also be null. We 
may call `poll()` on a null value.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_DIST_JAR;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static 
org.apache.flink.yarn.YarnResourceManagerDriver.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link YarnResourceManagerDriver}.
+ */
+public class YarnResourceManagerDriverTest extends 
ResourceManagerDriverTestBase<YarnWorkerNode> {
+       private static final Resource testingResource = 
Resource.newInstance(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
 YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+       private static final Container testingContainer = 
createTestingContainerWithResource(testingResource, 0);
+       private static final TaskExecutorProcessSpec 
testingTaskExecutorProcessSpec =
+               new TaskExecutorProcessSpec(
+                       new CPUResource(1),
+                       MemorySize.ZERO,
+                       MemorySize.ZERO,
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ZERO,
+                       MemorySize.ZERO);
+
+       @Rule
+       public TemporaryFolder folder = new TemporaryFolder();
+
+       @Override
+       protected Context createContext() {
+               return new Context();
+       }
+
+       @Test
+       public void testShutdownRequestCausesFatalError() throws Exception {
+               new Context() {{
+                       final CompletableFuture<Throwable> 
throwableCompletableFuture = new CompletableFuture<>();
+                       
resourceEventHandlerBuilder.setOnErrorConsumer(throwableCompletableFuture::complete);
+                       runTest(() -> {
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onShutdownRequest();
+
+                               Throwable throwable = 
throwableCompletableFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               
assertThat(ExceptionUtils.findThrowable(throwable, 
ResourceManagerException.class).isPresent(), is(true));
+                               
assertThat(ExceptionUtils.findThrowableWithMessage(throwable, 
ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+                       });
+               }};
+       }
+
+       /**
+        * Tests that application files are deleted when the YARN application 
master is de-registered.
+        */
+       @Test
+       public void testDeleteApplicationFiles() throws Exception {
+               new Context() {{
+                       final File applicationDir = folder.newFolder(".flink");
+                       env.put(FLINK_YARN_FILES, 
applicationDir.getCanonicalPath());
+
+                       runTest(() -> {
+                               
getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+                               assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnContainerCompleted() throws Exception {
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+                       runTest(() -> {
+                               runInMainThread(() -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(removeContainerRequestFuture);
+                               
verifyFutureCompleted(startContainerAsyncFuture);
+
+                               ContainerStatus testingContainerStatus = 
createTestingContainerCompletedStatus(testingContainer.getId());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(ImmutableList.of(testingContainerStatus));
+
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnStartContainerError() throws Exception {
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+                       runTest(() -> {
+                               runInMainThread(() -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(removeContainerRequestFuture);
+                               
verifyFutureCompleted(startContainerAsyncFuture);
+
+                               
testingYarnNodeManagerClientFactory.getCallbackHandler().onStartContainerError(testingContainer.getId(),
 new Exception("start error"));
+                               
verifyFutureCompleted(releaseAssignedContainerFuture);
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+                       });
+               }};
+       }
+
+       @Test
+       public void 
testStartTaskExecutorProcessVariousSpec_SameContainerResource() throws 
Exception{
+               final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(1),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+               final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(1),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(99),
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       final String startCommand1 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+                       final String startCommand2 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20);
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+                       
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored ->
+                               Collections.singletonList(ImmutableList.of(
+                                       
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec1).get()),
+                                       
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec2).get()))));
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+                       
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1, 
context, ignored2) -> {
+                               if (containsStartCommand(context, 
startCommand1)) {
+                                       
startContainerAsyncCommandFuture1.complete(null);
+                               } else if (containsStartCommand(context, 
startCommand2)) {
+                                       
startContainerAsyncCommandFuture2.complete(null);
+                               }
+                       });
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+
+                       runTest(() -> {
+                               final Resource containerResource = 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+                               // Make sure two worker resource spec will be 
normalized to the same container resource
+                               assertEquals(containerResource, 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec2).get());
+
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec2));
+
+                               // Verify both containers requested
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+                               // Mock that both containers are allocated
+                               Container container1 = 
createTestingContainerWithResource(containerResource);
+                               Container container2 = 
createTestingContainerWithResource(containerResource);
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(container1,
 container2));
+
+                               // Verify workers with both spec are started.
+                               
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+                               
verifyFutureCompleted(startContainerAsyncCommandFuture2);
+
+                               // Mock that one container is completed, while 
the worker is still pending
+                               ContainerStatus testingContainerStatus = 
createTestingContainerCompletedStatus(container1.getId());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+                               // Verify that only one more container is 
requested.
+                               
verifyFutureCompleted(addContainerRequestFutures.get(2));
+                               
assertFalse(addContainerRequestFutures.get(3).isDone());
+                       });
+               }};
+       }
+
+       @Test
+       public void testStartWorkerVariousSpec_DifferentContainerResource() 
throws Exception{
+               final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(1),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+               final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(2),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       final String startCommand1 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (50L << 20);
+                       final String startCommand2 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+                       
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(tuple -> {
+                               if (tuple.f2.getVirtualCores() == 1) {
+                                       return Collections.singletonList(
+                                               
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
 getDriver()).getContainerResource(taskExecutorProcessSpec1).get())));
+                               } else if (tuple.f2.getVirtualCores() == 2) {
+                                       return Collections.singletonList(
+                                               
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
 getDriver()).getContainerResource(taskExecutorProcessSpec2).get())));
+                               }
+                               return null;
+                       });
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((request, 
ignored) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(request.getCapability()));
+                       
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1, 
context, ignored3) -> {
+                               if (containsStartCommand(context, 
startCommand1)) {
+                                       
startContainerAsyncCommandFuture1.complete(null);
+                               } else if (containsStartCommand(context, 
startCommand2)) {
+                                       
startContainerAsyncCommandFuture2.complete(null);
+                               }
+                       });
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+
+                       runTest(() -> {
+                               final Resource containerResource1 = 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+                               final Resource containerResource2 = 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec2).get();
+                               // Make sure two worker resource spec will be 
normalized to different container resources
+                               assertNotEquals(containerResource1, 
containerResource2);
+
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec2));
+
+                               // Verify both containers requested
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+                               // Mock that container 1 is allocated
+                               Container container1 = 
createTestingContainerWithResource(containerResource1);
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(Collections.singletonList(container1));
+
+                               // Verify that only worker with spec1 is 
started.
+                               
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+                               
assertFalse(startContainerAsyncCommandFuture2.isDone());
+
+                               // Mock that container 1 is completed, while 
the worker is still pending
+                               ContainerStatus testingContainerStatus = 
createTestingContainerCompletedStatus(container1.getId());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+                               // Verify that only container 1 is requested 
again
+                               
verifyFutureCompleted(addContainerRequestFutures.get(2));
+                               
assertThat(addContainerRequestFutures.get(2).get(), is(containerResource1));
+                               
assertFalse(addContainerRequestFutures.get(3).isDone());
+                       });
+               }};
+       }
+
+       private boolean containsStartCommand(ContainerLaunchContext 
containerLaunchContext, String command) {
+               return 
containerLaunchContext.getCommands().stream().anyMatch(str -> 
str.contains(command));
+       }
+
+       private static Container createTestingContainerWithResource(Resource 
resource, int containerIdx) {
+               final ContainerId containerId = ContainerId.newInstance(
+                       ApplicationAttemptId.newInstance(
+                               
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+                               1),
+                       containerIdx);
+               final NodeId nodeId = NodeId.newInstance("container", 1234);
+               return new TestingContainer(containerId, nodeId, resource, 
Priority.UNDEFINED);
+       }
+
+       private class Context extends 
ResourceManagerDriverTestBase<YarnWorkerNode>.Context {
+               private final CompletableFuture<Void> 
stopAndCleanupClusterFuture =  new CompletableFuture<>();
+               private final CompletableFuture<Resource> 
createTaskManagerContainerFuture = new CompletableFuture<>();
+               private final CompletableFuture<Void> stopContainerAsyncFuture 
= new CompletableFuture<>();
+               final List<CompletableFuture<Resource>> 
addContainerRequestFutures = new ArrayList<>();
+               final AtomicInteger addContainerRequestFuturesNumCompleted = 
new AtomicInteger(0);
+               final CompletableFuture<Void> removeContainerRequestFuture = 
new CompletableFuture<>();
+               final CompletableFuture<Void> releaseAssignedContainerFuture = 
new CompletableFuture<>();
+               final CompletableFuture<Void> startContainerAsyncFuture = new 
CompletableFuture<>();
+               final TestingYarnResourceManagerClientFactory 
testingYarnResourceManagerClientFactory = new 
TestingYarnResourceManagerClientFactory();
+               final TestingYarnNodeManagerClientFactory 
testingYarnNodeManagerClientFactory = new TestingYarnNodeManagerClientFactory();
+
+               final TestingYarnNMClientAsync.Builder 
testingYarnNMClientAsyncBuilder = 
testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsyncBuilder()
+                       .setStartContainerAsyncConsumer((ignored1, ignored2, 
ignored3) -> startContainerAsyncFuture.complete(null))
+                       .setStopContainerAsyncConsumer((ignored1, ignored2, 
ignored3) -> stopContainerAsyncFuture.complete(null));
+               final TestingYarnAMRMClientAsync.Builder 
testingYarnAMRMClientAsyncBuilder = 
testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsyncBuilder()
+                       .setAddContainerRequestConsumer((request, handler) -> {
+                               
createTaskManagerContainerFuture.complete(request.getCapability());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler()
+                                       
.onContainersAllocated(Collections.singletonList(testingContainer));
+                       })
+                       .setGetMatchingRequestsFunction(ignored ->
+                               
Collections.singletonList(Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(testingResource))))
+                       .setRemoveContainerRequestConsumer((request, handler) 
-> removeContainerRequestFuture.complete(null))
+                       .setReleaseAssignedContainerConsumer((ignored1, 
ignored2) -> releaseAssignedContainerFuture.complete(null))
+                       .setUnregisterApplicationMasterConsumer((ignore1, 
ignore2, ignore3) -> stopAndCleanupClusterFuture.complete(null));
+
+               final Map<String, String> env = new HashMap<>();
+
+               private int containerIdx = 0;
+
+               @Override
+               protected void prepareRunTest() {
+                       File root = folder.getRoot();
+                       File home = new File(root, "home");
+                       env.put(ENV_APP_ID, "foo");
+                       env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
+                       env.put(ENV_CLIENT_SHIP_FILES, "");
+                       env.put(ENV_FLINK_CLASSPATH, "");
+                       env.put(ENV_HADOOP_USER_NAME, "foo");
+                       env.put(FLINK_DIST_JAR, new YarnLocalResourceDescriptor(
+                               "flink.jar",
+                               new Path("/tmp/flink.jar"),
+                               0,
+                               System.currentTimeMillis(),
+                               LocalResourceVisibility.APPLICATION,
+                               LocalResourceType.FILE).toString());
+                       env.put(ApplicationConstants.Environment.PWD.key(), 
home.getAbsolutePath());
+               }
+
+               @Override
+               protected void preparePreviousAttemptWorkers() {
+                       
testingYarnAMRMClientAsyncBuilder.setRegisterApplicationMasterFunction(
+                               (ignored1, ignored2, ignored3) -> new 
TestingRegisterApplicationMasterResponse(() -> 
Collections.singletonList(testingContainer)));
+               }
+
+               @Override
+               protected void prepareReleaseResource() {

Review comment:
       The preparations here are not related to the validations. It seems this 
interface can be removed.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourceAdapter.java
##########
@@ -86,21 +81,21 @@
                }
        }
 
-       Set<WorkerResourceSpec> getWorkerSpecs(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+       Set<TaskExecutorProcessSpec> getTaskExecutorProcessSpec(final Resource 
containerResource, final 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy) {
                final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
                return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
-                       .flatMap(resource -> 
containerResourceToWorkerSpecs.getOrDefault(resource, 
Collections.emptySet()).stream())
+                       .flatMap(resource -> 
containerResourceToTaskExecutorProcessSpec.getOrDefault(resource, 
Collections.emptySet()).stream())
                        .collect(Collectors.toSet());
        }
 
-       Set<Resource> getEquivalentContainerResource(final Resource 
containerResource, final MatchingStrategy matchingStrategy) {
+       Set<Resource> getEquivalentContainerResource(final Resource 
containerResource, final 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy) {
                final InternalContainerResource internalContainerResource = new 
InternalContainerResource(containerResource);
                return 
getEquivalentInternalContainerResource(internalContainerResource, 
matchingStrategy).stream()
                        .map(InternalContainerResource::toResource)
                        .collect(Collectors.toSet());
        }
 
-       private Set<InternalContainerResource> 
getEquivalentInternalContainerResource(final InternalContainerResource 
internalContainerResource, final MatchingStrategy matchingStrategy) {
+       private Set<InternalContainerResource> 
getEquivalentInternalContainerResource(final InternalContainerResource 
internalContainerResource, final 
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy 
matchingStrategy) {

Review comment:
       ```suggestion
        private Set<InternalContainerResource> 
getEquivalentInternalContainerResource(final InternalContainerResource 
internalContainerResource, final MatchingStrategy matchingStrategy) {
   ```

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+       private final Map<String, String> yarnConfig;
+       private final String webInterfaceUrl;
+       private final String rpcAddress;
+
+       public YarnResourceManagerConfiguration(
+               Map<String, String> env,
+               @Nullable String webInterfaceUrl,
+               String rpcAddress) {
+               this.yarnConfig = 
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+               this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+               this.webInterfaceUrl = webInterfaceUrl;
+       }
+
+       public String getRpcAddress() {
+               return rpcAddress;
+       }
+
+       public String getWebInterfaceUrl() {
+               return webInterfaceUrl;
+       }
+
+       private static Map<String, String> getYarnConfFromEnv(Map<String, 
String> env) {
+               Map<String, String> configs = new HashMap<>();
+               configs.put(YarnConfigKeys.FLINK_YARN_FILES, 
env.get(YarnConfigKeys.FLINK_YARN_FILES));
+               configs.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, 
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+               configs.put(YarnConfigKeys.ENV_APP_ID, 
env.get(YarnConfigKeys.ENV_APP_ID));
+               configs.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR));

Review comment:
       These two are read and checked, but never really used by the resource 
manager. I think we can exclude them.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+       private final Map<String, String> yarnConfig;
+       private final String webInterfaceUrl;
+       private final String rpcAddress;
+
+       public YarnResourceManagerConfiguration(
+               Map<String, String> env,
+               @Nullable String webInterfaceUrl,
+               String rpcAddress) {
+               this.yarnConfig = 
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+               this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+               this.webInterfaceUrl = webInterfaceUrl;
+       }
+
+       public String getRpcAddress() {
+               return rpcAddress;
+       }
+
+       public String getWebInterfaceUrl() {

Review comment:
       This should be annotated `Nullable`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+       private final Map<String, String> yarnConfig;

Review comment:
       Can we replace this map with plaint individual class fields?

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+       private final Map<String, String> yarnConfig;
+       private final String webInterfaceUrl;
+       private final String rpcAddress;
+
+       public YarnResourceManagerConfiguration(
+               Map<String, String> env,
+               @Nullable String webInterfaceUrl,
+               String rpcAddress) {
+               this.yarnConfig = 
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+               this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+               this.webInterfaceUrl = webInterfaceUrl;
+       }
+
+       public String getRpcAddress() {
+               return rpcAddress;
+       }
+
+       public String getWebInterfaceUrl() {
+               return webInterfaceUrl;
+       }
+
+       private static Map<String, String> getYarnConfFromEnv(Map<String, 
String> env) {
+               Map<String, String> configs = new HashMap<>();
+               configs.put(YarnConfigKeys.FLINK_YARN_FILES, 
env.get(YarnConfigKeys.FLINK_YARN_FILES));
+               configs.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, 
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+               configs.put(YarnConfigKeys.ENV_APP_ID, 
env.get(YarnConfigKeys.ENV_APP_ID));
+               configs.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR));
+               configs.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, 
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES));
+               configs.put(YarnConfigKeys.FLINK_DIST_JAR, 
env.get(YarnConfigKeys.FLINK_DIST_JAR));
+               configs.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, 
env.get(YarnConfigKeys.REMOTE_KEYTAB_PATH));
+               configs.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, 
env.get(YarnConfigKeys.LOCAL_KEYTAB_PATH));
+               configs.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL));
+               configs.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, 
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME));
+               configs.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, 
env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE));
+               configs.put(YarnConfigKeys.ENV_KRB5_PATH, 
env.get(YarnConfigKeys.ENV_KRB5_PATH));
+               configs.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH));
+               return configs;
+       }
+
+       public String getYarnFiles() {
+               return yarnConfig.get(YarnConfigKeys.FLINK_YARN_FILES);
+       }
+
+       public String getFlinkClasspath() {
+               return yarnConfig.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
+       }
+
+       public String getAppId() {
+               return yarnConfig.get(YarnConfigKeys.ENV_APP_ID);
+       }
+
+       public String getClientHomeDir() {
+               return yarnConfig.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+       }
+
+       public String getClientShipFiles() {
+               return yarnConfig.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+       }
+
+       public String getFlinkDistJar() {
+               return yarnConfig.get(YarnConfigKeys.FLINK_DIST_JAR);
+       }
+
+       public String getRemoteKeytabPath() {
+               return yarnConfig.get(YarnConfigKeys.REMOTE_KEYTAB_PATH);
+       }
+
+       public String getLocalKeytabPath() {
+               return yarnConfig.get(YarnConfigKeys.LOCAL_KEYTAB_PATH);
+       }
+
+       public String getKeytabPrinciple() {
+               return yarnConfig.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+       }

Review comment:
       These should be `@Nullable`

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -135,7 +135,7 @@ public YarnResourceManager(
                        JobLeaderIdService jobLeaderIdService,
                        ClusterInformation clusterInformation,
                        FatalErrorHandler fatalErrorHandler,
-                       @Nullable String webInterfaceUrl,
+                       YarnResourceManagerConfiguration 
yarnResourceManagerConfiguration,

Review comment:
       There are 3 usages of `env` in `YarnResourceManager`, which could be 
replaced by `YarnResourceManagerConfiguration`. After that, we can get rid of 
`env` from `LegacyActiveResourceManager` and the related constructors / factory 
methods.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingRegisterApplicationMasterResponse.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * A Yarn {@link RegisterApplicationMasterResponse} implementation for testing.
+ */
+public class TestingRegisterApplicationMasterResponse extends 
RegisterApplicationMasterResponsePBImpl {

Review comment:
       I think we should also add `getSchedulerResourceTypes` in this testing 
implementation, without the `@Override` annotation.
   The problem is that, we tried to use this interface in the production codes 
with reflection. If this interface is available in the given hadoop version, 
`RegisterApplicationMasterResponsePBImpl` will be invoked and the behavior is 
unpredictable.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerClientFactory.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Factory for {@link AMRMClientAsync}.
+ */
+public class YarnResourceManagerClientFactory {
+
+       private static final YarnResourceManagerClientFactory INSTANCE = new 
YarnResourceManagerClientFactory();
+
+       @VisibleForTesting
+       YarnResourceManagerClientFactory() {}
+
+       public static YarnResourceManagerClientFactory getInstance() {
+               return INSTANCE;
+       }
+
+       protected AMRMClientAsync<AMRMClient.ContainerRequest> 
createAndStartResourceManagerClient(
+               YarnConfiguration yarnConfiguration,
+               int yarnHeartbeatIntervalMillis,
+               AMRMClientAsync.CallbackHandler callbackHandler) {
+               AMRMClientAsync<AMRMClient.ContainerRequest> 
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
+                       yarnHeartbeatIntervalMillis,
+                       callbackHandler);
+               resourceManagerClient.init(yarnConfiguration);
+               resourceManagerClient.start();

Review comment:
       I think a factory should be responsible for creating the instance, but 
not sure about init and start.
   
   This reduce the flexibility. What if the caller wants to do something 
between creating the client and init/start it?
   
   This is also not good for testability. Now we cannot verify whether the RM 
properly init/start the clients, because it is inited/started by the factory, 
which is replaced by the testing implementation in testing.
   
   Same for the `YarnNodeManagerClientFactory`.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+       private final Map<String, String> yarnConfig;
+       private final String webInterfaceUrl;
+       private final String rpcAddress;
+
+       public YarnResourceManagerConfiguration(
+               Map<String, String> env,
+               @Nullable String webInterfaceUrl,
+               String rpcAddress) {
+               this.yarnConfig = 
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+               this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+               this.webInterfaceUrl = webInterfaceUrl;
+       }
+
+       public String getRpcAddress() {
+               return rpcAddress;
+       }
+
+       public String getWebInterfaceUrl() {
+               return webInterfaceUrl;
+       }
+
+       private static Map<String, String> getYarnConfFromEnv(Map<String, 
String> env) {
+               Map<String, String> configs = new HashMap<>();
+               configs.put(YarnConfigKeys.FLINK_YARN_FILES, 
env.get(YarnConfigKeys.FLINK_YARN_FILES));
+               configs.put(YarnConfigKeys.ENV_FLINK_CLASSPATH, 
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+               configs.put(YarnConfigKeys.ENV_APP_ID, 
env.get(YarnConfigKeys.ENV_APP_ID));
+               configs.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR, 
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR));
+               configs.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES, 
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES));
+               configs.put(YarnConfigKeys.FLINK_DIST_JAR, 
env.get(YarnConfigKeys.FLINK_DIST_JAR));
+               configs.put(YarnConfigKeys.REMOTE_KEYTAB_PATH, 
env.get(YarnConfigKeys.REMOTE_KEYTAB_PATH));
+               configs.put(YarnConfigKeys.LOCAL_KEYTAB_PATH, 
env.get(YarnConfigKeys.LOCAL_KEYTAB_PATH));
+               configs.put(YarnConfigKeys.KEYTAB_PRINCIPAL, 
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL));
+               configs.put(YarnConfigKeys.ENV_HADOOP_USER_NAME, 
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME));
+               configs.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE, 
env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE));
+               configs.put(YarnConfigKeys.ENV_KRB5_PATH, 
env.get(YarnConfigKeys.ENV_KRB5_PATH));
+               configs.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH, 
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH));
+               return configs;
+       }
+
+       public String getYarnFiles() {
+               return yarnConfig.get(YarnConfigKeys.FLINK_YARN_FILES);
+       }
+
+       public String getFlinkClasspath() {
+               return yarnConfig.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
+       }
+
+       public String getAppId() {
+               return yarnConfig.get(YarnConfigKeys.ENV_APP_ID);
+       }
+
+       public String getClientHomeDir() {
+               return yarnConfig.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+       }
+
+       public String getClientShipFiles() {
+               return yarnConfig.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+       }
+
+       public String getFlinkDistJar() {
+               return yarnConfig.get(YarnConfigKeys.FLINK_DIST_JAR);
+       }
+
+       public String getRemoteKeytabPath() {
+               return yarnConfig.get(YarnConfigKeys.REMOTE_KEYTAB_PATH);
+       }
+
+       public String getLocalKeytabPath() {
+               return yarnConfig.get(YarnConfigKeys.LOCAL_KEYTAB_PATH);
+       }
+
+       public String getKeytabPrinciple() {
+               return yarnConfig.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+       }
+
+       public String getHadoopUserName() {
+               return yarnConfig.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+       }
+
+       public String getZookeeperNamespace() {
+               return yarnConfig.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
+       }
+
+       public String getKrb5Path() {
+               return yarnConfig.get(YarnConfigKeys.ENV_KRB5_PATH);
+       }
+
+       public String getYarnSiteXMLPath() {
+               return yarnConfig.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+       }

Review comment:
       These should be `@Nullable`

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/TestingRegisterApplicationMasterResponse.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import 
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import 
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * A Yarn {@link RegisterApplicationMasterResponse} implementation for testing.
+ */
+public class TestingRegisterApplicationMasterResponse extends 
RegisterApplicationMasterResponsePBImpl {
+       private final Supplier<List<Container>> 
getContainersFromPreviousAttemptsSupplier;
+
+       TestingRegisterApplicationMasterResponse(Supplier<List<Container>> 
getContainersFromPreviousAttemptsSupplier) {
+               this.getContainersFromPreviousAttemptsSupplier = 
getContainersFromPreviousAttemptsSupplier;
+       }
+
+       @Override

Review comment:
       Let's remove this annotation for now, to align with the production 
codes' assumption that this interface may not be available for the given hadoop 
version.
   
   I ideally, I think we can already get rid of the reflection on this 
interface, since all the supported hadoop versions should have this interface 
now. But this could be a follow-up issue. No need to further increase the scope 
of this PR.

##########
File path: 
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerClientFactory.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Factory for {@link AMRMClientAsync}.
+ */
+public class YarnResourceManagerClientFactory {

Review comment:
       I think we can make `YarnResourceManagerClientFactory` an interface, and 
provide two independent implementations of the interface, for production and 
testing respectively.
   I don't see a good reason to make the testing implementation extends from 
the production implementation. There's nothing reused between them.
   
   Same for the `YarnNodeManagerClientFactory`.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.ExceptionUtils;
+import 
org.apache.flink.yarn.configuration.YarnResourceManagerDriverConfiguration;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_DIST_JAR;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static 
org.apache.flink.yarn.YarnResourceManagerDriver.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link YarnResourceManagerDriver}.
+ */
+public class YarnResourceManagerDriverTest extends 
ResourceManagerDriverTestBase<YarnWorkerNode> {
+       private static final Resource testingResource = 
Resource.newInstance(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
 YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+       private static final Container testingContainer = 
createTestingContainerWithResource(testingResource, 0);
+       private static final TaskExecutorProcessSpec 
testingTaskExecutorProcessSpec =
+               new TaskExecutorProcessSpec(
+                       new CPUResource(1),
+                       MemorySize.ZERO,
+                       MemorySize.ZERO,
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ZERO,
+                       MemorySize.ZERO);
+
+       @Rule
+       public TemporaryFolder folder = new TemporaryFolder();
+
+       @Override
+       protected Context createContext() {
+               return new Context();
+       }
+
+       @Test
+       public void testShutdownRequestCausesFatalError() throws Exception {
+               new Context() {{
+                       final CompletableFuture<Throwable> 
throwableCompletableFuture = new CompletableFuture<>();
+                       
resourceEventHandlerBuilder.setOnErrorConsumer(throwableCompletableFuture::complete);
+                       runTest(() -> {
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onShutdownRequest();
+
+                               Throwable throwable = 
throwableCompletableFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               
assertThat(ExceptionUtils.findThrowable(throwable, 
ResourceManagerException.class).isPresent(), is(true));
+                               
assertThat(ExceptionUtils.findThrowableWithMessage(throwable, 
ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+                       });
+               }};
+       }
+
+       /**
+        * Tests that application files are deleted when the YARN application 
master is de-registered.
+        */
+       @Test
+       public void testDeleteApplicationFiles() throws Exception {
+               new Context() {{
+                       final File applicationDir = folder.newFolder(".flink");
+                       env.put(FLINK_YARN_FILES, 
applicationDir.getCanonicalPath());
+
+                       runTest(() -> {
+                               
getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+                               assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnContainerCompleted() throws Exception {
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+                       runTest(() -> {
+                               runInMainThread(() -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(removeContainerRequestFuture);
+                               
verifyFutureCompleted(startContainerAsyncFuture);
+
+                               ContainerStatus testingContainerStatus = 
createTestingContainerCompletedStatus(testingContainer.getId());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(ImmutableList.of(testingContainerStatus));
+
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnStartContainerError() throws Exception {
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+                       runTest(() -> {
+                               runInMainThread(() -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(removeContainerRequestFuture);
+                               
verifyFutureCompleted(startContainerAsyncFuture);
+
+                               
testingYarnNodeManagerClientFactory.getCallbackHandler().onStartContainerError(testingContainer.getId(),
 new Exception("start error"));
+                               
verifyFutureCompleted(releaseAssignedContainerFuture);
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+                       });
+               }};
+       }
+
+       @Test
+       public void 
testStartTaskExecutorProcessVariousSpec_SameContainerResource() throws 
Exception{
+               final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(1),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+               final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(1),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(99),
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       final String startCommand1 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+                       final String startCommand2 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20);
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+                       
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored ->
+                               Collections.singletonList(ImmutableList.of(
+                                       
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec1).get()),
+                                       
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec2).get()))));
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+                       
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1, 
context, ignored2) -> {
+                               if (containsStartCommand(context, 
startCommand1)) {
+                                       
startContainerAsyncCommandFuture1.complete(null);
+                               } else if (containsStartCommand(context, 
startCommand2)) {
+                                       
startContainerAsyncCommandFuture2.complete(null);
+                               }
+                       });
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+
+                       runTest(() -> {
+                               final Resource containerResource = 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+                               // Make sure two worker resource spec will be 
normalized to the same container resource
+                               assertEquals(containerResource, 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec2).get());
+
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec2));
+
+                               // Verify both containers requested
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+                               // Mock that both containers are allocated
+                               Container container1 = 
createTestingContainerWithResource(containerResource);
+                               Container container2 = 
createTestingContainerWithResource(containerResource);
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(container1,
 container2));
+
+                               // Verify workers with both spec are started.
+                               
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+                               
verifyFutureCompleted(startContainerAsyncCommandFuture2);
+
+                               // Mock that one container is completed, while 
the worker is still pending
+                               ContainerStatus testingContainerStatus = 
createTestingContainerCompletedStatus(container1.getId());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+                               // Verify that only one more container is 
requested.
+                               
verifyFutureCompleted(addContainerRequestFutures.get(2));
+                               
assertFalse(addContainerRequestFutures.get(3).isDone());
+                       });
+               }};
+       }
+
+       @Test
+       public void testStartWorkerVariousSpec_DifferentContainerResource() 
throws Exception{
+               final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(1),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+               final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(2),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       final String startCommand1 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (50L << 20);
+                       final String startCommand2 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+                       
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(tuple -> {
+                               if (tuple.f2.getVirtualCores() == 1) {
+                                       return Collections.singletonList(
+                                               
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
 getDriver()).getContainerResource(taskExecutorProcessSpec1).get())));
+                               } else if (tuple.f2.getVirtualCores() == 2) {
+                                       return Collections.singletonList(
+                                               
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
 getDriver()).getContainerResource(taskExecutorProcessSpec2).get())));
+                               }
+                               return null;
+                       });
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((request, 
ignored) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(request.getCapability()));
+                       
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1, 
context, ignored3) -> {
+                               if (containsStartCommand(context, 
startCommand1)) {
+                                       
startContainerAsyncCommandFuture1.complete(null);
+                               } else if (containsStartCommand(context, 
startCommand2)) {
+                                       
startContainerAsyncCommandFuture2.complete(null);
+                               }
+                       });
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+
+                       runTest(() -> {
+                               final Resource containerResource1 = 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+                               final Resource containerResource2 = 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec2).get();
+                               // Make sure two worker resource spec will be 
normalized to different container resources
+                               assertNotEquals(containerResource1, 
containerResource2);
+
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec2));
+
+                               // Verify both containers requested
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+                               // Mock that container 1 is allocated
+                               Container container1 = 
createTestingContainerWithResource(containerResource1);
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(Collections.singletonList(container1));
+
+                               // Verify that only worker with spec1 is 
started.
+                               
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+                               
assertFalse(startContainerAsyncCommandFuture2.isDone());
+
+                               // Mock that container 1 is completed, while 
the worker is still pending
+                               ContainerStatus testingContainerStatus = 
createTestingContainerCompletedStatus(container1.getId());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+                               // Verify that only container 1 is requested 
again
+                               
verifyFutureCompleted(addContainerRequestFutures.get(2));
+                               
assertThat(addContainerRequestFutures.get(2).get(), is(containerResource1));
+                               
assertFalse(addContainerRequestFutures.get(3).isDone());
+                       });
+               }};
+       }
+
+       private boolean containsStartCommand(ContainerLaunchContext 
containerLaunchContext, String command) {
+               return 
containerLaunchContext.getCommands().stream().anyMatch(str -> 
str.contains(command));
+       }
+
+       private static Container createTestingContainerWithResource(Resource 
resource, int containerIdx) {
+               final ContainerId containerId = ContainerId.newInstance(
+                       ApplicationAttemptId.newInstance(
+                               
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+                               1),
+                       containerIdx);
+               final NodeId nodeId = NodeId.newInstance("container", 1234);
+               return new TestingContainer(containerId, nodeId, resource, 
Priority.UNDEFINED);
+       }
+
+       private class Context extends 
ResourceManagerDriverTestBase<YarnWorkerNode>.Context {
+               private final CompletableFuture<Void> 
stopAndCleanupClusterFuture =  new CompletableFuture<>();
+               private final CompletableFuture<Resource> 
createTaskManagerContainerFuture = new CompletableFuture<>();
+               private final CompletableFuture<Void> stopContainerAsyncFuture 
= new CompletableFuture<>();
+               final List<CompletableFuture<Resource>> 
addContainerRequestFutures = new ArrayList<>();
+               final AtomicInteger addContainerRequestFuturesNumCompleted = 
new AtomicInteger(0);
+               final CompletableFuture<Void> removeContainerRequestFuture = 
new CompletableFuture<>();
+               final CompletableFuture<Void> releaseAssignedContainerFuture = 
new CompletableFuture<>();
+               final CompletableFuture<Void> startContainerAsyncFuture = new 
CompletableFuture<>();
+               final TestingYarnResourceManagerClientFactory 
testingYarnResourceManagerClientFactory = new 
TestingYarnResourceManagerClientFactory();
+               final TestingYarnNodeManagerClientFactory 
testingYarnNodeManagerClientFactory = new TestingYarnNodeManagerClientFactory();
+
+               final TestingYarnNMClientAsync.Builder 
testingYarnNMClientAsyncBuilder = 
testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsyncBuilder()
+                       .setStartContainerAsyncConsumer((ignored1, ignored2, 
ignored3) -> startContainerAsyncFuture.complete(null))
+                       .setStopContainerAsyncConsumer((ignored1, ignored2, 
ignored3) -> stopContainerAsyncFuture.complete(null));
+               final TestingYarnAMRMClientAsync.Builder 
testingYarnAMRMClientAsyncBuilder = 
testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsyncBuilder()
+                       .setAddContainerRequestConsumer((request, handler) -> {
+                               
createTaskManagerContainerFuture.complete(request.getCapability());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler()
+                                       
.onContainersAllocated(Collections.singletonList(testingContainer));
+                       })
+                       .setGetMatchingRequestsFunction(ignored ->
+                               
Collections.singletonList(Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(testingResource))))
+                       .setRemoveContainerRequestConsumer((request, handler) 
-> removeContainerRequestFuture.complete(null))
+                       .setReleaseAssignedContainerConsumer((ignored1, 
ignored2) -> releaseAssignedContainerFuture.complete(null))
+                       .setUnregisterApplicationMasterConsumer((ignore1, 
ignore2, ignore3) -> stopAndCleanupClusterFuture.complete(null));
+
+               final Map<String, String> env = new HashMap<>();
+
+               private int containerIdx = 0;
+
+               @Override
+               protected void prepareRunTest() {
+                       File root = folder.getRoot();
+                       File home = new File(root, "home");
+                       env.put(ENV_APP_ID, "foo");
+                       env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
+                       env.put(ENV_CLIENT_SHIP_FILES, "");
+                       env.put(ENV_FLINK_CLASSPATH, "");
+                       env.put(ENV_HADOOP_USER_NAME, "foo");
+                       env.put(FLINK_DIST_JAR, new YarnLocalResourceDescriptor(
+                               "flink.jar",
+                               new Path("/tmp/flink.jar"),
+                               0,
+                               System.currentTimeMillis(),
+                               LocalResourceVisibility.APPLICATION,
+                               LocalResourceType.FILE).toString());
+                       env.put(ApplicationConstants.Environment.PWD.key(), 
home.getAbsolutePath());
+               }
+
+               @Override
+               protected void preparePreviousAttemptWorkers() {
+                       
testingYarnAMRMClientAsyncBuilder.setRegisterApplicationMasterFunction(
+                               (ignored1, ignored2, ignored3) -> new 
TestingRegisterApplicationMasterResponse(() -> 
Collections.singletonList(testingContainer)));
+               }
+
+               @Override
+               protected void prepareReleaseResource() {
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) -> {
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null);
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler()
+                                       
.onContainersAllocated(Collections.singletonList(testingContainer));
+                       });
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+               }
+
+               @Override
+               protected ResourceManagerDriver<YarnWorkerNode> 
createResourceManagerDriver() {
+                       return new YarnResourceManagerDriver(
+                               flinkConfig,
+                               new YarnResourceManagerDriverConfiguration(env, 
null, "localhost:9000"),
+                               testingYarnResourceManagerClientFactory,
+                               testingYarnNodeManagerClientFactory);
+               }
+
+               @Override
+               protected void validateInitialization() {
+                       
assertNotNull(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync());
+                       
assertNotNull(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync());
+                       
assertThat(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync().getServiceState(),
 is(Service.STATE.STARTED));
+                       
assertThat(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync().getServiceState(),
 is(Service.STATE.STARTED));

Review comment:
       I think it would be better to validate whether `init` and `start` are 
called on these clients. Currently, we are relying on that the state is updated 
correctly. The problem is that the testing clients override behaviors of real 
clients, thus a correct state changes on the testing clients cannot guarantee 
the correct state changes on the real clients.

##########
File path: 
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import 
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import 
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_DIST_JAR;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static 
org.apache.flink.yarn.YarnResourceManagerDriver.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link YarnResourceManagerDriver}.
+ */
+public class YarnResourceManagerDriverTest extends 
ResourceManagerDriverTestBase<YarnWorkerNode> {
+       private static final Resource testingResource = 
Resource.newInstance(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
 YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+       private static final Container testingContainer = 
createTestingContainerWithResource(testingResource, 0);
+       private static final TaskExecutorProcessSpec 
testingTaskExecutorProcessSpec =
+               new TaskExecutorProcessSpec(
+                       new CPUResource(1),
+                       MemorySize.ZERO,
+                       MemorySize.ZERO,
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ofMebiBytes(256),
+                       MemorySize.ZERO,
+                       MemorySize.ZERO);
+
+       @Rule
+       public TemporaryFolder folder = new TemporaryFolder();
+
+       @Override
+       protected Context createContext() {
+               return new Context();
+       }
+
+       @Test
+       public void testShutdownRequestCausesFatalError() throws Exception {
+               new Context() {{
+                       final CompletableFuture<Throwable> 
throwableCompletableFuture = new CompletableFuture<>();
+                       
resourceEventHandlerBuilder.setOnErrorConsumer(throwableCompletableFuture::complete);
+                       runTest(() -> {
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onShutdownRequest();
+
+                               Throwable throwable = 
throwableCompletableFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+                               
assertThat(ExceptionUtils.findThrowable(throwable, 
ResourceManagerException.class).isPresent(), is(true));
+                               
assertThat(ExceptionUtils.findThrowableWithMessage(throwable, 
ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+                       });
+               }};
+       }
+
+       /**
+        * Tests that application files are deleted when the YARN application 
master is de-registered.
+        */
+       @Test
+       public void testDeleteApplicationFiles() throws Exception {
+               new Context() {{
+                       final File applicationDir = folder.newFolder(".flink");
+                       env.put(FLINK_YARN_FILES, 
applicationDir.getCanonicalPath());
+
+                       runTest(() -> {
+                               
getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+                               assertFalse("YARN application directory was not 
removed", Files.exists(applicationDir.toPath()));
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnContainerCompleted() throws Exception {
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+                       runTest(() -> {
+                               runInMainThread(() -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(removeContainerRequestFuture);
+                               
verifyFutureCompleted(startContainerAsyncFuture);
+
+                               ContainerStatus testingContainerStatus = 
createTestingContainerCompletedStatus(testingContainer.getId());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(ImmutableList.of(testingContainerStatus));
+
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+                       });
+               }};
+       }
+
+       @Test
+       public void testOnStartContainerError() throws Exception {
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+                       runTest(() -> {
+                               runInMainThread(() -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(removeContainerRequestFuture);
+                               
verifyFutureCompleted(startContainerAsyncFuture);
+
+                               
testingYarnNodeManagerClientFactory.getCallbackHandler().onStartContainerError(testingContainer.getId(),
 new Exception("start error"));
+                               
verifyFutureCompleted(releaseAssignedContainerFuture);
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+                       });
+               }};
+       }
+
+       @Test
+       public void 
testStartTaskExecutorProcessVariousSpec_SameContainerResource() throws 
Exception{
+               final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(1),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+               final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(1),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(99),
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ofMebiBytes(100),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       final String startCommand1 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+                       final String startCommand2 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20);
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+                       
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored ->
+                               Collections.singletonList(ImmutableList.of(
+                                       
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec1).get()),
+                                       
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec2).get()))));
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+                       
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1, 
context, ignored2) -> {
+                               if (containsStartCommand(context, 
startCommand1)) {
+                                       
startContainerAsyncCommandFuture1.complete(null);
+                               } else if (containsStartCommand(context, 
startCommand2)) {
+                                       
startContainerAsyncCommandFuture2.complete(null);
+                               }
+                       });
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+
+                       runTest(() -> {
+                               final Resource containerResource = 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+                               // Make sure two worker resource spec will be 
normalized to the same container resource
+                               assertEquals(containerResource, 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec2).get());
+
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec2));
+
+                               // Verify both containers requested
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+                               // Mock that both containers are allocated
+                               Container container1 = 
createTestingContainerWithResource(containerResource);
+                               Container container2 = 
createTestingContainerWithResource(containerResource);
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(container1,
 container2));
+
+                               // Verify workers with both spec are started.
+                               
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+                               
verifyFutureCompleted(startContainerAsyncCommandFuture2);
+
+                               // Mock that one container is completed, while 
the worker is still pending
+                               ContainerStatus testingContainerStatus = 
createTestingContainerCompletedStatus(container1.getId());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+                               // Verify that only one more container is 
requested.
+                               
verifyFutureCompleted(addContainerRequestFutures.get(2));
+                               
assertFalse(addContainerRequestFutures.get(3).isDone());
+                       });
+               }};
+       }
+
+       @Test
+       public void testStartWorkerVariousSpec_DifferentContainerResource() 
throws Exception{
+               final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(1),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ofMebiBytes(50),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+               final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+                       new TaskExecutorProcessSpec(
+                               new CPUResource(2),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO,
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ofMebiBytes(500),
+                               MemorySize.ZERO,
+                               MemorySize.ZERO
+                       );
+
+               new Context() {{
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+
+                       final String startCommand1 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (50L << 20);
+                       final String startCommand2 = 
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+                       final CompletableFuture<Void> 
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+                       
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(tuple -> {
+                               if (tuple.f2.getVirtualCores() == 1) {
+                                       return Collections.singletonList(
+                                               
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
 getDriver()).getContainerResource(taskExecutorProcessSpec1).get())));
+                               } else if (tuple.f2.getVirtualCores() == 2) {
+                                       return Collections.singletonList(
+                                               
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
 getDriver()).getContainerResource(taskExecutorProcessSpec2).get())));
+                               }
+                               return null;
+                       });
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((request, 
ignored) ->
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(request.getCapability()));
+                       
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1, 
context, ignored3) -> {
+                               if (containsStartCommand(context, 
startCommand1)) {
+                                       
startContainerAsyncCommandFuture1.complete(null);
+                               } else if (containsStartCommand(context, 
startCommand2)) {
+                                       
startContainerAsyncCommandFuture2.complete(null);
+                               }
+                       });
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+
+                       runTest(() -> {
+                               final Resource containerResource1 = 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+                               final Resource containerResource2 = 
((YarnResourceManagerDriver) 
getDriver()).getContainerResource(taskExecutorProcessSpec2).get();
+                               // Make sure two worker resource spec will be 
normalized to different container resources
+                               assertNotEquals(containerResource1, 
containerResource2);
+
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec1));
+                               runInMainThread(() -> 
getDriver().requestResource(taskExecutorProcessSpec2));
+
+                               // Verify both containers requested
+                               
verifyFutureCompleted(addContainerRequestFutures.get(0));
+                               
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+                               // Mock that container 1 is allocated
+                               Container container1 = 
createTestingContainerWithResource(containerResource1);
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(Collections.singletonList(container1));
+
+                               // Verify that only worker with spec1 is 
started.
+                               
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+                               
assertFalse(startContainerAsyncCommandFuture2.isDone());
+
+                               // Mock that container 1 is completed, while 
the worker is still pending
+                               ContainerStatus testingContainerStatus = 
createTestingContainerCompletedStatus(container1.getId());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+                               // Verify that only container 1 is requested 
again
+                               
verifyFutureCompleted(addContainerRequestFutures.get(2));
+                               
assertThat(addContainerRequestFutures.get(2).get(), is(containerResource1));
+                               
assertFalse(addContainerRequestFutures.get(3).isDone());
+                       });
+               }};
+       }
+
+       private boolean containsStartCommand(ContainerLaunchContext 
containerLaunchContext, String command) {
+               return 
containerLaunchContext.getCommands().stream().anyMatch(str -> 
str.contains(command));
+       }
+
+       private static Container createTestingContainerWithResource(Resource 
resource, int containerIdx) {
+               final ContainerId containerId = ContainerId.newInstance(
+                       ApplicationAttemptId.newInstance(
+                               
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+                               1),
+                       containerIdx);
+               final NodeId nodeId = NodeId.newInstance("container", 1234);
+               return new TestingContainer(containerId, nodeId, resource, 
Priority.UNDEFINED);
+       }
+
+       private class Context extends 
ResourceManagerDriverTestBase<YarnWorkerNode>.Context {
+               private final CompletableFuture<Void> 
stopAndCleanupClusterFuture =  new CompletableFuture<>();
+               private final CompletableFuture<Resource> 
createTaskManagerContainerFuture = new CompletableFuture<>();
+               private final CompletableFuture<Void> stopContainerAsyncFuture 
= new CompletableFuture<>();
+               final List<CompletableFuture<Resource>> 
addContainerRequestFutures = new ArrayList<>();
+               final AtomicInteger addContainerRequestFuturesNumCompleted = 
new AtomicInteger(0);
+               final CompletableFuture<Void> removeContainerRequestFuture = 
new CompletableFuture<>();
+               final CompletableFuture<Void> releaseAssignedContainerFuture = 
new CompletableFuture<>();
+               final CompletableFuture<Void> startContainerAsyncFuture = new 
CompletableFuture<>();
+               final TestingYarnResourceManagerClientFactory 
testingYarnResourceManagerClientFactory = new 
TestingYarnResourceManagerClientFactory();
+               final TestingYarnNodeManagerClientFactory 
testingYarnNodeManagerClientFactory = new TestingYarnNodeManagerClientFactory();
+
+               final TestingYarnNMClientAsync.Builder 
testingYarnNMClientAsyncBuilder = 
testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsyncBuilder()
+                       .setStartContainerAsyncConsumer((ignored1, ignored2, 
ignored3) -> startContainerAsyncFuture.complete(null))
+                       .setStopContainerAsyncConsumer((ignored1, ignored2, 
ignored3) -> stopContainerAsyncFuture.complete(null));
+               final TestingYarnAMRMClientAsync.Builder 
testingYarnAMRMClientAsyncBuilder = 
testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsyncBuilder()
+                       .setAddContainerRequestConsumer((request, handler) -> {
+                               
createTaskManagerContainerFuture.complete(request.getCapability());
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler()
+                                       
.onContainersAllocated(Collections.singletonList(testingContainer));
+                       })
+                       .setGetMatchingRequestsFunction(ignored ->
+                               
Collections.singletonList(Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(testingResource))))
+                       .setRemoveContainerRequestConsumer((request, handler) 
-> removeContainerRequestFuture.complete(null))
+                       .setReleaseAssignedContainerConsumer((ignored1, 
ignored2) -> releaseAssignedContainerFuture.complete(null))
+                       .setUnregisterApplicationMasterConsumer((ignore1, 
ignore2, ignore3) -> stopAndCleanupClusterFuture.complete(null));
+
+               final Map<String, String> env = new HashMap<>();
+
+               private int containerIdx = 0;
+
+               @Override
+               protected void prepareRunTest() {
+                       File root = folder.getRoot();
+                       File home = new File(root, "home");
+                       env.put(ENV_APP_ID, "foo");
+                       env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
+                       env.put(ENV_CLIENT_SHIP_FILES, "");
+                       env.put(ENV_FLINK_CLASSPATH, "");
+                       env.put(ENV_HADOOP_USER_NAME, "foo");
+                       env.put(FLINK_DIST_JAR, new YarnLocalResourceDescriptor(
+                               "flink.jar",
+                               new Path("/tmp/flink.jar"),
+                               0,
+                               System.currentTimeMillis(),
+                               LocalResourceVisibility.APPLICATION,
+                               LocalResourceType.FILE).toString());
+                       env.put(ApplicationConstants.Environment.PWD.key(), 
home.getAbsolutePath());
+               }
+
+               @Override
+               protected void preparePreviousAttemptWorkers() {
+                       
testingYarnAMRMClientAsyncBuilder.setRegisterApplicationMasterFunction(
+                               (ignored1, ignored2, ignored3) -> new 
TestingRegisterApplicationMasterResponse(() -> 
Collections.singletonList(testingContainer)));
+               }
+
+               @Override
+               protected void prepareReleaseResource() {
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       addContainerRequestFutures.add(new 
CompletableFuture<>());
+                       
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1, 
ignored2) -> {
+                               
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null);
+                               
testingYarnResourceManagerClientFactory.getCallbackHandler()
+                                       
.onContainersAllocated(Collections.singletonList(testingContainer));
+                       });
+                       
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) -> 
getDriver().requestResource(testingTaskExecutorProcessSpec));
+               }
+
+               @Override
+               protected ResourceManagerDriver<YarnWorkerNode> 
createResourceManagerDriver() {
+                       return new YarnResourceManagerDriver(
+                               flinkConfig,
+                               new YarnResourceManagerConfiguration(env, null, 
"localhost:9000"),
+                               testingYarnResourceManagerClientFactory,
+                               testingYarnNodeManagerClientFactory);
+               }
+
+               @Override
+               protected void validateInitialization() {
+                       
assertNotNull(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync());
+                       
assertNotNull(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync());
+                       
assertThat(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync().getServiceState(),
 is(Service.STATE.STARTED));
+                       
assertThat(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync().getServiceState(),
 is(Service.STATE.STARTED));
+               }
+
+               @Override
+               protected void 
validateWorkersRecoveredFromPreviousAttempt(Collection<YarnWorkerNode> workers) 
{
+                       Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 2));
+                       assertThat(workers.size(), is(1));
+
+                       final ResourceID resourceId = 
workers.iterator().next().getResourceID();
+                       assertThat(resourceId.toString(), 
is(testingContainer.getId().toString()));
+               }
+
+               @Override
+               protected void validateTermination() {
+                       
assertThat(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync().getServiceState(),
 is(Service.STATE.STOPPED));
+                       
assertThat(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync().getServiceState(),
 is(Service.STATE.STOPPED));

Review comment:
       Same here.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to