minor, more verbose on job diag

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/5155994e
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/5155994e
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/5155994e

Branch: refs/heads/master-hbase0.98
Commit: 5155994e1a84e47d9ce8d90d3787a28910673b66
Parents: e4ed232
Author: lidongsjtu <[email protected]>
Authored: Wed Mar 15 14:44:17 2017 +0800
Committer: lidongsjtu <[email protected]>
Committed: Wed Mar 15 14:51:55 2017 +0800

----------------------------------------------------------------------
 build/bin/diag.sh                               |   4 +-
 build/conf/kylin-tools-log4j.properties         |   2 +-
 .../apache/kylin/tool/MrJobInfoExtractor.java   | 208 +++++++++++++------
 3 files changed, 149 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/5155994e/build/bin/diag.sh
----------------------------------------------------------------------
diff --git a/build/bin/diag.sh b/build/bin/diag.sh
index a995774..bb1bdc4 100644
--- a/build/bin/diag.sh
+++ b/build/bin/diag.sh
@@ -52,14 +52,14 @@ then
 
     if [ ${#patient} -eq 36 ]; then
         hbase ${KYLIN_EXTRA_START_OPTS} \
-        -Dlog4j.configuration=kylin-tool-log4j.properties \
+        
-Dlog4j.configuration=file:${KYLIN_HOME}/conf/kylin-tools-log4j.properties \
         -Dcatalina.home=${tomcat_root} \
         org.apache.kylin.tool.JobDiagnosisInfoCLI \
         -jobId $patient \
         -destDir $destDir || exit 1
     else
         hbase ${KYLIN_EXTRA_START_OPTS} \
-        -Dlog4j.configuration=kylin-server-log4j.properties \
+        
-Dlog4j.configuration=file:${KYLIN_HOME}/conf/kylin-tools-log4j.properties \
         -Dcatalina.home=${tomcat_root} \
         org.apache.kylin.tool.DiagnosisInfoCLI \
         -project -all \

http://git-wip-us.apache.org/repos/asf/kylin/blob/5155994e/build/conf/kylin-tools-log4j.properties
----------------------------------------------------------------------
diff --git a/build/conf/kylin-tools-log4j.properties 
b/build/conf/kylin-tools-log4j.properties
index 2ccd772..d47f9a2 100644
--- a/build/conf/kylin-tools-log4j.properties
+++ b/build/conf/kylin-tools-log4j.properties
@@ -35,4 +35,4 @@ log4j.appender.stderr.layout.ConversionPattern=%d{ISO8601} 
%-5p [%t %c{1}:%L]: %
 #log4j.logger.org.apache.hadoop=ERROR
 log4j.logger.org.apache.kylin=DEBUG
 log4j.logger.org.springframework=WARN
-log4j.logger.org.apache.commons.httpclient=WARN
+log4j.logger.org.apache.kylin.tool.shaded=INFO

http://git-wip-us.apache.org/repos/asf/kylin/blob/5155994e/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java 
b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
index 483694b..b9bf2de 100644
--- a/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
+++ b/tool/src/main/java/org/apache/kylin/tool/MrJobInfoExtractor.java
@@ -21,6 +21,7 @@ package org.apache.kylin.tool;
 import java.io.File;
 import java.io.IOException;
 import java.nio.charset.Charset;
+import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
@@ -45,12 +46,13 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.databind.JsonNode;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.google.common.base.Preconditions;
+import com.google.common.collect.Sets;
 
 public class MrJobInfoExtractor extends AbstractInfoExtractor {
     private static final Logger logger = 
LoggerFactory.getLogger(MrJobInfoExtractor.class);
 
     @SuppressWarnings("static-access")
-    private static final Option OPTION_INCLUDE_COUNTERS = 
OptionBuilder.withArgName("includeCounters").hasArg().isRequired(false).withDescription("Specify
 whether to include mr task counters to extract. Default 
true.").create("includeCounters");
+    private static final Option OPTION_INCLUDE_DETAILS = 
OptionBuilder.withArgName("includeTasks").hasArg().isRequired(false).withDescription("Specify
 whether to include mr task details to extract. Default 
true.").create("includeTasks");
 
     @SuppressWarnings("static-access")
     private static final Option OPTION_MR_JOB_ID = 
OptionBuilder.withArgName("mrJobId").hasArg().isRequired(false).withDescription("Specify
 MR Job Id").create("mrJobId");
@@ -60,7 +62,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor 
{
     public MrJobInfoExtractor() {
         packageType = "MR";
 
-        options.addOption(OPTION_INCLUDE_COUNTERS);
+        options.addOption(OPTION_INCLUDE_DETAILS);
         options.addOption(OPTION_MR_JOB_ID);
     }
 
@@ -79,7 +81,7 @@ public class MrJobInfoExtractor extends AbstractInfoExtractor 
{
                 return m.group(1) + m.group(2) + ":19888";
             }
         }
-        logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set 
read from hadoop configuration");
+        logger.info("kylin.engine.mr.yarn-check-status-url" + " is not set, 
read from hadoop configuration");
 
         Configuration conf = HadoopUtil.getCurrentConfiguration();
         String rmWebHost = 
HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, 
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf);
@@ -120,69 +122,113 @@ public class MrJobInfoExtractor extends 
AbstractInfoExtractor {
         return msg;
     }
 
-    private void extractTaskCounter(String taskId, File exportDir, String 
taskUrl, String id) throws IOException {
+    private void extractTaskDetail(String taskId, String nodeId, String user, 
File exportDir, String taskUrl, String urlBase) throws IOException {
         try {
-            String response = getHttpResponse(taskUrl + taskId + "/counters");
-            FileUtils.writeStringToFile(new File(exportDir, id + "_" + taskId 
+ ".json"), response, Charset.defaultCharset());
+            if (StringUtils.isEmpty(taskId)) {
+                return;
+            }
+
+            String taskUrlBase = taskUrl + taskId;
+            File destDir = new File(exportDir, taskId);
+
+            // get task basic info
+            String taskInfo = saveHttpResponseQuietly(new File(destDir, 
"task.json"), taskUrlBase);
+            JsonNode taskAttempt = new 
ObjectMapper().readTree(taskInfo).path("task").path("successfulAttempt");
+            String succAttemptId = taskAttempt.textValue();
+
+            String attemptInfo = saveHttpResponseQuietly(new File(destDir, 
"task_attempts.json"), taskUrlBase + "/attempts/" + succAttemptId);
+            JsonNode attemptAttempt = new 
ObjectMapper().readTree(attemptInfo).path("taskAttempt").path("assignedContainerId");
+            String containerId = attemptAttempt.textValue();
+
+            // save task counters
+            saveHttpResponseQuietly(new File(destDir, "task_counters.json"), 
taskUrlBase + "/counters");
+
+            // save task logs
+            String logUrl = urlBase + "/jobhistory/logs/" + nodeId + "/" + 
containerId + "/" + succAttemptId + "/" + user + "/syslog/?start=0";
+            logger.debug("Fetch task log from url: " + logUrl);
+
+            saveHttpResponseQuietly(new File(destDir, "task_log.txt"), logUrl);
         } catch (Exception e) {
             logger.warn("Failed to get task counters rest response" + e);
         }
     }
 
-    private void extractJobConf(File exportDir, String jobUrlPrefix) throws 
IOException {
+    private String saveHttpResponseQuietly(File dest, String url) {
+        String response = null;
+
         try {
-            String confUrl = jobUrlPrefix + "/conf/";
-            String response = getHttpResponse(confUrl);
-            FileUtils.writeStringToFile(new File(exportDir, "job_conf.json"), 
response, Charset.defaultCharset());
+            response = getHttpResponse(url);
+            FileUtils.forceMkdir(dest.getParentFile());
+            FileUtils.writeStringToFile(dest, response, 
Charset.defaultCharset());
+            return response;
         } catch (Exception e) {
-            logger.warn("Failed to get job conf rest response.", e);
+            logger.warn("Failed to get http response from {}.", url, e);
         }
+
+        return response;
     }
 
     @Override
     protected void executeExtract(OptionsHelper optionsHelper, File exportDir) 
throws Exception {
         try {
-            boolean includeTaskCounter = 
optionsHelper.hasOption(OPTION_INCLUDE_COUNTERS) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_COUNTERS)) : true;
+            boolean includeTaskDetails = 
optionsHelper.hasOption(OPTION_INCLUDE_DETAILS) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_DETAILS)) : true;
             String mrJobId = optionsHelper.getOptionValue(OPTION_MR_JOB_ID);
-            String jobUrlPrefix = getRestCheckUrl() + 
"/ws/v1/history/mapreduce/jobs/" + mrJobId;
+            String jobUrlBase = getRestCheckUrl();
+            String jobUrlPrefix = jobUrlBase + 
"/ws/v1/history/mapreduce/jobs/" + mrJobId;
 
-            if (includeTaskCounter) {
-                extractTaskCounters(exportDir, jobUrlPrefix);
-            }
-            extractJobCounters(exportDir, jobUrlPrefix);
-            extractJobTasks(exportDir, jobUrlPrefix);
-            extractJobConf(exportDir, jobUrlPrefix);
-        } catch (Exception e) {
-            logger.warn("Failed to get mr tasks rest response.", e);
-        }
-    }
+            // save mr job stats
+            String jobResponse = saveHttpResponseQuietly(new File(exportDir, 
"job.json"), jobUrlPrefix);
+            String user = new 
ObjectMapper().readTree(jobResponse).path("job").path("user").textValue();
 
-    private void extractJobCounters(File exportDir, String jobUrlPrefix) {
-        String url = jobUrlPrefix + "/counters";
-        String response = getHttpResponse(url);
-        try {
-            FileUtils.writeStringToFile(new File(exportDir, 
"job_counters.json"), response, Charset.defaultCharset());
-        } catch (Exception e) {
-            logger.warn("Failed to get mr counters rest response.", e);
-        }
-    }
+            String jobAttemptResponse = saveHttpResponseQuietly(new 
File(exportDir, "job_attempts.json"), jobUrlPrefix + "/jobattempts");
+            String nodeId = new 
ObjectMapper().readTree(jobAttemptResponse).path("jobAttempts").path("jobAttempt").get(0).path("nodeId").textValue();
+
+            // save mr job conf
+            saveHttpResponseQuietly(new File(exportDir, "job_conf.json"), 
jobUrlPrefix + "/conf");
+
+            // save mr job counters
+            saveHttpResponseQuietly(new File(exportDir, "job_counters.json"), 
jobUrlPrefix + "/counters");
+
+            // save task details
+            if (includeTaskDetails) {
+                extractTaskDetails(exportDir, jobUrlPrefix, jobUrlBase, 
nodeId, user);
+            }
 
-    private void extractJobTasks(File exportDir, String jobUrlPrefix) {
-        String url = jobUrlPrefix + "/tasks";
-        String response = getHttpResponse(url);
-        try {
-            FileUtils.writeStringToFile(new File(exportDir, "job_tasks.json"), 
response, Charset.defaultCharset());
         } catch (Exception e) {
-            logger.warn("Failed to get mr counters rest response.", e);
+            logger.warn("Failed to get mr tasks rest response.", e);
         }
     }
 
-    private void extractTaskCounters(File exportDir, String jobUrlPrefix) {
+    private void extractTaskDetails(File exportDir, String jobUrlPrefix, 
String jobUrlBase, String nodeId, String user) {
         try {
             String tasksUrl = jobUrlPrefix + "/tasks/";
-            String tasksResponse = getHttpResponse(tasksUrl);
+            String tasksResponse = saveHttpResponseQuietly(new File(exportDir, 
"job_tasks.json"), tasksUrl);
             JsonNode tasks = new 
ObjectMapper().readTree(tasksResponse).path("tasks").path("task");
 
+            // find the first start map and reduce
+            String firstStartMapId = null;
+            String firstStartReduceId = null;
+            long firstStartMapTime = Long.MAX_VALUE;
+            long firstStartReduceTime = Long.MAX_VALUE;
+
+            // find the first end map and reduce
+            String firstEndMapId = null;
+            String firstEndReduceId = null;
+            long firstEndMapTime = Long.MAX_VALUE;
+            long firstEndReduceTime = Long.MAX_VALUE;
+
+            // find the last start map and reduce
+            String lastStartMapId = null;
+            String lastStartReduceId = null;
+            long lastStartMapTime = 0L;
+            long lastStartReduceTime = 0L;
+
+            // find the last end map and reduce
+            String lastEndMapId = null;
+            String lastEndReduceId = null;
+            long lastEndMapTime = 0L;
+            long lastEndReduceTime = 0L;
+
             // find the max map and reduce duation
             String maxReduceId = null;
             String maxMapId = null;
@@ -192,14 +238,10 @@ public class MrJobInfoExtractor extends 
AbstractInfoExtractor {
             // find the min map and reduce duration
             String minReduceId = null;
             String minMapId = null;
-            long minMapElapsedTime = Integer.MAX_VALUE;
-            long minReduceElapsedTime = Integer.MAX_VALUE;
-
-            // find a normal map and reduce duration (the first one)
-            String normReduceId = null;
-            String normMapId = null;
-            long normMapElapsedTime = 0;
-            long normReduceElapsedTime = 0;
+            long minMapElapsedTime = Long.MAX_VALUE;
+            long minReduceElapsedTime = Long.MAX_VALUE;
+
+            Set<String> selectedTaskIds = Sets.newHashSet();
             for (JsonNode node : tasks) {
                 if (node.get("type").textValue().equals("MAP")) {
                     if (node.get("elapsedTime").longValue() >= 
maxMapElapsedTime) {
@@ -212,11 +254,27 @@ public class MrJobInfoExtractor extends 
AbstractInfoExtractor {
                         minMapId = node.get("id").textValue();
                     }
 
-                    if (normMapElapsedTime == 0) {
-                        normMapElapsedTime = 
node.get("elapsedTime").longValue();
-                        normMapId = node.get("id").textValue();
+                    if (node.get("startTime").longValue() <= 
firstStartMapTime) {
+                        firstStartMapTime = node.get("startTime").longValue();
+                        firstStartMapId = node.get("id").textValue();
+                    }
+
+                    if (node.get("startTime").longValue() >= lastStartMapTime) 
{
+                        lastStartMapTime = node.get("startTime").longValue();
+                        lastStartMapId = node.get("id").textValue();
+                    }
+
+                    if (node.get("finishTime").longValue() <= firstEndMapTime) 
{
+                        firstEndMapTime = node.get("finishTime").longValue();
+                        firstEndMapId = node.get("id").textValue();
+                    }
+
+                    if (node.get("finishTime").longValue() >= lastEndMapTime) {
+                        lastEndMapTime = node.get("finishTime").longValue();
+                        lastEndMapId = node.get("id").textValue();
                     }
                 }
+
                 if (node.get("type").textValue().equals("REDUCE")) {
                     if (node.get("elapsedTime").longValue() >= 
maxReduceElapsedTime) {
                         maxReduceElapsedTime = 
node.get("elapsedTime").longValue();
@@ -228,20 +286,46 @@ public class MrJobInfoExtractor extends 
AbstractInfoExtractor {
                         minReduceId = node.get("id").textValue();
                     }
 
-                    if (normReduceElapsedTime == 0) {
-                        normReduceElapsedTime = 
node.get("elapsedTime").longValue();
-                        normReduceId = node.get("id").textValue();
+                    if (node.get("startTime").longValue() <= 
firstStartReduceTime) {
+                        firstStartReduceTime = 
node.get("startTime").longValue();
+                        firstStartReduceId = node.get("id").textValue();
+                    }
+
+                    if (node.get("startTime").longValue() >= 
lastStartReduceTime) {
+                        lastStartReduceTime = 
node.get("startTime").longValue();
+                        lastStartReduceId = node.get("id").textValue();
+                    }
+
+                    if (node.get("finishTime").longValue() <= 
firstEndReduceTime) {
+                        firstEndReduceTime = 
node.get("finishTime").longValue();
+                        firstEndReduceId = node.get("id").textValue();
+                    }
+
+                    if (node.get("finishTime").longValue() >= 
lastEndReduceTime) {
+                        lastEndReduceTime = node.get("finishTime").longValue();
+                        lastEndReduceId = node.get("id").textValue();
                     }
                 }
             }
-            File counterDir = new File(exportDir, "counters");
-            FileUtils.forceMkdir(counterDir);
-            extractTaskCounter(maxMapId, counterDir, tasksUrl, "max");
-            extractTaskCounter(maxReduceId, counterDir, tasksUrl, "max");
-            extractTaskCounter(minMapId, counterDir, tasksUrl, "min");
-            extractTaskCounter(minReduceId, counterDir, tasksUrl, "min");
-            extractTaskCounter(normMapId, counterDir, tasksUrl, "norm");
-            extractTaskCounter(normReduceId, counterDir, tasksUrl, "norm");
+
+            selectedTaskIds.add(maxMapId);
+            selectedTaskIds.add(maxReduceId);
+            selectedTaskIds.add(minMapId);
+            selectedTaskIds.add(minReduceId);
+            selectedTaskIds.add(firstStartMapId);
+            selectedTaskIds.add(firstStartReduceId);
+            selectedTaskIds.add(lastStartMapId);
+            selectedTaskIds.add(lastStartReduceId);
+            selectedTaskIds.add(firstEndMapId);
+            selectedTaskIds.add(firstEndReduceId);
+            selectedTaskIds.add(lastEndMapId);
+            selectedTaskIds.add(lastEndReduceId);
+
+            File tasksDir = new File(exportDir, "tasks");
+            FileUtils.forceMkdir(tasksDir);
+            for (String taskId : selectedTaskIds) {
+                extractTaskDetail(taskId, nodeId, user, tasksDir, tasksUrl, 
jobUrlBase);
+            }
         } catch (Exception e) {
             logger.warn("Failed to get mr tasks rest response.", e);
         }

Reply via email to