xintongsong commented on a change in pull request #13311:
URL: https://github.com/apache/flink/pull/13311#discussion_r484622514
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourceAdapter.java
##########
@@ -86,21 +81,21 @@
}
}
- Set<WorkerResourceSpec> getWorkerSpecs(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
+ Set<TaskExecutorProcessSpec> getTaskExecutorProcessSpec(final Resource
containerResource, final
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy) {
Review comment:
```suggestion
Set<TaskExecutorProcessSpec> getTaskExecutorProcessSpec(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourceAdapter.java
##########
@@ -86,21 +81,21 @@
}
}
- Set<WorkerResourceSpec> getWorkerSpecs(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
+ Set<TaskExecutorProcessSpec> getTaskExecutorProcessSpec(final Resource
containerResource, final
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy) {
final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
- .flatMap(resource ->
containerResourceToWorkerSpecs.getOrDefault(resource,
Collections.emptySet()).stream())
+ .flatMap(resource ->
containerResourceToTaskExecutorProcessSpec.getOrDefault(resource,
Collections.emptySet()).stream())
.collect(Collectors.toSet());
}
- Set<Resource> getEquivalentContainerResource(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
+ Set<Resource> getEquivalentContainerResource(final Resource
containerResource, final
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy) {
Review comment:
```suggestion
Set<Resource> getEquivalentContainerResource(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+ private final Map<String, String> yarnConfig;
+ private final String webInterfaceUrl;
+ private final String rpcAddress;
+
+ public YarnResourceManagerConfiguration(
+ Map<String, String> env,
+ @Nullable String webInterfaceUrl,
+ String rpcAddress) {
+ this.yarnConfig =
getYarnConfFromEnv(Preconditions.checkNotNull(env));
Review comment:
Instead of passing in the `env` and extract needed configurations in the
constructor, I would suggest to do make the constructor private and take the
extracted configs directly, and introduce a public static factory method to do
the extractions.
```
private YarnResourceManagerConfiguration(conf1, conf2, conf3, ...,
webInterfaceUrl, rpcAddress) {
...
}
public static YarnResourceManagerConfiguration fromEnv(env, webInterfaceUrl,
rpcAddress) {
...
foobar = getYarnConfigFromEnv(env);
...
return new YarnResourceConfiguration(...);
}
```
The idea is to keep as less as possible stuff in the constructor.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+ private final Map<String, String> yarnConfig;
+ private final String webInterfaceUrl;
+ private final String rpcAddress;
+
+ public YarnResourceManagerConfiguration(
+ Map<String, String> env,
+ @Nullable String webInterfaceUrl,
+ String rpcAddress) {
+ this.yarnConfig =
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+ this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+ this.webInterfaceUrl = webInterfaceUrl;
+ }
+
+ public String getRpcAddress() {
+ return rpcAddress;
+ }
+
+ public String getWebInterfaceUrl() {
+ return webInterfaceUrl;
+ }
+
+ private static Map<String, String> getYarnConfFromEnv(Map<String,
String> env) {
+ Map<String, String> configs = new HashMap<>();
+ configs.put(YarnConfigKeys.FLINK_YARN_FILES,
env.get(YarnConfigKeys.FLINK_YARN_FILES));
+ configs.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+ configs.put(YarnConfigKeys.ENV_APP_ID,
env.get(YarnConfigKeys.ENV_APP_ID));
+ configs.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR));
+ configs.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES));
+ configs.put(YarnConfigKeys.FLINK_DIST_JAR,
env.get(YarnConfigKeys.FLINK_DIST_JAR));
+ configs.put(YarnConfigKeys.REMOTE_KEYTAB_PATH,
env.get(YarnConfigKeys.REMOTE_KEYTAB_PATH));
+ configs.put(YarnConfigKeys.LOCAL_KEYTAB_PATH,
env.get(YarnConfigKeys.LOCAL_KEYTAB_PATH));
+ configs.put(YarnConfigKeys.KEYTAB_PRINCIPAL,
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL));
+ configs.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME));
+ configs.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE,
env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE));
Review comment:
This one is used in the cluster entry point, but not in the resource
manager. It can also be excluded.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerDriver.java
##########
@@ -0,0 +1,617 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.GlobalConfiguration;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.configuration.TaskManagerOptionsInternal;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.BootstrapTools;
+import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.concurrent.FutureUtils;
+import org.apache.flink.runtime.externalresource.ExternalResourceUtils;
+import
org.apache.flink.runtime.resourcemanager.active.AbstractResourceManagerDriver;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.webmonitor.history.HistoryServerUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.configuration.YarnConfigOptions;
+import org.apache.flink.yarn.configuration.YarnConfigOptionsInternal;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.NodeReport;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.ResourceRequest;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.client.api.async.NMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.net.URL;
+import java.nio.ByteBuffer;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Queue;
+import java.util.Set;
+import java.util.concurrent.CompletableFuture;
+import java.util.stream.Collectors;
+
+/**
+ * Implementation of {@link ResourceManagerDriver} for Kubernetes deployment.
+ */
+public class YarnResourceManagerDriver extends
AbstractResourceManagerDriver<YarnWorkerNode> {
+
+ private static final Priority RM_REQUEST_PRIORITY =
Priority.newInstance(1);
+
+ /** Environment variable name of the hostname given by the YARN.
+ * In task executor we use the hostnames given by YARN consistently
throughout akka */
+ private static final String ENV_FLINK_NODE_ID = "_FLINK_NODE_ID";
+
+ static final String ERROR_MASSAGE_ON_SHUTDOWN_REQUEST = "Received
shutdown request from YARN ResourceManager.";
+
+ private final YarnConfiguration yarnConfig;
+
+ /** The process environment variables. */
+ private final YarnResourceManagerConfiguration configuration;
+
+ /** Default heartbeat interval between this resource manager and the
YARN ResourceManager. */
+ private final int yarnHeartbeatIntervalMillis;
+
+ /** Client to communicate with the Resource Manager (YARN's master). */
+ private AMRMClientAsync<AMRMClient.ContainerRequest>
resourceManagerClient;
+
+ /** The heartbeat interval while the resource master is waiting for
containers. */
+ private final int containerRequestHeartbeatIntervalMillis;
+
+ /** Client to communicate with the Node manager and launch TaskExecutor
processes. */
+ private NMClientAsync nodeManagerClient;
+
+ /** Request resource futures, keyed by container ids. */
+ private final Map<TaskExecutorProcessSpec,
Queue<CompletableFuture<YarnWorkerNode>>> requestResourceFutures;
+
+ private final TaskExecutorProcessSpecContainerResourceAdapter
taskExecutorProcessSpecContainerResourceAdapter;
+
+ private final RegisterApplicationMasterResponseReflector
registerApplicationMasterResponseReflector;
+
+ private
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy;
+
+ private final YarnResourceManagerClientFactory
yarnResourceManagerClientFactory;
+
+ private final YarnNodeManagerClientFactory yarnNodeManagerClientFactory;
+
+ public YarnResourceManagerDriver(
+ Configuration flinkConfig,
+ YarnResourceManagerConfiguration configuration,
+ YarnResourceManagerClientFactory
yarnResourceManagerClientFactory,
+ YarnNodeManagerClientFactory yarnNodeManagerClientFactory) {
+ super(flinkConfig, GlobalConfiguration.loadConfiguration());
+
+ this.yarnConfig = new YarnConfiguration();
+ this.requestResourceFutures = new HashMap<>();
+ this.configuration = configuration;
+
+ final int yarnHeartbeatIntervalMS = flinkConfig.getInteger(
+ YarnConfigOptions.HEARTBEAT_DELAY_SECONDS) * 1000;
+
+ final long yarnExpiryIntervalMS = yarnConfig.getLong(
+ YarnConfiguration.RM_AM_EXPIRY_INTERVAL_MS,
+ YarnConfiguration.DEFAULT_RM_AM_EXPIRY_INTERVAL_MS);
+
+ if (yarnHeartbeatIntervalMS >= yarnExpiryIntervalMS) {
+ log.warn("The heartbeat interval of the Flink
Application master ({}) is greater " +
+ "than YARN's expiry interval ({}). The
application is likely to be killed by YARN.",
+ yarnHeartbeatIntervalMS, yarnExpiryIntervalMS);
+ }
+ yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMS;
+ containerRequestHeartbeatIntervalMillis =
flinkConfig.getInteger(YarnConfigOptions.CONTAINER_REQUEST_HEARTBEAT_INTERVAL_MILLISECONDS);
+
+ this.taskExecutorProcessSpecContainerResourceAdapter = new
TaskExecutorProcessSpecContainerResourceAdapter(
+ yarnConfig.getInt(
+
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
+
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB),
+ yarnConfig.getInt(
+
YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES,
+
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES),
+ yarnConfig.getInt(
+
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
+
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB),
+ yarnConfig.getInt(
+
YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
+
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES),
+ ExternalResourceUtils.getExternalResources(flinkConfig,
YarnConfigOptions.EXTERNAL_RESOURCE_YARN_CONFIG_KEY_SUFFIX));
+ this.registerApplicationMasterResponseReflector = new
RegisterApplicationMasterResponseReflector(log);
+
+ this.matchingStrategy =
flinkConfig.getBoolean(YarnConfigOptionsInternal.MATCH_CONTAINER_VCORES) ?
+
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.MATCH_VCORE :
+
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy.IGNORE_VCORE;
+
+ this.yarnResourceManagerClientFactory =
yarnResourceManagerClientFactory;
+ this.yarnNodeManagerClientFactory =
yarnNodeManagerClientFactory;
+ }
+
+ //
------------------------------------------------------------------------
+ // ResourceManagerDriver
+ //
------------------------------------------------------------------------
+
+ @Override
+ protected void initializeInternal() throws Exception {
+ final YarnContainerEventHandler yarnContainerEventHandler = new
YarnContainerEventHandler();
+ try {
+ resourceManagerClient =
yarnResourceManagerClientFactory.createAndStartResourceManagerClient(
+ yarnConfig,
+ yarnHeartbeatIntervalMillis,
+ yarnContainerEventHandler);
+
+ final RegisterApplicationMasterResponse
registerApplicationMasterResponse = registerApplicationMaster();
+
getContainersFromPreviousAttempts(registerApplicationMasterResponse);
+
updateMatchingStrategy(registerApplicationMasterResponse);
+ } catch (Exception e) {
+ throw new ResourceManagerException("Could not start
resource manager client.", e);
+ }
+
+ nodeManagerClient =
yarnNodeManagerClientFactory.createAndStartNodeManagerClient(yarnConfig,
yarnContainerEventHandler);
+ }
+
+ @Override
+ public CompletableFuture<Void> terminate() {
+ // shut down all components
+ Exception exception = null;
+
+ if (resourceManagerClient != null) {
+ try {
+ resourceManagerClient.stop();
+ } catch (Exception e) {
+ exception = e;
+ }
+ }
+
+ if (nodeManagerClient != null) {
+ try {
+ nodeManagerClient.stop();
+ } catch (Exception e) {
+ exception = ExceptionUtils.firstOrSuppressed(e,
exception);
+ }
+ }
+
+ return exception == null ?
+ FutureUtils.completedVoidFuture() :
+ FutureUtils.completedExceptionally(exception);
+ }
+
+ @Override
+ public void deregisterApplication(ApplicationStatus finalStatus,
@Nullable String optionalDiagnostics) {
+ // first, de-register from YARN
+ final FinalApplicationStatus yarnStatus =
getYarnStatus(finalStatus);
+ log.info("Unregister application from the YARN Resource Manager
with final status {}.", yarnStatus);
+
+ final Optional<URL> historyServerURL =
HistoryServerUtils.getHistoryServerURL(flinkConfig);
+
+ final String appTrackingUrl =
historyServerURL.map(URL::toString).orElse("");
+
+ try {
+
resourceManagerClient.unregisterApplicationMaster(yarnStatus,
optionalDiagnostics, appTrackingUrl);
+ } catch (Throwable t) {
+ log.error("Could not unregister the application
master.", t);
+ }
+
+ Utils.deleteApplicationFiles(configuration.getYarnFiles());
+ }
+
+ @Override
+ public CompletableFuture<YarnWorkerNode>
requestResource(TaskExecutorProcessSpec taskExecutorProcessSpec) {
+ final Optional<Resource> containerResourceOptional =
getContainerResource(taskExecutorProcessSpec);
+ final CompletableFuture<YarnWorkerNode> requestResourceFuture =
new CompletableFuture<>();
+
+ if (containerResourceOptional.isPresent()) {
+
resourceManagerClient.addContainerRequest(getContainerRequest(containerResourceOptional.get()));
+
+ // make sure we transmit the request fast and receive
fast news of granted allocations
+
resourceManagerClient.setHeartbeatInterval(containerRequestHeartbeatIntervalMillis);
+
+
requestResourceFutures.computeIfAbsent(taskExecutorProcessSpec, ignore -> new
LinkedList<>()).add(requestResourceFuture);
+
+ log.info("Requesting new TaskExecutor container with
resource {}.", taskExecutorProcessSpec);
+ } else {
+ requestResourceFuture.completeExceptionally(
+ new
ResourceManagerException(String.format("Could not compute the container
Resource from the given TaskExecutorProcessSpec %s.",
taskExecutorProcessSpec)));
+ }
+
+ return requestResourceFuture;
+ }
+
+ @Override
+ public void releaseResource(YarnWorkerNode workerNode) {
+ final Container container = workerNode.getContainer();
+ log.info("Stopping container {}.",
workerNode.getResourceID().getStringWithMetadata());
+ nodeManagerClient.stopContainerAsync(container.getId(),
container.getNodeId());
+
resourceManagerClient.releaseAssignedContainer(container.getId());
+ }
+
+ //
------------------------------------------------------------------------
+ // Internal
+ //
------------------------------------------------------------------------
+
+ private void onContainersOfResourceAllocated(Resource resource,
List<Container> containers) {
+ final List<TaskExecutorProcessSpec>
pendingTaskExecutorProcessSpecs =
+
taskExecutorProcessSpecContainerResourceAdapter.getTaskExecutorProcessSpec(resource,
matchingStrategy).stream()
+ .flatMap(spec ->
Collections.nCopies(getNumRequestedNotAllocatedWorkersFor(spec), spec).stream())
+ .collect(Collectors.toList());
+
+ int numPending = pendingTaskExecutorProcessSpecs.size();
+ log.info("Received {} containers with resource {}, {} pending
container requests.",
+ containers.size(),
+ resource,
+ numPending);
+
+ final Iterator<Container> containerIterator =
containers.iterator();
+ final Iterator<TaskExecutorProcessSpec>
pendingTaskExecutorProcessSpecIterator =
pendingTaskExecutorProcessSpecs.iterator();
+ final Iterator<AMRMClient.ContainerRequest>
pendingRequestsIterator =
+ getPendingRequestsAndCheckConsistency(resource,
pendingTaskExecutorProcessSpecs.size()).iterator();
+
+ int numAccepted = 0;
+ while (containerIterator.hasNext() &&
pendingTaskExecutorProcessSpecIterator.hasNext()) {
+ final TaskExecutorProcessSpec taskExecutorProcessSpec =
pendingTaskExecutorProcessSpecIterator.next();
+ final Container container = containerIterator.next();
+ final AMRMClient.ContainerRequest pendingRequest =
pendingRequestsIterator.next();
+ final ResourceID resourceId =
getContainerResourceId(container);
+ final CompletableFuture<YarnWorkerNode>
requestResourceFuture =
+
Preconditions.checkNotNull(requestResourceFutures.get(taskExecutorProcessSpec).poll(),
"The requestResourceFuture queue for TasExecutorProcessSpec %s should not be
empty.", taskExecutorProcessSpec);
Review comment:
This only checks
`requestResourceFutures.get(taskExecutorProcessSpec).poll()` is not null, while
`requestResourceFutures.get(taskExecutorProcessSpec)` could also be null. We
may call `poll()` on a null value.
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_DIST_JAR;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static
org.apache.flink.yarn.YarnResourceManagerDriver.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link YarnResourceManagerDriver}.
+ */
+public class YarnResourceManagerDriverTest extends
ResourceManagerDriverTestBase<YarnWorkerNode> {
+ private static final Resource testingResource =
Resource.newInstance(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ private static final Container testingContainer =
createTestingContainerWithResource(testingResource, 0);
+ private static final TaskExecutorProcessSpec
testingTaskExecutorProcessSpec =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ZERO,
+ MemorySize.ZERO);
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Override
+ protected Context createContext() {
+ return new Context();
+ }
+
+ @Test
+ public void testShutdownRequestCausesFatalError() throws Exception {
+ new Context() {{
+ final CompletableFuture<Throwable>
throwableCompletableFuture = new CompletableFuture<>();
+
resourceEventHandlerBuilder.setOnErrorConsumer(throwableCompletableFuture::complete);
+ runTest(() -> {
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onShutdownRequest();
+
+ Throwable throwable =
throwableCompletableFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
assertThat(ExceptionUtils.findThrowable(throwable,
ResourceManagerException.class).isPresent(), is(true));
+
assertThat(ExceptionUtils.findThrowableWithMessage(throwable,
ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+ });
+ }};
+ }
+
+ /**
+ * Tests that application files are deleted when the YARN application
master is de-registered.
+ */
+ @Test
+ public void testDeleteApplicationFiles() throws Exception {
+ new Context() {{
+ final File applicationDir = folder.newFolder(".flink");
+ env.put(FLINK_YARN_FILES,
applicationDir.getCanonicalPath());
+
+ runTest(() -> {
+
getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+ assertFalse("YARN application directory was not
removed", Files.exists(applicationDir.toPath()));
+ });
+ }};
+ }
+
+ @Test
+ public void testOnContainerCompleted() throws Exception {
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+ runTest(() -> {
+ runInMainThread(() ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(removeContainerRequestFuture);
+
verifyFutureCompleted(startContainerAsyncFuture);
+
+ ContainerStatus testingContainerStatus =
createTestingContainerCompletedStatus(testingContainer.getId());
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(ImmutableList.of(testingContainerStatus));
+
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+ });
+ }};
+ }
+
+ @Test
+ public void testOnStartContainerError() throws Exception {
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+ runTest(() -> {
+ runInMainThread(() ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(removeContainerRequestFuture);
+
verifyFutureCompleted(startContainerAsyncFuture);
+
+
testingYarnNodeManagerClientFactory.getCallbackHandler().onStartContainerError(testingContainer.getId(),
new Exception("start error"));
+
verifyFutureCompleted(releaseAssignedContainerFuture);
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+ });
+ }};
+ }
+
+ @Test
+ public void
testStartTaskExecutorProcessVariousSpec_SameContainerResource() throws
Exception{
+ final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+ final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(99),
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+ final String startCommand1 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+ final String startCommand2 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20);
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored ->
+ Collections.singletonList(ImmutableList.of(
+
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get()),
+
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get()))));
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1,
context, ignored2) -> {
+ if (containsStartCommand(context,
startCommand1)) {
+
startContainerAsyncCommandFuture1.complete(null);
+ } else if (containsStartCommand(context,
startCommand2)) {
+
startContainerAsyncCommandFuture2.complete(null);
+ }
+ });
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(taskExecutorProcessSpec1));
+
+ runTest(() -> {
+ final Resource containerResource =
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+ // Make sure two worker resource spec will be
normalized to the same container resource
+ assertEquals(containerResource,
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get());
+
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec1));
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec2));
+
+ // Verify both containers requested
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+ // Mock that both containers are allocated
+ Container container1 =
createTestingContainerWithResource(containerResource);
+ Container container2 =
createTestingContainerWithResource(containerResource);
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(container1,
container2));
+
+ // Verify workers with both spec are started.
+
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+
verifyFutureCompleted(startContainerAsyncCommandFuture2);
+
+ // Mock that one container is completed, while
the worker is still pending
+ ContainerStatus testingContainerStatus =
createTestingContainerCompletedStatus(container1.getId());
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+ // Verify that only one more container is
requested.
+
verifyFutureCompleted(addContainerRequestFutures.get(2));
+
assertFalse(addContainerRequestFutures.get(3).isDone());
+ });
+ }};
+ }
+
+ @Test
+ public void testStartWorkerVariousSpec_DifferentContainerResource()
throws Exception{
+ final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+ final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(2),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+ final String startCommand1 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (50L << 20);
+ final String startCommand2 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(tuple -> {
+ if (tuple.f2.getVirtualCores() == 1) {
+ return Collections.singletonList(
+
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get())));
+ } else if (tuple.f2.getVirtualCores() == 2) {
+ return Collections.singletonList(
+
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get())));
+ }
+ return null;
+ });
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((request,
ignored) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(request.getCapability()));
+
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1,
context, ignored3) -> {
+ if (containsStartCommand(context,
startCommand1)) {
+
startContainerAsyncCommandFuture1.complete(null);
+ } else if (containsStartCommand(context,
startCommand2)) {
+
startContainerAsyncCommandFuture2.complete(null);
+ }
+ });
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(taskExecutorProcessSpec1));
+
+ runTest(() -> {
+ final Resource containerResource1 =
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+ final Resource containerResource2 =
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get();
+ // Make sure two worker resource spec will be
normalized to different container resources
+ assertNotEquals(containerResource1,
containerResource2);
+
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec1));
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec2));
+
+ // Verify both containers requested
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+ // Mock that container 1 is allocated
+ Container container1 =
createTestingContainerWithResource(containerResource1);
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(Collections.singletonList(container1));
+
+ // Verify that only worker with spec1 is
started.
+
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+
assertFalse(startContainerAsyncCommandFuture2.isDone());
+
+ // Mock that container 1 is completed, while
the worker is still pending
+ ContainerStatus testingContainerStatus =
createTestingContainerCompletedStatus(container1.getId());
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+ // Verify that only container 1 is requested
again
+
verifyFutureCompleted(addContainerRequestFutures.get(2));
+
assertThat(addContainerRequestFutures.get(2).get(), is(containerResource1));
+
assertFalse(addContainerRequestFutures.get(3).isDone());
+ });
+ }};
+ }
+
+ private boolean containsStartCommand(ContainerLaunchContext
containerLaunchContext, String command) {
+ return
containerLaunchContext.getCommands().stream().anyMatch(str ->
str.contains(command));
+ }
+
+ private static Container createTestingContainerWithResource(Resource
resource, int containerIdx) {
+ final ContainerId containerId = ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+ 1),
+ containerIdx);
+ final NodeId nodeId = NodeId.newInstance("container", 1234);
+ return new TestingContainer(containerId, nodeId, resource,
Priority.UNDEFINED);
+ }
+
+ private class Context extends
ResourceManagerDriverTestBase<YarnWorkerNode>.Context {
+ private final CompletableFuture<Void>
stopAndCleanupClusterFuture = new CompletableFuture<>();
+ private final CompletableFuture<Resource>
createTaskManagerContainerFuture = new CompletableFuture<>();
+ private final CompletableFuture<Void> stopContainerAsyncFuture
= new CompletableFuture<>();
+ final List<CompletableFuture<Resource>>
addContainerRequestFutures = new ArrayList<>();
+ final AtomicInteger addContainerRequestFuturesNumCompleted =
new AtomicInteger(0);
+ final CompletableFuture<Void> removeContainerRequestFuture =
new CompletableFuture<>();
+ final CompletableFuture<Void> releaseAssignedContainerFuture =
new CompletableFuture<>();
+ final CompletableFuture<Void> startContainerAsyncFuture = new
CompletableFuture<>();
+ final TestingYarnResourceManagerClientFactory
testingYarnResourceManagerClientFactory = new
TestingYarnResourceManagerClientFactory();
+ final TestingYarnNodeManagerClientFactory
testingYarnNodeManagerClientFactory = new TestingYarnNodeManagerClientFactory();
+
+ final TestingYarnNMClientAsync.Builder
testingYarnNMClientAsyncBuilder =
testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsyncBuilder()
+ .setStartContainerAsyncConsumer((ignored1, ignored2,
ignored3) -> startContainerAsyncFuture.complete(null))
+ .setStopContainerAsyncConsumer((ignored1, ignored2,
ignored3) -> stopContainerAsyncFuture.complete(null));
+ final TestingYarnAMRMClientAsync.Builder
testingYarnAMRMClientAsyncBuilder =
testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsyncBuilder()
+ .setAddContainerRequestConsumer((request, handler) -> {
+
createTaskManagerContainerFuture.complete(request.getCapability());
+
testingYarnResourceManagerClientFactory.getCallbackHandler()
+
.onContainersAllocated(Collections.singletonList(testingContainer));
+ })
+ .setGetMatchingRequestsFunction(ignored ->
+
Collections.singletonList(Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(testingResource))))
+ .setRemoveContainerRequestConsumer((request, handler)
-> removeContainerRequestFuture.complete(null))
+ .setReleaseAssignedContainerConsumer((ignored1,
ignored2) -> releaseAssignedContainerFuture.complete(null))
+ .setUnregisterApplicationMasterConsumer((ignore1,
ignore2, ignore3) -> stopAndCleanupClusterFuture.complete(null));
+
+ final Map<String, String> env = new HashMap<>();
+
+ private int containerIdx = 0;
+
+ @Override
+ protected void prepareRunTest() {
+ File root = folder.getRoot();
+ File home = new File(root, "home");
+ env.put(ENV_APP_ID, "foo");
+ env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
+ env.put(ENV_CLIENT_SHIP_FILES, "");
+ env.put(ENV_FLINK_CLASSPATH, "");
+ env.put(ENV_HADOOP_USER_NAME, "foo");
+ env.put(FLINK_DIST_JAR, new YarnLocalResourceDescriptor(
+ "flink.jar",
+ new Path("/tmp/flink.jar"),
+ 0,
+ System.currentTimeMillis(),
+ LocalResourceVisibility.APPLICATION,
+ LocalResourceType.FILE).toString());
+ env.put(ApplicationConstants.Environment.PWD.key(),
home.getAbsolutePath());
+ }
+
+ @Override
+ protected void preparePreviousAttemptWorkers() {
+
testingYarnAMRMClientAsyncBuilder.setRegisterApplicationMasterFunction(
+ (ignored1, ignored2, ignored3) -> new
TestingRegisterApplicationMasterResponse(() ->
Collections.singletonList(testingContainer)));
+ }
+
+ @Override
+ protected void prepareReleaseResource() {
Review comment:
The preparations here are not related to the validations. It seems this
interface can be removed.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/TaskExecutorProcessSpecContainerResourceAdapter.java
##########
@@ -86,21 +81,21 @@
}
}
- Set<WorkerResourceSpec> getWorkerSpecs(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
+ Set<TaskExecutorProcessSpec> getTaskExecutorProcessSpec(final Resource
containerResource, final
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy) {
final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
- .flatMap(resource ->
containerResourceToWorkerSpecs.getOrDefault(resource,
Collections.emptySet()).stream())
+ .flatMap(resource ->
containerResourceToTaskExecutorProcessSpec.getOrDefault(resource,
Collections.emptySet()).stream())
.collect(Collectors.toSet());
}
- Set<Resource> getEquivalentContainerResource(final Resource
containerResource, final MatchingStrategy matchingStrategy) {
+ Set<Resource> getEquivalentContainerResource(final Resource
containerResource, final
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy) {
final InternalContainerResource internalContainerResource = new
InternalContainerResource(containerResource);
return
getEquivalentInternalContainerResource(internalContainerResource,
matchingStrategy).stream()
.map(InternalContainerResource::toResource)
.collect(Collectors.toSet());
}
- private Set<InternalContainerResource>
getEquivalentInternalContainerResource(final InternalContainerResource
internalContainerResource, final MatchingStrategy matchingStrategy) {
+ private Set<InternalContainerResource>
getEquivalentInternalContainerResource(final InternalContainerResource
internalContainerResource, final
TaskExecutorProcessSpecContainerResourceAdapter.MatchingStrategy
matchingStrategy) {
Review comment:
```suggestion
private Set<InternalContainerResource>
getEquivalentInternalContainerResource(final InternalContainerResource
internalContainerResource, final MatchingStrategy matchingStrategy) {
```
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+ private final Map<String, String> yarnConfig;
+ private final String webInterfaceUrl;
+ private final String rpcAddress;
+
+ public YarnResourceManagerConfiguration(
+ Map<String, String> env,
+ @Nullable String webInterfaceUrl,
+ String rpcAddress) {
+ this.yarnConfig =
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+ this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+ this.webInterfaceUrl = webInterfaceUrl;
+ }
+
+ public String getRpcAddress() {
+ return rpcAddress;
+ }
+
+ public String getWebInterfaceUrl() {
+ return webInterfaceUrl;
+ }
+
+ private static Map<String, String> getYarnConfFromEnv(Map<String,
String> env) {
+ Map<String, String> configs = new HashMap<>();
+ configs.put(YarnConfigKeys.FLINK_YARN_FILES,
env.get(YarnConfigKeys.FLINK_YARN_FILES));
+ configs.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+ configs.put(YarnConfigKeys.ENV_APP_ID,
env.get(YarnConfigKeys.ENV_APP_ID));
+ configs.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR));
Review comment:
These two are read and checked, but never really used by the resource
manager. I think we can exclude them.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+ private final Map<String, String> yarnConfig;
+ private final String webInterfaceUrl;
+ private final String rpcAddress;
+
+ public YarnResourceManagerConfiguration(
+ Map<String, String> env,
+ @Nullable String webInterfaceUrl,
+ String rpcAddress) {
+ this.yarnConfig =
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+ this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+ this.webInterfaceUrl = webInterfaceUrl;
+ }
+
+ public String getRpcAddress() {
+ return rpcAddress;
+ }
+
+ public String getWebInterfaceUrl() {
Review comment:
This should be annotated `Nullable`.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+ private final Map<String, String> yarnConfig;
Review comment:
Can we replace this map with plaint individual class fields?
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+ private final Map<String, String> yarnConfig;
+ private final String webInterfaceUrl;
+ private final String rpcAddress;
+
+ public YarnResourceManagerConfiguration(
+ Map<String, String> env,
+ @Nullable String webInterfaceUrl,
+ String rpcAddress) {
+ this.yarnConfig =
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+ this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+ this.webInterfaceUrl = webInterfaceUrl;
+ }
+
+ public String getRpcAddress() {
+ return rpcAddress;
+ }
+
+ public String getWebInterfaceUrl() {
+ return webInterfaceUrl;
+ }
+
+ private static Map<String, String> getYarnConfFromEnv(Map<String,
String> env) {
+ Map<String, String> configs = new HashMap<>();
+ configs.put(YarnConfigKeys.FLINK_YARN_FILES,
env.get(YarnConfigKeys.FLINK_YARN_FILES));
+ configs.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+ configs.put(YarnConfigKeys.ENV_APP_ID,
env.get(YarnConfigKeys.ENV_APP_ID));
+ configs.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR));
+ configs.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES));
+ configs.put(YarnConfigKeys.FLINK_DIST_JAR,
env.get(YarnConfigKeys.FLINK_DIST_JAR));
+ configs.put(YarnConfigKeys.REMOTE_KEYTAB_PATH,
env.get(YarnConfigKeys.REMOTE_KEYTAB_PATH));
+ configs.put(YarnConfigKeys.LOCAL_KEYTAB_PATH,
env.get(YarnConfigKeys.LOCAL_KEYTAB_PATH));
+ configs.put(YarnConfigKeys.KEYTAB_PRINCIPAL,
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL));
+ configs.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME));
+ configs.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE,
env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE));
+ configs.put(YarnConfigKeys.ENV_KRB5_PATH,
env.get(YarnConfigKeys.ENV_KRB5_PATH));
+ configs.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH,
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH));
+ return configs;
+ }
+
+ public String getYarnFiles() {
+ return yarnConfig.get(YarnConfigKeys.FLINK_YARN_FILES);
+ }
+
+ public String getFlinkClasspath() {
+ return yarnConfig.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
+ }
+
+ public String getAppId() {
+ return yarnConfig.get(YarnConfigKeys.ENV_APP_ID);
+ }
+
+ public String getClientHomeDir() {
+ return yarnConfig.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+ }
+
+ public String getClientShipFiles() {
+ return yarnConfig.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+ }
+
+ public String getFlinkDistJar() {
+ return yarnConfig.get(YarnConfigKeys.FLINK_DIST_JAR);
+ }
+
+ public String getRemoteKeytabPath() {
+ return yarnConfig.get(YarnConfigKeys.REMOTE_KEYTAB_PATH);
+ }
+
+ public String getLocalKeytabPath() {
+ return yarnConfig.get(YarnConfigKeys.LOCAL_KEYTAB_PATH);
+ }
+
+ public String getKeytabPrinciple() {
+ return yarnConfig.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ }
Review comment:
These should be `@Nullable`
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManager.java
##########
@@ -135,7 +135,7 @@ public YarnResourceManager(
JobLeaderIdService jobLeaderIdService,
ClusterInformation clusterInformation,
FatalErrorHandler fatalErrorHandler,
- @Nullable String webInterfaceUrl,
+ YarnResourceManagerConfiguration
yarnResourceManagerConfiguration,
Review comment:
There are 3 usages of `env` in `YarnResourceManager`, which could be
replaced by `YarnResourceManagerConfiguration`. After that, we can get rid of
`env` from `LegacyActiveResourceManager` and the related constructors / factory
methods.
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/TestingRegisterApplicationMasterResponse.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * A Yarn {@link RegisterApplicationMasterResponse} implementation for testing.
+ */
+public class TestingRegisterApplicationMasterResponse extends
RegisterApplicationMasterResponsePBImpl {
Review comment:
I think we should also add `getSchedulerResourceTypes` in this testing
implementation, without the `@Override` annotation.
The problem is that, we tried to use this interface in the production codes
with reflection. If this interface is available in the given hadoop version,
`RegisterApplicationMasterResponsePBImpl` will be invoked and the behavior is
unpredictable.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerClientFactory.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Factory for {@link AMRMClientAsync}.
+ */
+public class YarnResourceManagerClientFactory {
+
+ private static final YarnResourceManagerClientFactory INSTANCE = new
YarnResourceManagerClientFactory();
+
+ @VisibleForTesting
+ YarnResourceManagerClientFactory() {}
+
+ public static YarnResourceManagerClientFactory getInstance() {
+ return INSTANCE;
+ }
+
+ protected AMRMClientAsync<AMRMClient.ContainerRequest>
createAndStartResourceManagerClient(
+ YarnConfiguration yarnConfiguration,
+ int yarnHeartbeatIntervalMillis,
+ AMRMClientAsync.CallbackHandler callbackHandler) {
+ AMRMClientAsync<AMRMClient.ContainerRequest>
resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
+ yarnHeartbeatIntervalMillis,
+ callbackHandler);
+ resourceManagerClient.init(yarnConfiguration);
+ resourceManagerClient.start();
Review comment:
I think a factory should be responsible for creating the instance, but
not sure about init and start.
This reduce the flexibility. What if the caller wants to do something
between creating the client and init/start it?
This is also not good for testability. Now we cannot verify whether the RM
properly init/start the clients, because it is inited/started by the factory,
which is replaced by the testing implementation in testing.
Same for the `YarnNodeManagerClientFactory`.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/configuration/YarnResourceManagerConfiguration.java
##########
@@ -0,0 +1,123 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn.configuration;
+
+import org.apache.flink.util.Preconditions;
+import org.apache.flink.yarn.YarnConfigKeys;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Configuration specific to {@link org.apache.flink.yarn.YarnResourceManager}.
+ */
+public class YarnResourceManagerConfiguration {
+ private final Map<String, String> yarnConfig;
+ private final String webInterfaceUrl;
+ private final String rpcAddress;
+
+ public YarnResourceManagerConfiguration(
+ Map<String, String> env,
+ @Nullable String webInterfaceUrl,
+ String rpcAddress) {
+ this.yarnConfig =
getYarnConfFromEnv(Preconditions.checkNotNull(env));
+ this.rpcAddress = Preconditions.checkNotNull(rpcAddress);
+ this.webInterfaceUrl = webInterfaceUrl;
+ }
+
+ public String getRpcAddress() {
+ return rpcAddress;
+ }
+
+ public String getWebInterfaceUrl() {
+ return webInterfaceUrl;
+ }
+
+ private static Map<String, String> getYarnConfFromEnv(Map<String,
String> env) {
+ Map<String, String> configs = new HashMap<>();
+ configs.put(YarnConfigKeys.FLINK_YARN_FILES,
env.get(YarnConfigKeys.FLINK_YARN_FILES));
+ configs.put(YarnConfigKeys.ENV_FLINK_CLASSPATH,
env.get(YarnConfigKeys.ENV_FLINK_CLASSPATH));
+ configs.put(YarnConfigKeys.ENV_APP_ID,
env.get(YarnConfigKeys.ENV_APP_ID));
+ configs.put(YarnConfigKeys.ENV_CLIENT_HOME_DIR,
env.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR));
+ configs.put(YarnConfigKeys.ENV_CLIENT_SHIP_FILES,
env.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES));
+ configs.put(YarnConfigKeys.FLINK_DIST_JAR,
env.get(YarnConfigKeys.FLINK_DIST_JAR));
+ configs.put(YarnConfigKeys.REMOTE_KEYTAB_PATH,
env.get(YarnConfigKeys.REMOTE_KEYTAB_PATH));
+ configs.put(YarnConfigKeys.LOCAL_KEYTAB_PATH,
env.get(YarnConfigKeys.LOCAL_KEYTAB_PATH));
+ configs.put(YarnConfigKeys.KEYTAB_PRINCIPAL,
env.get(YarnConfigKeys.KEYTAB_PRINCIPAL));
+ configs.put(YarnConfigKeys.ENV_HADOOP_USER_NAME,
env.get(YarnConfigKeys.ENV_HADOOP_USER_NAME));
+ configs.put(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE,
env.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE));
+ configs.put(YarnConfigKeys.ENV_KRB5_PATH,
env.get(YarnConfigKeys.ENV_KRB5_PATH));
+ configs.put(YarnConfigKeys.ENV_YARN_SITE_XML_PATH,
env.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH));
+ return configs;
+ }
+
+ public String getYarnFiles() {
+ return yarnConfig.get(YarnConfigKeys.FLINK_YARN_FILES);
+ }
+
+ public String getFlinkClasspath() {
+ return yarnConfig.get(YarnConfigKeys.ENV_FLINK_CLASSPATH);
+ }
+
+ public String getAppId() {
+ return yarnConfig.get(YarnConfigKeys.ENV_APP_ID);
+ }
+
+ public String getClientHomeDir() {
+ return yarnConfig.get(YarnConfigKeys.ENV_CLIENT_HOME_DIR);
+ }
+
+ public String getClientShipFiles() {
+ return yarnConfig.get(YarnConfigKeys.ENV_CLIENT_SHIP_FILES);
+ }
+
+ public String getFlinkDistJar() {
+ return yarnConfig.get(YarnConfigKeys.FLINK_DIST_JAR);
+ }
+
+ public String getRemoteKeytabPath() {
+ return yarnConfig.get(YarnConfigKeys.REMOTE_KEYTAB_PATH);
+ }
+
+ public String getLocalKeytabPath() {
+ return yarnConfig.get(YarnConfigKeys.LOCAL_KEYTAB_PATH);
+ }
+
+ public String getKeytabPrinciple() {
+ return yarnConfig.get(YarnConfigKeys.KEYTAB_PRINCIPAL);
+ }
+
+ public String getHadoopUserName() {
+ return yarnConfig.get(YarnConfigKeys.ENV_HADOOP_USER_NAME);
+ }
+
+ public String getZookeeperNamespace() {
+ return yarnConfig.get(YarnConfigKeys.ENV_ZOOKEEPER_NAMESPACE);
+ }
+
+ public String getKrb5Path() {
+ return yarnConfig.get(YarnConfigKeys.ENV_KRB5_PATH);
+ }
+
+ public String getYarnSiteXMLPath() {
+ return yarnConfig.get(YarnConfigKeys.ENV_YARN_SITE_XML_PATH);
+ }
Review comment:
These should be `@Nullable`
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/TestingRegisterApplicationMasterResponse.java
##########
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import
org.apache.hadoop.yarn.api.protocolrecords.impl.pb.RegisterApplicationMasterResponsePBImpl;
+import org.apache.hadoop.yarn.api.records.Container;
+
+import java.util.List;
+import java.util.function.Supplier;
+
+/**
+ * A Yarn {@link RegisterApplicationMasterResponse} implementation for testing.
+ */
+public class TestingRegisterApplicationMasterResponse extends
RegisterApplicationMasterResponsePBImpl {
+ private final Supplier<List<Container>>
getContainersFromPreviousAttemptsSupplier;
+
+ TestingRegisterApplicationMasterResponse(Supplier<List<Container>>
getContainersFromPreviousAttemptsSupplier) {
+ this.getContainersFromPreviousAttemptsSupplier =
getContainersFromPreviousAttemptsSupplier;
+ }
+
+ @Override
Review comment:
Let's remove this annotation for now, to align with the production
codes' assumption that this interface may not be available for the given hadoop
version.
I ideally, I think we can already get rid of the reflection on this
interface, since all the supported hadoop versions should have this interface
now. But this could be a follow-up issue. No need to further increase the scope
of this PR.
##########
File path:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnResourceManagerClientFactory.java
##########
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.annotation.VisibleForTesting;
+
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+/**
+ * Factory for {@link AMRMClientAsync}.
+ */
+public class YarnResourceManagerClientFactory {
Review comment:
I think we can make `YarnResourceManagerClientFactory` an interface, and
provide two independent implementations of the interface, for production and
testing respectively.
I don't see a good reason to make the testing implementation extends from
the production implementation. There's nothing reused between them.
Same for the `YarnNodeManagerClientFactory`.
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.ExceptionUtils;
+import
org.apache.flink.yarn.configuration.YarnResourceManagerDriverConfiguration;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_DIST_JAR;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static
org.apache.flink.yarn.YarnResourceManagerDriver.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link YarnResourceManagerDriver}.
+ */
+public class YarnResourceManagerDriverTest extends
ResourceManagerDriverTestBase<YarnWorkerNode> {
+ private static final Resource testingResource =
Resource.newInstance(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ private static final Container testingContainer =
createTestingContainerWithResource(testingResource, 0);
+ private static final TaskExecutorProcessSpec
testingTaskExecutorProcessSpec =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ZERO,
+ MemorySize.ZERO);
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Override
+ protected Context createContext() {
+ return new Context();
+ }
+
+ @Test
+ public void testShutdownRequestCausesFatalError() throws Exception {
+ new Context() {{
+ final CompletableFuture<Throwable>
throwableCompletableFuture = new CompletableFuture<>();
+
resourceEventHandlerBuilder.setOnErrorConsumer(throwableCompletableFuture::complete);
+ runTest(() -> {
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onShutdownRequest();
+
+ Throwable throwable =
throwableCompletableFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
assertThat(ExceptionUtils.findThrowable(throwable,
ResourceManagerException.class).isPresent(), is(true));
+
assertThat(ExceptionUtils.findThrowableWithMessage(throwable,
ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+ });
+ }};
+ }
+
+ /**
+ * Tests that application files are deleted when the YARN application
master is de-registered.
+ */
+ @Test
+ public void testDeleteApplicationFiles() throws Exception {
+ new Context() {{
+ final File applicationDir = folder.newFolder(".flink");
+ env.put(FLINK_YARN_FILES,
applicationDir.getCanonicalPath());
+
+ runTest(() -> {
+
getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+ assertFalse("YARN application directory was not
removed", Files.exists(applicationDir.toPath()));
+ });
+ }};
+ }
+
+ @Test
+ public void testOnContainerCompleted() throws Exception {
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+ runTest(() -> {
+ runInMainThread(() ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(removeContainerRequestFuture);
+
verifyFutureCompleted(startContainerAsyncFuture);
+
+ ContainerStatus testingContainerStatus =
createTestingContainerCompletedStatus(testingContainer.getId());
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(ImmutableList.of(testingContainerStatus));
+
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+ });
+ }};
+ }
+
+ @Test
+ public void testOnStartContainerError() throws Exception {
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+ runTest(() -> {
+ runInMainThread(() ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(removeContainerRequestFuture);
+
verifyFutureCompleted(startContainerAsyncFuture);
+
+
testingYarnNodeManagerClientFactory.getCallbackHandler().onStartContainerError(testingContainer.getId(),
new Exception("start error"));
+
verifyFutureCompleted(releaseAssignedContainerFuture);
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+ });
+ }};
+ }
+
+ @Test
+ public void
testStartTaskExecutorProcessVariousSpec_SameContainerResource() throws
Exception{
+ final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+ final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(99),
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+ final String startCommand1 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+ final String startCommand2 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20);
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored ->
+ Collections.singletonList(ImmutableList.of(
+
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get()),
+
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get()))));
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1,
context, ignored2) -> {
+ if (containsStartCommand(context,
startCommand1)) {
+
startContainerAsyncCommandFuture1.complete(null);
+ } else if (containsStartCommand(context,
startCommand2)) {
+
startContainerAsyncCommandFuture2.complete(null);
+ }
+ });
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(taskExecutorProcessSpec1));
+
+ runTest(() -> {
+ final Resource containerResource =
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+ // Make sure two worker resource spec will be
normalized to the same container resource
+ assertEquals(containerResource,
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get());
+
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec1));
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec2));
+
+ // Verify both containers requested
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+ // Mock that both containers are allocated
+ Container container1 =
createTestingContainerWithResource(containerResource);
+ Container container2 =
createTestingContainerWithResource(containerResource);
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(container1,
container2));
+
+ // Verify workers with both spec are started.
+
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+
verifyFutureCompleted(startContainerAsyncCommandFuture2);
+
+ // Mock that one container is completed, while
the worker is still pending
+ ContainerStatus testingContainerStatus =
createTestingContainerCompletedStatus(container1.getId());
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+ // Verify that only one more container is
requested.
+
verifyFutureCompleted(addContainerRequestFutures.get(2));
+
assertFalse(addContainerRequestFutures.get(3).isDone());
+ });
+ }};
+ }
+
+ @Test
+ public void testStartWorkerVariousSpec_DifferentContainerResource()
throws Exception{
+ final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+ final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(2),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+ final String startCommand1 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (50L << 20);
+ final String startCommand2 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(tuple -> {
+ if (tuple.f2.getVirtualCores() == 1) {
+ return Collections.singletonList(
+
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get())));
+ } else if (tuple.f2.getVirtualCores() == 2) {
+ return Collections.singletonList(
+
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get())));
+ }
+ return null;
+ });
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((request,
ignored) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(request.getCapability()));
+
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1,
context, ignored3) -> {
+ if (containsStartCommand(context,
startCommand1)) {
+
startContainerAsyncCommandFuture1.complete(null);
+ } else if (containsStartCommand(context,
startCommand2)) {
+
startContainerAsyncCommandFuture2.complete(null);
+ }
+ });
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(taskExecutorProcessSpec1));
+
+ runTest(() -> {
+ final Resource containerResource1 =
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+ final Resource containerResource2 =
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get();
+ // Make sure two worker resource spec will be
normalized to different container resources
+ assertNotEquals(containerResource1,
containerResource2);
+
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec1));
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec2));
+
+ // Verify both containers requested
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+ // Mock that container 1 is allocated
+ Container container1 =
createTestingContainerWithResource(containerResource1);
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(Collections.singletonList(container1));
+
+ // Verify that only worker with spec1 is
started.
+
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+
assertFalse(startContainerAsyncCommandFuture2.isDone());
+
+ // Mock that container 1 is completed, while
the worker is still pending
+ ContainerStatus testingContainerStatus =
createTestingContainerCompletedStatus(container1.getId());
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+ // Verify that only container 1 is requested
again
+
verifyFutureCompleted(addContainerRequestFutures.get(2));
+
assertThat(addContainerRequestFutures.get(2).get(), is(containerResource1));
+
assertFalse(addContainerRequestFutures.get(3).isDone());
+ });
+ }};
+ }
+
+ private boolean containsStartCommand(ContainerLaunchContext
containerLaunchContext, String command) {
+ return
containerLaunchContext.getCommands().stream().anyMatch(str ->
str.contains(command));
+ }
+
+ private static Container createTestingContainerWithResource(Resource
resource, int containerIdx) {
+ final ContainerId containerId = ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+ 1),
+ containerIdx);
+ final NodeId nodeId = NodeId.newInstance("container", 1234);
+ return new TestingContainer(containerId, nodeId, resource,
Priority.UNDEFINED);
+ }
+
+ private class Context extends
ResourceManagerDriverTestBase<YarnWorkerNode>.Context {
+ private final CompletableFuture<Void>
stopAndCleanupClusterFuture = new CompletableFuture<>();
+ private final CompletableFuture<Resource>
createTaskManagerContainerFuture = new CompletableFuture<>();
+ private final CompletableFuture<Void> stopContainerAsyncFuture
= new CompletableFuture<>();
+ final List<CompletableFuture<Resource>>
addContainerRequestFutures = new ArrayList<>();
+ final AtomicInteger addContainerRequestFuturesNumCompleted =
new AtomicInteger(0);
+ final CompletableFuture<Void> removeContainerRequestFuture =
new CompletableFuture<>();
+ final CompletableFuture<Void> releaseAssignedContainerFuture =
new CompletableFuture<>();
+ final CompletableFuture<Void> startContainerAsyncFuture = new
CompletableFuture<>();
+ final TestingYarnResourceManagerClientFactory
testingYarnResourceManagerClientFactory = new
TestingYarnResourceManagerClientFactory();
+ final TestingYarnNodeManagerClientFactory
testingYarnNodeManagerClientFactory = new TestingYarnNodeManagerClientFactory();
+
+ final TestingYarnNMClientAsync.Builder
testingYarnNMClientAsyncBuilder =
testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsyncBuilder()
+ .setStartContainerAsyncConsumer((ignored1, ignored2,
ignored3) -> startContainerAsyncFuture.complete(null))
+ .setStopContainerAsyncConsumer((ignored1, ignored2,
ignored3) -> stopContainerAsyncFuture.complete(null));
+ final TestingYarnAMRMClientAsync.Builder
testingYarnAMRMClientAsyncBuilder =
testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsyncBuilder()
+ .setAddContainerRequestConsumer((request, handler) -> {
+
createTaskManagerContainerFuture.complete(request.getCapability());
+
testingYarnResourceManagerClientFactory.getCallbackHandler()
+
.onContainersAllocated(Collections.singletonList(testingContainer));
+ })
+ .setGetMatchingRequestsFunction(ignored ->
+
Collections.singletonList(Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(testingResource))))
+ .setRemoveContainerRequestConsumer((request, handler)
-> removeContainerRequestFuture.complete(null))
+ .setReleaseAssignedContainerConsumer((ignored1,
ignored2) -> releaseAssignedContainerFuture.complete(null))
+ .setUnregisterApplicationMasterConsumer((ignore1,
ignore2, ignore3) -> stopAndCleanupClusterFuture.complete(null));
+
+ final Map<String, String> env = new HashMap<>();
+
+ private int containerIdx = 0;
+
+ @Override
+ protected void prepareRunTest() {
+ File root = folder.getRoot();
+ File home = new File(root, "home");
+ env.put(ENV_APP_ID, "foo");
+ env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
+ env.put(ENV_CLIENT_SHIP_FILES, "");
+ env.put(ENV_FLINK_CLASSPATH, "");
+ env.put(ENV_HADOOP_USER_NAME, "foo");
+ env.put(FLINK_DIST_JAR, new YarnLocalResourceDescriptor(
+ "flink.jar",
+ new Path("/tmp/flink.jar"),
+ 0,
+ System.currentTimeMillis(),
+ LocalResourceVisibility.APPLICATION,
+ LocalResourceType.FILE).toString());
+ env.put(ApplicationConstants.Environment.PWD.key(),
home.getAbsolutePath());
+ }
+
+ @Override
+ protected void preparePreviousAttemptWorkers() {
+
testingYarnAMRMClientAsyncBuilder.setRegisterApplicationMasterFunction(
+ (ignored1, ignored2, ignored3) -> new
TestingRegisterApplicationMasterResponse(() ->
Collections.singletonList(testingContainer)));
+ }
+
+ @Override
+ protected void prepareReleaseResource() {
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) -> {
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null);
+
testingYarnResourceManagerClientFactory.getCallbackHandler()
+
.onContainersAllocated(Collections.singletonList(testingContainer));
+ });
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+ }
+
+ @Override
+ protected ResourceManagerDriver<YarnWorkerNode>
createResourceManagerDriver() {
+ return new YarnResourceManagerDriver(
+ flinkConfig,
+ new YarnResourceManagerDriverConfiguration(env,
null, "localhost:9000"),
+ testingYarnResourceManagerClientFactory,
+ testingYarnNodeManagerClientFactory);
+ }
+
+ @Override
+ protected void validateInitialization() {
+
assertNotNull(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync());
+
assertNotNull(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync());
+
assertThat(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync().getServiceState(),
is(Service.STATE.STARTED));
+
assertThat(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync().getServiceState(),
is(Service.STATE.STARTED));
Review comment:
I think it would be better to validate whether `init` and `start` are
called on these clients. Currently, we are relying on that the state is updated
correctly. The problem is that the testing clients override behaviors of real
clients, thus a correct state changes on the testing clients cannot guarantee
the correct state changes on the real clients.
##########
File path:
flink-yarn/src/test/java/org/apache/flink/yarn/YarnResourceManagerDriverTest.java
##########
@@ -0,0 +1,518 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import org.apache.flink.api.common.resources.CPUResource;
+import org.apache.flink.configuration.MemorySize;
+import org.apache.flink.configuration.TaskManagerOptions;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import org.apache.flink.runtime.clusterframework.TaskExecutorProcessSpec;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriver;
+import
org.apache.flink.runtime.resourcemanager.active.ResourceManagerDriverTestBase;
+import
org.apache.flink.runtime.resourcemanager.exceptions.ResourceManagerException;
+import org.apache.flink.runtime.util.HadoopUtils;
+import org.apache.flink.util.ExceptionUtils;
+import org.apache.flink.yarn.configuration.YarnResourceManagerConfiguration;
+
+import org.apache.flink.shaded.guava18.com.google.common.collect.ImmutableList;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.service.Service;
+import org.apache.hadoop.yarn.api.ApplicationConstants;
+import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerState;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.junit.Assume;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.nio.file.Files;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_APP_ID;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_HOME_DIR;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_CLIENT_SHIP_FILES;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_FLINK_CLASSPATH;
+import static org.apache.flink.yarn.YarnConfigKeys.ENV_HADOOP_USER_NAME;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_DIST_JAR;
+import static org.apache.flink.yarn.YarnConfigKeys.FLINK_YARN_FILES;
+import static
org.apache.flink.yarn.YarnResourceManagerDriver.ERROR_MASSAGE_ON_SHUTDOWN_REQUEST;
+import static org.hamcrest.Matchers.is;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotEquals;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertThat;
+
+/**
+ * Tests for {@link YarnResourceManagerDriver}.
+ */
+public class YarnResourceManagerDriverTest extends
ResourceManagerDriverTestBase<YarnWorkerNode> {
+ private static final Resource testingResource =
Resource.newInstance(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES);
+ private static final Container testingContainer =
createTestingContainerWithResource(testingResource, 0);
+ private static final TaskExecutorProcessSpec
testingTaskExecutorProcessSpec =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ofMebiBytes(256),
+ MemorySize.ZERO,
+ MemorySize.ZERO);
+
+ @Rule
+ public TemporaryFolder folder = new TemporaryFolder();
+
+ @Override
+ protected Context createContext() {
+ return new Context();
+ }
+
+ @Test
+ public void testShutdownRequestCausesFatalError() throws Exception {
+ new Context() {{
+ final CompletableFuture<Throwable>
throwableCompletableFuture = new CompletableFuture<>();
+
resourceEventHandlerBuilder.setOnErrorConsumer(throwableCompletableFuture::complete);
+ runTest(() -> {
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onShutdownRequest();
+
+ Throwable throwable =
throwableCompletableFuture.get(TIMEOUT_SEC, TimeUnit.SECONDS);
+
assertThat(ExceptionUtils.findThrowable(throwable,
ResourceManagerException.class).isPresent(), is(true));
+
assertThat(ExceptionUtils.findThrowableWithMessage(throwable,
ERROR_MASSAGE_ON_SHUTDOWN_REQUEST).isPresent(), is(true));
+ });
+ }};
+ }
+
+ /**
+ * Tests that application files are deleted when the YARN application
master is de-registered.
+ */
+ @Test
+ public void testDeleteApplicationFiles() throws Exception {
+ new Context() {{
+ final File applicationDir = folder.newFolder(".flink");
+ env.put(FLINK_YARN_FILES,
applicationDir.getCanonicalPath());
+
+ runTest(() -> {
+
getDriver().deregisterApplication(ApplicationStatus.SUCCEEDED, null);
+ assertFalse("YARN application directory was not
removed", Files.exists(applicationDir.toPath()));
+ });
+ }};
+ }
+
+ @Test
+ public void testOnContainerCompleted() throws Exception {
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+ runTest(() -> {
+ runInMainThread(() ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(removeContainerRequestFuture);
+
verifyFutureCompleted(startContainerAsyncFuture);
+
+ ContainerStatus testingContainerStatus =
createTestingContainerCompletedStatus(testingContainer.getId());
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(ImmutableList.of(testingContainerStatus));
+
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+ });
+ }};
+ }
+
+ @Test
+ public void testOnStartContainerError() throws Exception {
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
+ runTest(() -> {
+ runInMainThread(() ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(testingContainer));
+
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(removeContainerRequestFuture);
+
verifyFutureCompleted(startContainerAsyncFuture);
+
+
testingYarnNodeManagerClientFactory.getCallbackHandler().onStartContainerError(testingContainer.getId(),
new Exception("start error"));
+
verifyFutureCompleted(releaseAssignedContainerFuture);
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+ });
+ }};
+ }
+
+ @Test
+ public void
testStartTaskExecutorProcessVariousSpec_SameContainerResource() throws
Exception{
+ final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+ final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(99),
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ofMebiBytes(100),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+ final String startCommand1 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+ final String startCommand2 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (99L << 20);
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(ignored ->
+ Collections.singletonList(ImmutableList.of(
+
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get()),
+
YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get()))));
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null));
+
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1,
context, ignored2) -> {
+ if (containsStartCommand(context,
startCommand1)) {
+
startContainerAsyncCommandFuture1.complete(null);
+ } else if (containsStartCommand(context,
startCommand2)) {
+
startContainerAsyncCommandFuture2.complete(null);
+ }
+ });
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(taskExecutorProcessSpec1));
+
+ runTest(() -> {
+ final Resource containerResource =
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+ // Make sure two worker resource spec will be
normalized to the same container resource
+ assertEquals(containerResource,
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get());
+
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec1));
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec2));
+
+ // Verify both containers requested
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+ // Mock that both containers are allocated
+ Container container1 =
createTestingContainerWithResource(containerResource);
+ Container container2 =
createTestingContainerWithResource(containerResource);
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(ImmutableList.of(container1,
container2));
+
+ // Verify workers with both spec are started.
+
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+
verifyFutureCompleted(startContainerAsyncCommandFuture2);
+
+ // Mock that one container is completed, while
the worker is still pending
+ ContainerStatus testingContainerStatus =
createTestingContainerCompletedStatus(container1.getId());
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+ // Verify that only one more container is
requested.
+
verifyFutureCompleted(addContainerRequestFutures.get(2));
+
assertFalse(addContainerRequestFutures.get(3).isDone());
+ });
+ }};
+ }
+
+ @Test
+ public void testStartWorkerVariousSpec_DifferentContainerResource()
throws Exception{
+ final TaskExecutorProcessSpec taskExecutorProcessSpec1 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(1),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ofMebiBytes(50),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+ final TaskExecutorProcessSpec taskExecutorProcessSpec2 =
+ new TaskExecutorProcessSpec(
+ new CPUResource(2),
+ MemorySize.ZERO,
+ MemorySize.ZERO,
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ofMebiBytes(500),
+ MemorySize.ZERO,
+ MemorySize.ZERO
+ );
+
+ new Context() {{
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
+ final String startCommand1 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (50L << 20);
+ final String startCommand2 =
TaskManagerOptions.TASK_HEAP_MEMORY.key() + "=" + (100L << 20);
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture1 = new CompletableFuture<>();
+ final CompletableFuture<Void>
startContainerAsyncCommandFuture2 = new CompletableFuture<>();
+
+
testingYarnAMRMClientAsyncBuilder.setGetMatchingRequestsFunction(tuple -> {
+ if (tuple.f2.getVirtualCores() == 1) {
+ return Collections.singletonList(
+
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get())));
+ } else if (tuple.f2.getVirtualCores() == 2) {
+ return Collections.singletonList(
+
Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get())));
+ }
+ return null;
+ });
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((request,
ignored) ->
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(request.getCapability()));
+
testingYarnNMClientAsyncBuilder.setStartContainerAsyncConsumer((ignored1,
context, ignored3) -> {
+ if (containsStartCommand(context,
startCommand1)) {
+
startContainerAsyncCommandFuture1.complete(null);
+ } else if (containsStartCommand(context,
startCommand2)) {
+
startContainerAsyncCommandFuture2.complete(null);
+ }
+ });
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(taskExecutorProcessSpec1));
+
+ runTest(() -> {
+ final Resource containerResource1 =
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec1).get();
+ final Resource containerResource2 =
((YarnResourceManagerDriver)
getDriver()).getContainerResource(taskExecutorProcessSpec2).get();
+ // Make sure two worker resource spec will be
normalized to different container resources
+ assertNotEquals(containerResource1,
containerResource2);
+
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec1));
+ runInMainThread(() ->
getDriver().requestResource(taskExecutorProcessSpec2));
+
+ // Verify both containers requested
+
verifyFutureCompleted(addContainerRequestFutures.get(0));
+
verifyFutureCompleted(addContainerRequestFutures.get(1));
+
+ // Mock that container 1 is allocated
+ Container container1 =
createTestingContainerWithResource(containerResource1);
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersAllocated(Collections.singletonList(container1));
+
+ // Verify that only worker with spec1 is
started.
+
verifyFutureCompleted(startContainerAsyncCommandFuture1);
+
assertFalse(startContainerAsyncCommandFuture2.isDone());
+
+ // Mock that container 1 is completed, while
the worker is still pending
+ ContainerStatus testingContainerStatus =
createTestingContainerCompletedStatus(container1.getId());
+
testingYarnResourceManagerClientFactory.getCallbackHandler().onContainersCompleted(Collections.singletonList(testingContainerStatus));
+
+ // Verify that only container 1 is requested
again
+
verifyFutureCompleted(addContainerRequestFutures.get(2));
+
assertThat(addContainerRequestFutures.get(2).get(), is(containerResource1));
+
assertFalse(addContainerRequestFutures.get(3).isDone());
+ });
+ }};
+ }
+
+ private boolean containsStartCommand(ContainerLaunchContext
containerLaunchContext, String command) {
+ return
containerLaunchContext.getCommands().stream().anyMatch(str ->
str.contains(command));
+ }
+
+ private static Container createTestingContainerWithResource(Resource
resource, int containerIdx) {
+ final ContainerId containerId = ContainerId.newInstance(
+ ApplicationAttemptId.newInstance(
+
ApplicationId.newInstance(System.currentTimeMillis(), 1),
+ 1),
+ containerIdx);
+ final NodeId nodeId = NodeId.newInstance("container", 1234);
+ return new TestingContainer(containerId, nodeId, resource,
Priority.UNDEFINED);
+ }
+
+ private class Context extends
ResourceManagerDriverTestBase<YarnWorkerNode>.Context {
+ private final CompletableFuture<Void>
stopAndCleanupClusterFuture = new CompletableFuture<>();
+ private final CompletableFuture<Resource>
createTaskManagerContainerFuture = new CompletableFuture<>();
+ private final CompletableFuture<Void> stopContainerAsyncFuture
= new CompletableFuture<>();
+ final List<CompletableFuture<Resource>>
addContainerRequestFutures = new ArrayList<>();
+ final AtomicInteger addContainerRequestFuturesNumCompleted =
new AtomicInteger(0);
+ final CompletableFuture<Void> removeContainerRequestFuture =
new CompletableFuture<>();
+ final CompletableFuture<Void> releaseAssignedContainerFuture =
new CompletableFuture<>();
+ final CompletableFuture<Void> startContainerAsyncFuture = new
CompletableFuture<>();
+ final TestingYarnResourceManagerClientFactory
testingYarnResourceManagerClientFactory = new
TestingYarnResourceManagerClientFactory();
+ final TestingYarnNodeManagerClientFactory
testingYarnNodeManagerClientFactory = new TestingYarnNodeManagerClientFactory();
+
+ final TestingYarnNMClientAsync.Builder
testingYarnNMClientAsyncBuilder =
testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsyncBuilder()
+ .setStartContainerAsyncConsumer((ignored1, ignored2,
ignored3) -> startContainerAsyncFuture.complete(null))
+ .setStopContainerAsyncConsumer((ignored1, ignored2,
ignored3) -> stopContainerAsyncFuture.complete(null));
+ final TestingYarnAMRMClientAsync.Builder
testingYarnAMRMClientAsyncBuilder =
testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsyncBuilder()
+ .setAddContainerRequestConsumer((request, handler) -> {
+
createTaskManagerContainerFuture.complete(request.getCapability());
+
testingYarnResourceManagerClientFactory.getCallbackHandler()
+
.onContainersAllocated(Collections.singletonList(testingContainer));
+ })
+ .setGetMatchingRequestsFunction(ignored ->
+
Collections.singletonList(Collections.singletonList(YarnResourceManagerDriver.getContainerRequest(testingResource))))
+ .setRemoveContainerRequestConsumer((request, handler)
-> removeContainerRequestFuture.complete(null))
+ .setReleaseAssignedContainerConsumer((ignored1,
ignored2) -> releaseAssignedContainerFuture.complete(null))
+ .setUnregisterApplicationMasterConsumer((ignore1,
ignore2, ignore3) -> stopAndCleanupClusterFuture.complete(null));
+
+ final Map<String, String> env = new HashMap<>();
+
+ private int containerIdx = 0;
+
+ @Override
+ protected void prepareRunTest() {
+ File root = folder.getRoot();
+ File home = new File(root, "home");
+ env.put(ENV_APP_ID, "foo");
+ env.put(ENV_CLIENT_HOME_DIR, home.getAbsolutePath());
+ env.put(ENV_CLIENT_SHIP_FILES, "");
+ env.put(ENV_FLINK_CLASSPATH, "");
+ env.put(ENV_HADOOP_USER_NAME, "foo");
+ env.put(FLINK_DIST_JAR, new YarnLocalResourceDescriptor(
+ "flink.jar",
+ new Path("/tmp/flink.jar"),
+ 0,
+ System.currentTimeMillis(),
+ LocalResourceVisibility.APPLICATION,
+ LocalResourceType.FILE).toString());
+ env.put(ApplicationConstants.Environment.PWD.key(),
home.getAbsolutePath());
+ }
+
+ @Override
+ protected void preparePreviousAttemptWorkers() {
+
testingYarnAMRMClientAsyncBuilder.setRegisterApplicationMasterFunction(
+ (ignored1, ignored2, ignored3) -> new
TestingRegisterApplicationMasterResponse(() ->
Collections.singletonList(testingContainer)));
+ }
+
+ @Override
+ protected void prepareReleaseResource() {
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+ addContainerRequestFutures.add(new
CompletableFuture<>());
+
testingYarnAMRMClientAsyncBuilder.setAddContainerRequestConsumer((ignored1,
ignored2) -> {
+
addContainerRequestFutures.get(addContainerRequestFuturesNumCompleted.getAndIncrement()).complete(null);
+
testingYarnResourceManagerClientFactory.getCallbackHandler()
+
.onContainersAllocated(Collections.singletonList(testingContainer));
+ });
+
resourceEventHandlerBuilder.setOnWorkerTerminatedConsumer((ignore1, ignore2) ->
getDriver().requestResource(testingTaskExecutorProcessSpec));
+ }
+
+ @Override
+ protected ResourceManagerDriver<YarnWorkerNode>
createResourceManagerDriver() {
+ return new YarnResourceManagerDriver(
+ flinkConfig,
+ new YarnResourceManagerConfiguration(env, null,
"localhost:9000"),
+ testingYarnResourceManagerClientFactory,
+ testingYarnNodeManagerClientFactory);
+ }
+
+ @Override
+ protected void validateInitialization() {
+
assertNotNull(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync());
+
assertNotNull(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync());
+
assertThat(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync().getServiceState(),
is(Service.STATE.STARTED));
+
assertThat(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync().getServiceState(),
is(Service.STATE.STARTED));
+ }
+
+ @Override
+ protected void
validateWorkersRecoveredFromPreviousAttempt(Collection<YarnWorkerNode> workers)
{
+ Assume.assumeTrue(HadoopUtils.isMinHadoopVersion(2, 2));
+ assertThat(workers.size(), is(1));
+
+ final ResourceID resourceId =
workers.iterator().next().getResourceID();
+ assertThat(resourceId.toString(),
is(testingContainer.getId().toString()));
+ }
+
+ @Override
+ protected void validateTermination() {
+
assertThat(testingYarnNodeManagerClientFactory.getTestingYarnNMClientAsync().getServiceState(),
is(Service.STATE.STOPPED));
+
assertThat(testingYarnResourceManagerClientFactory.getTestingYarnAMRMClientAsync().getServiceState(),
is(Service.STATE.STOPPED));
Review comment:
Same here.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]