Repository: hadoop
Updated Branches:
  refs/heads/trunk 88fba00ca -> 037d78348


YARN-7891. LogAggregationIndexedFileController should support read from HAR 
file. (Xuan Gong via wangda)

Change-Id: Ie16e34039d57df50128c73b37516ad0bc7c9590e


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/4d53ef7e
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/4d53ef7e
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/4d53ef7e

Branch: refs/heads/trunk
Commit: 4d53ef7eefb14661d824924e503a910de1ae997f
Parents: 88fba00
Author: Wangda Tan <wan...@apache.org>
Authored: Wed Mar 7 10:40:31 2018 -0800
Committer: Wangda Tan <wan...@apache.org>
Committed: Wed Mar 7 11:30:06 2018 -0800

----------------------------------------------------------------------
 .../hadoop-yarn/hadoop-yarn-common/pom.xml      |   4 +
 .../LogAggregationIndexedFileController.java    |  60 +++--
 .../TestLogAggregationIndexFileController.java  |  54 ++++
 .../application_123456_0001.har/_SUCCESS        |   0
 .../application_123456_0001.har/_index          |   3 +
 .../application_123456_0001.har/_masterindex    |   2 +
 .../application_123456_0001.har/part-0          | Bin 0 -> 4123 bytes
 .../RegisterNodeManagerRequest.java             |   5 +
 .../pb/RegisterNodeManagerRequestPBImpl.java    |  79 ++++++
 .../yarn_server_common_service_protos.proto     |   1 +
 .../hadoop/yarn/server/nodemanager/Context.java |   4 +-
 .../yarn/server/nodemanager/NodeManager.java    |  12 +
 .../nodemanager/NodeStatusUpdaterImpl.java      |  14 ++
 .../containermanager/ContainerManagerImpl.java  |  15 ++
 .../logaggregation/AppLogAggregatorImpl.java    |  11 +-
 .../tracker/NMLogAggregationStatusTracker.java  | 244 +++++++++++++++++++
 .../amrmproxy/BaseAMRMProxyTest.java            |   6 +
 .../TestNMLogAggregationStatusTracker.java      | 124 ++++++++++
 .../resourcemanager/ResourceTrackerService.java |  17 +-
 .../resourcemanager/rmnode/RMNodeImpl.java      |   6 +
 .../rmnode/RMNodeStartedEvent.java              |  11 +
 21 files changed, 646 insertions(+), 26 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
index a235478..5378072 100644
--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
+++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/pom.xml
@@ -249,6 +249,10 @@
             
<exclude>src/test/resources/application_1440536969523_0001.har/part-0</exclude>
             
<exclude>src/test/resources/application_1440536969523_0001.har/_masterindex</exclude>
             
<exclude>src/test/resources/application_1440536969523_0001.har/_SUCCESS</exclude>
+            
<exclude>src/test/resources/application_123456_0001.har/_index</exclude>
+            
<exclude>src/test/resources/application_123456_0001.har/part-0</exclude>
+            
<exclude>src/test/resources/application_123456_0001.har/_masterindex</exclude>
+            
<exclude>src/test/resources/application_123456_0001.har/_SUCCESS</exclude>
           </excludes>
         </configuration>
       </plugin>

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
index 56bae26..5bba2e0 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/LogAggregationIndexedFileController.java
@@ -495,16 +495,21 @@ public class LogAggregationIndexedFileController
     boolean getAllContainers = (containerIdStr == null
         || containerIdStr.isEmpty());
     long size = logRequest.getBytes();
-    List<FileStatus> nodeFiles = LogAggregationUtils
-        .getRemoteNodeFileList(conf, appId, logRequest.getAppOwner(),
+    RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
+        .getRemoteNodeFileDir(conf, appId, logRequest.getAppOwner(),
         this.remoteRootLogDir, this.remoteRootLogDirSuffix);
-    if (nodeFiles.isEmpty()) {
+    if (!nodeFiles.hasNext()) {
       throw new IOException("There is no available log fils for "
           + "application:" + appId);
     }
-    Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
+    List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
+    if (allFiles.isEmpty()) {
+      throw new IOException("There is no available log fils for "
+          + "application:" + appId);
+    }
+    Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
     List<FileStatus> fileToRead = getNodeLogFileToRead(
-        nodeFiles, nodeIdStr, appId);
+        allFiles, nodeIdStr, appId);
     byte[] buf = new byte[65535];
     for (FileStatus thisNodeFile : fileToRead) {
       String nodeName = thisNodeFile.getPath().getName();
@@ -609,16 +614,21 @@ public class LogAggregationIndexedFileController
         containerIdStr.isEmpty());
     String nodeIdStr = (nodeId == null || nodeId.isEmpty()) ? null
         : LogAggregationUtils.getNodeString(nodeId);
-    List<FileStatus> nodeFiles = LogAggregationUtils
-        .getRemoteNodeFileList(conf, appId, appOwner, this.remoteRootLogDir,
+    RemoteIterator<FileStatus> nodeFiles = LogAggregationUtils
+        .getRemoteNodeFileDir(conf, appId, appOwner, this.remoteRootLogDir,
         this.remoteRootLogDirSuffix);
-    if (nodeFiles.isEmpty()) {
+    if (!nodeFiles.hasNext()) {
       throw new IOException("There is no available log fils for "
           + "application:" + appId);
     }
-    Map<String, Long> checkSumFiles = parseCheckSumFiles(nodeFiles);
+    List<FileStatus> allFiles = getAllNodeFiles(nodeFiles, appId);
+    if (allFiles.isEmpty()) {
+      throw new IOException("There is no available log fils for "
+          + "application:" + appId);
+    }
+    Map<String, Long> checkSumFiles = parseCheckSumFiles(allFiles);
     List<FileStatus> fileToRead = getNodeLogFileToRead(
-        nodeFiles, nodeIdStr, appId);
+        allFiles, nodeIdStr, appId);
     for(FileStatus thisNodeFile : fileToRead) {
       try {
         Long checkSumIndex = checkSumFiles.get(
@@ -727,21 +737,33 @@ public class LogAggregationIndexedFileController
       List<FileStatus> nodeFiles, String nodeId, ApplicationId appId)
       throws IOException {
     List<FileStatus> listOfFiles = new ArrayList<>();
-    List<FileStatus> files = new ArrayList<>(nodeFiles);
-    for (FileStatus file : files) {
-      String nodeName = file.getPath().getName();
+    for (FileStatus thisNodeFile : nodeFiles) {
+      String nodeName = thisNodeFile.getPath().getName();
       if ((nodeId == null || nodeId.isEmpty()
           || nodeName.contains(LogAggregationUtils
           .getNodeString(nodeId))) && !nodeName.endsWith(
               LogAggregationUtils.TMP_FILE_SUFFIX) &&
           !nodeName.endsWith(CHECK_SUM_FILE_SUFFIX)) {
-        if (nodeName.equals(appId + ".har")) {
-          Path p = new Path("har:///" + file.getPath().toUri().getRawPath());
-          files = Arrays.asList(HarFs.get(p.toUri(), conf).listStatus(p));
-          continue;
-        }
-        listOfFiles.add(file);
+        listOfFiles.add(thisNodeFile);
+      }
+    }
+    return listOfFiles;
+  }
+
+  private List<FileStatus> getAllNodeFiles(
+      RemoteIterator<FileStatus> nodeFiles, ApplicationId appId)
+      throws IOException {
+    List<FileStatus> listOfFiles = new ArrayList<>();
+    while (nodeFiles != null && nodeFiles.hasNext()) {
+      FileStatus thisNodeFile = nodeFiles.next();
+      String nodeName = thisNodeFile.getPath().getName();
+      if (nodeName.equals(appId + ".har")) {
+        Path p = new Path("har:///"
+            + thisNodeFile.getPath().toUri().getRawPath());
+        nodeFiles = HarFs.get(p.toUri(), conf).listStatusIterator(p);
+        continue;
       }
+      listOfFiles.add(thisNodeFile);
     }
     return listOfFiles;
   }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
index 9c02c1b..7922679 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/filecontroller/ifile/TestLogAggregationIndexFileController.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.yarn.logaggregation.filecontroller.ifile;
 
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertNotNull;
 import static org.mockito.Mockito.mock;
 import static org.mockito.Mockito.when;
 import java.io.ByteArrayOutputStream;
@@ -27,6 +28,7 @@ import java.io.FileWriter;
 import java.io.IOException;
 import java.io.PrintStream;
 import java.io.Writer;
+import java.net.URL;
 import java.nio.charset.Charset;
 import java.util.ArrayList;
 import java.util.HashMap;
@@ -364,6 +366,58 @@ public class TestLogAggregationIndexFileController {
     sysOutStream.reset();
   }
 
+  @Test(timeout = 15000)
+  public void testFetchApplictionLogsHar() throws Exception {
+    List<String> newLogTypes = new ArrayList<>();
+    newLogTypes.add("syslog");
+    newLogTypes.add("stdout");
+    newLogTypes.add("stderr");
+    newLogTypes.add("test1");
+    newLogTypes.add("test2");
+    URL harUrl = ClassLoader.getSystemClassLoader()
+        .getResource("application_123456_0001.har");
+    assertNotNull(harUrl);
+
+    Path path = new Path(remoteLogDir + "/" + USER_UGI.getShortUserName()
+        + "/logs/application_123456_0001");
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+    assertTrue(fs.mkdirs(path));
+    Path harPath = new Path(path, "application_123456_0001.har");
+    fs.copyFromLocalFile(false, new Path(harUrl.toURI()), harPath);
+    assertTrue(fs.exists(harPath));
+    LogAggregationIndexedFileController fileFormat
+        = new LogAggregationIndexedFileController();
+    fileFormat.initialize(conf, "Indexed");
+    ContainerLogsRequest logRequest = new ContainerLogsRequest();
+    logRequest.setAppId(appId);
+    logRequest.setNodeId(nodeId.toString());
+    logRequest.setAppOwner(USER_UGI.getShortUserName());
+    logRequest.setContainerId(containerId.toString());
+    logRequest.setBytes(Long.MAX_VALUE);
+    List<ContainerLogMeta> meta = fileFormat.readAggregatedLogsMeta(
+        logRequest);
+    Assert.assertEquals(meta.size(), 3);
+    List<String> fileNames = new ArrayList<>();
+    for (ContainerLogMeta log : meta) {
+      Assert.assertTrue(log.getContainerId().equals(containerId.toString()));
+      Assert.assertTrue(log.getNodeId().equals(nodeId.toString()));
+      for (ContainerLogFileInfo file : log.getContainerLogMeta()) {
+        fileNames.add(file.getFileName());
+      }
+    }
+    fileNames.removeAll(newLogTypes);
+    Assert.assertTrue(fileNames.isEmpty());
+    boolean foundLogs = fileFormat.readAggregatedLogs(logRequest, System.out);
+    Assert.assertTrue(foundLogs);
+    for (String logType : newLogTypes) {
+      Assert.assertTrue(sysOutStream.toString().contains(logMessage(
+          containerId, logType)));
+    }
+    sysOutStream.reset();
+  }
+
   private File createAndWriteLocalLogFile(ContainerId containerId,
       Path localLogDir, String logType) throws IOException {
     File file = new File(localLogDir.toString(), logType);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_SUCCESS
new file mode 100644
index 0000000..e69de29

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index
new file mode 100644
index 0000000..b042846
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_index
@@ -0,0 +1,3 @@
+%2F dir 1517728311922+493+xuan+supergroup 0 0 localhost_9999_1517727665265 
localhost_9999_1517727668513 
+%2Flocalhost_9999_1517727665265 file part-0 0 2895 
1517728301581+420+xuan+supergroup 
+%2Flocalhost_9999_1517727668513 file part-0 2895 1228 
1517728311919+420+xuan+supergroup 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex
new file mode 100644
index 0000000..cda8cbd
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/_masterindex
@@ -0,0 +1,2 @@
+3 
+0 1897968749 0 280 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0
new file mode 100644
index 0000000..aea95fa
Binary files /dev/null and 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/resources/application_123456_0001.har/part-0
 differ

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
index fc30a80..ff50330 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/RegisterNodeManagerRequest.java
@@ -112,4 +112,9 @@ public abstract class RegisterNodeManagerRequest {
    * @param physicalResource Physical resources in the node.
    */
   public abstract void setPhysicalResource(Resource physicalResource);
+
+  public abstract List<LogAggregationReport> getLogAggregationReportsForApps();
+
+  public abstract void setLogAggregationReportsForApps(
+      List<LogAggregationReport> logAggregationReportsForApps);
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
index eda06d0..f1d7339 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/api/protocolrecords/impl/pb/RegisterNodeManagerRequestPBImpl.java
@@ -38,11 +38,13 @@ import 
org.apache.hadoop.yarn.proto.YarnProtos.ApplicationIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeIdProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.NodeLabelProto;
 import org.apache.hadoop.yarn.proto.YarnProtos.ResourceProto;
+import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.LogAggregationReportProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NMContainerStatusProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.NodeLabelsProto.Builder;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProto;
 import 
org.apache.hadoop.yarn.proto.YarnServerCommonServiceProtos.RegisterNodeManagerRequestProtoOrBuilder;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 import 
org.apache.hadoop.yarn.server.api.protocolrecords.RegisterNodeManagerRequest;
     
@@ -57,6 +59,8 @@ public class RegisterNodeManagerRequestPBImpl extends 
RegisterNodeManagerRequest
   private List<ApplicationId> runningApplications = null;
   private Set<NodeLabel> labels = null;
 
+  private List<LogAggregationReport> logAggregationReportsForApps = null;
+
   /** Physical resources in the node. */
   private Resource physicalResource = null;
 
@@ -100,6 +104,48 @@ public class RegisterNodeManagerRequestPBImpl extends 
RegisterNodeManagerRequest
     if (this.physicalResource != null) {
       builder.setPhysicalResource(convertToProtoFormat(this.physicalResource));
     }
+    if (this.logAggregationReportsForApps != null) {
+        addLogAggregationStatusForAppsToProto();
+    }
+  }
+
+  private void addLogAggregationStatusForAppsToProto() {
+    maybeInitBuilder();
+    builder.clearLogAggregationReportsForApps();
+    if (this.logAggregationReportsForApps == null) {
+      return;
+    }
+    Iterable<LogAggregationReportProto> it =
+        new Iterable<LogAggregationReportProto>() {
+          @Override
+          public Iterator<LogAggregationReportProto> iterator() {
+            return new Iterator<LogAggregationReportProto>() {
+              private Iterator<LogAggregationReport> iter =
+                  logAggregationReportsForApps.iterator();
+
+              @Override
+              public boolean hasNext() {
+                return iter.hasNext();
+              }
+
+              @Override
+              public LogAggregationReportProto next() {
+                return convertToProtoFormat(iter.next());
+              }
+
+              @Override
+              public void remove() {
+                throw new UnsupportedOperationException();
+              }
+            };
+          }
+        };
+    builder.addAllLogAggregationReportsForApps(it);
+  }
+
+  private LogAggregationReportProto convertToProtoFormat(
+      LogAggregationReport value) {
+    return ((LogAggregationReportPBImpl) value).getProto();
   }
 
   private synchronized void addNMContainerStatusesToProto() {
@@ -400,4 +446,37 @@ public class RegisterNodeManagerRequestPBImpl extends 
RegisterNodeManagerRequest
       NMContainerStatus c) {
     return ((NMContainerStatusPBImpl)c).getProto();
   }
+
+  @Override
+  public List<LogAggregationReport> getLogAggregationReportsForApps() {
+    if (this.logAggregationReportsForApps != null) {
+      return this.logAggregationReportsForApps;
+    }
+    initLogAggregationReportsForApps();
+    return logAggregationReportsForApps;
+  }
+
+  private void initLogAggregationReportsForApps() {
+    RegisterNodeManagerRequestProtoOrBuilder p = viaProto ? proto : builder;
+    List<LogAggregationReportProto> list =
+        p.getLogAggregationReportsForAppsList();
+    this.logAggregationReportsForApps = new ArrayList<LogAggregationReport>();
+    for (LogAggregationReportProto c : list) {
+      this.logAggregationReportsForApps.add(convertFromProtoFormat(c));
+    }
+  }
+
+  private LogAggregationReport convertFromProtoFormat(
+      LogAggregationReportProto logAggregationReport) {
+    return new LogAggregationReportPBImpl(logAggregationReport);
+  }
+
+  @Override
+  public void setLogAggregationReportsForApps(
+      List<LogAggregationReport> logAggregationStatusForApps) {
+    if(logAggregationStatusForApps == null) {
+      builder.clearLogAggregationReportsForApps();
+    }
+    this.logAggregationReportsForApps = logAggregationStatusForApps;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
index e782cc2..1b090bf 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/proto/yarn_server_common_service_protos.proto
@@ -66,6 +66,7 @@ message RegisterNodeManagerRequestProto {
   repeated ApplicationIdProto runningApplications = 7;
   optional NodeLabelsProto nodeLabels = 8;
   optional ResourceProto physicalResource = 9;
+  repeated LogAggregationReportProto log_aggregation_reports_for_apps = 10;
 }
 
 message RegisterNodeManagerResponseProto {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
index d7e3b52..d3dd2b9 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/Context.java
@@ -33,7 +33,7 @@ import 
org.apache.hadoop.yarn.server.api.records.NodeHealthStatus;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
-
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import org.apache.hadoop.yarn.server.nodemanager.recovery.NMStateStoreService;
@@ -121,6 +121,8 @@ public interface Context {
 
   NMTimelinePublisher getNMTimelinePublisher();
 
+  NMLogAggregationStatusTracker getNMLogAggregationStatusTracker();
+
   ContainerExecutor getContainerExecutor();
 
   ContainerStateTransitionListener getContainerStateTransitionListener();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
index 42b7b5f..d5b8fd3 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java
@@ -58,6 +58,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Cont
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerEvent;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerImpl;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import 
org.apache.hadoop.yarn.server.nodemanager.nodelabels.ConfigurationNodeLabelsProvider;
@@ -621,6 +622,8 @@ public class NodeManager extends CompositeService
 
     private ResourcePluginManager resourcePluginManager;
 
+    private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
+
     public NMContext(NMContainerTokenSecretManager containerTokenSecretManager,
         NMTokenSecretManagerInNM nmTokenSecretManager,
         LocalDirsHandlerService dirsHandler, ApplicationACLsManager 
aclsManager,
@@ -862,6 +865,15 @@ public class NodeManager extends CompositeService
     public void setDeletionService(DeletionService deletionService) {
       this.deletionService = deletionService;
     }
+
+    public void setNMLogAggregationStatusTracker(
+        NMLogAggregationStatusTracker nmLogAggregationStatusTracker) {
+      this.nmLogAggregationStatusTracker = nmLogAggregationStatusTracker;
+    }
+    @Override
+    public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
+      return nmLogAggregationStatusTracker;
+    }
   }
 
   /**

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
index 3d3f573..8154723 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeStatusUpdaterImpl.java
@@ -381,6 +381,20 @@ public class NodeStatusUpdaterImpl extends AbstractService 
implements
       if (containerReports != null) {
         LOG.info("Registering with RM using containers :" + containerReports);
       }
+      if (logAggregationEnabled) {
+        // pull log aggregation status for application running in this NM
+        List<LogAggregationReport> logAggregationReports =
+            context.getNMLogAggregationStatusTracker()
+                .pullCachedLogAggregationReports();
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("The cache log aggregation status size:"
+              + logAggregationReports.size());
+        }
+        if (logAggregationReports != null
+            && !logAggregationReports.isEmpty()) {
+          request.setLogAggregationReportsForApps(logAggregationReports);
+        }
+      }
       regNMResponse =
           resourceTracker.registerNodeManager(request);
       // Make sure rmIdentifier is set before we release the lock

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
index 6b4d517..0b2fca1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/ContainerManagerImpl.java
@@ -109,6 +109,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.DeletionService;
 import org.apache.hadoop.yarn.server.nodemanager.LocalDirsHandlerService;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger;
 import org.apache.hadoop.yarn.server.nodemanager.NMAuditLogger.AuditConstants;
+import org.apache.hadoop.yarn.server.nodemanager.NodeManager.NMContext;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import org.apache.hadoop.yarn.server.nodemanager.amrmproxy.AMRMProxyService;
@@ -138,6 +139,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.even
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadEventType;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.localizer.sharedcache.SharedCacheUploadService;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.LogAggregationService;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.LogHandler;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.NonAggregatingLogHandler;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.loghandler.event.LogHandlerEventType;
@@ -226,6 +228,8 @@ public class ContainerManagerImpl extends CompositeService 
implements
   // NM metrics publisher is set only if the timeline service v.2 is enabled
   private NMTimelinePublisher nmMetricsPublisher;
 
+  private NMLogAggregationStatusTracker nmLogAggregationStatusTracker;
+
   public ContainerManagerImpl(Context context, ContainerExecutor exec,
       DeletionService deletionContext, NodeStatusUpdater nodeStatusUpdater,
       NodeManagerMetrics metrics, LocalDirsHandlerService dirsHandler) {
@@ -283,6 +287,10 @@ public class ContainerManagerImpl extends CompositeService 
implements
 
     addService(dispatcher);
 
+    this.nmLogAggregationStatusTracker = createNMLogAggregationStatusTracker(
+        context);
+    ((NMContext)context).setNMLogAggregationStatusTracker(
+        this.nmLogAggregationStatusTracker);
     ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
     this.readLock = lock.readLock();
     this.writeLock = lock.writeLock();
@@ -558,6 +566,11 @@ public class ContainerManagerImpl extends CompositeService 
implements
     return nmTimelinePublisherLocal;
   }
 
+  protected NMLogAggregationStatusTracker createNMLogAggregationStatusTracker(
+      Context ctxt) {
+    return new NMLogAggregationStatusTracker(ctxt);
+  }
+
   protected ContainersLauncher createContainersLauncher(Context context,
       ContainerExecutor exec) {
     return new ContainersLauncher(context, this.dispatcher, exec, dirsHandler, 
this);
@@ -653,6 +666,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
       }
     }
 
+    this.nmLogAggregationStatusTracker.start();
     LOG.info("ContainerManager started at " + connectAddress);
     LOG.info("ContainerManager bound to " + initialAddress);
   }
@@ -691,6 +705,7 @@ public class ContainerManagerImpl extends CompositeService 
implements
       server.stop();
     }
     super.serviceStop();
+    this.nmLogAggregationStatusTracker.stop();
   }
 
   public void cleanUpApplicationsOnNMShutDown() {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 4ac150a..c7e06ff 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -385,7 +385,8 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
         logAggregationSucceedInThisCycle
             ? LogAggregationStatus.RUNNING
             : LogAggregationStatus.RUNNING_WITH_FAILURE;
-    sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage);
+    sendLogAggregationReportInternal(logAggregationStatus, diagnosticMessage,
+        false);
     if (appFinished) {
       // If the app is finished, one extra final report with log aggregation
       // status SUCCEEDED/FAILED will be sent to RM to inform the RM
@@ -394,18 +395,22 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
           renameTemporaryLogFileFailed || !logAggregationSucceedInThisCycle
               ? LogAggregationStatus.FAILED
               : LogAggregationStatus.SUCCEEDED;
-      sendLogAggregationReportInternal(finalLogAggregationStatus, "");
+      sendLogAggregationReportInternal(finalLogAggregationStatus, "", true);
     }
   }
 
   private void sendLogAggregationReportInternal(
-      LogAggregationStatus logAggregationStatus, String diagnosticMessage) {
+      LogAggregationStatus logAggregationStatus, String diagnosticMessage,
+      boolean finalized) {
     LogAggregationReport report =
         Records.newRecord(LogAggregationReport.class);
     report.setApplicationId(appId);
     report.setDiagnosticMessage(diagnosticMessage);
     report.setLogAggregationStatus(logAggregationStatus);
     this.context.getLogAggregationStatusForApps().add(report);
+    this.context.getNMLogAggregationStatusTracker().updateLogAggregationStatus(
+        appId, logAggregationStatus, System.currentTimeMillis(),
+        diagnosticMessage, finalized);
   }
 
   @SuppressWarnings("unchecked")

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java
new file mode 100644
index 0000000..6d785d9
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/NMLogAggregationStatusTracker.java
@@ -0,0 +1,244 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Timer;
+import java.util.TimerTask;
+import java.util.Map.Entry;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock;
+import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock;
+import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class NMLogAggregationStatusTracker {
+
+  private static final Logger LOG =
+       LoggerFactory.getLogger(NMLogAggregationStatusTracker.class);
+
+  private final ReadLock updateLocker;
+  private final WriteLock pullLocker;
+  private final Context nmContext;
+  private final long rollingInterval;
+  private final Timer timer;
+  private final Map<ApplicationId, LogAggregationTrakcer> trackers;
+  private boolean disabled = false;
+
+  public NMLogAggregationStatusTracker(Context context) {
+    this.nmContext = context;
+    Configuration conf = context.getConf();
+    if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
+        YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
+      disabled = true;
+    }
+    this.trackers = new HashMap<>();
+    ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    this.updateLocker = lock.readLock();
+    this.pullLocker = lock.writeLock();
+    this.timer = new Timer();
+    this.rollingInterval = conf.getLong(
+        YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
+        YarnConfiguration.DEFAULT_LOG_AGGREGATION_STATUS_TIME_OUT_MS);
+    LOG.info("the rolling interval seconds for the NodeManager Cached Log "
+        + "aggregation status is " + (rollingInterval/1000));
+  }
+
+  public void start() {
+    if (disabled) {
+      LOG.warn("Log Aggregation is disabled."
+          + "So is the LogAggregationStatusTracker.");
+    } else {
+      this.timer.scheduleAtFixedRate(new LogAggregationStatusRoller(),
+          rollingInterval, rollingInterval);
+    }
+  }
+
+  public void stop() {
+    this.timer.cancel();
+  }
+
+  public void updateLogAggregationStatus(ApplicationId appId,
+      LogAggregationStatus logAggregationStatus, long updateTime,
+      String diagnosis, boolean finalized) {
+    if (disabled) {
+      LOG.warn("The log aggregation is diabled. No need to update "
+          + "the log aggregation status");
+    }
+    this.updateLocker.lock();
+    try {
+      LogAggregationTrakcer tracker = trackers.get(appId);
+      if (tracker == null) {
+        Application application = this.nmContext.getApplications().get(appId);
+        if (application == null) {
+          // the application has already finished or
+          // this application is unknown application.
+          // Check the log aggregation status update time, if the update time 
is
+          // still in the period of timeout, we add it to the trackers map.
+          // Otherwise, we ignore it.
+          long currentTime = System.currentTimeMillis();
+          if (currentTime - updateTime > rollingInterval) {
+            LOG.warn("Ignore the log aggregation status update request "
+                + "for the application:" + appId + ". The log aggregation 
status"
+                + " update time is " + updateTime + " while the request 
process "
+                + "time is " + currentTime + ".");
+            return;
+          }
+        }
+        LogAggregationTrakcer newTracker = new LogAggregationTrakcer(
+            logAggregationStatus, diagnosis);
+        newTracker.setLastModifiedTime(updateTime);
+        newTracker.setFinalized(finalized);
+        trackers.put(appId, newTracker);
+      } else {
+        if (tracker.isFinalized()) {
+          LOG.warn("Ignore the log aggregation status update request "
+              + "for the application:" + appId + ". The cached log aggregation 
"
+              + "status is " + tracker.getLogAggregationStatus() + ".");
+        } else {
+          if (tracker.getLastModifiedTime() > updateTime) {
+            LOG.warn("Ignore the log aggregation status update request "
+                + "for the application:" + appId + ". The request log "
+                + "aggregation status update is older than the cached "
+                + "log aggregation status.");
+          } else {
+            tracker.setLogAggregationStatus(logAggregationStatus);
+            tracker.setDiagnosis(diagnosis);
+            tracker.setLastModifiedTime(updateTime);
+            tracker.setFinalized(finalized);
+            trackers.put(appId, tracker);
+          }
+        }
+      }
+    } finally {
+      this.updateLocker.unlock();
+    }
+  }
+
+  public List<LogAggregationReport> pullCachedLogAggregationReports() {
+    List<LogAggregationReport> reports = new ArrayList<>();
+    if (disabled) {
+      LOG.warn("The log aggregation is diabled."
+          + "There is no cached log aggregation status.");
+      return reports;
+    }
+    this.pullLocker.lock();
+    try {
+      for(Entry<ApplicationId, LogAggregationTrakcer> tracker :
+          trackers.entrySet()) {
+        LogAggregationTrakcer current = tracker.getValue();
+        LogAggregationReport report = LogAggregationReport.newInstance(
+            tracker.getKey(), current.getLogAggregationStatus(),
+            current.getDiagnosis());
+        reports.add(report);
+      }
+      return reports;
+    } finally {
+      this.pullLocker.unlock();
+    }
+  }
+
+  private class LogAggregationStatusRoller extends TimerTask {
+    @Override
+    public void run() {
+      rollLogAggregationStatus();
+    }
+  }
+
+  @Private
+  void rollLogAggregationStatus() {
+    this.pullLocker.lock();
+    try {
+      long currentTimeStamp = System.currentTimeMillis();
+      LOG.info("Rolling over the cached log aggregation status.");
+      Iterator<Entry<ApplicationId, LogAggregationTrakcer>> it = trackers
+          .entrySet().iterator();
+      while (it.hasNext()) {
+        Entry<ApplicationId, LogAggregationTrakcer> tracker = it.next(); 
+        // the application has finished.
+        if (nmContext.getApplications().get(tracker.getKey()) == null) {
+          if (currentTimeStamp - tracker.getValue().getLastModifiedTime()
+              > rollingInterval) {
+            it.remove();
+          }
+        }
+      }
+    } finally {
+      this.pullLocker.unlock();
+    }
+  }
+
+  private static class LogAggregationTrakcer {
+    private LogAggregationStatus logAggregationStatus;
+    private long lastModifiedTime;
+    private boolean finalized;
+    private String diagnosis;
+
+    public LogAggregationTrakcer(
+        LogAggregationStatus logAggregationStatus, String diagnosis) {
+      this.setLogAggregationStatus(logAggregationStatus);
+      this.setDiagnosis(diagnosis);
+    }
+
+    public LogAggregationStatus getLogAggregationStatus() {
+      return logAggregationStatus;
+    }
+
+    public void setLogAggregationStatus(
+        LogAggregationStatus logAggregationStatus) {
+      this.logAggregationStatus = logAggregationStatus;
+    }
+
+    public long getLastModifiedTime() {
+      return lastModifiedTime;
+    }
+
+    public void setLastModifiedTime(long lastModifiedTime) {
+      this.lastModifiedTime = lastModifiedTime;
+    }
+
+    public boolean isFinalized() {
+      return finalized;
+    }
+
+    public void setFinalized(boolean finalized) {
+      this.finalized = finalized;
+    }
+
+    public String getDiagnosis() {
+      return diagnosis;
+    }
+
+    public void setDiagnosis(String diagnosis) {
+      this.diagnosis = diagnosis;
+    }
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
index 9602142..4ac268b 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/amrmproxy/BaseAMRMProxyTest.java
@@ -55,6 +55,7 @@ import 
org.apache.hadoop.yarn.server.nodemanager.NodeStatusUpdater;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.ContainerManager;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker.NMLogAggregationStatusTracker;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.resourceplugin.ResourcePluginManager;
 import org.apache.hadoop.yarn.server.nodemanager.metrics.NodeManagerMetrics;
 import 
org.apache.hadoop.yarn.server.nodemanager.recovery.NMMemoryStateStoreService;
@@ -814,5 +815,10 @@ public abstract class BaseAMRMProxyTest {
     public DeletionService getDeletionService() {
       return null;
     }
+
+    @Override
+    public NMLogAggregationStatusTracker getNMLogAggregationStatusTracker() {
+      return null;
+    }
   }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java
new file mode 100644
index 0000000..e51bac1
--- /dev/null
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/tracker/TestNMLogAggregationStatusTracker.java
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregation.tracker;
+
+import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
+
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ConcurrentMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.LogAggregationStatus;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
+import org.apache.hadoop.yarn.server.nodemanager.Context;
+import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.Application;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestNMLogAggregationStatusTracker {
+
+  @Test
+  public void testNMLogAggregationStatusUpdate() {
+    Context mockContext = mock(Context.class);
+    ConcurrentMap<ApplicationId, Application> apps = new ConcurrentHashMap<>();
+    when(mockContext.getApplications()).thenReturn(apps);
+    // the log aggregation is disabled.
+    Configuration conf = new YarnConfiguration();
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, false);
+    when(mockContext.getConf()).thenReturn(conf);
+    NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker(
+        mockContext);
+    ApplicationId appId0 = ApplicationId.newInstance(0, 0);
+    tracker.updateLogAggregationStatus(appId0,
+        LogAggregationStatus.RUNNING, System.currentTimeMillis(), "", false);
+    List<LogAggregationReport> reports = tracker
+        .pullCachedLogAggregationReports();
+    // we can not get any cached log aggregation status because
+    // the log aggregation is disabled.
+    Assert.assertTrue(reports.isEmpty());
+
+    // enable the log aggregation.
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    when(mockContext.getConf()).thenReturn(conf);
+    tracker = new NMLogAggregationStatusTracker(mockContext);
+    // update the log aggregation status for an un-existed application
+    // the update time is not in the period of timeout.
+    // So, we should not cache the log application status for this
+    // application.
+    appId0 = ApplicationId.newInstance(0, 0);
+    tracker.updateLogAggregationStatus(appId0,
+        LogAggregationStatus.RUNNING,
+        System.currentTimeMillis() - 15 * 60 * 1000, "", false);
+    reports = tracker
+        .pullCachedLogAggregationReports();
+    Assert.assertTrue(reports.isEmpty());
+
+    tracker.updateLogAggregationStatus(appId0,
+        LogAggregationStatus.RUNNING,
+        System.currentTimeMillis() - 60 * 1000, "", false);
+    reports = tracker
+        .pullCachedLogAggregationReports();
+    Assert.assertTrue(reports.size() == 1);
+    Assert.assertTrue(reports.get(0).getLogAggregationStatus()
+        == LogAggregationStatus.RUNNING);
+
+    tracker.updateLogAggregationStatus(appId0,
+        LogAggregationStatus.SUCCEEDED,
+        System.currentTimeMillis() - 1 * 60 * 1000, "", true);
+    reports = tracker
+        .pullCachedLogAggregationReports();
+    Assert.assertTrue(reports.size() == 1);
+    Assert.assertTrue(reports.get(0).getLogAggregationStatus()
+        == LogAggregationStatus.SUCCEEDED);
+
+    // the log aggregation status is finalized. So, we would
+    // ingore the following update
+    tracker.updateLogAggregationStatus(appId0,
+        LogAggregationStatus.FAILED,
+        System.currentTimeMillis() - 1 * 60 * 1000, "", true);
+    reports = tracker
+        .pullCachedLogAggregationReports();
+    Assert.assertTrue(reports.size() == 1);
+    Assert.assertTrue(reports.get(0).getLogAggregationStatus()
+        == LogAggregationStatus.SUCCEEDED);
+  }
+
+  public void testLogAggregationStatusRoller() throws InterruptedException {
+    Context mockContext = mock(Context.class);
+    Configuration conf = new YarnConfiguration();
+    conf.setLong(YarnConfiguration.LOG_AGGREGATION_STATUS_TIME_OUT_MS,
+        10 * 1000);
+    when(mockContext.getConf()).thenReturn(conf);
+    NMLogAggregationStatusTracker tracker = new NMLogAggregationStatusTracker(
+        mockContext);
+    ApplicationId appId0 = ApplicationId.newInstance(0, 0);
+    tracker.updateLogAggregationStatus(appId0,
+        LogAggregationStatus.RUNNING,
+        System.currentTimeMillis(), "", false);
+    // sleep 10s
+    Thread.sleep(10*1000);
+    // the cache log aggregation status should be deleted.
+    List<LogAggregationReport> reports = tracker
+        .pullCachedLogAggregationReports();
+    Assert.assertTrue(reports.size() == 0);
+  }
+}

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
index 9d95f63..e997192 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceTrackerService.java
@@ -399,9 +399,21 @@ public class ResourceTrackerService extends 
AbstractService implements
 
     RMNode oldNode = this.rmContext.getRMNodes().putIfAbsent(nodeId, rmNode);
     if (oldNode == null) {
+      RMNodeStartedEvent startEvent = new RMNodeStartedEvent(nodeId,
+          request.getNMContainerStatuses(),
+          request.getRunningApplications());
+      if (request.getLogAggregationReportsForApps() != null
+          && !request.getLogAggregationReportsForApps().isEmpty()) {
+        if (LOG.isDebugEnabled()) {
+          LOG.debug("Found the number of previous cached log aggregation "
+              + "status from nodemanager:" + nodeId + " is :"
+              + request.getLogAggregationReportsForApps().size());
+        }
+        startEvent.setLogAggregationReportsForApps(request
+            .getLogAggregationReportsForApps());
+      }
       this.rmContext.getDispatcher().getEventHandler().handle(
-              new RMNodeStartedEvent(nodeId, request.getNMContainerStatuses(),
-                  request.getRunningApplications()));
+          startEvent);
     } else {
       LOG.info("Reconnect from the node at: " + host);
       this.nmLivelinessMonitor.unregister(nodeId);
@@ -426,7 +438,6 @@ public class ResourceTrackerService extends AbstractService 
implements
         this.rmContext.getRMNodes().put(nodeId, rmNode);
         this.rmContext.getDispatcher().getEventHandler()
             .handle(new RMNodeStartedEvent(nodeId, null, null));
-
       } else {
         // Reset heartbeat ID since node just restarted.
         oldNode.resetLastNodeHeartBeatResponse();

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
index 3cbde01..14bc0da 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeImpl.java
@@ -866,6 +866,12 @@ public class RMNodeImpl implements RMNode, 
EventHandler<RMNodeEvent> {
       rmNode.context.getDispatcher().getEventHandler().handle(
         new NodesListManagerEvent(
             NodesListManagerEventType.NODE_USABLE, rmNode));
+      List<LogAggregationReport> logAggregationReportsForApps =
+        startEvent.getLogAggregationReportsForApps();
+      if (logAggregationReportsForApps != null
+          && !logAggregationReportsForApps.isEmpty()) {
+        rmNode.handleLogAggregationStatus(logAggregationReportsForApps);
+      }
     }
   }
 

http://git-wip-us.apache.org/repos/asf/hadoop/blob/4d53ef7e/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
index 4fc983a..3976994 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmnode/RMNodeStartedEvent.java
@@ -22,12 +22,14 @@ import java.util.List;
 
 import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.server.api.protocolrecords.LogAggregationReport;
 import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
 
 public class RMNodeStartedEvent extends RMNodeEvent {
 
   private List<NMContainerStatus> containerStatuses;
   private List<ApplicationId> runningApplications;
+  private List<LogAggregationReport> logAggregationReportsForApps;
 
   public RMNodeStartedEvent(NodeId nodeId,
       List<NMContainerStatus> containerReports,
@@ -44,4 +46,13 @@ public class RMNodeStartedEvent extends RMNodeEvent {
   public List<ApplicationId> getRunningApplications() {
     return runningApplications;
   }
+
+  public List<LogAggregationReport> getLogAggregationReportsForApps() {
+    return this.logAggregationReportsForApps;
+  }
+
+  public void setLogAggregationReportsForApps(
+      List<LogAggregationReport> logAggregationReportsForApps) {
+    this.logAggregationReportsForApps = logAggregationReportsForApps;
+  }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to