Repository: flink
Updated Branches:
  refs/heads/master 707606ac4 -> 5fdf39b1f


[FLINK-3927][yarn] make container id consistent across Hadoop versions

- introduce a unique container id independent of the Hadoop version
- improve printing of exceptions during registration
- minor improvements to the Yarn ResourceManager code

This closes #2013


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/017106e1
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/017106e1
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/017106e1

Branch: refs/heads/master
Commit: 017106e140f3c17ebaaa0507e1dcbbc445c8f0ac
Parents: 707606a
Author: Maximilian Michels <[email protected]>
Authored: Fri May 20 14:29:12 2016 +0200
Committer: Maximilian Michels <[email protected]>
Committed: Mon May 23 12:36:25 2016 +0200

----------------------------------------------------------------------
 .../clusterframework/FlinkResourceManager.java  |  4 ++--
 .../clusterframework/types/ResourceID.java      | 14 +++++++----
 .../flink/runtime/jobmanager/JobManager.scala   |  4 ++--
 .../flink/yarn/RegisteredYarnWorkerNode.java    | 13 +++++-----
 .../flink/yarn/YarnContainerInLaunch.java       | 16 +++++++++----
 .../flink/yarn/YarnFlinkResourceManager.java    | 25 +++++++++++++-------
 .../flink/yarn/YarnTaskManagerRunner.java       |  5 ++--
 7 files changed, 51 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
index a5c354c..8766e15 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/FlinkResourceManager.java
@@ -353,7 +353,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceID> extend
                ResourceID resourceID = msg.resourceId();
                try {
                        Preconditions.checkNotNull(resourceID);
-                       WorkerType newWorker = 
workerRegistered(msg.resourceId());
+                       WorkerType newWorker = workerRegistered(resourceID);
                        WorkerType oldWorker = 
registeredWorkers.put(resourceID, newWorker);
                        if (oldWorker != null) {
                                LOG.warn("Worker {} had been registered 
before.", resourceID);
@@ -363,7 +363,7 @@ public abstract class FlinkResourceManager<WorkerType 
extends ResourceID> extend
                                self());
                } catch (Exception e) {
                        // This may happen on duplicate task manager 
registration message to the job manager
-                       LOG.warn("TaskManager resource registration failed for 
{}", resourceID);
+                       LOG.warn("TaskManager resource registration failed for 
{}", resourceID, e);
 
                        // tell the JobManager about the failure
                        String eStr = ExceptionUtils.stringifyException(e);

http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
index 8e48244..e599456 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/clusterframework/types/ResourceID.java
@@ -19,6 +19,7 @@
 package org.apache.flink.runtime.clusterframework.types;
 
 import org.apache.flink.util.AbstractID;
+import org.apache.flink.util.Preconditions;
 
 import java.io.Serializable;
 
@@ -32,10 +33,15 @@ public class ResourceID implements Serializable {
        private final String resourceId;
 
        public ResourceID(String resourceId) {
+               Preconditions.checkNotNull(resourceId, "ResourceID must not be 
null");
                this.resourceId = resourceId;
        }
 
-       public String getResourceId() {
+       /**
+        * Gets the Resource Id as string
+        * @return Stringified version of the ResourceID
+        */
+       public final String getResourceIdString() {
                return resourceId;
        }
 
@@ -48,10 +54,10 @@ public class ResourceID implements Serializable {
        }
 
        @Override
-       public boolean equals(Object o) {
+       public final boolean equals(Object o) {
                if (this == o) {
                        return true;
-               } else if (o == null || getClass() != o.getClass()) {
+               } else if (o == null || !(o instanceof ResourceID)) {
                        return false;
                } else {
                        return resourceId.equals(((ResourceID) o).resourceId);
@@ -59,7 +65,7 @@ public class ResourceID implements Serializable {
        }
 
        @Override
-       public int hashCode() {
+       public final int hashCode() {
                return resourceId.hashCode();
        }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
index 3c633f3..540957d 100644
--- 
a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
+++ 
b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala
@@ -440,8 +440,8 @@ class JobManager(
 
       val taskManager = msg.getTaskManager
       val resourceId = msg.getResourceID
-      log.warn(s"TaskManager's resource id $resourceId is not registered with 
ResourceManager. " +
-        s"Refusing registration.")
+      log.warn(s"TaskManager's resource id $resourceId failed to register at 
ResourceManager. " +
+        s"Refusing registration because of\n${msg.getMessage}.")
 
       taskManager ! decorateMessage(
         RefuseRegistration(new IllegalStateException(

http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
index 1fdb32c..a6c094d 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/RegisteredYarnWorkerNode.java
@@ -24,22 +24,23 @@ import org.apache.hadoop.yarn.api.records.Container;
 
 import static java.util.Objects.requireNonNull;
 
+/**
+ * A representation of a registered Yarn container managed by the {@link 
YarnFlinkResourceManager}.
+ */
 public class RegisteredYarnWorkerNode extends ResourceID {
-       
+
        /** The container on which the worker runs */
        private final Container yarnContainer;
 
-       public RegisteredYarnWorkerNode(
-               ResourceID resourceId, Container yarnContainer)
-       {
-               super(resourceId.getResourceId());
+       public RegisteredYarnWorkerNode(Container yarnContainer) {
+               super(yarnContainer.getId().toString());
                this.yarnContainer = requireNonNull(yarnContainer);
        }
 
        public Container yarnContainer() {
                return yarnContainer;
        }
-       
+
        // 
------------------------------------------------------------------------
 
        @Override

http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
index 87020db..03c5b3a 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnContainerInLaunch.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.yarn;
 
+import org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.hadoop.yarn.api.records.Container;
 
 import static java.util.Objects.requireNonNull;
@@ -26,17 +27,22 @@ import static java.util.Objects.requireNonNull;
  * This class describes a container in which a TaskManager is being launched 
(or
  * has been launched) but where the TaskManager has not properly registered, 
yet.
  */
-public class YarnContainerInLaunch {
-       
+public class YarnContainerInLaunch extends ResourceID {
+
        private final Container container;
-       
+
        private final long timestamp;
-       
+
+       public YarnContainerInLaunch(Container container) {
+               this(container, System.currentTimeMillis());
+       }
+
        public YarnContainerInLaunch(Container container, long timestamp) {
+               super(container.getId().toString());
                this.container = requireNonNull(container);
                this.timestamp = timestamp;
        }
-       
+
        // 
------------------------------------------------------------------------
 
        public Container container() {

http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
index 4a21d5c..71fc371 100644
--- 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
+++ 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnFlinkResourceManager.java
@@ -73,6 +73,10 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
        /** The default heartbeat interval during regular operation */
        private static final int DEFAULT_YARN_HEARTBEAT_INTERVAL_MS = 5000;
 
+       /** Environment variable name of the final container id used by the 
Flink ResourceManager.
+        * Container ID generation may vary across Hadoop versions. */
+       final static String ENV_FLINK_CONTAINER_ID = "_FLINK_CONTAINER_ID";
+
        /** The containers where a TaskManager is starting and we are waiting 
for it to register */
        private final Map<ResourceID, YarnContainerInLaunch> containersInLaunch;
 
@@ -210,7 +214,8 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
 
                        final long now = System.currentTimeMillis();
                        for (Container c : containersFromPreviousAttempts) {
-                               containersInLaunch.put(new 
ResourceID(c.getId().toString()), new YarnContainerInLaunch(c, now));
+                               YarnContainerInLaunch containerInLaunch = new 
YarnContainerInLaunch(c, now);
+                               containersInLaunch.put(containerInLaunch, 
containerInLaunch);
                        }
 
                        // adjust the progress indicator
@@ -332,7 +337,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                if (inLaunch == null) {
                        throw new Exception("Cannot register Worker - unknown 
resource id " + resourceID);
                } else {
-                       return new RegisteredYarnWorkerNode(resourceID, 
inLaunch.container());
+                       return new 
RegisteredYarnWorkerNode(inLaunch.container());
                }
        }
 
@@ -346,7 +351,7 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                        if (yci != null) {
                                LOG.info("YARN container consolidation 
recognizes Resource {} ", resourceID);
 
-                               accepted.add(new 
RegisteredYarnWorkerNode(resourceID, yci.container()));
+                               accepted.add(new 
RegisteredYarnWorkerNode(yci.container()));
                        }
                        else {
                                LOG.info("YARN container consolidation does not 
recognize TaskManager {}",
@@ -382,24 +387,26 @@ public class YarnFlinkResourceManager extends 
FlinkResourceManager<RegisteredYar
                        // decide whether to return the container, or whether 
to start a TaskManager
                        if (numRegistered + containersInLaunch.size() < 
numRequired) {
                                // start a TaskManager
-                               final ResourceID containerIdString = new 
ResourceID(container.getId().toString());
-                               final long now = System.currentTimeMillis();
-                               containersInLaunch.put(containerIdString, new 
YarnContainerInLaunch(container, now));
+                               final YarnContainerInLaunch containerInLaunch = 
new YarnContainerInLaunch(container);
+                               containersInLaunch.put(containerInLaunch, 
containerInLaunch);
 
-                               String message = "Launching TaskManager in 
container " + containerIdString
+                               String message = "Launching TaskManager in 
container " + containerInLaunch
                                        + " on host " + 
container.getNodeId().getHost();
                                LOG.info(message);
                                sendInfoMessage(message);
 
                                try {
+                                       // set a special environment variable 
to uniquely identify this container
+                                       
taskManagerLaunchContext.getEnvironment()
+                                               .put(ENV_FLINK_CONTAINER_ID, 
containerInLaunch.getResourceIdString());
                                        
nodeManagerClient.startContainer(container, taskManagerLaunchContext);
                                }
                                catch (Throwable t) {
                                        // failed to launch the container
-                                       
containersInLaunch.remove(containerIdString);
+                                       
containersInLaunch.remove(containerInLaunch);
 
                                        // return container, a new one will be 
requested eventually
-                                       LOG.error("Could not start TaskManager 
in container " + containerIdString, t);
+                                       LOG.error("Could not start TaskManager 
in container " + containerInLaunch, t);
                                        
containersBeingReturned.put(container.getId(), container);
                                        
resourceManagerClient.releaseAssignedContainer(container.getId());
                                }

http://git-wip-us.apache.org/repos/asf/flink/blob/017106e1/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
----------------------------------------------------------------------
diff --git 
a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java 
b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
index dba81de..6839bb5 100644
--- a/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
+++ b/flink-yarn/src/main/java/org/apache/flink/yarn/YarnTaskManagerRunner.java
@@ -21,7 +21,6 @@ package org.apache.flink.yarn;
 import java.io.IOException;
 import java.security.PrivilegedAction;
 import java.util.Map;
-import java.util.Objects;
 
 import org.apache.flink.configuration.ConfigConstants;
 import org.apache.flink.configuration.Configuration;
@@ -29,6 +28,7 @@ import 
org.apache.flink.runtime.clusterframework.types.ResourceID;
 import org.apache.flink.runtime.taskmanager.TaskManager;
 import org.apache.flink.runtime.util.EnvironmentInformation;
 
+import org.apache.flink.util.Preconditions;
 import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.token.Token;
 import org.apache.hadoop.security.token.TokenIdentifier;
@@ -87,8 +87,9 @@ public class YarnTaskManagerRunner {
                }
 
                // Infer the resource identifier from the environment variable
-               String containerID = 
Objects.requireNonNull(System.getenv(Environment.CONTAINER_ID.key()));
+               String containerID = 
Preconditions.checkNotNull(envs.get(YarnFlinkResourceManager.ENV_FLINK_CONTAINER_ID));
                final ResourceID resourceId = new ResourceID(containerID);
+               LOG.info("ResourceID assigned for this container: {}", 
resourceId);
 
                ugi.doAs(new PrivilegedAction<Object>() {
                        @Override

Reply via email to