Repository: hadoop
Updated Branches:
  refs/heads/branch-2.6 281041a57 -> 16d714433


YARN-2583. Modified AggregatedLogDeletionService to be able to delete rolling 
aggregated logs. Contributed by Xuan Gong.

(cherry picked from commit cb81bac0029fce3a9726df3523f0b692cd3375b8)


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/16d71443
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/16d71443
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/16d71443

Branch: refs/heads/branch-2.6
Commit: 16d71443373e8eda8644bee54f3f9a6979d66a56
Parents: 281041a
Author: Zhijie Shen <zjs...@apache.org>
Authored: Fri Oct 10 00:10:39 2014 -0700
Committer: Zhijie Shen <zjs...@apache.org>
Committed: Fri Oct 10 00:18:24 2014 -0700

----------------------------------------------------------------------
 hadoop-yarn-project/CHANGES.txt                 |   3 +
 .../AggregatedLogDeletionService.java           |  93 ++++++++-
 .../TestAggregatedLogDeletionService.java       | 192 ++++++++++++++++---
 .../logaggregation/AppLogAggregatorImpl.java    | 151 ++++++++++++++-
 .../logaggregation/LogAggregationService.java   |   2 +-
 .../TestLogAggregationService.java              |  94 ++++++---
 6 files changed, 467 insertions(+), 68 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d71443/hadoop-yarn-project/CHANGES.txt
----------------------------------------------------------------------
diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt
index 674909c..ec31f8f 100644
--- a/hadoop-yarn-project/CHANGES.txt
+++ b/hadoop-yarn-project/CHANGES.txt
@@ -270,6 +270,9 @@ Release 2.6.0 - UNRELEASED
     YARN-2629. Made the distributed shell use the domain-based timeline ACLs.
     (zjshen)
 
+    YARN-2583. Modified AggregatedLogDeletionService to be able to delete 
rolling
+    aggregated logs. (Xuan Gong via zjshen)
+
   OPTIMIZATIONS
 
   BUG FIXES

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d71443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
index 590cfe2..4c1d152 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/logaggregation/AggregatedLogDeletionService.java
@@ -24,38 +24,53 @@ import java.util.TimerTask;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
-import org.apache.hadoop.classification.InterfaceAudience.Private;
+import org.apache.hadoop.classification.InterfaceAudience;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.ipc.RPC;
 import org.apache.hadoop.security.AccessControlException;
 import org.apache.hadoop.service.AbstractService;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
+import org.apache.hadoop.yarn.client.ClientRMProxy;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.exceptions.ApplicationNotFoundException;
+import org.apache.hadoop.yarn.exceptions.YarnException;
+import org.apache.hadoop.yarn.util.ConverterUtils;
+
+import com.google.common.annotations.VisibleForTesting;
 
 /**
  * A service that periodically deletes aggregated logs.
  */
-@Private
+@InterfaceAudience.LimitedPrivate({"yarn", "mapreduce"})
 public class AggregatedLogDeletionService extends AbstractService {
   private static final Log LOG = 
LogFactory.getLog(AggregatedLogDeletionService.class);
   
   private Timer timer = null;
   private long checkIntervalMsecs;
+  private LogDeletionTask task;
   
   static class LogDeletionTask extends TimerTask {
     private Configuration conf;
     private long retentionMillis;
     private String suffix = null;
     private Path remoteRootLogDir = null;
+    private ApplicationClientProtocol rmClient = null;
     
-    public LogDeletionTask(Configuration conf, long retentionSecs) {
+    public LogDeletionTask(Configuration conf, long retentionSecs, 
ApplicationClientProtocol rmClient) {
       this.conf = conf;
       this.retentionMillis = retentionSecs * 1000;
       this.suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
       this.remoteRootLogDir =
         new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
             YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+      this.rmClient = rmClient;
     }
     
     @Override
@@ -64,11 +79,10 @@ public class AggregatedLogDeletionService extends 
AbstractService {
       LOG.info("aggregated log deletion started.");
       try {
         FileSystem fs = remoteRootLogDir.getFileSystem(conf);
-
         for(FileStatus userDir : fs.listStatus(remoteRootLogDir)) {
           if(userDir.isDirectory()) {
             Path userDirPath = new Path(userDir.getPath(), suffix);
-            deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs);
+            deleteOldLogDirsFrom(userDirPath, cutoffMillis, fs, rmClient);
           }
         }
       } catch (IOException e) {
@@ -79,18 +93,36 @@ public class AggregatedLogDeletionService extends 
AbstractService {
     }
     
     private static void deleteOldLogDirsFrom(Path dir, long cutoffMillis, 
-        FileSystem fs) {
+        FileSystem fs, ApplicationClientProtocol rmClient) {
       try {
         for(FileStatus appDir : fs.listStatus(dir)) {
           if(appDir.isDirectory() && 
               appDir.getModificationTime() < cutoffMillis) {
-            if(shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
+            boolean appTerminated =
+                isApplicationTerminated(ConverterUtils.toApplicationId(appDir
+                  .getPath().getName()), rmClient);
+            if(appTerminated && shouldDeleteLogDir(appDir, cutoffMillis, fs)) {
               try {
                 LOG.info("Deleting aggregated logs in "+appDir.getPath());
                 fs.delete(appDir.getPath(), true);
               } catch (IOException e) {
                 logIOException("Could not delete "+appDir.getPath(), e);
               }
+            } else if (!appTerminated){
+              try {
+                for(FileStatus node: fs.listStatus(appDir.getPath())) {
+                  if(node.getModificationTime() < cutoffMillis) {
+                    try {
+                      fs.delete(node.getPath(), true);
+                    } catch (IOException ex) {
+                      logIOException("Could not delete "+appDir.getPath(), ex);
+                    }
+                  }
+                }
+              } catch(IOException e) {
+                logIOException(
+                  "Error reading the contents of " + appDir.getPath(), e);
+              }
             }
           }
         }
@@ -115,6 +147,29 @@ public class AggregatedLogDeletionService extends 
AbstractService {
       }
       return shouldDelete;
     }
+
+    private static boolean isApplicationTerminated(ApplicationId appId,
+        ApplicationClientProtocol rmClient) throws IOException {
+      ApplicationReport appReport = null;
+      try {
+        appReport =
+            rmClient.getApplicationReport(
+              GetApplicationReportRequest.newInstance(appId))
+              .getApplicationReport();
+      } catch (ApplicationNotFoundException e) {
+        return true;
+      } catch (YarnException e) {
+        throw new IOException(e);
+      }
+      YarnApplicationState currentState = appReport.getYarnApplicationState();
+      return currentState == YarnApplicationState.FAILED
+          || currentState == YarnApplicationState.KILLED
+          || currentState == YarnApplicationState.FINISHED;
+    }
+
+    public ApplicationClientProtocol getRMClient() {
+      return this.rmClient;
+    }
   }
   
   private static void logIOException(String comment, IOException e) {
@@ -140,6 +195,7 @@ public class AggregatedLogDeletionService extends 
AbstractService {
 
   @Override
   protected void serviceStop() throws Exception {
+    stopRMClient();
     stopTimer();
     super.serviceStop();
   }
@@ -156,10 +212,11 @@ public class AggregatedLogDeletionService extends 
AbstractService {
     }
   }
   
-  public void refreshLogRetentionSettings() {
+  public void refreshLogRetentionSettings() throws IOException {
     if (getServiceState() == STATE.STARTED) {
       Configuration conf = createConf();
       setConfig(conf);
+      stopRMClient();
       stopTimer();
       scheduleLogDeletionTask();
     } else {
@@ -167,7 +224,7 @@ public class AggregatedLogDeletionService extends 
AbstractService {
     }
   }
   
-  private void scheduleLogDeletionTask() {
+  private void scheduleLogDeletionTask() throws IOException {
     Configuration conf = getConfig();
     if (!conf.getBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED,
         YarnConfiguration.DEFAULT_LOG_AGGREGATION_ENABLED)) {
@@ -183,7 +240,7 @@ public class AggregatedLogDeletionService extends 
AbstractService {
       return;
     }
     setLogAggCheckIntervalMsecs(retentionSecs);
-    TimerTask task = new LogDeletionTask(conf, retentionSecs);
+    task = new LogDeletionTask(conf, retentionSecs, creatRMClient());
     timer = new Timer();
     timer.scheduleAtFixedRate(task, 0, checkIntervalMsecs);
   }
@@ -201,4 +258,20 @@ public class AggregatedLogDeletionService extends 
AbstractService {
   protected Configuration createConf() {
     return new Configuration();
   }
+
+  // Directly create and use ApplicationClientProtocol.
+  // We have already marked ApplicationClientProtocol.getApplicationReport
+  // as @Idempotent, it will automatically take care of RM restart/failover.
+  @VisibleForTesting
+  protected ApplicationClientProtocol creatRMClient() throws IOException {
+    return ClientRMProxy.createRMProxy(getConfig(),
+      ApplicationClientProtocol.class);
+  }
+
+  @VisibleForTesting
+  protected void stopRMClient() {
+    if (task != null && task.getRMClient() != null) {
+      RPC.stopProxy(task.getRMClient());
+    }
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d71443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
index 05c7e71..026996e 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/logaggregation/TestAggregatedLogDeletionService.java
@@ -20,6 +20,9 @@ package org.apache.hadoop.yarn.logaggregation;
 
 import java.io.IOException;
 import java.net.URI;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.List;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -27,6 +30,12 @@ import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FilterFileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.AccessControlException;
+import org.apache.hadoop.yarn.api.ApplicationClientProtocol;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportRequest;
+import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationReportResponse;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ApplicationReport;
+import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.junit.Before;
 import org.junit.Test;
@@ -51,7 +60,7 @@ public class TestAggregatedLogDeletionService {
     String root = "mockfs://foo/";
     String remoteRootLogDir = root+"tmp/logs";
     String suffix = "logs";
-    Configuration conf = new Configuration();
+    final Configuration conf = new Configuration();
     conf.setClass("fs.mockfs.impl", MockFileSystem.class, FileSystem.class);
     conf.set(YarnConfiguration.LOG_AGGREGATION_ENABLED, "true");
     conf.set(YarnConfiguration.LOG_AGGREGATION_RETAIN_SECONDS, "1800");
@@ -69,22 +78,37 @@ public class TestAggregatedLogDeletionService {
     
     when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
         new FileStatus[]{userDirStatus});
-    
+
+    ApplicationId appId1 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
     Path userLogDir = new Path(userDir, suffix);
-    Path app1Dir = new Path(userLogDir, "application_1_1");
+    Path app1Dir = new Path(userLogDir, appId1.toString());
     FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, 
app1Dir);
     
-    Path app2Dir = new Path(userLogDir, "application_1_2");
+    ApplicationId appId2 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 2);
+    Path app2Dir = new Path(userLogDir, appId2.toString());
     FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, 
app2Dir);
     
-    Path app3Dir = new Path(userLogDir, "application_1_3");
+    ApplicationId appId3 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 3);
+    Path app3Dir = new Path(userLogDir, appId3.toString());
     FileStatus app3DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, 
app3Dir);
     
-    Path app4Dir = new Path(userLogDir, "application_1_4");
+    ApplicationId appId4 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 4);
+    Path app4Dir = new Path(userLogDir, appId4.toString());
     FileStatus app4DirStatus = new FileStatus(0, true, 0, 0, toDeleteTime, 
app4Dir);
     
+    ApplicationId appId5 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 5);
+    Path app5Dir = new Path(userLogDir, appId5.toString());
+    FileStatus app5DirStatus =
+        new FileStatus(0, true, 0, 0, toDeleteTime, app5Dir);
+
     when(mockFs.listStatus(userLogDir)).thenReturn(
-        new FileStatus[]{app1DirStatus, app2DirStatus, app3DirStatus, 
app4DirStatus});
+      new FileStatus[] { app1DirStatus, app2DirStatus, app3DirStatus,
+          app4DirStatus, app5DirStatus });
     
     when(mockFs.listStatus(app1Dir)).thenReturn(
         new FileStatus[]{});
@@ -117,20 +141,55 @@ public class TestAggregatedLogDeletionService {
     
     when(mockFs.listStatus(app4Dir)).thenReturn(
         new FileStatus[]{app4Log1Status, app4Log2Status});
+
+    Path app5Log1 = new Path(app5Dir, "host1");
+    FileStatus app5Log1Status = new FileStatus(10, false, 1, 1, toDeleteTime, 
app5Log1);
     
-    AggregatedLogDeletionService.LogDeletionTask task = 
-      new AggregatedLogDeletionService.LogDeletionTask(conf, 1800);
-    
-    task.run();
-    
-    verify(mockFs).delete(app1Dir, true);
-    verify(mockFs, times(0)).delete(app2Dir, true);
-    verify(mockFs).delete(app3Dir, true);
-    verify(mockFs).delete(app4Dir, true);
+    Path app5Log2 = new Path(app5Dir, "host2");
+    FileStatus app5Log2Status = new FileStatus(10, false, 1, 1, toKeepTime, 
app5Log2);
+
+    when(mockFs.listStatus(app5Dir)).thenReturn(
+        new FileStatus[]{app5Log1Status, app5Log2Status});
+
+    final List<ApplicationId> finishedApplications =
+        Collections.unmodifiableList(Arrays.asList(appId1, appId2, appId3,
+          appId4));
+    final List<ApplicationId> runningApplications =
+        Collections.unmodifiableList(Arrays.asList(appId5));
+
+    AggregatedLogDeletionService deletionService =
+        new AggregatedLogDeletionService() {
+          @Override
+          protected ApplicationClientProtocol creatRMClient()
+              throws IOException {
+            try {
+              return createMockRMClient(finishedApplications,
+                runningApplications);
+            } catch (Exception e) {
+              throw new IOException(e);
+            }
+          }
+          @Override
+          protected void stopRMClient() {
+            // DO NOTHING
+          }
+        };
+    deletionService.init(conf);
+    deletionService.start();
+
+    verify(mockFs, timeout(2000)).delete(app1Dir, true);
+    verify(mockFs, timeout(2000).times(0)).delete(app2Dir, true);
+    verify(mockFs, timeout(2000)).delete(app3Dir, true);
+    verify(mockFs, timeout(2000)).delete(app4Dir, true);
+    verify(mockFs, timeout(2000).times(0)).delete(app5Dir, true);
+    verify(mockFs, timeout(2000)).delete(app5Log1, true);
+    verify(mockFs, timeout(2000).times(0)).delete(app5Log2, true);
+
+    deletionService.stop();
   }
 
   @Test
-  public void testRefreshLogRetentionSettings() throws IOException {
+  public void testRefreshLogRetentionSettings() throws Exception {
     long now = System.currentTimeMillis();
     //time before 2000 sec
     long before2000Secs = now - (2000 * 1000);
@@ -163,13 +222,17 @@ public class TestAggregatedLogDeletionService {
 
     Path userLogDir = new Path(userDir, suffix);
 
+    ApplicationId appId1 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
     //Set time last modified of app1Dir directory and its files to 
before2000Secs 
-    Path app1Dir = new Path(userLogDir, "application_1_1");
+    Path app1Dir = new Path(userLogDir, appId1.toString());
     FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, before2000Secs,
         app1Dir);
     
+    ApplicationId appId2 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 2);
     //Set time last modified of app1Dir directory and its files to 
before50Secs 
-    Path app2Dir = new Path(userLogDir, "application_1_2");
+    Path app2Dir = new Path(userLogDir, appId2.toString());
     FileStatus app2DirStatus = new FileStatus(0, true, 0, 0, before50Secs,
         app2Dir);
 
@@ -190,11 +253,27 @@ public class TestAggregatedLogDeletionService {
     when(mockFs.listStatus(app2Dir)).thenReturn(
         new FileStatus[] { app2Log1Status });
 
+    final List<ApplicationId> finishedApplications =
+        Collections.unmodifiableList(Arrays.asList(appId1, appId2));
+
     AggregatedLogDeletionService deletionSvc = new 
AggregatedLogDeletionService() {
       @Override
       protected Configuration createConf() {
         return conf;
       }
+      @Override
+      protected ApplicationClientProtocol creatRMClient()
+          throws IOException {
+        try {
+          return createMockRMClient(finishedApplications, null);
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+      @Override
+      protected void stopRMClient() {
+        // DO NOTHING
+      }
     };
     
     deletionSvc.init(conf);
@@ -253,8 +332,10 @@ public class TestAggregatedLogDeletionService {
     when(mockFs.listStatus(remoteRootLogPath)).thenReturn(
         new FileStatus[]{userDirStatus});
 
+    ApplicationId appId1 =
+        ApplicationId.newInstance(System.currentTimeMillis(), 1);
     Path userLogDir = new Path(userDir, suffix);
-    Path app1Dir = new Path(userLogDir, "application_1_1");
+    Path app1Dir = new Path(userLogDir, appId1.toString());
     FileStatus app1DirStatus = new FileStatus(0, true, 0, 0, now, app1Dir);
 
     when(mockFs.listStatus(userLogDir)).thenReturn(
@@ -266,8 +347,25 @@ public class TestAggregatedLogDeletionService {
     when(mockFs.listStatus(app1Dir)).thenReturn(
         new FileStatus[]{app1Log1Status});
 
+    final List<ApplicationId> finishedApplications =
+        Collections.unmodifiableList(Arrays.asList(appId1));
+
     AggregatedLogDeletionService deletionSvc =
-        new AggregatedLogDeletionService();
+        new AggregatedLogDeletionService() {
+      @Override
+      protected ApplicationClientProtocol creatRMClient()
+          throws IOException {
+        try {
+          return createMockRMClient(finishedApplications, null);
+        } catch (Exception e) {
+          throw new IOException(e);
+        }
+      }
+      @Override
+      protected void stopRMClient() {
+        // DO NOTHING
+      }
+    };
     deletionSvc.init(conf);
     deletionSvc.start();
  
@@ -286,11 +384,61 @@ public class TestAggregatedLogDeletionService {
 
     deletionSvc.stop();
   }
-  
+
   static class MockFileSystem extends FilterFileSystem {
     MockFileSystem() {
       super(mock(FileSystem.class));
     }
     public void initialize(URI name, Configuration conf) throws IOException {}
   }
+
+  private static ApplicationClientProtocol createMockRMClient(
+      List<ApplicationId> finishedApplicaitons,
+      List<ApplicationId> runningApplications) throws Exception {
+    final ApplicationClientProtocol mockProtocol =
+        mock(ApplicationClientProtocol.class);
+    if (finishedApplicaitons != null && !finishedApplicaitons.isEmpty()) {
+      for (ApplicationId appId : finishedApplicaitons) {
+        GetApplicationReportRequest request =
+            GetApplicationReportRequest.newInstance(appId);
+        GetApplicationReportResponse response =
+            createApplicationReportWithFinishedApplication();
+        when(mockProtocol.getApplicationReport(request))
+          .thenReturn(response);
+      }
+    }
+    if (runningApplications != null && !runningApplications.isEmpty()) {
+      for (ApplicationId appId : runningApplications) {
+        GetApplicationReportRequest request =
+            GetApplicationReportRequest.newInstance(appId);
+        GetApplicationReportResponse response =
+            createApplicationReportWithRunningApplication();
+        when(mockProtocol.getApplicationReport(request))
+          .thenReturn(response);
+      }
+    }
+    return mockProtocol;
+  }
+
+  private static GetApplicationReportResponse
+      createApplicationReportWithRunningApplication() {
+    ApplicationReport report = mock(ApplicationReport.class);
+    when(report.getYarnApplicationState()).thenReturn(
+      YarnApplicationState.RUNNING);
+    GetApplicationReportResponse response =
+        mock(GetApplicationReportResponse.class);
+    when(response.getApplicationReport()).thenReturn(report);
+    return response;
+  }
+
+  private static GetApplicationReportResponse
+      createApplicationReportWithFinishedApplication() {
+    ApplicationReport report = mock(ApplicationReport.class);
+    when(report.getYarnApplicationState()).thenReturn(
+      YarnApplicationState.FINISHED);
+    GetApplicationReportResponse response =
+        mock(GetApplicationReportResponse.class);
+    when(response.getApplicationReport()).thenReturn(report);
+    return response;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d71443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
index 318caf2..63f7c66 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/AppLogAggregatorImpl.java
@@ -20,6 +20,10 @@ package 
org.apache.hadoop.yarn.server.nodemanager.containermanager.logaggregatio
 
 import java.io.IOException;
 import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
@@ -33,6 +37,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.hadoop.classification.InterfaceAudience.Private;
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.security.UserGroupInformation;
@@ -41,6 +46,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationId;
 import org.apache.hadoop.yarn.api.records.ApplicationAccessType;
 import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.LogAggregationContext;
+import org.apache.hadoop.yarn.api.records.NodeId;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
 import org.apache.hadoop.yarn.event.Dispatcher;
 import org.apache.hadoop.yarn.logaggregation.ContainerLogsRetentionPolicy;
 import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
@@ -65,6 +72,23 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
   private static final Log LOG = LogFactory
       .getLog(AppLogAggregatorImpl.class);
   private static final int THREAD_SLEEP_TIME = 1000;
+  // This is temporary solution. The configuration will be deleted once
+  // we find a more scalable method to only write a single log file per LRS.
+  private static final String NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP
+      = YarnConfiguration.NM_PREFIX + "log-aggregation.num-log-files-per-app";
+  private static final int
+      DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP = 30;
+  
+  // This configuration is for debug and test purpose. By setting
+  // this configuration as true. We can break the lower bound of
+  // NM_LOG_AGGREGATION_ROLL_MONITORING_INTERVAL_SECONDS.
+  private static final String NM_LOG_AGGREGATION_DEBUG_ENABLED
+      = YarnConfiguration.NM_PREFIX + "log-aggregation.debug-enabled";
+  private static final boolean
+      DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED = false;
+
+  private static final long
+      NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS = 3600;
 
   private final LocalDirsHandlerService dirsHandler;
   private final Dispatcher dispatcher;
@@ -85,13 +109,16 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
   private final Map<ApplicationAccessType, String> appAcls;
   private final LogAggregationContext logAggregationContext;
   private final Context context;
+  private final int retentionSize;
+  private final long rollingMonitorInterval;
+  private final NodeId nodeId;
 
   private final Map<ContainerId, ContainerLogAggregator> 
containerLogAggregators =
       new HashMap<ContainerId, ContainerLogAggregator>();
 
   public AppLogAggregatorImpl(Dispatcher dispatcher,
       DeletionService deletionService, Configuration conf,
-      ApplicationId appId, UserGroupInformation userUgi,
+      ApplicationId appId, UserGroupInformation userUgi, NodeId nodeId,
       LocalDirsHandlerService dirsHandler, Path remoteNodeLogFileForApp,
       ContainerLogsRetentionPolicy retentionPolicy,
       Map<ApplicationAccessType, String> appAcls,
@@ -111,6 +138,51 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     this.appAcls = appAcls;
     this.logAggregationContext = logAggregationContext;
     this.context = context;
+    this.nodeId = nodeId;
+    int configuredRentionSize =
+        conf.getInt(NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP,
+            DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP);
+    if (configuredRentionSize <= 0) {
+      this.retentionSize =
+          DEFAULT_NM_LOG_AGGREGATION_NUM_LOG_FILES_SIZE_PER_APP;
+    } else {
+      this.retentionSize = configuredRentionSize;
+    }
+    long configuredRollingMonitorInterval =
+        this.logAggregationContext == null ? -1 : this.logAggregationContext
+          .getRollingIntervalSeconds();
+    boolean debug_mode =
+        conf.getBoolean(NM_LOG_AGGREGATION_DEBUG_ENABLED,
+          DEFAULT_NM_LOG_AGGREGATION_DEBUG_ENABLED);
+    if (configuredRollingMonitorInterval > 0
+        && configuredRollingMonitorInterval <
+          NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS) {
+      if (debug_mode) {
+        this.rollingMonitorInterval = configuredRollingMonitorInterval;
+      } else {
+        LOG.warn(
+            "rollingMonitorIntervall should be more than or equal to "
+            + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
+            + " seconds. Using "
+            + NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS
+            + " seconds instead.");
+        this.rollingMonitorInterval =
+            NM_LOG_AGGREGATION_MIN_ROLL_MONITORING_INTERVAL_SECONDS;
+      }
+    } else {
+      if (configuredRollingMonitorInterval <= 0) {
+        LOG.warn("rollingMonitorInterval is set as "
+            + configuredRollingMonitorInterval + ". "
+            + "The log rolling mornitoring interval is disabled. "
+            + "The logs will be aggregated after this application is 
finished.");
+      } else {
+        LOG.warn("rollingMonitorInterval is set as "
+            + configuredRollingMonitorInterval + ". "
+            + "The logs will be aggregated every "
+            + configuredRollingMonitorInterval + " seconds");
+      }
+      this.rollingMonitorInterval = configuredRollingMonitorInterval;
+    }
   }
 
   private void uploadLogsForContainers() {
@@ -181,12 +253,17 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
         }
       }
 
+      // Before upload logs, make sure the number of existing logs
+      // is smaller than the configured NM log aggregation retention size.
+      if (uploadedLogsInThisCycle) {
+        cleanOldLogs();
+      }
+
       if (writer != null) {
         writer.close();
       }
 
-      final Path renamedPath = logAggregationContext == null ||
-          logAggregationContext.getRollingIntervalSeconds() <= 0
+      final Path renamedPath = this.rollingMonitorInterval <= 0
               ? remoteNodeLogFileForApp : new Path(
                 remoteNodeLogFileForApp.getParent(),
                 remoteNodeLogFileForApp.getName() + "_"
@@ -198,9 +275,12 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
           @Override
           public Object run() throws Exception {
             FileSystem remoteFS = FileSystem.get(conf);
-            if (remoteFS.exists(remoteNodeTmpLogFileForApp)
-                && rename) {
-              remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
+            if (remoteFS.exists(remoteNodeTmpLogFileForApp)) {
+              if (rename) {
+                remoteFS.rename(remoteNodeTmpLogFileForApp, renamedPath);
+              } else {
+                remoteFS.delete(remoteNodeTmpLogFileForApp, false);
+              }
             }
             return null;
           }
@@ -218,6 +298,60 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     }
   }
 
+  private void cleanOldLogs() {
+    try {
+      final FileSystem remoteFS =
+          this.remoteNodeLogFileForApp.getFileSystem(conf);
+      Path appDir =
+          this.remoteNodeLogFileForApp.getParent().makeQualified(
+            remoteFS.getUri(), remoteFS.getWorkingDirectory());
+      Set<FileStatus> status =
+          new HashSet<FileStatus>(Arrays.asList(remoteFS.listStatus(appDir)));
+
+      Iterable<FileStatus> mask =
+          Iterables.filter(status, new Predicate<FileStatus>() {
+            @Override
+            public boolean apply(FileStatus next) {
+              return next.getPath().getName()
+                .contains(LogAggregationUtils.getNodeString(nodeId))
+                && !next.getPath().getName().endsWith(
+                    LogAggregationUtils.TMP_FILE_SUFFIX);
+            }
+          });
+      status = Sets.newHashSet(mask);
+      // Normally, we just need to delete one oldest log
+      // before we upload a new log.
+      // If we can not delete the older logs in this cycle,
+      // we will delete them in next cycle.
+      if (status.size() >= this.retentionSize) {
+        // sort by the lastModificationTime ascending
+        List<FileStatus> statusList = new ArrayList<FileStatus>(status);
+        Collections.sort(statusList, new Comparator<FileStatus>() {
+          public int compare(FileStatus s1, FileStatus s2) {
+            return s1.getModificationTime() < s2.getModificationTime() ? -1
+                : s1.getModificationTime() > s2.getModificationTime() ? 1 : 0;
+          }
+        });
+        for (int i = 0 ; i <= statusList.size() - this.retentionSize; i++) {
+          final FileStatus remove = statusList.get(i);
+          try {
+            userUgi.doAs(new PrivilegedExceptionAction<Object>() {
+              @Override
+              public Object run() throws Exception {
+                remoteFS.delete(remove.getPath(), false);
+                return null;
+              }
+            });
+          } catch (Exception e) {
+            LOG.error("Failed to delete " + remove.getPath(), e);
+          }
+        }
+      }
+    } catch (Exception e) {
+      LOG.error("Failed to clean old logs", e);
+    }
+  }
+
   @Override
   public void run() {
     try {
@@ -235,9 +369,8 @@ public class AppLogAggregatorImpl implements 
AppLogAggregator {
     while (!this.appFinishing.get() && !this.aborted.get()) {
       synchronized(this) {
         try {
-          if (this.logAggregationContext != null && this.logAggregationContext
-              .getRollingIntervalSeconds() > 0) {
-            wait(this.logAggregationContext.getRollingIntervalSeconds() * 
1000);
+          if (this.rollingMonitorInterval > 0) {
+            wait(this.rollingMonitorInterval * 1000);
             if (this.appFinishing.get() || this.aborted.get()) {
               break;
             }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d71443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
index 772f3f1..1d6a9e1 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/LogAggregationService.java
@@ -342,7 +342,7 @@ public class LogAggregationService extends AbstractService 
implements
     // New application
     final AppLogAggregator appLogAggregator =
         new AppLogAggregatorImpl(this.dispatcher, this.deletionService,
-            getConfig(), appId, userUgi, dirsHandler,
+            getConfig(), appId, userUgi, this.nodeId, dirsHandler,
             getRemoteNodeLogFileForApp(appId, user), logRetentionPolicy,
             appAcls, logAggregationContext, this.context);
     if (this.appLogAggregators.putIfAbsent(appId, appLogAggregator) != null) {

http://git-wip-us.apache.org/repos/asf/hadoop/blob/16d71443/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
index 36c54dc..2c0f349 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/containermanager/logaggregation/TestLogAggregationService.java
@@ -699,7 +699,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     }
   }
 
-  private void verifyContainerLogs(LogAggregationService logAggregationService,
+  private String verifyContainerLogs(LogAggregationService 
logAggregationService,
       ApplicationId appId, ContainerId[] expectedContainerIds,
       String[] logFiles, int numOfContainerLogs, boolean multiLogs)
       throws IOException {
@@ -811,6 +811,7 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
         Assert.assertEquals(0, thisContainerMap.size());
       }
       Assert.assertEquals(0, logMap.size());
+      return targetNodeFile.getPath().getName();
     } finally {
       reader.close();
     }
@@ -1219,17 +1220,32 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     dispatcher.stop();
   }
 
-  @SuppressWarnings("unchecked")
   @Test (timeout = 50000)
   public void testLogAggregationServiceWithInterval() throws Exception {
-    final int maxAttempts = 50;
+    testLogAggregationService(false);
+  }
+
+  @Test (timeout = 50000)
+  public void testLogAggregationServiceWithRetention() throws Exception {
+    testLogAggregationService(true);
+  }
+
+  @SuppressWarnings("unchecked")
+  private void testLogAggregationService(boolean retentionSizeLimitation)
+      throws Exception {
     LogAggregationContext logAggregationContextWithInterval =
         Records.newRecord(LogAggregationContext.class);
     logAggregationContextWithInterval.setRollingIntervalSeconds(5000);
-
     this.conf.set(YarnConfiguration.NM_LOG_DIRS, 
localLogDir.getAbsolutePath());
     this.conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
       this.remoteRootLogDir.getAbsolutePath());
+    if (retentionSizeLimitation) {
+      // set the retention size as 1. The number of logs for one application
+      // in one NM should be 1.
+      this.conf.setInt(YarnConfiguration.NM_PREFIX
+          + "log-aggregation.num-log-files-per-app", 1);
+    }
+
     // by setting this configuration, the log files will not be deleted 
immediately after
     // they are aggregated to remote directory.
     // We could use it to test whether the previous aggregated log files will 
be aggregated
@@ -1280,23 +1296,29 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
           .get(application);
     aggregator.doLogAggregationOutOfBand();
 
-    int count = 0;
-    while (numOfLogsAvailable(logAggregationService, application) != 1
-        && count <= maxAttempts) {
-      Thread.sleep(100);
-      count++;
+    if (retentionSizeLimitation) {
+      Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+        50, 1, true, null));
+    } else {
+      Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+        50, 1, false, null));
     }
+    String logFileInLastCycle = null;
     // Container logs should be uploaded
-    verifyContainerLogs(logAggregationService, application,
+    logFileInLastCycle = verifyContainerLogs(logAggregationService, 
application,
         new ContainerId[] { container }, logFiles1, 3, true);
 
+    Thread.sleep(2000);
+
     // There is no log generated at this time. Do the log aggregation again.
     aggregator.doLogAggregationOutOfBand();
 
     // Same logs will not be aggregated again.
     // Only one aggregated log file in Remote file directory.
-    Assert.assertEquals(numOfLogsAvailable(logAggregationService, application),
-      1);
+    Assert.assertEquals(numOfLogsAvailable(logAggregationService,
+        application, true, null), 1);
+
+    Thread.sleep(2000);
 
     // Do log aggregation
     String[] logFiles2 = new String[] { "stdout_1", "stderr_1", "syslog_1" };
@@ -1304,16 +1326,19 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
 
     aggregator.doLogAggregationOutOfBand();
 
-    count = 0;
-    while (numOfLogsAvailable(logAggregationService, application) != 2
-        && count <= maxAttempts) {
-      Thread.sleep(100);
-      count ++;
+    if (retentionSizeLimitation) {
+      Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+        50, 1, true, logFileInLastCycle));
+    } else {
+      Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+        50, 2, false, null));
     }
     // Container logs should be uploaded
-    verifyContainerLogs(logAggregationService, application,
+    logFileInLastCycle = verifyContainerLogs(logAggregationService, 
application,
         new ContainerId[] { container }, logFiles2, 3, true);
 
+    Thread.sleep(2000);
+
     // create another logs
     String[] logFiles3 = new String[] { "stdout_2", "stderr_2", "syslog_2" };
     writeContainerLogs(appLogDir, container, logFiles3);
@@ -1323,13 +1348,13 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
 
     dispatcher.await();
     logAggregationService.handle(new LogHandlerAppFinishedEvent(application));
-    count = 0;
-    while (numOfLogsAvailable(logAggregationService, application) != 3
-        && count <= maxAttempts) {
-      Thread.sleep(100);
-      count ++;
+    if (retentionSizeLimitation) {
+      Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+        50, 1, true, logFileInLastCycle));
+    } else {
+      Assert.assertTrue(waitAndCheckLogNum(logAggregationService, application,
+        50, 3, false, null));
     }
-
     verifyContainerLogs(logAggregationService, application,
       new ContainerId[] { container }, logFiles3, 3, true);
     logAggregationService.stop();
@@ -1338,7 +1363,8 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
   }
 
   private int numOfLogsAvailable(LogAggregationService logAggregationService,
-      ApplicationId appId) throws IOException {
+      ApplicationId appId, boolean sizeLimited, String lastLogFile)
+      throws IOException {
     Path appLogDir = logAggregationService.getRemoteAppLogDir(appId, 
this.user);
     RemoteIterator<FileStatus> nodeFiles = null;
     try {
@@ -1354,7 +1380,9 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     while (nodeFiles.hasNext()) {
       FileStatus status = nodeFiles.next();
       String filename = status.getPath().getName();
-      if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)) {
+      if (filename.contains(LogAggregationUtils.TMP_FILE_SUFFIX)
+          || (lastLogFile != null && filename.contains(lastLogFile)
+              && sizeLimited)) {
         return -1;
       }
       if (filename.contains(LogAggregationUtils
@@ -1364,4 +1392,18 @@ public class TestLogAggregationService extends 
BaseContainerManagerTest {
     }
     return count;
   }
+
+  private boolean waitAndCheckLogNum(
+      LogAggregationService logAggregationService, ApplicationId application,
+      int maxAttempts, int expectNum, boolean sizeLimited, String lastLogFile)
+      throws IOException, InterruptedException {
+    int count = 0;
+    while (numOfLogsAvailable(logAggregationService, application, sizeLimited,
+      lastLogFile) != expectNum && count <= maxAttempts) {
+      Thread.sleep(500);
+      count++;
+    }
+    return numOfLogsAvailable(logAggregationService, application, sizeLimited,
+      lastLogFile) == expectNum;
+  }
 }

Reply via email to