Repository: flink Updated Branches: refs/heads/master 707606ac4 -> 5fdf39b1f
[FLINK-3927][yarn] make container id consistent across Hadoop versions - introduce a unique container id independent of the Hadoop version - improve printing of exceptions during registration - minor improvements to the Yarn ResourceManager code This closes #2013 Project: http://git-wip-us.apache.org/repos/asf/flink/repo Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/017106e1 Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/017106e1 Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/017106e1 Branch: refs/heads/master Commit: 017106e140f3c17ebaaa0507e1dcbbc445c8f0ac Parents: 707606a Author: Maximilian Michels <[email protected]> Authored: Fri May 20 14:29:12 2016 +0200 Committer: Maximilian Michels <[email protected]> Committed: Mon May 23 12:36:25 2016 +0200 ---------------------------------------------------------------------- .../clusterframework/FlinkResourceManager.java | 4 ++-- .../clusterframework/types/ResourceID.java | 14 +++++++---- .../flink/runtime/jobmanager/JobManager.scala | 4 ++-- .../flink/yarn/RegisteredYarnWorkerNode.java | 13 +++++----- .../flink/yarn/YarnContainerInLaunch.java | 16 +++++++++---- .../flink/yarn/YarnFlinkResourceManager.java | 25 +++++++++++++------- .../flink/yarn/YarnTaskManagerRunner.java | 5 ++-- 7 files changed, 51 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java index a5c354c..8766e15 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java @@ -353,7 +353,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceID> extend ResourceID resourceID = msg.resourceId(); try { Preconditions.checkNotNull(resourceID); - WorkerType newWorker = workerRegistered(msg.resourceId()); + WorkerType newWorker = workerRegistered(resourceID); WorkerType oldWorker = registeredWorkers.put(resourceID, newWorker); if (oldWorker != null) { LOG.warn("Worker {} had been registered before.", resourceID); @@ -363,7 +363,7 @@ public abstract class FlinkResourceManager<WorkerType extends ResourceID> extend self()); } catch (Exception e) { // This may happen on duplicate task manager registration message to the job manager - LOG.warn("TaskManager resource registration failed for {}", resourceID); + LOG.warn("TaskManager resource registration failed for {}", resourceID, e); // tell the JobManager about the failure String eStr = ExceptionUtils.stringifyException(e); http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java index 8e48244..e599456 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.clusterframework.types; import org.apache.flink.util.AbstractID; +import org.apache.flink.util.Preconditions; import java.io.Serializable; @@ -32,10 +33,15 @@ public class ResourceID implements Serializable { private final String resourceId; public ResourceID(String resourceId) { + Preconditions.checkNotNull(resourceId, "ResourceID must not be null"); this.resourceId = resourceId; } - public String getResourceId() { + /** + * Gets the Resource Id as string + * @return Stringified version of the ResourceID + */ + public final String getResourceIdString() { return resourceId; } @@ -48,10 +54,10 @@ public class ResourceID implements Serializable { } @Override - public boolean equals(Object o) { + public final boolean equals(Object o) { if (this == o) { return true; - } else if (o == null || getClass() != o.getClass()) { + } else if (o == null || !(o instanceof ResourceID)) { return false; } else { return resourceId.equals(((ResourceID) o).resourceId); @@ -59,7 +65,7 @@ public class ResourceID implements Serializable { } @Override - public int hashCode() { + public final int hashCode() { return resourceId.hashCode(); } http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala ---------------------------------------------------------------------- diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index 3c633f3..540957d 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -440,8 +440,8 @@ class JobManager( val taskManager = msg.getTaskManager val resourceId = msg.getResourceID - log.warn(s"TaskManager's resource id $resourceId is not registered with ResourceManager. " + - s"Refusing registration.") + log.warn(s"TaskManager's resource id $resourceId failed to register at ResourceManager. " + + s"Refusing registration because of\n${msg.getMessage}.") taskManager ! decorateMessage( RefuseRegistration(new IllegalStateException( http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java index 1fdb32c..a6c094d 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java @@ -24,22 +24,23 @@ import org.apache.hadoop.yarn.api.records.Container; import static java.util.Objects.requireNonNull; +/** + * A representation of a registered Yarn container managed by the {@link YarnFlinkResourceManager}. + */ public class RegisteredYarnWorkerNode extends ResourceID { - + /** The container on which the worker runs */ private final Container yarnContainer; - public RegisteredYarnWorkerNode( - ResourceID resourceId, Container yarnContainer) - { - super(resourceId.getResourceId()); + public RegisteredYarnWorkerNode(Container yarnContainer) { + super(yarnContainer.getId().toString()); this.yarnContainer = requireNonNull(yarnContainer); } public Container yarnContainer() { return yarnContainer; } - + // ------------------------------------------------------------------------ @Override http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java index 87020db..03c5b3a 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java @@ -18,6 +18,7 @@ package org.apache.flink.yarn; +import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.hadoop.yarn.api.records.Container; import static java.util.Objects.requireNonNull; @@ -26,17 +27,22 @@ import static java.util.Objects.requireNonNull; * This class describes a container in which a TaskManager is being launched (or * has been launched) but where the TaskManager has not properly registered, yet. */ -public class YarnContainerInLaunch { - +public class YarnContainerInLaunch extends ResourceID { + private final Container container; - + private final long timestamp; - + + public YarnContainerInLaunch(Container container) { + this(container, System.currentTimeMillis()); + } + public YarnContainerInLaunch(Container container, long timestamp) { + super(container.getId().toString()); this.container = requireNonNull(container); this.timestamp = timestamp; } - + // ------------------------------------------------------------------------ public Container container() { http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java index 4a21d5c..71fc371 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java @@ -73,6 +73,10 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar /** The default heartbeat interval during regular operation */ private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000; + /** Environment variable name of the final container id used by the Flink ResourceManager. + * Container ID generation may vary across Hadoop versions. */ + final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID"; + /** The containers where a TaskManager is starting and we are waiting for it to register */ private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch; @@ -210,7 +214,8 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar final long now = System.currentTimeMillis(); for (Container c : containersFromPreviousAttempts) { - containersInLaunch.put(new ResourceID(c.getId().toString()), new YarnContainerInLaunch(c, now)); + YarnContainerInLaunch containerInLaunch = new YarnContainerInLaunch(c, now); + containersInLaunch.put(containerInLaunch, containerInLaunch); } // adjust the progress indicator @@ -332,7 +337,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar if (inLaunch == null) { throw new Exception("Cannot register Worker - unknown resource id " + resourceID); } else { - return new RegisteredYarnWorkerNode(resourceID, inLaunch.container()); + return new RegisteredYarnWorkerNode(inLaunch.container()); } } @@ -346,7 +351,7 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar if (yci != null) { LOG.info("YARN container consolidation recognizes Resource {} ", resourceID); - accepted.add(new RegisteredYarnWorkerNode(resourceID, yci.container())); + accepted.add(new RegisteredYarnWorkerNode(yci.container())); } else { LOG.info("YARN container consolidation does not recognize TaskManager {}", @@ -382,24 +387,26 @@ public class YarnFlinkResourceManager extends FlinkResourceManager<RegisteredYar // decide whether to return the container, or whether to start a TaskManager if (numRegistered + containersInLaunch.size() < numRequired) { // start a TaskManager - final ResourceID containerIdString = new ResourceID(container.getId().toString()); - final long now = System.currentTimeMillis(); - containersInLaunch.put(containerIdString, new YarnContainerInLaunch(container, now)); + final YarnContainerInLaunch containerInLaunch = new YarnContainerInLaunch(container); + containersInLaunch.put(containerInLaunch, containerInLaunch); - String message = "Launching TaskManager in container " + containerIdString + String message = "Launching TaskManager in container " + containerInLaunch + " on host " + container.getNodeId().getHost(); LOG.info(message); sendInfoMessage(message); try { + // set a special environment variable to uniquely identify this container + taskManagerLaunchContext.getEnvironment() + .put(ENV_FLINK_CONTAINER_ID, containerInLaunch.getResourceIdString()); nodeManagerClient.startContainer(container, taskManagerLaunchContext); } catch (Throwable t) { // failed to launch the container - containersInLaunch.remove(containerIdString); + containersInLaunch.remove(containerInLaunch); // return container, a new one will be requested eventually - LOG.error("Could not start TaskManager in container " + containerIdString, t); + LOG.error("Could not start TaskManager in container " + containerInLaunch, t); containersBeingReturned.put(container.getId(), container); resourceManagerClient.releaseAssignedContainer(container.getId()); } http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java ---------------------------------------------------------------------- diff --git a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java index dba81de..6839bb5 100644 --- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java +++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java @@ -21,7 +21,6 @@ package org.apache.flink.yarn; import java.io.IOException; import java.security.PrivilegedAction; import java.util.Map; -import java.util.Objects; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.configuration.Configuration; @@ -29,6 +28,7 @@ import org.apache.flink.runtime.clusterframework.types.ResourceID; import org.apache.flink.runtime.taskmanager.TaskManager; import org.apache.flink.runtime.util.EnvironmentInformation; +import org.apache.flink.util.Preconditions; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; @@ -87,8 +87,9 @@ public class YarnTaskManagerRunner { } // Infer the resource identifier from the environment variable - String containerID = Objects.requireNonNull(System.getenv(Environment.CONTAINER_ID.key())); + String containerID = Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID)); final ResourceID resourceId = new ResourceID(containerID); + LOG.info("ResourceID assigned for this container: {}", resourceId); ugi.doAs(new PrivilegedAction<Object>() { @Override
