[
https://issues.apache.org/jira/browse/FLINK-3544?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15206832#comment-15206832
]
ASF GitHub Bot commented on FLINK-3544:
---------------------------------------
Github user mxm commented on a diff in the pull request:
https://github.com/apache/flink/pull/1741#discussion_r57030485
--- Diff:
flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---
@@ -0,0 +1,647 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.yarn;
+
+import akka.actor.ActorRef;
+import akka.actor.Props;
+
+import org.apache.flink.configuration.ConfigConstants;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.runtime.akka.AkkaUtils;
+import org.apache.flink.runtime.clusterframework.FlinkResourceManager;
+import org.apache.flink.runtime.clusterframework.ApplicationStatus;
+import
org.apache.flink.runtime.clusterframework.ContaineredTaskManagerParameters;
+import org.apache.flink.runtime.clusterframework.messages.StopCluster;
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
+import org.apache.flink.runtime.instance.ActorGateway;
+import org.apache.flink.runtime.instance.AkkaActorGateway;
+import org.apache.flink.runtime.leaderretrieval.LeaderRetrievalService;
+import org.apache.flink.yarn.messages.ContainersAllocated;
+import org.apache.flink.yarn.messages.ContainersComplete;
+
+import
org.apache.hadoop.yarn.api.protocolrecords.RegisterApplicationMasterResponse;
+import org.apache.hadoop.yarn.api.records.Container;
+import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
+import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.FinalApplicationStatus;
+import org.apache.hadoop.yarn.api.records.Priority;
+import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.client.api.AMRMClient;
+import org.apache.hadoop.yarn.client.api.NMClient;
+import org.apache.hadoop.yarn.client.api.async.AMRMClientAsync;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+
+import org.slf4j.Logger;
+
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static java.util.Objects.requireNonNull;
+
+/**
+ * Specialized Flink Resource Manager implementation for YARN clusters. It
is started as the
+ * YARN ApplicationMaster and implements the YARN-specific logic for
container requests and failure
+ * monitoring.
+ */
+public class YarnFlinkResourceManager extends
FlinkResourceManager<RegisteredYarnWorkerNode> {
+
+ /** The heartbeat interval while the resource master is waiting for
containers */
+ private static final int FAST_YARN_HEARTBEAT_INTERVAL_MS = 500;
+
+ /** The default heartbeat interval during regular operation */
+ private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
+
+ /** The containers where a TaskManager is starting and we are waiting
for it to register */
+ private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;
+
+ /** Containers we have released, where we are waiting for an
acknowledgement that
+ * they are released */
+ private final Map<ContainerId, Container> containersBeingReturned;
+
+ /** The YARN / Hadoop configuration object */
+ private final YarnConfiguration yarnConfig;
+
+ /** The TaskManager container parameters (like container memory size) */
+ private final ContaineredTaskManagerParameters taskManagerParameters;
+
+ /** Context information used to start a TaskManager Java process */
+ private final ContainerLaunchContext taskManagerLaunchContext;
+
+ /** Host name for the container running this process */
+ private final String applicationMasterHostName;
+
+ /** Web interface URL, may be null */
+ private final String webInterfaceURL;
+
+ /** Default heartbeat interval between this actor and the YARN resource
manager */
+ private final int yarnHeartbeatIntervalMillis;
+
+ /** Number of failed TaskManager containers before stopping the
application. -1 means infinite. */
+ private final int maxFailedContainers;
+
+ /** Callback handler for the asynchronous resourceManagerClient */
+ private YarnResourceManagerCallbackHandler
resourceManagerCallbackHandler;
+
+ /** Client to communicate with the Resource Manager (YARN's master) */
+ private AMRMClientAsync<AMRMClient.ContainerRequest>
resourceManagerClient;
+
+ /** Client to communicate with the Node manager and launch TaskManager
processes */
+ private NMClient nodeManagerClient;
+
+ /** The number of containers requested, but not yet granted */
+ private int numPendingContainerRequests;
+
+ /** The number of failed containers since the master became active */
+ private int failedContainersSoFar;
+
+
+ public YarnFlinkResourceManager(
+ Configuration flinkConfig,
+ YarnConfiguration yarnConfig,
+ LeaderRetrievalService leaderRetrievalService,
+ String applicationMasterHostName,
+ String webInterfaceURL,
+ ContaineredTaskManagerParameters taskManagerParameters,
+ ContainerLaunchContext taskManagerLaunchContext,
+ int yarnHeartbeatIntervalMillis,
+ int maxFailedContainers,
+ int numInitialTaskManagers) {
+
+ super(numInitialTaskManagers, flinkConfig,
leaderRetrievalService);
+
+ this.yarnConfig = requireNonNull(yarnConfig);
+ this.taskManagerParameters =
requireNonNull(taskManagerParameters);
+ this.taskManagerLaunchContext =
requireNonNull(taskManagerLaunchContext);
+ this.applicationMasterHostName =
requireNonNull(applicationMasterHostName);
+ this.webInterfaceURL = webInterfaceURL;
+ this.yarnHeartbeatIntervalMillis = yarnHeartbeatIntervalMillis;
+ this.maxFailedContainers = maxFailedContainers;
+
+ this.containersInLaunch = new HashMap<>();
+ this.containersBeingReturned = new HashMap<>();
+ }
+
+ //
------------------------------------------------------------------------
+ // Actor messages
+ //
------------------------------------------------------------------------
+
+ @Override
+ protected void handleMessage(Object message) {
+
+ // check for YARN specific actor messages first
+
+ if (message instanceof ContainersAllocated) {
+ containersAllocated(((ContainersAllocated)
message).containers());
+
+ } else if (message instanceof ContainersComplete) {
+ containersComplete(((ContainersComplete)
message).containers());
+
+ } else {
+ // message handled by the generic resource master code
+ super.handleMessage(message);
+ }
+ }
+
+ //
------------------------------------------------------------------------
+ // YARN specific behavior
+ //
------------------------------------------------------------------------
+
+ @Override
+ protected void initialize() throws Exception {
+ LOG.info("Initializing YARN resource master");
+
+ // create the client to communicate with the resource manager
+ ActorGateway selfGateway = new AkkaActorGateway(self(),
getLeaderSessionID());
+ resourceManagerCallbackHandler = new
YarnResourceManagerCallbackHandler(selfGateway);
+
+ resourceManagerClient = AMRMClientAsync.createAMRMClientAsync(
+ yarnHeartbeatIntervalMillis,
resourceManagerCallbackHandler);
+ resourceManagerClient.init(yarnConfig);
+ resourceManagerClient.start();
+
+ // create the client to communicate with the node managers
+ nodeManagerClient = NMClient.createNMClient();
+ nodeManagerClient.init(yarnConfig);
+ nodeManagerClient.start();
+ nodeManagerClient.cleanupRunningContainersOnStop(true);
+
+ // register with Resource Manager
+ LOG.info("Registering Application Master with tracking url {}",
webInterfaceURL);
+
+ scala.Option<Object> portOption =
AkkaUtils.getAddress(getContext().system()).port();
+ int actorSystemPort = portOption.isDefined() ? (int)
portOption.get() : -1;
+
+ RegisterApplicationMasterResponse response =
resourceManagerClient.registerApplicationMaster(
+ applicationMasterHostName, actorSystemPort,
webInterfaceURL);
+
+ // if this application master starts as part of an
ApplicationMaster/JobManager recovery,
+ // then some worker containers are most likely still alive and
we can re-obtain them
+ List<Container> containersFromPreviousAttempts =
getContainersFromPreviousAttempts(response);
+
+ if (!containersFromPreviousAttempts.isEmpty()) {
+ LOG.info("Retrieved {} TaskManagers from previous
attempt", containersFromPreviousAttempts.size());
+
+ final long now = System.currentTimeMillis();
+ for (Container c : containersFromPreviousAttempts) {
+ containersInLaunch.put(new
ResourceID(c.getId().toString()), new YarnContainerInLaunch(c, now));
+ }
+
+ // adjust the progress indicator
+ updateProgress();
+ }
+ }
+
+ @Override
+ protected void leaderUpdated() {
+ AkkaActorGateway newGateway = new AkkaActorGateway(self(),
getLeaderSessionID());
+
resourceManagerCallbackHandler.setCurrentLeaderGateway(newGateway);
+ }
+
+ @Override
+ protected void shutdownApplication(ApplicationStatus finalStatus,
String optionalDiagnostics) {
+ // first, de-register from YARN
+ FinalApplicationStatus yarnStatus = getYarnStatus(finalStatus);
+ LOG.info("Unregistering application from the YARN Resource
Manager");
+ try {
+
resourceManagerClient.unregisterApplicationMaster(yarnStatus,
optionalDiagnostics, "");
+ } catch (Throwable t) {
+ LOG.error("Could not unregister the application
master.", t);
+ }
+
+ // now shut down all our components
+ try {
+ resourceManagerClient.stop();
+ } catch (Throwable t) {
+ LOG.error("Could not cleanly shut down the Asynchronous
Resource Manager Client", t);
+ }
+ try {
+ nodeManagerClient.stop();
+ } catch (Throwable t) {
+ LOG.error("Could not cleanly shut down the Node Manager
Client", t);
+ }
+ }
+
+ @Override
+ protected void fatalError(String message, Throwable error) {
+ // we do not unregister, but cause a hard fail of this process,
to have it
+ // restarted by YARN
+ LOG.error("FATAL ERROR IN YARN APPLICATION MASTER: " + message,
error);
+ LOG.error("Shutting down process");
+
+ // kill this process, this will make YARN restart the process
+ System.exit(EXIT_CODE_FATAL_ERROR);
+ }
+
+ @Override
+ protected void requestNewWorkers(int numWorkers) {
+ final long mem =
taskManagerParameters.taskManagerTotalMemoryMB();
+ final int containerMemorySizeMB;
+
+ if (mem <= Integer.MAX_VALUE) {
+ containerMemorySizeMB = (int) mem;
+ } else {
+ containerMemorySizeMB = Integer.MAX_VALUE;
+ LOG.error("Decreasing container size from {} MB to {}
MB (integer value overflow)",
+ mem, containerMemorySizeMB);
+ }
+
+ for (int i = 0; i < numWorkers; i++) {
+ numPendingContainerRequests++;
+ LOG.info("Requesting new TaskManager container with {}
megabytes memory. Pending requests: {}",
+ containerMemorySizeMB,
numPendingContainerRequests);
+
+ // Priority for worker containers - priorities are
intra-application
+ Priority priority = Priority.newInstance(0);
+
+ // Resource requirements for worker containers
+ int taskManagerSlots =
Integer.valueOf(System.getenv(FlinkYarnClientBase.ENV_SLOTS));
+ int vcores =
config.getInteger(ConfigConstants.YARN_VCORES, Math.max(taskManagerSlots, 1));
+ Resource capability =
Resource.newInstance(containerMemorySizeMB, vcores);
+
+ resourceManagerClient.addContainerRequest(
+ new AMRMClient.ContainerRequest(capability,
null, null, priority));
+ }
+
+ // make sure we transmit the request fast and receive fast news
of granted allocations
+
resourceManagerClient.setHeartbeatInterval(FAST_YARN_HEARTBEAT_INTERVAL_MS);
+ }
+
+ @Override
+ protected void releasePendingWorker(ResourceID id) {
+ YarnContainerInLaunch container = containersInLaunch.remove(id);
+ if (container != null) {
+ releaseYarnContainer(container.container());
+ } else {
+ LOG.error("Cannot find container {} to release.
Ignoring request.", id);
+ }
+ }
+
+ @Override
+ protected void releaseRegisteredWorker(RegisteredYarnWorkerNode worker)
{
+ releaseYarnContainer(worker.yarnContainer());
+ }
+
+ private void releaseYarnContainer(Container container) {
+ LOG.info("Releasing YARN container {}", container.getId());
+
+ containersBeingReturned.put(container.getId(), container);
+
+ // release the container on the node manager
+ try {
+ nodeManagerClient.stopContainer(container.getId(),
container.getNodeId());
+ } catch (Throwable t) {
+ // we only log this error. since the resource manager
also gets the release
+ // notification, the container should be eventually
cleaned up
+ LOG.error("Error while calling YARN Node Manager to
release container", t);
+ }
+
+ // tell the master that the container is no longer needed
+
resourceManagerClient.releaseAssignedContainer(container.getId());
+ }
+
+ @Override
+ protected RegisteredYarnWorkerNode workerRegistered(ResourceID
resourceID) throws Exception {
+ YarnContainerInLaunch inLaunch =
containersInLaunch.remove(resourceID);
+ if (inLaunch == null) {
+ throw new Exception("Cannot register Worker - unknown
resource id " + resourceID);
+ } else {
+ return new RegisteredYarnWorkerNode(resourceID,
inLaunch.container());
+ }
+ }
+
+ @Override
+ protected Collection<RegisteredYarnWorkerNode>
reacceptRegisteredWorkers(Collection<ResourceID> toConsolidate) {
+ // we check for each task manager if we recognize its container
+ List<RegisteredYarnWorkerNode> accepted = new ArrayList<>();
+ for (ResourceID resourceID : toConsolidate) {
+ YarnContainerInLaunch yci =
containersInLaunch.remove(resourceID);
+
+ if (yci != null) {
+ LOG.info("YARN container consolidation
recognizes Resource {} ", resourceID);
+
+ accepted.add(new
RegisteredYarnWorkerNode(resourceID, yci.container()));
+ }
+ else {
+ LOG.info("YARN container consolidation does not
recognize TaskManager {}",
+ resourceID);
+ }
+ }
+ return accepted;
+ }
+
+ @Override
+ protected int getNumWorkerRequestsPending() {
+ return numPendingContainerRequests;
+ }
+
+ @Override
+ protected int getNumWorkersPendingRegistration() {
+ return containersInLaunch.size();
+ }
+
+ //
------------------------------------------------------------------------
+ // Callbacks from the YARN Resource Manager
+ //
------------------------------------------------------------------------
+
+ private void containersAllocated(List<Container> containers) {
+ final int numRequired = getDesignatedWorkerPoolSize();
+ final int numRegistered = getNumberOfRegisteredTaskManagers();
+
+ for (Container container : containers) {
+ numPendingContainerRequests = Math.max(0,
numPendingContainerRequests - 1);
+ LOG.info("Received new container: {} - Remaining
pending container requests: {}",
+ container.getId(), numPendingContainerRequests);
+
+ // decide whether to return the container, or whether
to start a TaskManager
+ if (numRegistered + containersInLaunch.size() <
numRequired) {
+ // start a TaskManager
+ final ResourceID containerIdString = new
ResourceID(container.getId().toString());
+ final long now = System.currentTimeMillis();
+ containersInLaunch.put(containerIdString, new
YarnContainerInLaunch(container, now));
+
+ String message = "Launching TaskManager in
container " + containerIdString
+ + " on host " +
container.getNodeId().getHost();
+ LOG.info(message);
+ sendInfoMessage(message);
+
+ try {
+
nodeManagerClient.startContainer(container, taskManagerLaunchContext);
+ }
+ catch (Throwable t) {
+ // failed to launch the container
+
containersInLaunch.remove(containerIdString);
+
+ // return container, a new one will be
requested eventually
+ LOG.error("Could not start TaskManager
in container " + containerIdString, t);
+
containersBeingReturned.put(container.getId(), container);
+
resourceManagerClient.releaseAssignedContainer(container.getId());
+ }
+ } else {
+ // return excessive container
+ LOG.info("Returning excess container {}",
container.getId());
+ containersBeingReturned.put(container.getId(),
container);
+
resourceManagerClient.releaseAssignedContainer(container.getId());
+ }
+ }
+
+ updateProgress();
+
+ // if we are waiting for no further containers, we can go to the
+ // regular heartbeat interval
+ if (numPendingContainerRequests <= 0) {
+
resourceManagerClient.setHeartbeatInterval(yarnHeartbeatIntervalMillis);
+ }
+
+ // make sure we re-check the status of workers / containers one
more time at least,
+ // in case some containers did not come up properly
+ triggerCheckWorkers();
+ }
+
+ /**
+ * Invoked when the resource manager informs of completed containers.
+ * Called via an actor message by the callback from the resource
manager client.
+ *
+ * @param containers The containers that have completed.
+ */
+ private void containersComplete(List<ContainerStatus> containers) {
+ // the list contains both failed containers, as well as
containers that
+ // were gracefully returned by this application master
+
+ for (ContainerStatus status : containers) {
+ final ResourceID id = new
ResourceID(status.getContainerId().toString());
+
+ // check if this is a failed container or a completed
container
+ if
(containersBeingReturned.remove(status.getContainerId()) != null) {
+ // regular completed container that we released
+ LOG.info("Container {} completed successfully
with diagnostics: {}",
+ id, status.getDiagnostics());
+ } else {
+ // failed container, either at startup, or
running
+ final String exitStatus;
+ switch (status.getExitStatus()) {
+ case -103:
+ exitStatus = "Vmem limit
exceeded (-103)";
+ break;
+ case -104:
+ exitStatus = "Pmem limit
exceeded (-104)";
+ break;
+ default:
+ exitStatus =
String.valueOf(status.getExitStatus());
+ }
+
+ final YarnContainerInLaunch launched =
containersInLaunch.remove(id);
+ if (launched != null) {
+ LOG.info("Container {} failed, with a
TaskManager in launch or registration. " +
+ "Exit status: {}", id,
exitStatus);
+ // we will trigger re-acquiring new
containers at the end
+ } else {
+ // failed registered worker
+ LOG.info("Container {} failed. Exit
status: {}", id, exitStatus);
+
+ // notify the generic logic, which
notifies the JobManager, etc.
+ notifyWorkerFailed(id, "Container " +
id + " failed. " + "Exit status: {}" + exitStatus);
+ }
+
+ // general failure logging
+ failedContainersSoFar++;
+
+ String diagMessage = String.format("Diagnostics
for container %s in state %s : " +
+ "exitStatus=%s diagnostics=%s",
+ id, status.getState(), exitStatus,
status.getDiagnostics());
+ sendInfoMessage(diagMessage);
+
+ LOG.info(diagMessage);
+ LOG.info("Total number of failed containers so
far: " + failedContainersSoFar);
+
+ // maxFailedContainers == -1 is infinite number
of retries.
+ if (maxFailedContainers >= 0 &&
failedContainersSoFar > maxFailedContainers) {
+ String msg = "Stopping YARN session
because the number of failed containers ("
+ + failedContainersSoFar + ")
exceeded the maximum failed containers ("
+ + maxFailedContainers + ").
This number is controlled by the '"
+ +
ConfigConstants.YARN_MAX_FAILED_CONTAINERS + "' configuration setting. "
+ + "By default its the number of
requested containers.";
+
+ LOG.error(msg);
+ self().tell(decorateMessage(new
StopCluster(ApplicationStatus.FAILED, msg)),
+ ActorRef.noSender());
+
+ // no need to do anything else
+ return;
+ }
+ }
+ }
+
+ updateProgress();
+
+ // in case failed containers were among the finished
containers, make
+ // sure we re-examine and request new ones
+ triggerCheckWorkers();
+ }
+
+ //
------------------------------------------------------------------------
+ // Utilities
+ //
------------------------------------------------------------------------
+
+ private void updateProgress() {
+ final int required = getDesignatedWorkerPoolSize();
+ final int available = getNumberOfRegisteredTaskManagers() +
containersInLaunch.size();
+ final float progress = (required <= 0) ? 1.0f : available /
(float) required;
+
+ if (resourceManagerCallbackHandler != null) {
+
resourceManagerCallbackHandler.setCurrentProgress(progress);
+ }
+ }
+
+ /**
+ * Converts a Flink application status enum to a YARN application
status enum.
+ * @param status The Flink application status.
+ * @return The corresponding YARN application status.
+ */
+ private FinalApplicationStatus getYarnStatus(ApplicationStatus status) {
+ if (status == null) {
+ return FinalApplicationStatus.UNDEFINED;
+ }
+ else {
+ switch (status) {
+ case SUCCEEDED:
+ return FinalApplicationStatus.SUCCEEDED;
+ case FAILED:
+ return FinalApplicationStatus.FAILED;
+ case CANCELED:
+ return FinalApplicationStatus.KILLED;
+ default:
+ return FinalApplicationStatus.UNDEFINED;
+ }
+ }
+ }
+
+ /**
+ * Checks if a YARN application still has registered containers. If the
application master
+ * registered at the resource manager for the first time, this list
will be empty. If the
+ * application master registered a repeated time (after a failure and
recovery), this list
+ * will contain the containers that were previously allocated.
+ *
+ * @param response The response object from the registration at the
ResourceManager.
+ * @return A list with containers from previous application attempt.
+ */
+ private List<Container>
getContainersFromPreviousAttempts(RegisterApplicationMasterResponse response) {
+ try {
+ Method m = RegisterApplicationMasterResponse.class
+ .getMethod("getContainersFromPreviousAttempts");
--- End diff --
It changes because everything is Java now. Before, the
RegisterApplicationMasterResponseReflector was in YarnJobManager.scala.
I tend to think this is premature optimization, but I'll add it again in
Java because we already had it before.
> ResourceManager runtime components
> ----------------------------------
>
> Key: FLINK-3544
> URL: https://issues.apache.org/jira/browse/FLINK-3544
> Project: Flink
> Issue Type: Sub-task
> Components: ResourceManager
> Affects Versions: 1.1.0
> Reporter: Maximilian Michels
> Assignee: Maximilian Michels
> Fix For: 1.1.0
>
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)