Repository: hadoop
Updated Branches:
  refs/heads/trunk af942585a -> e61d43127


YARN-4920. ATS/NM should support a link to dowload/get the logs in text format. 
Contributed by Xuan Gong.


Project: http://git-wip-us.apache.org/repos/asf/hadoop/repo
Commit: http://git-wip-us.apache.org/repos/asf/hadoop/commit/e61d4312
Tree: http://git-wip-us.apache.org/repos/asf/hadoop/tree/e61d4312
Diff: http://git-wip-us.apache.org/repos/asf/hadoop/diff/e61d4312

Branch: refs/heads/trunk
Commit: e61d431275d7fe5641fe9da4903e285b10330fa0
Parents: af94258
Author: Junping Du <junping...@apache.org>
Authored: Wed May 4 09:40:13 2016 -0700
Committer: Junping Du <junping...@apache.org>
Committed: Wed May 4 10:35:49 2016 -0700

----------------------------------------------------------------------
 .../webapp/AHSWebServices.java                  | 270 ++++++++++++++++++-
 ...pplicationHistoryManagerOnTimelineStore.java |  29 +-
 .../webapp/TestAHSWebServices.java              | 203 +++++++++++++-
 .../yarn/server/webapp/dao/ContainerInfo.java   |   6 +
 .../nodemanager/webapp/NMWebServices.java       |  22 +-
 .../nodemanager/webapp/TestNMWebServices.java   |  12 +-
 6 files changed, 525 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hadoop/blob/e61d4312/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
index e7a22bd..75dce07 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/main/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/AHSWebServices.java
@@ -18,6 +18,11 @@
 
 package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
 
+import java.io.DataInputStream;
+import java.io.EOFException;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.charset.Charset;
 import java.util.Collections;
 import java.util.Set;
 
@@ -28,13 +33,30 @@ import javax.ws.rs.Path;
 import javax.ws.rs.PathParam;
 import javax.ws.rs.Produces;
 import javax.ws.rs.QueryParam;
+import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.Context;
 import javax.ws.rs.core.MediaType;
+import javax.ws.rs.core.Response;
+import javax.ws.rs.core.StreamingOutput;
+import javax.ws.rs.core.Response.ResponseBuilder;
+import javax.ws.rs.core.Response.Status;
 
+import org.apache.hadoop.classification.InterfaceAudience.Public;
+import org.apache.hadoop.classification.InterfaceStability.Unstable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileContext;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.RemoteIterator;
 import org.apache.hadoop.util.StringUtils;
+import org.apache.hadoop.yarn.api.records.ApplicationId;
+import org.apache.hadoop.yarn.api.records.ContainerId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
 import org.apache.hadoop.yarn.api.records.timeline.TimelineAbout;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat.LogKey;
 import org.apache.hadoop.yarn.server.webapp.WebServices;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.AppAttemptsInfo;
@@ -42,9 +64,10 @@ import org.apache.hadoop.yarn.server.webapp.dao.AppInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.AppsInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainerInfo;
 import org.apache.hadoop.yarn.server.webapp.dao.ContainersInfo;
+import org.apache.hadoop.yarn.util.Times;
 import org.apache.hadoop.yarn.util.timeline.TimelineUtils;
 import org.apache.hadoop.yarn.webapp.BadRequestException;
-
+import com.google.common.base.Joiner;
 import com.google.inject.Inject;
 import com.google.inject.Singleton;
 
@@ -52,9 +75,17 @@ import com.google.inject.Singleton;
 @Path("/ws/v1/applicationhistory")
 public class AHSWebServices extends WebServices {
 
+  private static final String NM_DOWNLOAD_URI_STR =
+      "/ws/v1/node/containerlogs";
+  private static final Joiner JOINER = Joiner.on("");
+  private static final Joiner DOT_JOINER = Joiner.on(". ");
+  private final Configuration conf;
+
   @Inject
-  public AHSWebServices(ApplicationBaseProtocol appBaseProt) {
+  public AHSWebServices(ApplicationBaseProtocol appBaseProt,
+      Configuration conf) {
     super(appBaseProt);
+    this.conf = conf;
   }
 
   @GET
@@ -173,4 +204,239 @@ public class AHSWebServices extends WebServices {
     }
   }
 
+  @GET
+  @Path("/containerlogs/{containerid}/{filename}")
+  @Produces({ MediaType.TEXT_PLAIN })
+  @Public
+  @Unstable
+  public Response getLogs(@Context HttpServletRequest req,
+      @Context HttpServletResponse res,
+      @PathParam("containerid") String containerIdStr,
+      @PathParam("filename") String filename,
+      @QueryParam("download") String download) {
+    init(res);
+    ContainerId containerId;
+    try {
+      containerId = ContainerId.fromString(containerIdStr);
+    } catch (IllegalArgumentException ex) {
+      return createBadResponse(Status.NOT_FOUND,
+          "Invalid ContainerId: " + containerIdStr);
+    }
+
+    boolean downloadFile = parseBooleanParam(download);
+
+    ApplicationId appId = containerId.getApplicationAttemptId()
+        .getApplicationId();
+    AppInfo appInfo;
+    try {
+      appInfo = super.getApp(req, res, appId.toString());
+    } catch (Exception ex) {
+      // directly find logs from HDFS.
+      return sendStreamOutputResponse(appId, null, null, containerIdStr,
+          filename, downloadFile);
+    }
+    String appOwner = appInfo.getUser();
+
+    ContainerInfo containerInfo;
+    try {
+      containerInfo = super.getContainer(
+          req, res, appId.toString(),
+          containerId.getApplicationAttemptId().toString(),
+          containerId.toString());
+    } catch (Exception ex) {
+      if (isFinishedState(appInfo.getAppState())) {
+        // directly find logs from HDFS.
+        return sendStreamOutputResponse(appId, appOwner, null, containerIdStr,
+            filename, downloadFile);
+      }
+      return createBadResponse(Status.INTERNAL_SERVER_ERROR,
+          "Can not get ContainerInfo for the container: " + containerId);
+    }
+    String nodeId = containerInfo.getNodeId();
+    if (isRunningState(appInfo.getAppState())) {
+      String nodeHttpAddress = containerInfo.getNodeHttpAddress();
+      String uri = "/" + containerId.toString() + "/" + filename;
+      String resURI = JOINER.join(nodeHttpAddress, NM_DOWNLOAD_URI_STR, uri);
+      String query = req.getQueryString();
+      if (query != null && !query.isEmpty()) {
+        resURI += "?" + query;
+      }
+      ResponseBuilder response = Response.status(
+          HttpServletResponse.SC_TEMPORARY_REDIRECT);
+      response.header("Location", resURI);
+      return response.build();
+    } else if (isFinishedState(appInfo.getAppState())) {
+      return sendStreamOutputResponse(appId, appOwner, nodeId,
+          containerIdStr, filename, downloadFile);
+    } else {
+      return createBadResponse(Status.NOT_FOUND,
+          "The application is not at Running or Finished State.");
+    }
+  }
+
+  // TODO: YARN-5029. RM would send the update event. We could get
+  // the consistent YarnApplicationState.
+  // Will remove YarnApplicationState.ACCEPTED.
+  private boolean isRunningState(YarnApplicationState appState) {
+    return appState == YarnApplicationState.ACCEPTED
+        || appState == YarnApplicationState.RUNNING;
+  }
+
+  private boolean isFinishedState(YarnApplicationState appState) {
+    return appState == YarnApplicationState.FINISHED
+        || appState == YarnApplicationState.FAILED
+        || appState == YarnApplicationState.KILLED;
+  }
+
+  private Response createBadResponse(Status status, String errMessage) {
+    Response response = Response.status(status)
+        .entity(DOT_JOINER.join(status.toString(), errMessage)).build();
+    return response;
+  }
+
+  private boolean parseBooleanParam(String param) {
+    return ("true").equalsIgnoreCase(param);
+  }
+
+  private Response sendStreamOutputResponse(ApplicationId appId,
+      String appOwner, String nodeId, String containerIdStr,
+      String fileName, boolean downloadFile) {
+    StreamingOutput stream = null;
+    try {
+      stream = getStreamingOutput(appId, appOwner, nodeId,
+          containerIdStr, fileName);
+    } catch (Exception ex) {
+      return createBadResponse(Status.INTERNAL_SERVER_ERROR,
+          ex.getMessage());
+    }
+    if (stream == null) {
+      return createBadResponse(Status.INTERNAL_SERVER_ERROR,
+          "Can not get log for container: " + containerIdStr);
+    }
+    ResponseBuilder response = Response.ok(stream);
+    if (downloadFile) {
+      response.header("Content-Type", "application/octet-stream");
+      response.header("Content-Disposition", "attachment; filename="
+          + fileName);
+    }
+    return response.build();
+  }
+
+  private StreamingOutput getStreamingOutput(ApplicationId appId,
+      String appOwner, final String nodeId, final String containerIdStr,
+      final String logFile) throws IOException{
+    String suffix = LogAggregationUtils.getRemoteNodeLogDirSuffix(conf);
+    org.apache.hadoop.fs.Path remoteRootLogDir = new org.apache.hadoop.fs.Path(
+        conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR,
+            YarnConfiguration.DEFAULT_NM_REMOTE_APP_LOG_DIR));
+    org.apache.hadoop.fs.Path qualifiedRemoteRootLogDir =
+        FileContext.getFileContext(conf).makeQualified(remoteRootLogDir);
+    FileContext fc = FileContext.getFileContext(
+        qualifiedRemoteRootLogDir.toUri(), conf);
+    org.apache.hadoop.fs.Path remoteAppDir = null;
+    if (appOwner == null) {
+      org.apache.hadoop.fs.Path toMatch = LogAggregationUtils
+          .getRemoteAppLogDir(remoteRootLogDir, appId, "*", suffix);
+      FileStatus[] matching  = fc.util().globStatus(toMatch);
+      if (matching == null || matching.length != 1) {
+        return null;
+      }
+      remoteAppDir = matching[0].getPath();
+    } else {
+      remoteAppDir = LogAggregationUtils
+          .getRemoteAppLogDir(remoteRootLogDir, appId, appOwner, suffix);
+    }
+    final RemoteIterator<FileStatus> nodeFiles;
+    nodeFiles = fc.listStatus(remoteAppDir);
+    if (!nodeFiles.hasNext()) {
+      return null;
+    }
+
+    StreamingOutput stream = new StreamingOutput() {
+
+      @Override
+      public void write(OutputStream os) throws IOException,
+          WebApplicationException {
+        byte[] buf = new byte[65535];
+        boolean findLogs = false;
+        while (nodeFiles.hasNext()) {
+          final FileStatus thisNodeFile = nodeFiles.next();
+          String nodeName = thisNodeFile.getPath().getName();
+          if ((nodeId == null || nodeName.contains(LogAggregationUtils
+              .getNodeString(nodeId))) && !nodeName.endsWith(
+              LogAggregationUtils.TMP_FILE_SUFFIX)) {
+            AggregatedLogFormat.LogReader reader =
+                new AggregatedLogFormat.LogReader(conf,
+                    thisNodeFile.getPath());
+            DataInputStream valueStream;
+            LogKey key = new LogKey();
+            valueStream = reader.next(key);
+            while (valueStream != null && !key.toString()
+                .equals(containerIdStr)) {
+              // Next container
+              key = new LogKey();
+              valueStream = reader.next(key);
+            }
+            if (valueStream == null) {
+              continue;
+            }
+            while (true) {
+              try {
+                String fileType = valueStream.readUTF();
+                String fileLengthStr = valueStream.readUTF();
+                long fileLength = Long.parseLong(fileLengthStr);
+                if (fileType.equalsIgnoreCase(logFile)) {
+                  StringBuilder sb = new StringBuilder();
+                  sb.append("LogType:");
+                  sb.append(fileType + "\n");
+                  sb.append("Log Upload Time:");
+                  sb.append(Times.format(System.currentTimeMillis()) + "\n");
+                  sb.append("LogLength:");
+                  sb.append(fileLengthStr + "\n");
+                  sb.append("Log Contents:\n");
+                  byte[] b = sb.toString().getBytes(Charset.forName("UTF-8"));
+                  os.write(b, 0, b.length);
+
+                  long curRead = 0;
+                  long pendingRead = fileLength - curRead;
+                  int toRead = pendingRead > buf.length ? buf.length
+                      : (int) pendingRead;
+                  int len = valueStream.read(buf, 0, toRead);
+                  while (len != -1 && curRead < fileLength) {
+                    os.write(buf, 0, len);
+                    curRead += len;
+
+                    pendingRead = fileLength - curRead;
+                    toRead = pendingRead > buf.length ? buf.length
+                        : (int) pendingRead;
+                    len = valueStream.read(buf, 0, toRead);
+                  }
+                  sb = new StringBuilder();
+                  sb.append("\nEnd of LogType:" + fileType + "\n");
+                  b = sb.toString().getBytes(Charset.forName("UTF-8"));
+                  os.write(b, 0, b.length);
+                  findLogs = true;
+                } else {
+                  long totalSkipped = 0;
+                  long currSkipped = 0;
+                  while (currSkipped != -1 && totalSkipped < fileLength) {
+                    currSkipped = valueStream.skip(fileLength - totalSkipped);
+                    totalSkipped += currSkipped;
+                  }
+                }
+              } catch (EOFException eof) {
+                break;
+              }
+            }
+          }
+        }
+        os.flush();
+        if (!findLogs) {
+          throw new IOException("Can not find logs for container:"
+              + containerIdStr);
+        }
+      }
+    };
+    return stream;
+  }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e61d4312/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
index dfc5b81..3c97584 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/TestApplicationHistoryManagerOnTimelineStore.java
@@ -80,9 +80,11 @@ public class TestApplicationHistoryManagerOnTimelineStore {
     store = createStore(SCALE);
     TimelineEntities entities = new TimelineEntities();
     entities.addEntity(createApplicationTimelineEntity(
-        ApplicationId.newInstance(0, SCALE + 1), true, true, false, false));
+        ApplicationId.newInstance(0, SCALE + 1), true, true, false, false,
+        YarnApplicationState.FINISHED));
     entities.addEntity(createApplicationTimelineEntity(
-        ApplicationId.newInstance(0, SCALE + 2), true, false, true, false));
+        ApplicationId.newInstance(0, SCALE + 2), true, false, true, false,
+        YarnApplicationState.FINISHED));
     store.put(entities);
   }
 
@@ -140,10 +142,10 @@ public class TestApplicationHistoryManagerOnTimelineStore 
{
       ApplicationId appId = ApplicationId.newInstance(0, i);
       if (i == 2) {
         entities.addEntity(createApplicationTimelineEntity(
-            appId, true, false, false, true));
+            appId, true, false, false, true, YarnApplicationState.FINISHED));
       } else {
         entities.addEntity(createApplicationTimelineEntity(
-            appId, false, false, false, false));
+            appId, false, false, false, false, YarnApplicationState.FINISHED));
       }
       store.put(entities);
       for (int j = 1; j <= scale; ++j) {
@@ -160,6 +162,16 @@ public class TestApplicationHistoryManagerOnTimelineStore {
         }
       }
     }
+    TimelineEntities entities = new TimelineEntities();
+    ApplicationId appId = ApplicationId.newInstance(1234, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId = ContainerId.newContainerId(appAttemptId, 1);
+    entities.addEntity(createApplicationTimelineEntity(
+        appId, true, false, false, false, YarnApplicationState.RUNNING));
+    entities.addEntity(createAppAttemptTimelineEntity(appAttemptId));
+    entities.addEntity(createContainerEntity(containerId));
+    store.put(entities);
   }
 
   @Test
@@ -355,7 +367,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
         historyManager.getApplications(Long.MAX_VALUE, 0L, Long.MAX_VALUE)
           .values();
     Assert.assertNotNull(apps);
-    Assert.assertEquals(SCALE + 1, apps.size());
+    Assert.assertEquals(SCALE + 2, apps.size());
     ApplicationId ignoredAppId = ApplicationId.newInstance(0, SCALE + 2);
     for (ApplicationReport app : apps) {
       Assert.assertNotEquals(ignoredAppId, app.getApplicationId());
@@ -467,7 +479,8 @@ public class TestApplicationHistoryManagerOnTimelineStore {
 
   private static TimelineEntity createApplicationTimelineEntity(
       ApplicationId appId, boolean emptyACLs, boolean noAttemptId,
-      boolean wrongAppId, boolean enableUpdateEvent) {
+      boolean wrongAppId, boolean enableUpdateEvent,
+      YarnApplicationState state) {
     TimelineEntity entity = new TimelineEntity();
     entity.setEntityType(ApplicationMetricsConstants.ENTITY_TYPE);
     if (wrongAppId) {
@@ -517,7 +530,7 @@ public class TestApplicationHistoryManagerOnTimelineStore {
     eventInfo.put(ApplicationMetricsConstants.FINAL_STATUS_EVENT_INFO,
         FinalApplicationStatus.UNDEFINED.toString());
     eventInfo.put(ApplicationMetricsConstants.STATE_EVENT_INFO,
-        YarnApplicationState.FINISHED.toString());
+        state.toString());
     if (!noAttemptId) {
       eventInfo.put(ApplicationMetricsConstants.LATEST_APP_ATTEMPT_EVENT_INFO,
           ApplicationAttemptId.newInstance(appId, 1));
@@ -610,6 +623,8 @@ public class TestApplicationHistoryManagerOnTimelineStore {
     entityInfo.put(ContainerMetricsConstants.ALLOCATED_PORT_ENTITY_INFO, 100);
     entityInfo
         .put(ContainerMetricsConstants.ALLOCATED_PRIORITY_ENTITY_INFO, -1);
+    entityInfo.put(ContainerMetricsConstants
+        .ALLOCATED_HOST_HTTP_ADDRESS_ENTITY_INFO, "http://test:1234";);
     entity.setOtherInfo(entityInfo);
     TimelineEvent tEvent = new TimelineEvent();
     tEvent.setEventType(ContainerMetricsConstants.CREATED_EVENT_TYPE);

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e61d4312/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
index 20dfe45..f985fe4 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-applicationhistoryservice/src/test/java/org/apache/hadoop/yarn/server/applicationhistoryservice/webapp/TestAHSWebServices.java
@@ -19,17 +19,30 @@
 package org.apache.hadoop.yarn.server.applicationhistoryservice.webapp;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
+import java.io.File;
+import java.io.FileWriter;
+import java.io.Writer;
+import java.net.HttpURLConnection;
+import java.net.URI;
+import java.net.URL;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collection;
+import java.util.List;
 import java.util.Properties;
 
 import javax.servlet.FilterConfig;
 import javax.servlet.ServletException;
+import javax.servlet.http.HttpServletResponse;
 import javax.ws.rs.core.MediaType;
 
 import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.security.UserGroupInformation;
 import org.apache.hadoop.security.authentication.server.AuthenticationFilter;
 import 
org.apache.hadoop.security.authentication.server.PseudoAuthenticationHandler;
 import org.apache.hadoop.yarn.api.ApplicationBaseProtocol;
@@ -42,6 +55,8 @@ import org.apache.hadoop.yarn.api.records.NodeId;
 import org.apache.hadoop.yarn.api.records.YarnApplicationAttemptState;
 import org.apache.hadoop.yarn.api.records.YarnApplicationState;
 import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.logaggregation.AggregatedLogFormat;
+import org.apache.hadoop.yarn.logaggregation.LogAggregationUtils;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryClientService;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.ApplicationHistoryManagerOnTimelineStore;
 import 
org.apache.hadoop.yarn.server.applicationhistoryservice.TestApplicationHistoryManagerOnTimelineStore;
@@ -81,12 +96,17 @@ import com.sun.jersey.test.framework.WebAppDescriptor;
 public class TestAHSWebServices extends JerseyTestBase {
 
   private static ApplicationHistoryClientService historyClientService;
+  private static AHSWebServices ahsWebservice;
   private static final String[] USERS = new String[] { "foo" , "bar" };
   private static final int MAX_APPS = 5;
+  private static Configuration conf;
+  private static FileSystem fs;
+  private static final String remoteLogRootDir = "target/logs/";
+  private static final String rootLogDir = "target/LocalLogs";
 
   @BeforeClass
   public static void setupClass() throws Exception {
-    Configuration conf = new YarnConfiguration();
+    conf = new YarnConfiguration();
     TimelineStore store =
         TestApplicationHistoryManagerOnTimelineStore.createStore(MAX_APPS);
     TimelineACLsManager aclsManager = new TimelineACLsManager(conf);
@@ -95,6 +115,8 @@ public class TestAHSWebServices extends JerseyTestBase {
         new TimelineDataManager(store, aclsManager);
     conf.setBoolean(YarnConfiguration.YARN_ACL_ENABLE, true);
     conf.set(YarnConfiguration.YARN_ADMIN_ACL, "foo");
+    conf.setBoolean(YarnConfiguration.LOG_AGGREGATION_ENABLED, true);
+    conf.set(YarnConfiguration.NM_REMOTE_APP_LOG_DIR, remoteLogRootDir);
     dataManager.init(conf);
     ApplicationACLsManager appAclsManager = new ApplicationACLsManager(conf);
     ApplicationHistoryManagerOnTimelineStore historyManager =
@@ -108,6 +130,8 @@ public class TestAHSWebServices extends JerseyTestBase {
     };
     historyClientService.init(conf);
     historyClientService.start();
+    ahsWebservice = new AHSWebServices(historyClientService, conf);
+    fs = FileSystem.get(conf);
   }
 
   @AfterClass
@@ -115,6 +139,8 @@ public class TestAHSWebServices extends JerseyTestBase {
     if (historyClientService != null) {
       historyClientService.stop();
     }
+    fs.delete(new Path(remoteLogRootDir), true);
+    fs.delete(new Path(rootLogDir), true);
   }
 
   @Parameterized.Parameters
@@ -127,7 +153,7 @@ public class TestAHSWebServices extends JerseyTestBase {
     @Override
     protected void configureServlets() {
       bind(JAXBContextResolver.class);
-      bind(AHSWebServices.class);
+      bind(AHSWebServices.class).toInstance(ahsWebservice);;
       bind(GenericExceptionHandler.class);
       bind(ApplicationBaseProtocol.class).toInstance(historyClientService);
       serve("/*").with(GuiceContainer.class);
@@ -471,4 +497,177 @@ public class TestAHSWebServices extends JerseyTestBase {
     assertEquals(ContainerState.COMPLETE.toString(),
       container.getString("containerState"));
   }
+
+  @Test(timeout = 10000)
+  public void testContainerLogsForFinishedApps() throws Exception {
+    String fileName = "syslog";
+    String user = "user1";
+    UserGroupInformation ugi = UserGroupInformation.createRemoteUser("user1");
+    NodeId nodeId = NodeId.newInstance("test host", 100);
+    NodeId nodeId2 = NodeId.newInstance("host2", 1234);
+    //prepare the logs for remote directory
+    ApplicationId appId = ApplicationId.newInstance(0, 1);
+    // create local logs
+    List<String> rootLogDirList = new ArrayList<String>();
+    rootLogDirList.add(rootLogDir);
+    Path rootLogDirPath = new Path(rootLogDir);
+    if (fs.exists(rootLogDirPath)) {
+      fs.delete(rootLogDirPath, true);
+    }
+    assertTrue(fs.mkdirs(rootLogDirPath));
+
+    Path appLogsDir = new Path(rootLogDirPath, appId.toString());
+    if (fs.exists(appLogsDir)) {
+      fs.delete(appLogsDir, true);
+    }
+    assertTrue(fs.mkdirs(appLogsDir));
+
+    // create container logs in local log file dir
+    // create two container log files. We can get containerInfo
+    // for container1 from AHS, but can not get such info for
+    // container100
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+    ContainerId containerId100 = ContainerId.newContainerId(appAttemptId, 100);
+    createContainerLogInLocalDir(appLogsDir, containerId1, fs, fileName,
+        ("Hello." + containerId1));
+    createContainerLogInLocalDir(appLogsDir, containerId100, fs, fileName,
+        ("Hello." + containerId100));
+
+    // upload container logs to remote log dir
+    Path path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
+        user + "/logs/" + appId.toString());
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+    assertTrue(fs.mkdirs(path));
+    uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId,
+        containerId1, path, fs);
+    uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
+        containerId100, path, fs);
+
+    // test whether we can find container log from remote diretory if
+    // the containerInfo for this container could be fetched from AHS.
+    WebResource r = resource();
+    ClientResponse response = r.path("ws").path("v1")
+        .path("applicationhistory").path("containerlogs")
+        .path(containerId1.toString()).path(fileName)
+        .queryParam("user.name", user)
+        .accept(MediaType.TEXT_PLAIN)
+        .get(ClientResponse.class);
+    String responseText = response.getEntity(String.class);
+    assertTrue(responseText.contains("Hello." + containerId1));
+
+    // test whether we can find container log from remote diretory if
+    // the containerInfo for this container could not be fetched from AHS.
+    r = resource();
+    response = r.path("ws").path("v1")
+        .path("applicationhistory").path("containerlogs")
+        .path(containerId100.toString()).path(fileName)
+        .queryParam("user.name", user)
+        .accept(MediaType.TEXT_PLAIN)
+        .get(ClientResponse.class);
+    responseText = response.getEntity(String.class);
+    assertTrue(responseText.contains("Hello." + containerId100));
+
+    // create an application which can not be found from AHS
+    ApplicationId appId100 = ApplicationId.newInstance(0, 100);
+    appLogsDir = new Path(rootLogDirPath, appId100.toString());
+    if (fs.exists(appLogsDir)) {
+      fs.delete(appLogsDir, true);
+    }
+    assertTrue(fs.mkdirs(appLogsDir));
+    ApplicationAttemptId appAttemptId100 =
+        ApplicationAttemptId.newInstance(appId100, 1);
+    ContainerId containerId1ForApp100 = ContainerId
+        .newContainerId(appAttemptId100, 1);
+    createContainerLogInLocalDir(appLogsDir, containerId1ForApp100, fs,
+        fileName, ("Hello." + containerId1ForApp100));
+    path = new Path(conf.get(YarnConfiguration.NM_REMOTE_APP_LOG_DIR) +
+        user + "/logs/" + appId100.toString());
+    if (fs.exists(path)) {
+      fs.delete(path, true);
+    }
+    assertTrue(fs.mkdirs(path));
+    uploadContainerLogIntoRemoteDir(ugi, conf, rootLogDirList, nodeId2,
+        containerId1ForApp100, path, fs);
+    r = resource();
+    response = r.path("ws").path("v1")
+        .path("applicationhistory").path("containerlogs")
+        .path(containerId1ForApp100.toString()).path(fileName)
+        .queryParam("user.name", user)
+        .accept(MediaType.TEXT_PLAIN)
+        .get(ClientResponse.class);
+    responseText = response.getEntity(String.class);
+    assertTrue(responseText.contains("Hello." + containerId1ForApp100));
+  }
+
+  private static void createContainerLogInLocalDir(Path appLogsDir,
+      ContainerId containerId, FileSystem fs, String fileName, String content)
+      throws Exception {
+    Path containerLogsDir = new Path(appLogsDir, containerId.toString());
+    if (fs.exists(containerLogsDir)) {
+      fs.delete(containerLogsDir, true);
+    }
+    assertTrue(fs.mkdirs(containerLogsDir));
+    Writer writer =
+        new FileWriter(new File(containerLogsDir.toString(), fileName));
+    writer.write(content);
+    writer.close();
+  }
+
+  private static void uploadContainerLogIntoRemoteDir(UserGroupInformation ugi,
+      Configuration configuration, List<String> rootLogDirs, NodeId nodeId,
+      ContainerId containerId, Path appDir, FileSystem fs) throws Exception {
+    Path path =
+        new Path(appDir, LogAggregationUtils.getNodeString(nodeId));
+    AggregatedLogFormat.LogWriter writer =
+        new AggregatedLogFormat.LogWriter(configuration, path, ugi);
+    writer.writeApplicationOwner(ugi.getUserName());
+
+    writer.append(new AggregatedLogFormat.LogKey(containerId),
+        new AggregatedLogFormat.LogValue(rootLogDirs, containerId,
+        ugi.getShortUserName()));
+    writer.close();
+  }
+
+  @Test(timeout = 10000)
+  public void testContainerLogsForRunningApps() throws Exception {
+    String fileName = "syslog";
+    String user = "user1";
+    ApplicationId appId = ApplicationId.newInstance(
+        1234, 1);
+    ApplicationAttemptId appAttemptId =
+        ApplicationAttemptId.newInstance(appId, 1);
+    ContainerId containerId1 = ContainerId.newContainerId(appAttemptId, 1);
+    WebResource r = resource();
+    URI requestURI = r.path("ws").path("v1")
+        .path("applicationhistory").path("containerlogs")
+        .path(containerId1.toString()).path(fileName)
+        .queryParam("user.name", user).getURI();
+    String redirectURL = getRedirectURL(requestURI.toString());
+    assertTrue(redirectURL != null);
+    assertTrue(redirectURL.contains("test:1234"));
+    assertTrue(redirectURL.contains("ws/v1/node/containerlogs"));
+    assertTrue(redirectURL.contains(containerId1.toString()));
+    assertTrue(redirectURL.contains("user.name=" + user));
+  }
+
+  private static String getRedirectURL(String url) {
+    String redirectUrl = null;
+    try {
+      HttpURLConnection conn = (HttpURLConnection) new URL(url)
+          .openConnection();
+      // do not automatically follow the redirection
+      // otherwise we get too many redirections exception
+      conn.setInstanceFollowRedirects(false);
+      if(conn.getResponseCode() == HttpServletResponse.SC_TEMPORARY_REDIRECT) {
+        redirectUrl = conn.getHeaderField("Location");
+      }
+    } catch (Exception e) {
+      // throw new RuntimeException(e);
+    }
+    return redirectUrl;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e61d4312/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerInfo.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerInfo.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerInfo.java
index d0d4df6..f127f9c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerInfo.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-common/src/main/java/org/apache/hadoop/yarn/server/webapp/dao/ContainerInfo.java
@@ -48,6 +48,7 @@ public class ContainerInfo {
   protected int containerExitStatus;
   protected ContainerState containerState;
   protected String nodeHttpAddress;
+  protected String nodeId;
 
   public ContainerInfo() {
     // JAXB needs this
@@ -71,6 +72,7 @@ public class ContainerInfo {
     containerExitStatus = container.getContainerExitStatus();
     containerState = container.getContainerState();
     nodeHttpAddress = container.getNodeHttpAddress();
+    nodeId = container.getAssignedNode().toString();
   }
 
   public String getContainerId() {
@@ -124,4 +126,8 @@ public class ContainerInfo {
   public String getNodeHttpAddress() {
     return nodeHttpAddress;
   }
+
+  public String getNodeId() {
+    return nodeId;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e61d4312/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
index fddeb04..57e729c 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/webapp/NMWebServices.java
@@ -33,6 +33,7 @@ import javax.ws.rs.QueryParam;
 import javax.ws.rs.WebApplicationException;
 import javax.ws.rs.core.MediaType;
 import javax.ws.rs.core.Response;
+import javax.ws.rs.core.Response.ResponseBuilder;
 import javax.ws.rs.core.Response.Status;
 import javax.ws.rs.core.StreamingOutput;
 import javax.ws.rs.core.UriInfo;
@@ -215,7 +216,8 @@ public class NMWebServices {
   @Public
   @Unstable
   public Response getLogs(@PathParam("containerid") String containerIdStr,
-      @PathParam("filename") String filename) {
+      @PathParam("filename") String filename,
+      @QueryParam("download") String download) {
     ContainerId containerId;
     try {
       containerId = ConverterUtils.toContainerId(containerIdStr);
@@ -232,7 +234,7 @@ public class NMWebServices {
     } catch (YarnException ex) {
       return Response.serverError().entity(ex.getMessage()).build();
     }
-    
+    boolean downloadFile = parseBooleanParam(download);
     try {
       final FileInputStream fis = ContainerLogsUtils.openLogFileForRead(
           containerIdStr, logFile, nmContext);
@@ -250,10 +252,22 @@ public class NMWebServices {
           os.flush();
         }
       };
-      
-      return Response.ok(stream).build();
+      ResponseBuilder resp = Response.ok(stream);
+      if (downloadFile) {
+        resp.header("Content-Type", "application/octet-stream");
+        resp.header("Content-Disposition", "attachment; filename="
+            + logFile.getName());
+      }
+      return resp.build();
     } catch (IOException ex) {
       return Response.serverError().entity(ex.getMessage()).build();
     }
   }
+
+  private boolean parseBooleanParam(String param) {
+    if (param != null) {
+      return ("true").equalsIgnoreCase(param);
+    }
+    return false;
+  }
 }

http://git-wip-us.apache.org/repos/asf/hadoop/blob/e61d4312/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
----------------------------------------------------------------------
diff --git 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
index 2ac0956..ce1b309 100644
--- 
a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
+++ 
b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/test/java/org/apache/hadoop/yarn/server/nodemanager/webapp/TestNMWebServices.java
@@ -50,7 +50,6 @@ import 
org.apache.hadoop.yarn.server.nodemanager.NodeHealthCheckerService;
 import org.apache.hadoop.yarn.server.nodemanager.NodeManager;
 import org.apache.hadoop.yarn.server.nodemanager.ResourceView;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.application.ApplicationImpl;
-import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.Container;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.container.ContainerState;
 import 
org.apache.hadoop.yarn.server.nodemanager.containermanager.launcher.ContainerLaunch;
 import org.apache.hadoop.yarn.server.nodemanager.webapp.WebServer.NMWebApp;
@@ -352,7 +351,16 @@ public class TestNMWebServices extends JerseyTestBase {
         .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
     String responseText = response.getEntity(String.class);
     assertEquals(logMessage, responseText);
-    
+
+    // ask and download it
+    response = r.path("ws").path("v1").path("node").path("containerlogs")
+        .path(containerIdStr).path(filename).queryParam("download", "true")
+        .accept(MediaType.TEXT_PLAIN).get(ClientResponse.class);
+    responseText = response.getEntity(String.class);
+    assertEquals(logMessage, responseText);
+    assertEquals(200, response.getStatus());
+    assertEquals("application/octet-stream", response.getType().toString());
+
     // ask for file that doesn't exist
     response = r.path("ws").path("v1").path("node")
         .path("containerlogs").path(containerIdStr).path("uhhh")


---------------------------------------------------------------------
To unsubscribe, e-mail: common-commits-unsubscr...@hadoop.apache.org
For additional commands, e-mail: common-commits-h...@hadoop.apache.org

Reply via email to