Repository: kylin Updated Branches: refs/heads/master 1c0cd2bf7 -> e02f0bed2
minor, diagnosis tool for mr jobs Signed-off-by: lidongsjtu <lid...@apache.org> Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/e02f0bed Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/e02f0bed Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/e02f0bed Branch: refs/heads/master Commit: e02f0bed299b69dabed51809811134591f21248e Parents: 1c0cd2b Author: Lingyan Jiang <lynne...@hotmail.com> Authored: Tue May 31 19:03:06 2016 +0800 Committer: lidongsjtu <lid...@apache.org> Committed: Tue May 31 21:02:17 2016 +0800 ---------------------------------------------------------------------- tool/pom.xml | 5 - .../apache/kylin/tool/JobDiagnosisInfoCLI.java | 31 ++-- .../kylin/tool/JobTaskCounterExtractor.java | 149 +++++++++++++++++++ 3 files changed, 168 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/e02f0bed/tool/pom.xml ---------------------------------------------------------------------- diff --git a/tool/pom.xml b/tool/pom.xml index ee61fae..2657b12 100644 --- a/tool/pom.xml +++ b/tool/pom.xml @@ -43,11 +43,6 @@ <!--Env--> <dependency> <groupId>org.apache.hbase</groupId> - <artifactId>hbase-common</artifactId> - <scope>provided</scope> - </dependency> - <dependency> - <groupId>org.apache.hbase</groupId> <artifactId>hbase-client</artifactId> <scope>provided</scope> </dependency> http://git-wip-us.apache.org/repos/asf/kylin/blob/e02f0bed/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java index 8ba3a33..4dab057 100644 --- a/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java +++ b/tool/src/main/java/org/apache/kylin/tool/JobDiagnosisInfoCLI.java @@ -18,12 +18,8 @@ package org.apache.kylin.tool; -import java.io.File; -import java.io.IOException; -import java.util.Arrays; -import java.util.List; -import java.util.Map; - +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; import org.apache.commons.cli.Option; import org.apache.commons.cli.OptionBuilder; import org.apache.commons.io.FileUtils; @@ -37,8 +33,11 @@ import org.apache.kylin.tool.util.ResourceStoreUtil; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; +import java.io.File; +import java.io.IOException; +import java.util.Arrays; +import java.util.List; +import java.util.Map; public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { private static final Logger logger = LoggerFactory.getLogger(JobDiagnosisInfoCLI.class); @@ -106,7 +105,6 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { File metaDir = new File(exportDir, "cube"); FileUtils.forceMkdir(metaDir); String[] cubeMetaArgs = { "-cube", cubeName, "-destDir", new File(metaDir, cubeName).getAbsolutePath(), "-includeJobs", "false", "-compress", "false", "-submodule", "true" }; - logger.info("Start to extract related cube: " + StringUtils.join(cubeMetaArgs)); CubeMetaExtractor cubeMetaExtractor = new CubeMetaExtractor(); logger.info("CubeMetaExtractor args: " + Arrays.toString(cubeMetaArgs)); @@ -119,8 +117,9 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { logger.info("Start to dump yarn job logs: " + jobId); File yarnLogDir = new File(exportDir, "yarn"); FileUtils.forceMkdir(yarnLogDir); - for (String taskId : yarnLogsResources) { - extractYarnLog(taskId, new File(yarnLogDir, jobId), true); + for (String stepId : yarnLogsResources) { + extractTaskCounter(stepId,new File(new File(yarnLogDir, stepId),"Counters")); + extractYarnLog(stepId, new File(yarnLogDir, stepId), true); } } @@ -147,7 +146,6 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { try { KylinConfig srcConfig = KylinConfig.getInstanceFromEnv(); KylinConfig dstConfig = KylinConfig.createInstanceFromUri(destDir.getAbsolutePath()); - ResourceStoreUtil.copy(srcConfig, dstConfig, requiredResources); } catch (Exception e) { throw new RuntimeException("Failed to extract job resources. ", e); @@ -172,6 +170,15 @@ public class JobDiagnosisInfoCLI extends AbstractInfoExtractor { } } + private void extractTaskCounter(String taskId, File destDir) throws Exception { + final Map<String, String> jobInfo = executableDao.getJobOutput(taskId).getInfo(); + if (jobInfo.containsKey(ExecutableConstants.MR_JOB_ID)) { + String jobId = jobInfo.get(ExecutableConstants.MR_JOB_ID); + FileUtils.forceMkdir(destDir); + new JobTaskCounterExtractor(jobId).executeExtract(destDir); + } + } + private boolean isYarnAppSucc(String applicationId) throws IOException { final String yarnCmd = "yarn application -status " + applicationId; final String cmdOutput = kylinConfig.getCliCommandExecutor().execute(yarnCmd).getSecond(); http://git-wip-us.apache.org/repos/asf/kylin/blob/e02f0bed/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java ---------------------------------------------------------------------- diff --git a/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java b/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java new file mode 100644 index 0000000..dc1686b --- /dev/null +++ b/tool/src/main/java/org/apache/kylin/tool/JobTaskCounterExtractor.java @@ -0,0 +1,149 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.tool; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.commons.httpclient.HttpClient; +import org.apache.commons.httpclient.HttpMethod; +import org.apache.commons.httpclient.methods.GetMethod; +import org.apache.commons.io.FileUtils; +import org.apache.commons.lang.StringUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.conf.HAUtil; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.util.RMHAUtils; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.common.util.OptionsHelper; +import org.apache.kylin.engine.mr.HadoopUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.File; +import java.io.IOException; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +public class JobTaskCounterExtractor extends AbstractInfoExtractor{ + private String mrJobId; + private String yarnUrl; + private static final Logger logger = LoggerFactory.getLogger(JobTaskCounterExtractor.class); + + public JobTaskCounterExtractor(String mrJobId) { + this.mrJobId = mrJobId; + this.yarnUrl = getRestCheckUrl(); + } + + private String getRestCheckUrl() { + KylinConfig config = KylinConfig.getInstanceFromEnv(); + final String yarnStatusCheckUrl = config.getYarnStatusCheckUrl(); + Pattern pattern = Pattern.compile("(http://)(.*):.*"); + if (yarnStatusCheckUrl != null) { + Matcher m = pattern.matcher(yarnStatusCheckUrl); + m.matches(); + yarnUrl = m.group(1) + m.group(2) + ":19888"; + return yarnUrl; + } else { + logger.info("kylin.job.yarn.app.rest.check.status.url" + " is not set read from hadoop configuration"); + } + Configuration conf = HadoopUtil.getCurrentConfiguration(); + String rmWebHost = HAUtil.getConfValueForRMInstance(YarnConfiguration.RM_WEBAPP_ADDRESS, YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, conf); + if (HAUtil.isHAEnabled(conf)) { + YarnConfiguration yarnConf = new YarnConfiguration(conf); + String active = RMHAUtils.findActiveRMHAId(yarnConf); + rmWebHost = HAUtil.getConfValueForRMInstance(HAUtil.addSuffix(YarnConfiguration.RM_WEBAPP_ADDRESS, active), YarnConfiguration.DEFAULT_RM_WEBAPP_ADDRESS, yarnConf); + } + if (StringUtils.isEmpty(rmWebHost)) { + return null; + } + if (rmWebHost.startsWith("http://") || rmWebHost.startsWith("https://")) { + //do nothing + } else { + rmWebHost = "http://" + rmWebHost; + } + Matcher m = pattern.matcher(rmWebHost); + m.matches(); + return m.group(1) + m.group(2) + ":19888"; + } + + + private String getHttpResponse(String url) { + HttpClient client = new HttpClient(); + String response = null; + while (response == null) { + HttpMethod get = new GetMethod(url); + try { + get.addRequestHeader("accept", "application/json"); + client.executeMethod(get); + response = get.getResponseBodyAsString(); + }catch (Exception e) { + logger.warn("Failed to fetch http response" + e); + }finally { + get.releaseConnection(); + } + } + return response; + } + + protected void executeExtract(File exportDir) { + try { + String taskUrl = yarnUrl + "/ws/v1/history/mapreduce/jobs/" + mrJobId + "/tasks/"; + String tasksResponse = getHttpResponse(taskUrl); + JsonNode tasks = new ObjectMapper().readTree(tasksResponse).path("tasks").path("task"); + + String maxReduceId = null; + String maxMapId = null; + long maxMapElapsedTime = 0L; + long maxReduceElapsedTime = 0L; + + for (JsonNode node : tasks) { + if(node.get("type").textValue().equals("MAP")) { + if (node.get("elapsedTime").longValue() >= maxMapElapsedTime) { + maxMapElapsedTime = node.get("elapsedTime").longValue(); + maxMapId = node.get("id").textValue(); + } + } + if(node.get("type").textValue().equals("REDUCE")) { + if (node.get("elapsedTime").longValue() >= maxReduceElapsedTime) { + maxReduceElapsedTime = node.get("elapsedTime").longValue(); + maxReduceId = node.get("id").textValue(); + } + } + } + extractTaskCounterFile(maxMapId, exportDir,taskUrl); + extractTaskCounterFile(maxReduceId, exportDir,taskUrl); + } catch (Exception e) { + logger.warn("Failed to get mr tasks rest response" + e); + } + } + + private void extractTaskCounterFile(String taskId, File exportDir, String taskUrl) throws IOException { + try { + String response = getHttpResponse(taskUrl + taskId + "/counters"); + FileUtils.writeStringToFile(new File(exportDir,taskId+".json"), response); + } catch (Exception e) { + logger.warn("Failed to get task counters rest response" + e); + } + } + + @Override + protected void executeExtract(OptionsHelper optionsHelper, File exportDir) throws Exception { + executeExtract(exportDir); + } +}