Repository: kylin
Updated Branches:
  refs/heads/master 1c0cd2bf7 -> e02f0bed2


minor, diagnosis tool for mr jobs

Signed-off-by: lidongsjtu <lid...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e02f0bed
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e02f0bed
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e02f0bed

Branch: refs/heads/master
Commit: e02f0bed299b69dabed51809811134591f21248e
Parents: 1c0cd2b
Author: Lingyan Jiang <lynne...@hotmail.com>
Authored: Tue May 31 19:03:06 2016 +0800
Committer: lidongsjtu <lid...@apache.org>
Committed: Tue May 31 21:02:17 2016 +0800

----------------------------------------------------------------------
 tool/pom.xml                                    |   5 -
 .../apache/kylin/tool/JobDiagnosisInfoCLI.java  |  31 ++--
 .../kylin/tool/JobTaskCounterExtractor.java     | 149 +++++++++++++++++++
 3 files changed, 168 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/e02f0bed/tool/pom.xml
----------------------------------------------------------------------
diff --git a/tool/pom.xml b/tool/pom.xml
index ee61fae..2657b12 100644
--- a/tool/pom.xml
+++ b/tool/pom.xml
@@ -43,11 +43,6 @@
         <!--Env-->
         <dependency>
             <groupId>org.apache.hbase</groupId>
-            <artifactId>hbase-common</artifactId>
-            <scope>provided</scope>
-        </dependency>
-        <dependency>
-            <groupId>org.apache.hbase</groupId>
             <artifactId>hbase-client</artifactId>
             <scope>provided</scope>
         </dependency>

http://git-wip-us.apache.org/repos/asf/kylin/blob/e02f0bed/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
----------------------------------------------------------------------
diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java 
b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
index 8ba3a33..4dab057 100644
--- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
+++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java
@@ -18,12 +18,8 @@
 
 package org.apache.kylin.tool;
 
-import java.io.File;
-import java.io.IOException;
-import java.util.Arrays;
-import java.util.List;
-import java.util.Map;
-
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.io.FileUtils;
@@ -37,8 +33,11 @@ import org.apache.kylin.tool.util.ResourceStoreUtil;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-import com.google.common.collect.Maps;
+import java.io.File;
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
 
 public class JobDiagnosisInfoCLI extends AbstractInfoExtractor {
     private static final Logger logger = 
LoggerFactory.getLogger(JobDiagnosisInfoCLI.class);
@@ -106,7 +105,6 @@ public class JobDiagnosisInfoCLI extends 
AbstractInfoExtractor {
                 File metaDir = new File(exportDir, "cube");
                 FileUtils.forceMkdir(metaDir);
                 String[] cubeMetaArgs = { "-cube", cubeName, "-destDir", new 
File(metaDir, cubeName).getAbsolutePath(), "-includeJobs", "false", 
"-compress", "false", "-submodule", "true" };
-
                 logger.info("Start to extract related cube: " + 
StringUtils.join(cubeMetaArgs));
                 CubeMetaExtractor cubeMetaExtractor = new CubeMetaExtractor();
                 logger.info("CubeMetaExtractor args: " + 
Arrays.toString(cubeMetaArgs));
@@ -119,8 +117,9 @@ public class JobDiagnosisInfoCLI extends 
AbstractInfoExtractor {
             logger.info("Start to dump yarn job logs: " + jobId);
             File yarnLogDir = new File(exportDir, "yarn");
             FileUtils.forceMkdir(yarnLogDir);
-            for (String taskId : yarnLogsResources) {
-                extractYarnLog(taskId, new File(yarnLogDir, jobId), true);
+            for (String stepId : yarnLogsResources) {
+                extractTaskCounter(stepId,new File(new File(yarnLogDir, 
stepId),"Counters"));
+                extractYarnLog(stepId, new File(yarnLogDir, stepId), true);
             }
         }
 
@@ -147,7 +146,6 @@ public class JobDiagnosisInfoCLI extends 
AbstractInfoExtractor {
         try {
             KylinConfig srcConfig = KylinConfig.getInstanceFromEnv();
             KylinConfig dstConfig = 
KylinConfig.createInstanceFromUri(destDir.getAbsolutePath());
-
             ResourceStoreUtil.copy(srcConfig, dstConfig, requiredResources);
         } catch (Exception e) {
             throw new RuntimeException("Failed to extract job resources. ", e);
@@ -172,6 +170,15 @@ public class JobDiagnosisInfoCLI extends 
AbstractInfoExtractor {
         }
     }
 
+    private void extractTaskCounter(String taskId, File destDir) throws 
Exception {
+        final Map<String, String> jobInfo = 
executableDao.getJobOutput(taskId).getInfo();
+        if (jobInfo.containsKey(ExecutableConstants.MR_JOB_ID)) {
+            String jobId = jobInfo.get(ExecutableConstants.MR_JOB_ID);
+            FileUtils.forceMkdir(destDir);
+            new JobTaskCounterExtractor(jobId).executeExtract(destDir);
+        }
+    }
+
     private boolean isYarnAppSucc(String applicationId) throws IOException {
         final String yarnCmd = "yarn application -status " + applicationId;
         final String cmdOutput = 
kylinConfig.getCliCommandExecutor().execute(yarnCmd).getSecond();

http://git-wip-us.apache.org/repos/asf/kylin/blob/e02f0bed/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java
----------------------------------------------------------------------
diff --git 
a/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java 
b/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java
new file mode 100644
index 0000000..dc1686b
--- /dev/null
+++ b/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.tool;
+
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.commons.httpclient.HttpClient;
+import org.apache.commons.httpclient.HttpMethod;
+import org.apache.commons.httpclient.methods.GetMethod;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.yarn.conf.HAUtil;
+import org.apache.hadoop.yarn.conf.YarnConfiguration;
+import org.apache.hadoop.yarn.util.RMHAUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.engine.mr.HadoopUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class JobTaskCounterExtractor extends AbstractInfoExtractor{
+    private String mrJobId;
+    private String yarnUrl;
+    private static final Logger logger = 
LoggerFactory.getLogger(JobTaskCounterExtractor.class);
+
+    public JobTaskCounterExtractor(String mrJobId) {
+        this.mrJobId = mrJobId;
+        this.yarnUrl = getRestCheckUrl();
+    }
+
+    private String getRestCheckUrl() {
+        KylinConfig config = KylinConfig.getInstanceFromEnv();
+        final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl();
+        Pattern pattern = Pattern.compile("(http://)(.*):.*");
+        if (yarnStatusCheckUrl != null) {
+            Matcher m = pattern.matcher(yarnStatusCheckUrl);
+            m.matches();
+            yarnUrl = m.group(1) + m.group(2) + ":19888";
+            return yarnUrl;
+        } else {
+            logger.info("kylin.job.yarn.app.rest.check.status.url" + " is not 
set read from hadoop configuration");
+        }
+        Configuration conf = HadoopUtil.getCurrentConfiguration();
+        String rmWebHost = 
HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, 
YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf);
+        if (HAUtil.isHAEnabled(conf)) {
+            YarnConfiguration yarnConf = new YarnConfiguration(conf);
+            String active = RMHAUtils.findActiveRMHAId(yarnConf);
+            rmWebHost = 
HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS,
 active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, yarnConf);
+        }
+        if (StringUtils.isEmpty(rmWebHost)) {
+            return null;
+        }
+        if (rmWebHost.startsWith("http://";) || 
rmWebHost.startsWith("https://";)) {
+            //do nothing
+        } else {
+            rmWebHost = "http://"; + rmWebHost;
+        }
+        Matcher m = pattern.matcher(rmWebHost);
+        m.matches();
+        return m.group(1) + m.group(2) + ":19888";
+    }
+
+
+    private String getHttpResponse(String url) {
+        HttpClient client = new HttpClient();
+        String response = null;
+        while (response == null) {
+            HttpMethod get = new GetMethod(url);
+            try {
+                get.addRequestHeader("accept", "application/json");
+                client.executeMethod(get);
+                response = get.getResponseBodyAsString();
+            }catch (Exception e) {
+                logger.warn("Failed to fetch http response" + e);
+            }finally {
+                get.releaseConnection();
+            }
+        }
+        return response;
+    }
+
+    protected void executeExtract(File exportDir) {
+        try {
+            String taskUrl = yarnUrl + "/ws/v1/history/mapreduce/jobs/" + 
mrJobId + "/tasks/";
+            String tasksResponse = getHttpResponse(taskUrl);
+            JsonNode tasks = new 
ObjectMapper().readTree(tasksResponse).path("tasks").path("task");
+
+            String maxReduceId = null;
+            String maxMapId = null;
+            long maxMapElapsedTime = 0L;
+            long maxReduceElapsedTime = 0L;
+
+            for (JsonNode node : tasks) {
+                if(node.get("type").textValue().equals("MAP")) {
+                    if (node.get("elapsedTime").longValue() >= 
maxMapElapsedTime) {
+                        maxMapElapsedTime = 
node.get("elapsedTime").longValue();
+                        maxMapId = node.get("id").textValue();
+                    }
+                }
+                if(node.get("type").textValue().equals("REDUCE")) {
+                    if (node.get("elapsedTime").longValue() >= 
maxReduceElapsedTime) {
+                        maxReduceElapsedTime = 
node.get("elapsedTime").longValue();
+                        maxReduceId = node.get("id").textValue();
+                    }
+                }
+            }
+            extractTaskCounterFile(maxMapId, exportDir,taskUrl);
+            extractTaskCounterFile(maxReduceId, exportDir,taskUrl);
+        } catch (Exception e) {
+            logger.warn("Failed to get mr tasks rest response" + e);
+        }
+    }
+
+    private void extractTaskCounterFile(String taskId, File exportDir, String 
taskUrl) throws IOException {
+        try {
+            String response = getHttpResponse(taskUrl + taskId + "/counters");
+            FileUtils.writeStringToFile(new File(exportDir,taskId+".json"), 
response);
+        } catch (Exception e) {
+            logger.warn("Failed to get task counters rest response" + e);
+        }
+    }
+
+    @Override
+    protected void executeExtract(OptionsHelper optionsHelper, File exportDir) 
throws Exception {
+        executeExtract(exportDir);
+    }
+}

Reply via email to