YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to 
support container resizing. Contributed by Meng Ding


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/c75bb5ac
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/c75bb5ac
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/c75bb5ac

Branch: refs/heads/YARN-1197
Commit: c75bb5ac9bb2a6ddfc281b3bc82be238a178d432
Parents: b24514e
Author: Jian He <jia...@apache.org>
Authored: Thu Aug 20 21:04:14 2015 -0700
Committer: Wangda Tan <wan...@apache.org>
Committed: Tue Aug 25 10:09:59 2015 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../yarn/client/TestResourceTrackerOnHA.java    |   2 +-
 .../protocolrecords/NodeHeartbeatResponse.java  |   4 +
 .../impl/pb/NodeHeartbeatResponsePBImpl.java    |  76 +++++-
 .../yarn/server/api/records/NodeStatus.java     |  15 +-
 .../api/records/impl/pb/NodeStatusPBImpl.java   |  75 +++++-
 .../main/proto/yarn_server_common_protos.proto  |   3 +-
 .../yarn_server_common_service_protos.proto     |   1 +
 .../hadoop/yarn/TestYarnServerApiClasses.java   |  39 ++-
 .../hadoop/yarn/server/nodemanager/Context.java |   3 +
 .../yarn/server/nodemanager/NodeManager.java    |  10 +
 .../nodemanager/NodeStatusUpdaterImpl.java      |  59 ++++-
 .../containermanager/ContainerManagerImpl.java  | 116 +++++----
 .../nodemanager/TestNodeManagerResync.java      | 258 +++++++++++++++++++
 .../containermanager/TestContainerManager.java  |   2 +-
 15 files changed, 600 insertions(+), 66 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 758e1a8..35da67f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -199,6 +199,9 @@ Release 2.8.0 - UNRELEASED
     YARN-1643. Make ContainersMonitor support changing monitoring size of an
     allocated container. (Meng Ding and Wangda Tan)
 
+    YARN-1644. RM-NM protocol changes and NodeStatusUpdater implementation to
+    support container resizing. (Meng Ding via jianhe)
+
   IMPROVEMENTS
 
     YARN-644. Basic null check is not performed on passed in arguments before

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
index 6cdf87f..338198b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/test/java/org/apache/hadoop/yarn/client/TestResourceTrackerOnHA.java
@@ -68,7 +68,7 @@ public class TestResourceTrackerOnHA extends 
ProtocolHATestBase{
     failoverThread = createAndStartFailoverThread();
     NodeStatus status =
         NodeStatus.newInstance(NodeId.newInstance("localhost", 0), 0, null,
-            null, null, null, null);
+            null, null, null, null, null);
     NodeHeartbeatRequest request2 =
         NodeHeartbeatRequest.newInstance(status, null, null,null);
     resourceTracker.nodeHeartbeat(request2);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
index 1498a0c..38fbc82 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/NodeHeartbeatResponse.java
@@ -24,6 +24,7 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.server.api.records.MasterKey;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
 
@@ -70,4 +71,7 @@ public interface NodeHeartbeatResponse {
   
   boolean getAreNodeLabelsAcceptedByRM();
   void setAreNodeLabelsAcceptedByRM(boolean areNodeLabelsAcceptedByRM);
+
+  List<Container> getContainersToDecrease();
+  void addAllContainersToDecrease(List<Container> containersToDecrease);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
index e27d8ca..12c5230 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/NodeHeartbeatResponsePBImpl.java
@@ -27,12 +27,15 @@ import java.util.Map;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ContainerId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoBase;
 import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerIdProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.MasterKeyProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeActionProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeHeartbeatResponseProto;
@@ -58,7 +61,9 @@ public class NodeHeartbeatResponsePBImpl extends
 
   private MasterKey containerTokenMasterKey = null;
   private MasterKey nmTokenMasterKey = null;
-  
+
+  private List<Container> containersToDecrease = null;
+
   public NodeHeartbeatResponsePBImpl() {
     builder = NodeHeartbeatResponseProto.newBuilder();
   }
@@ -96,6 +101,9 @@ public class NodeHeartbeatResponsePBImpl extends
     if (this.systemCredentials != null) {
       addSystemCredentialsToProto();
     }
+    if (this.containersToDecrease != null) {
+      addContainersToDecreaseToProto();
+    }
   }
 
   private void addSystemCredentialsToProto() {
@@ -408,6 +416,64 @@ public class NodeHeartbeatResponsePBImpl extends
     builder.addAllApplicationsToCleanup(iterable);
   }
 
+  private void initContainersToDecrease() {
+    if (this.containersToDecrease != null) {
+      return;
+    }
+    NodeHeartbeatResponseProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerProto> list = p.getContainersToDecreaseList();
+    this.containersToDecrease = new ArrayList<>();
+
+    for (ContainerProto c : list) {
+      this.containersToDecrease.add(convertFromProtoFormat(c));
+    }
+  }
+
+  @Override
+  public List<Container> getContainersToDecrease() {
+    initContainersToDecrease();
+    return this.containersToDecrease;
+  }
+
+  @Override
+  public void addAllContainersToDecrease(
+      final List<Container> containersToDecrease) {
+    if (containersToDecrease == null) {
+      return;
+    }
+    initContainersToDecrease();
+    this.containersToDecrease.addAll(containersToDecrease);
+  }
+
+  private void addContainersToDecreaseToProto() {
+    maybeInitBuilder();
+    builder.clearContainersToDecrease();
+    if (this.containersToDecrease == null) {
+      return;
+    }
+    Iterable<ContainerProto> iterable = new
+        Iterable<ContainerProto>() {
+      @Override
+      public Iterator<ContainerProto> iterator() {
+        return new Iterator<ContainerProto>() {
+          private Iterator<Container> iter = containersToDecrease.iterator();
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+          @Override
+          public ContainerProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+    builder.addAllContainersToDecrease(iterable);
+  }
 
   @Override
   public Map<ApplicationId, ByteBuffer> getSystemCredentialsForApps() {
@@ -484,6 +550,14 @@ public class NodeHeartbeatResponsePBImpl extends
     return ((MasterKeyPBImpl) t).getProto();
   }
 
+  private ContainerPBImpl convertFromProtoFormat(ContainerProto p) {
+    return new ContainerPBImpl(p);
+  }
+
+  private ContainerProto convertToProtoFormat(Container t) {
+    return ((ContainerPBImpl) t).getProto();
+  }
+
   @Override
   public boolean getAreNodeLabelsAcceptedByRM() {
     NodeHeartbeatResponseProtoOrBuilder p =

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
index 24391bf..a37c399 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/NodeStatus.java
@@ -24,6 +24,7 @@ import 
org.apache.hadoop.classification.InterfaceAudience.Public;
 import org.apache.hadoop.classification.InterfaceStability.Unstable;
 import org.apache.hadoop.classification.InterfaceStability.Stable;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.util.Records;
@@ -48,6 +49,7 @@ public abstract class NodeStatus {
    * @param nodeHealthStatus Health status of the node.
    * @param containersUtilizations Utilization of the containers in this node.
    * @param nodeUtilization Utilization of the node.
+   * @param increasedContainers Containers whose resource has been increased.
    * @return New {@code NodeStatus} with the provided information.
    */
   public static NodeStatus newInstance(NodeId nodeId, int responseId,
@@ -55,7 +57,8 @@ public abstract class NodeStatus {
       List<ApplicationId> keepAliveApplications,
       NodeHealthStatus nodeHealthStatus,
       ResourceUtilization containersUtilization,
-      ResourceUtilization nodeUtilization) {
+      ResourceUtilization nodeUtilization,
+      List<Container> increasedContainers) {
     NodeStatus nodeStatus = Records.newRecord(NodeStatus.class);
     nodeStatus.setResponseId(responseId);
     nodeStatus.setNodeId(nodeId);
@@ -64,6 +67,7 @@ public abstract class NodeStatus {
     nodeStatus.setNodeHealthStatus(nodeHealthStatus);
     nodeStatus.setContainersUtilization(containersUtilization);
     nodeStatus.setNodeUtilization(nodeUtilization);
+    nodeStatus.setIncreasedContainers(increasedContainers);
     return nodeStatus;
   }
 
@@ -108,4 +112,13 @@ public abstract class NodeStatus {
   @Unstable
   public abstract void setNodeUtilization(
       ResourceUtilization nodeUtilization);
+
+  @Public
+  @Unstable
+  public abstract List<Container> getIncreasedContainers();
+
+  @Private
+  @Unstable
+  public abstract void setIncreasedContainers(
+      List<Container> increasedContainers);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
index 2d139fe..c94febe 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/records/impl/pb/NodeStatusPBImpl.java
@@ -24,13 +24,16 @@ import java.util.Iterator;
 import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.impl.pb.ApplicationIdPBImpl;
+import org.apache.hadoop.yarn.api.records.impl.pb.ContainerPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.ContainerStatusPBImpl;
 import org.apache.hadoop.yarn.api.records.impl.pb.NodeIdPBImpl;
 import org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ContainerStatusProto;
+import org.apache.hadoop.yarn.proto.YarnProtos.ContainerProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeHealthStatusProto;
 import org.apache.hadoop.yarn.proto.YarnServerCommonProtos.NodeStatusProto;
@@ -49,7 +52,8 @@ public class NodeStatusPBImpl extends NodeStatus {
   private List<ContainerStatus> containers = null;
   private NodeHealthStatus nodeHealthStatus = null;
   private List<ApplicationId> keepAliveApplications = null;
-  
+  private List<Container> increasedContainers = null;
+
   public NodeStatusPBImpl() {
     builder = NodeStatusProto.newBuilder();
   }
@@ -79,6 +83,9 @@ public class NodeStatusPBImpl extends NodeStatus {
     if (this.keepAliveApplications != null) {
       addKeepAliveApplicationsToProto();
     }
+    if (this.increasedContainers != null) {
+      addIncreasedContainersToProto();
+    }
   }
 
   private synchronized void mergeLocalToProto() {
@@ -165,6 +172,37 @@ public class NodeStatusPBImpl extends NodeStatus {
     builder.addAllKeepAliveApplications(iterable);
   }
 
+  private synchronized void addIncreasedContainersToProto() {
+    maybeInitBuilder();
+    builder.clearIncreasedContainers();
+    if (increasedContainers == null) {
+      return;
+    }
+    Iterable<ContainerProto> iterable = new
+        Iterable<ContainerProto>() {
+      @Override
+      public Iterator<ContainerProto> iterator() {
+        return new Iterator<ContainerProto>() {
+          private Iterator<Container> iter =
+                  increasedContainers.iterator();
+          @Override
+          public boolean hasNext() {
+            return iter.hasNext();
+          }
+          @Override
+          public ContainerProto next() {
+            return convertToProtoFormat(iter.next());
+          }
+          @Override
+          public void remove() {
+            throw new UnsupportedOperationException();
+          }
+        };
+      }
+    };
+    builder.addAllIncreasedContainers(iterable);
+  }
+
   @Override
   public int hashCode() {
     return getProto().hashCode();
@@ -336,6 +374,31 @@ public class NodeStatusPBImpl extends NodeStatus {
         .setNodeUtilization(convertToProtoFormat(nodeUtilization));
   }
 
+  @Override
+  public synchronized List<Container> getIncreasedContainers() {
+    if (increasedContainers != null) {
+      return increasedContainers;
+    }
+    NodeStatusProtoOrBuilder p = viaProto ? proto : builder;
+    List<ContainerProto> list = p.getIncreasedContainersList();
+    this.increasedContainers = new ArrayList<>();
+    for (ContainerProto c : list) {
+      this.increasedContainers.add(convertFromProtoFormat(c));
+    }
+    return this.increasedContainers;
+  }
+
+  @Override
+  public synchronized void setIncreasedContainers(
+      List<Container> increasedContainers) {
+    maybeInitBuilder();
+    if (increasedContainers == null) {
+      builder.clearIncreasedContainers();
+      return;
+    }
+    this.increasedContainers = increasedContainers;
+  }
+
   private NodeIdProto convertToProtoFormat(NodeId nodeId) {
     return ((NodeIdPBImpl)nodeId).getProto();
   }
@@ -377,4 +440,14 @@ public class NodeStatusPBImpl extends NodeStatus {
       ResourceUtilizationProto p) {
     return new ResourceUtilizationPBImpl(p);
   }
+
+  private ContainerPBImpl convertFromProtoFormat(
+      ContainerProto c) {
+    return new ContainerPBImpl(c);
+  }
+
+  private ContainerProto convertToProtoFormat(
+      Container c) {
+    return ((ContainerPBImpl)c).getProto();
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
index 901051f..b161f5b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_protos.proto
@@ -38,6 +38,7 @@ message NodeStatusProto {
   repeated ApplicationIdProto keep_alive_applications = 5;
   optional ResourceUtilizationProto containers_utilization = 6;
   optional ResourceUtilizationProto node_utilization = 7;
+  repeated ContainerProto increased_containers = 8;
 }
 
 message MasterKeyProto {
@@ -60,4 +61,4 @@ message ResourceUtilizationProto {
   optional int32 pmem = 1;
   optional int32 vmem = 2;
   optional float cpu = 3;
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index c122b2a..2db8919 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -82,6 +82,7 @@ message NodeHeartbeatResponseProto {
   repeated ContainerIdProto containers_to_be_removed_from_nm = 9;
   repeated SystemCredentialsForAppsProto system_credentials_for_apps = 10;
   optional bool areNodeLabelsAcceptedByRM = 11 [default = false];
+  repeated ContainerProto containers_to_decrease = 12;
 }
 
 message SystemCredentialsForAppsProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
index d9eeb9d..c9427dd 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/test/java/org/apache/hadoop/yarn/TestYarnServerApiClasses.java
@@ -29,6 +29,7 @@ import java.util.HashSet;
 
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
 import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.Container;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
 import org.apache.hadoop.yarn.api.records.NodeId;
@@ -168,6 +169,20 @@ public class TestYarnServerApiClasses {
     assertTrue(copy.getAreNodeLabelsAcceptedByRM());
   }
 
+  @Test
+  public void testNodeHeartbeatResponsePBImplWithDecreasedContainers() {
+    NodeHeartbeatResponsePBImpl original = new NodeHeartbeatResponsePBImpl();
+    original.addAllContainersToDecrease(
+        Arrays.asList(getDecreasedContainer(1, 2, 2048, 2),
+            getDecreasedContainer(2, 3, 1024, 1)));
+    NodeHeartbeatResponsePBImpl copy =
+        new NodeHeartbeatResponsePBImpl(original.getProto());
+    assertEquals(1, copy.getContainersToDecrease().get(0)
+        .getId().getContainerId());
+    assertEquals(1024, copy.getContainersToDecrease().get(1)
+        .getResource().getMemory());
+  }
+
   /**
    * Test RegisterNodeManagerRequestPBImpl.
    */
@@ -244,6 +259,9 @@ public class TestYarnServerApiClasses {
     original.setNodeHealthStatus(getNodeHealthStatus());
     original.setNodeId(getNodeId());
     original.setResponseId(1);
+    original.setIncreasedContainers(
+        Arrays.asList(getIncreasedContainer(1, 2, 2048, 2),
+            getIncreasedContainer(2, 3, 4096, 3)));
 
     NodeStatusPBImpl copy = new NodeStatusPBImpl(original.getProto());
     assertEquals(3L, copy.getContainersStatuses().get(1).getContainerId()
@@ -252,7 +270,10 @@ public class TestYarnServerApiClasses {
     assertEquals(1000, copy.getNodeHealthStatus().getLastHealthReportTime());
     assertEquals(9090, copy.getNodeId().getPort());
     assertEquals(1, copy.getResponseId());
-
+    assertEquals(1, copy.getIncreasedContainers().get(0)
+        .getId().getContainerId());
+    assertEquals(4096, copy.getIncreasedContainers().get(1)
+        .getResource().getMemory());
   }
 
   @Test
@@ -347,6 +368,22 @@ public class TestYarnServerApiClasses {
     return new ApplicationIdPBImpl(appId.getProto());
   }
 
+  private Container getDecreasedContainer(int containerID,
+      int appAttemptId, int memory, int vCores) {
+    ContainerId containerId = getContainerId(containerID, appAttemptId);
+    Resource capability = Resource.newInstance(memory, vCores);
+    return Container.newInstance(
+        containerId, null, null, capability, null, null);
+  }
+
+  private Container getIncreasedContainer(int containerID,
+      int appAttemptId, int memory, int vCores) {
+    ContainerId containerId = getContainerId(containerID, appAttemptId);
+    Resource capability = Resource.newInstance(memory, vCores);
+    return Container.newInstance(
+        containerId, null, null, capability, null, null);
+  }
+
   private NodeStatus getNodeStatus() {
     NodeStatus status = recordFactory.newRecordInstance(NodeStatus.class);
     status.setContainersStatuses(new ArrayList<ContainerStatus>());

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index 52d937b..9c2d1fb 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -62,6 +62,9 @@ public interface Context {
 
   ConcurrentMap<ContainerId, Container> getContainers();
 
+  ConcurrentMap<ContainerId, org.apache.hadoop.yarn.api.records.Container>
+      getIncreasedContainers();
+
   NMContainerTokenSecretManager getContainerTokenSecretManager();
   
   NMTokenSecretManagerInNM getNMTokenSecretManager();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 68820a7..8062f41 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -439,6 +439,10 @@ public class NodeManager extends CompositeService
     protected final ConcurrentMap<ContainerId, Container> containers =
         new ConcurrentSkipListMap<ContainerId, Container>();
 
+    protected final ConcurrentMap<ContainerId,
+        org.apache.hadoop.yarn.api.records.Container> increasedContainers =
+            new ConcurrentHashMap<>();
+
     private final NMContainerTokenSecretManager containerTokenSecretManager;
     private final NMTokenSecretManagerInNM nmTokenSecretManager;
     private ContainerManagementProtocol containerManager;
@@ -493,6 +497,12 @@ public class NodeManager extends CompositeService
     }
 
     @Override
+    public ConcurrentMap<ContainerId, 
org.apache.hadoop.yarn.api.records.Container>
+        getIncreasedContainers() {
+      return this.increasedContainers;
+    }
+
+    @Override
     public NMContainerTokenSecretManager getContainerTokenSecretManager() {
       return this.containerTokenSecretManager;
     }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 05efc69..4e49040 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -310,18 +310,28 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
   @VisibleForTesting
   protected void registerWithRM()
       throws YarnException, IOException {
-    List<NMContainerStatus> containerReports = getNMContainerStatuses();
+    RegisterNodeManagerResponse regNMResponse;
     Set<NodeLabel> nodeLabels = 
nodeLabelsHandler.getNodeLabelsForRegistration();
-    RegisterNodeManagerRequest request =
-        RegisterNodeManagerRequest.newInstance(nodeId, httpPort, totalResource,
-            nodeManagerVersionId, containerReports, getRunningApplications(),
-            nodeLabels);
-    if (containerReports != null) {
-      LOG.info("Registering with RM using containers :" + containerReports);
-    }
-    RegisterNodeManagerResponse regNMResponse =
-        resourceTracker.registerNodeManager(request);
-    this.rmIdentifier = regNMResponse.getRMIdentifier();
+ 
+    // Synchronize NM-RM registration with
+    // ContainerManagerImpl#increaseContainersResource and
+    // ContainerManagerImpl#startContainers to avoid race condition
+    // during RM recovery
+    synchronized (this.context) {
+      List<NMContainerStatus> containerReports = getNMContainerStatuses();
+      RegisterNodeManagerRequest request =
+          RegisterNodeManagerRequest.newInstance(nodeId, httpPort, 
totalResource,
+              nodeManagerVersionId, containerReports, getRunningApplications(),
+              nodeLabels);
+      if (containerReports != null) {
+        LOG.info("Registering with RM using containers :" + containerReports);
+      }
+      regNMResponse =
+          resourceTracker.registerNodeManager(request);
+      // Make sure rmIdentifier is set before we release the lock
+      this.rmIdentifier = regNMResponse.getRMIdentifier();
+    }
+
     // if the Resource Manager instructs NM to shutdown.
     if (NodeAction.SHUTDOWN.equals(regNMResponse.getNodeAction())) {
       String message =
@@ -418,10 +428,12 @@ public class NodeStatusUpdaterImpl extends 
AbstractService implements
     List<ContainerStatus> containersStatuses = getContainerStatuses();
     ResourceUtilization containersUtilization = getContainersUtilization();
     ResourceUtilization nodeUtilization = getNodeUtilization();
+    List<org.apache.hadoop.yarn.api.records.Container> increasedContainers
+        = getIncreasedContainers();
     NodeStatus nodeStatus =
         NodeStatus.newInstance(nodeId, responseId, containersStatuses,
           createKeepAliveApplicationList(), nodeHealthStatus,
-          containersUtilization, nodeUtilization);
+          containersUtilization, nodeUtilization, increasedContainers);
 
     return nodeStatus;
   }
@@ -448,6 +460,21 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
     return nodeResourceMonitor.getUtilization();
   }
 
+  /* Get the containers whose resource has been increased since last
+   * NM-RM heartbeat.
+   */
+  private List<org.apache.hadoop.yarn.api.records.Container>
+      getIncreasedContainers() {
+    List<org.apache.hadoop.yarn.api.records.Container>
+        increasedContainers = new ArrayList<>(
+            this.context.getIncreasedContainers().values());
+    for (org.apache.hadoop.yarn.api.records.Container
+        container : increasedContainers) {
+      this.context.getIncreasedContainers().remove(container.getId());
+    }
+    return increasedContainers;
+  }
+
   // Iterate through the NMContext and clone and get all the containers'
   // statuses. If it's a completed container, add into the
   // recentlyStoppedContainers collections.
@@ -765,6 +792,14 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
               ((NMContext) context)
                 
.setSystemCrendentialsForApps(parseCredentials(systemCredentials));
             }
+
+            List<org.apache.hadoop.yarn.api.records.Container>
+                containersToDecrease = response.getContainersToDecrease();
+            if (!containersToDecrease.isEmpty()) {
+              dispatcher.getEventHandler().handle(
+                  new CMgrDecreaseContainersResourceEvent(containersToDecrease)
+              );
+            }
           } catch (ConnectException e) {
             //catch and throw the exception if tried MAX wait time to connect 
RM
             dispatcher.getEventHandler().handle(

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index d651ec4..01e75ba 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -755,31 +755,36 @@ public class ContainerManagerImpl extends 
CompositeService implements
     List<ContainerId> succeededContainers = new ArrayList<ContainerId>();
     Map<ContainerId, SerializedException> failedContainers =
         new HashMap<ContainerId, SerializedException>();
-    for (StartContainerRequest request : requests.getStartContainerRequests()) 
{
-      ContainerId containerId = null;
-      try {
-        if (request.getContainerToken() == null ||
-            request.getContainerToken().getIdentifier() == null) {
-          throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+    // Synchronize with NodeStatusUpdaterImpl#registerWithRM
+    // to avoid race condition during NM-RM resync (due to RM restart) while a
+    // container is being started, in particular when the container has not yet
+    // been added to the containers map in NMContext.
+    synchronized (this.context) {
+      for (StartContainerRequest request : 
requests.getStartContainerRequests()) {
+        ContainerId containerId = null;
+        try {
+          if (request.getContainerToken() == null ||
+              request.getContainerToken().getIdentifier() == null) {
+            throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+          }
+          ContainerTokenIdentifier containerTokenIdentifier =
+              
BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
+          verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
+              containerTokenIdentifier);
+          containerId = containerTokenIdentifier.getContainerID();
+          startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
+              request);
+          succeededContainers.add(containerId);
+        } catch (YarnException e) {
+          failedContainers.put(containerId, 
SerializedException.newInstance(e));
+        } catch (InvalidToken ie) {
+          failedContainers.put(containerId, 
SerializedException.newInstance(ie));
+          throw ie;
+        } catch (IOException e) {
+          throw RPCUtil.getRemoteException(e);
         }
-        ContainerTokenIdentifier containerTokenIdentifier =
-            
BuilderUtils.newContainerTokenIdentifier(request.getContainerToken());
-        verifyAndGetContainerTokenIdentifier(request.getContainerToken(),
-          containerTokenIdentifier);
-        containerId = containerTokenIdentifier.getContainerID();
-        startContainerInternal(nmTokenIdentifier, containerTokenIdentifier,
-          request);
-        succeededContainers.add(containerId);
-      } catch (YarnException e) {
-        failedContainers.put(containerId, SerializedException.newInstance(e));
-      } catch (InvalidToken ie) {
-        failedContainers.put(containerId, SerializedException.newInstance(ie));
-        throw ie;
-      } catch (IOException e) {
-        throw RPCUtil.getRemoteException(e);
       }
     }
-
     return StartContainersResponse.newInstance(getAuxServiceMetaData(),
       succeededContainers, failedContainers);
   }
@@ -961,32 +966,39 @@ public class ContainerManagerImpl extends 
CompositeService implements
         = new ArrayList<ContainerId>();
     Map<ContainerId, SerializedException> failedContainers =
         new HashMap<ContainerId, SerializedException>();
-    // Process container resource increase requests
-    for (org.apache.hadoop.yarn.api.records.Token token :
-        requests.getContainersToIncrease()) {
-      ContainerId containerId = null;
-      try {
-        if (token.getIdentifier() == null) {
-          throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+    // Synchronize with NodeStatusUpdaterImpl#registerWithRM
+    // to avoid race condition during NM-RM resync (due to RM restart) while a
+    // container resource is being increased in NM, in particular when the
+    // increased container has not yet been added to the increasedContainers
+    // map in NMContext.
+    synchronized (this.context) {
+      // Process container resource increase requests
+      for (org.apache.hadoop.yarn.api.records.Token token :
+          requests.getContainersToIncrease()) {
+        ContainerId containerId = null;
+        try {
+          if (token.getIdentifier() == null) {
+            throw new IOException(INVALID_CONTAINERTOKEN_MSG);
+          }
+          ContainerTokenIdentifier containerTokenIdentifier =
+              BuilderUtils.newContainerTokenIdentifier(token);
+          verifyAndGetContainerTokenIdentifier(token,
+              containerTokenIdentifier);
+          authorizeStartAndResourceIncreaseRequest(
+              nmTokenIdentifier, containerTokenIdentifier, false);
+          containerId = containerTokenIdentifier.getContainerID();
+          // Reuse the startContainer logic to update NMToken,
+          // as container resource increase request will have come with
+          // an updated NMToken.
+          updateNMTokenIdentifier(nmTokenIdentifier);
+          Resource resource = containerTokenIdentifier.getResource();
+          changeContainerResourceInternal(containerId, resource, true);
+          successfullyIncreasedContainers.add(containerId);
+        } catch (YarnException | InvalidToken e) {
+          failedContainers.put(containerId, 
SerializedException.newInstance(e));
+        } catch (IOException e) {
+          throw RPCUtil.getRemoteException(e);
         }
-        ContainerTokenIdentifier containerTokenIdentifier =
-            BuilderUtils.newContainerTokenIdentifier(token);
-        verifyAndGetContainerTokenIdentifier(token,
-            containerTokenIdentifier);
-        authorizeStartAndResourceIncreaseRequest(
-            nmTokenIdentifier, containerTokenIdentifier, false);
-        containerId = containerTokenIdentifier.getContainerID();
-        // Reuse the startContainer logic to update NMToken,
-        // as container resource increase request will have come with
-        // an updated NMToken.
-        updateNMTokenIdentifier(nmTokenIdentifier);
-        Resource resource = containerTokenIdentifier.getResource();
-        changeContainerResourceInternal(containerId, resource, true);
-        successfullyIncreasedContainers.add(containerId);
-      } catch (YarnException | InvalidToken e) {
-        failedContainers.put(containerId, SerializedException.newInstance(e));
-      } catch (IOException e) {
-        throw RPCUtil.getRemoteException(e);
       }
     }
     return IncreaseContainersResourceResponse.newInstance(
@@ -1047,6 +1059,16 @@ public class ContainerManagerImpl extends 
CompositeService implements
           + " is not smaller than the current resource "
           + currentResource.toString());
     }
+    if (increase) {
+      org.apache.hadoop.yarn.api.records.Container increasedContainer =
+          org.apache.hadoop.yarn.api.records.Container.newInstance(
+              containerId, null, null, targetResource, null, null);
+      if (context.getIncreasedContainers().putIfAbsent(containerId,
+          increasedContainer) != null){
+        throw RPCUtil.getRemoteException("Container " + containerId.toString()
+            + " resource is being increased.");
+      }
+    }
     this.readLock.lock();
     try {
       if (!serviceStopped) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
index 611e671..be20445 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/TestNodeManagerResync.java
@@ -18,21 +18,35 @@
 
 package org.apache.hadoop.yarn.server.nodemanager;
 
+import static org.junit.Assert.assertEquals;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.PrintWriter;
+import java.nio.ByteBuffer;
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.concurrent.BrokenBarrierException;
 import java.util.concurrent.ConcurrentMap;
 import java.util.concurrent.CyclicBarrier;
 import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.fs.FileContext;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.UnsupportedFileSystemException;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hadoop.security.token.SecretManager;
+import org.apache.hadoop.util.Shell;
+import org.apache.hadoop.yarn.api.protocolrecords.GetContainerStatusesRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceRequest;
+import 
org.apache.hadoop.yarn.api.protocolrecords.IncreaseContainersResourceResponse;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainerRequest;
 import org.apache.hadoop.yarn.api.protocolrecords.StartContainersRequest;
 import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@@ -41,8 +55,13 @@ import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.ContainerLaunchContext;
 import org.apache.hadoop.yarn.api.records.ContainerState;
 import org.apache.hadoop.yarn.api.records.ContainerStatus;
+import org.apache.hadoop.yarn.api.records.LocalResource;
+import org.apache.hadoop.yarn.api.records.LocalResourceType;
+import org.apache.hadoop.yarn.api.records.LocalResourceVisibility;
 import org.apache.hadoop.yarn.api.records.Priority;
 import org.apache.hadoop.yarn.api.records.Resource;
+import org.apache.hadoop.yarn.api.records.Token;
+import org.apache.hadoop.yarn.api.records.URL;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.exceptions.NMNotYetReadyException;
@@ -50,6 +69,8 @@ import org.apache.hadoop.yarn.exceptions.YarnException;
 import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
 import org.apache.hadoop.yarn.factories.RecordFactory;
 import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
+import org.apache.hadoop.yarn.security.ContainerTokenIdentifier;
+import org.apache.hadoop.yarn.security.NMTokenIdentifier;
 import org.apache.hadoop.yarn.server.api.ResourceTracker;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatRequest;
@@ -57,12 +78,15 @@ import 
org.apache.hadoop.yarn.server.api.protocolrecords.NodeHeartbeatResponse;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerResponse;
 import org.apache.hadoop.yarn.server.api.records.NodeAction;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.BaseContainerManagerTest;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManagerImpl;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.TestContainerManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.security.ApplicationACLsManager;
 import org.apache.hadoop.yarn.server.utils.YarnServerBuilderUtils;
+import org.apache.hadoop.yarn.util.ConverterUtils;
 import org.junit.After;
 import org.junit.Assert;
 import org.junit.Before;
@@ -87,7 +111,10 @@ public class TestNodeManagerResync {
   private AtomicBoolean isNMShutdownCalled = new AtomicBoolean(false);
   private final NodeManagerEvent resyncEvent =
       new NodeManagerEvent(NodeManagerEventType.RESYNC);
+  private final long DUMMY_RM_IDENTIFIER = 1234;
 
+  protected static Log LOG = LogFactory
+      .getLog(TestNodeManagerResync.class);
 
   @Before
   public void setup() throws UnsupportedFileSystemException {
@@ -209,6 +236,32 @@ public class TestNodeManagerResync {
     nm.stop();
   }
 
+  @SuppressWarnings("unchecked")
+  @Test(timeout=60000)
+  public void testContainerResourceIncreaseIsSynchronizedWithRMResync()
+      throws IOException, InterruptedException, YarnException {
+    NodeManager nm = new TestNodeManager4();
+    YarnConfiguration conf = createNMConfig();
+    conf.setBoolean(
+        YarnConfiguration.RM_WORK_PRESERVING_RECOVERY_ENABLED, true);
+    nm.init(conf);
+    nm.start();
+    // Start a container and make sure it is in RUNNING state
+    ((TestNodeManager4)nm).startContainer();
+    // Simulate a container resource increase in a separate thread
+    ((TestNodeManager4)nm).increaseContainersResource();
+    // Simulate RM restart by sending a RESYNC event
+    LOG.info("Sending out RESYNC event");
+    nm.getNMDispatcher().getEventHandler().handle(
+        new NodeManagerEvent(NodeManagerEventType.RESYNC));
+    try {
+      syncBarrier.await();
+    } catch (BrokenBarrierException e) {
+      e.printStackTrace();
+    }
+    Assert.assertFalse(assertionFailedInThread.get());
+    nm.stop();
+  }
 
   // This is to test when NM gets the resync response from last heart beat, it
   // should be able to send the already-sent-via-last-heart-beat container
@@ -588,6 +641,211 @@ public class TestNodeManagerResync {
       }
     }}
 
+  class TestNodeManager4 extends NodeManager {
+
+    private Thread increaseContainerResourceThread = null;
+
+    @Override
+    protected NodeStatusUpdater createNodeStatusUpdater(Context context,
+        Dispatcher dispatcher, NodeHealthCheckerService healthChecker) {
+      return new TestNodeStatusUpdaterImpl4(context, dispatcher,
+          healthChecker, metrics);
+    }
+
+    @Override
+    protected ContainerManagerImpl createContainerManager(Context context,
+        ContainerExecutor exec, DeletionService del,
+        NodeStatusUpdater nodeStatusUpdater,
+        ApplicationACLsManager aclsManager,
+        LocalDirsHandlerService dirsHandler) {
+      return new ContainerManagerImpl(context, exec, del, nodeStatusUpdater,
+          metrics, aclsManager, dirsHandler){
+        @Override
+        public void
+        setBlockNewContainerRequests(boolean blockNewContainerRequests) {
+          // do nothing
+        }
+
+        @Override
+        protected void authorizeGetAndStopContainerRequest(
+            ContainerId containerId, Container container,
+            boolean stopRequest, NMTokenIdentifier identifier)
+            throws YarnException {
+          // do nothing
+        }
+        @Override
+        protected void authorizeUser(UserGroupInformation remoteUgi,
+            NMTokenIdentifier nmTokenIdentifier) {
+          // do nothing
+        }
+        @Override
+        protected void authorizeStartAndResourceIncreaseRequest(
+            NMTokenIdentifier nmTokenIdentifier,
+            ContainerTokenIdentifier containerTokenIdentifier,
+            boolean startRequest) throws YarnException {
+          try {
+            // Sleep 2 seconds to simulate a pro-longed increase action.
+            // If during this time a RESYNC event is sent by RM, the
+            // resync action should block until the increase action is
+            // completed.
+            // See testContainerResourceIncreaseIsSynchronizedWithRMResync()
+            Thread.sleep(2000);
+          } catch (InterruptedException e) {
+            e.printStackTrace();
+          }
+        }
+        @Override
+        protected void updateNMTokenIdentifier(
+            NMTokenIdentifier nmTokenIdentifier)
+                throws SecretManager.InvalidToken {
+          // Do nothing
+        }
+        @Override
+        public Map<String, ByteBuffer> getAuxServiceMetaData() {
+          return new HashMap<>();
+        }
+        @Override
+        protected NMTokenIdentifier selectNMTokenIdentifier(
+            UserGroupInformation remoteUgi) {
+          return new NMTokenIdentifier();
+        }
+      };
+    }
+
+    // Start a container in NM
+    public void startContainer()
+        throws IOException, InterruptedException, YarnException {
+      LOG.info("Start a container and wait until it is in RUNNING state");
+      File scriptFile = Shell.appendScriptExtension(tmpDir, "scriptFile");
+      PrintWriter fileWriter = new PrintWriter(scriptFile);
+      if (Shell.WINDOWS) {
+        fileWriter.println("@ping -n 100 127.0.0.1 >nul");
+      } else {
+        fileWriter.write("\numask 0");
+        fileWriter.write("\nexec sleep 100");
+      }
+      fileWriter.close();
+      ContainerLaunchContext containerLaunchContext =
+          recordFactory.newRecordInstance(ContainerLaunchContext.class);
+      URL resource_alpha =
+          ConverterUtils.getYarnUrlFromPath(localFS
+              .makeQualified(new Path(scriptFile.getAbsolutePath())));
+      LocalResource rsrc_alpha =
+          recordFactory.newRecordInstance(LocalResource.class);
+      rsrc_alpha.setResource(resource_alpha);
+      rsrc_alpha.setSize(-1);
+      rsrc_alpha.setVisibility(LocalResourceVisibility.APPLICATION);
+      rsrc_alpha.setType(LocalResourceType.FILE);
+      rsrc_alpha.setTimestamp(scriptFile.lastModified());
+      String destinationFile = "dest_file";
+      Map<String, LocalResource> localResources =
+          new HashMap<String, LocalResource>();
+      localResources.put(destinationFile, rsrc_alpha);
+      containerLaunchContext.setLocalResources(localResources);
+      List<String> commands =
+          Arrays.asList(Shell.getRunScriptCommand(scriptFile));
+      containerLaunchContext.setCommands(commands);
+      Resource resource = Resource.newInstance(1024, 1);
+      StartContainerRequest scRequest =
+          StartContainerRequest.newInstance(
+              containerLaunchContext,
+              getContainerToken(resource));
+      List<StartContainerRequest> list = new 
ArrayList<StartContainerRequest>();
+      list.add(scRequest);
+      StartContainersRequest allRequests =
+          StartContainersRequest.newInstance(list);
+      getContainerManager().startContainers(allRequests);
+      // Make sure the container reaches RUNNING state
+      ContainerId cId = TestContainerManager.createContainerId(0);
+      BaseContainerManagerTest.waitForNMContainerState(
+          getContainerManager(), cId,
+          org.apache.hadoop.yarn.server.nodemanager.
+              containermanager.container.ContainerState.RUNNING);
+    }
+
+    // Increase container resource in a thread
+    public void increaseContainersResource()
+        throws InterruptedException {
+      LOG.info("Increase a container resource in a separate thread");
+      increaseContainerResourceThread = new IncreaseContainersResourceThread();
+      increaseContainerResourceThread.start();
+    }
+
+    class TestNodeStatusUpdaterImpl4 extends MockNodeStatusUpdater {
+
+      public TestNodeStatusUpdaterImpl4(Context context, Dispatcher dispatcher,
+          NodeHealthCheckerService healthChecker, NodeManagerMetrics metrics) {
+        super(context, dispatcher, healthChecker, metrics);
+      }
+
+      @Override
+      protected void rebootNodeStatusUpdaterAndRegisterWithRM() {
+        try {
+          try {
+            // Check status before registerWithRM
+            List<ContainerId> containerIds = new ArrayList<>();
+            ContainerId cId = TestContainerManager.createContainerId(0);
+            containerIds.add(cId);
+            GetContainerStatusesRequest gcsRequest =
+                GetContainerStatusesRequest.newInstance(containerIds);
+            ContainerStatus containerStatus = getContainerManager()
+                
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+            assertEquals(Resource.newInstance(1024, 1),
+                containerStatus.getCapability());
+            // Call the actual rebootNodeStatusUpdaterAndRegisterWithRM().
+            // This function should be synchronized with
+            // increaseContainersResource().
+            super.rebootNodeStatusUpdaterAndRegisterWithRM();
+            // Check status after registerWithRM
+            containerStatus = getContainerManager()
+                
.getContainerStatuses(gcsRequest).getContainerStatuses().get(0);
+            assertEquals(Resource.newInstance(4096, 2),
+                containerStatus.getCapability());
+          } catch (AssertionError ae) {
+            ae.printStackTrace();
+            assertionFailedInThread.set(true);
+          }   finally {
+            syncBarrier.await();
+          }
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    class IncreaseContainersResourceThread extends Thread {
+      @Override
+      public void run() {
+        // Construct container resource increase request
+        List<Token> increaseTokens = new ArrayList<Token>();
+        // Add increase request.
+        Resource targetResource = Resource.newInstance(4096, 2);
+        try {
+          increaseTokens.add(getContainerToken(targetResource));
+          IncreaseContainersResourceRequest increaseRequest =
+              IncreaseContainersResourceRequest.newInstance(increaseTokens);
+          IncreaseContainersResourceResponse increaseResponse =
+              getContainerManager()
+                  .increaseContainersResource(increaseRequest);
+          Assert.assertEquals(
+              1, increaseResponse.getSuccessfullyIncreasedContainers()
+                  .size());
+          Assert.assertTrue(increaseResponse.getFailedRequests().isEmpty());
+        } catch (Exception e) {
+          e.printStackTrace();
+        }
+      }
+    }
+
+    private Token getContainerToken(Resource resource) throws IOException {
+      ContainerId cId = TestContainerManager.createContainerId(0);
+      return TestContainerManager.createContainerToken(
+          cId, DUMMY_RM_IDENTIFIER,
+          getNMContext().getNodeId(), user, resource,
+          getNMContext().getContainerTokenSecretManager(), null);
+    }
+  }
+
   public static NMContainerStatus createNMContainerStatus(int id,
       ContainerState containerState) {
     ApplicationId applicationId = ApplicationId.newInstance(0, 1);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/c75bb5ac/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
index 6a605c5..0856562 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/TestContainerManager.java
@@ -109,7 +109,7 @@ public class TestContainerManager extends 
BaseContainerManagerTest {
     super.setup();
   }
 
-  private ContainerId createContainerId(int id) {
+  public static ContainerId createContainerId(int id) {
     ApplicationId appId = ApplicationId.newInstance(0, 0);
     ApplicationAttemptId appAttemptId =
         ApplicationAttemptId.newInstance(appId, 1);

Reply via email to