Repository: kylin
Updated Branches:
  refs/heads/master 094510cf3 -> 8de31563c


KYLIN-1546 Add tool to extract job related information


Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/8de31563
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/8de31563
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/8de31563

Branch: refs/heads/master
Commit: 8de31563cc11f7babdb8c171f64224b63785ab8d
Parents: 9e31a19
Author: lidongsjtu <lid...@apache.org>
Authored: Tue Mar 29 19:56:24 2016 +0800
Committer: lidongsjtu <lid...@apache.org>
Committed: Tue Mar 29 20:04:16 2016 +0800

----------------------------------------------------------------------
 .../apache/kylin/admin/JobInfoExtractor.java    | 160 +++++++++++++++++++
 .../apache/kylin/admin/YarnLogExtractor.java    | 134 ++++++++++++++++
 2 files changed, 294 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/8de31563/assembly/src/main/java/org/apache/kylin/admin/JobInfoExtractor.java
----------------------------------------------------------------------
diff --git 
a/assembly/src/main/java/org/apache/kylin/admin/JobInfoExtractor.java 
b/assembly/src/main/java/org/apache/kylin/admin/JobInfoExtractor.java
new file mode 100644
index 0000000..fadccae
--- /dev/null
+++ b/assembly/src/main/java/org/apache/kylin/admin/JobInfoExtractor.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.admin;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.persistence.ResourceTool;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.apache.kylin.job.manager.ExecutableManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by dongli on 3/29/16.
+ */
+public class JobInfoExtractor extends AbstractApplication {
+    private static final Logger logger = 
LoggerFactory.getLogger(JobInfoExtractor.class);
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_JOB_ID = 
OptionBuilder.withArgName("jobId").hasArg().isRequired(true).withDescription("specify
 the Job ID to extract information. ").create("jobId");
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_DEST = 
OptionBuilder.withArgName("destDir").hasArg().isRequired(true).withDescription("specify
 the dest dir to save the related information").create("destDir");
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_INCLUDE_CUBE = 
OptionBuilder.withArgName("includeCube").hasArg().isRequired(false).withDescription("set
 this to true if want to extract related cube info too. Default 
true").create("includeCube");
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_INCLUDE_YARN_LOGS = 
OptionBuilder.withArgName("includeYarnLogs").hasArg().isRequired(false).withDescription("set
 this to true if want to extract related yarn logs too. Default 
true").create("includeYarnLogs");
+
+    private Options options;
+
+    private KylinConfig kylinConfig;
+    private CubeMetaExtractor cubeMetaExtractor;
+    private YarnLogExtractor yarnLogExtractor;
+
+    private ExecutableDao executableDao;
+    private ExecutableManager executableManager;
+
+    List<String> requiredResources = Lists.newArrayList();
+
+    public JobInfoExtractor() {
+        cubeMetaExtractor = new CubeMetaExtractor();
+        yarnLogExtractor = new YarnLogExtractor();
+
+        options = new Options();
+        options.addOption(OPTION_JOB_ID);
+        options.addOption(OPTION_DEST);
+        options.addOption(OPTION_INCLUDE_CUBE);
+        options.addOption(OPTION_INCLUDE_YARN_LOGS);
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        executableDao = ExecutableDao.getInstance(kylinConfig);
+        executableManager = ExecutableManager.getInstance(kylinConfig);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String jobId = optionsHelper.getOptionValue(OPTION_JOB_ID);
+        String dest = optionsHelper.getOptionValue(OPTION_DEST);
+        boolean includeCube = optionsHelper.hasOption(OPTION_INCLUDE_CUBE) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_CUBE)) : true;
+        boolean includeYarnLogs = 
optionsHelper.hasOption(OPTION_INCLUDE_YARN_LOGS) ? 
Boolean.valueOf(optionsHelper.getOptionValue(OPTION_INCLUDE_YARN_LOGS)) : true;
+
+        if (StringUtils.isEmpty(dest)) {
+            throw new RuntimeException("destDir is not set, exit directly 
without extracting");
+        }
+
+        if (!dest.endsWith("/")) {
+            dest = dest + "/";
+        }
+
+        ExecutablePO executablePO = executableDao.getJob(jobId);
+        addRequired(ExecutableDao.pathOfJob(jobId));
+        addRequired(ExecutableDao.pathOfJobOutput(jobId));
+        for (ExecutablePO task : executablePO.getTasks()) {
+            addRequired(ExecutableDao.pathOfJob(task.getUuid()));
+            addRequired(ExecutableDao.pathOfJobOutput(task.getUuid()));
+        }
+        executeExtraction(dest);
+
+        if (includeCube) {
+            String cubeName = 
CubingExecutableUtil.getCubeName(executablePO.getParams());
+            String[] cubeMetaArgs = { "-cube", cubeName, "-destDir", dest + 
"cube_" + cubeName + "/", "-includeJobs", "false" };
+            logger.info("Start to extract related cube: " + 
StringUtils.join(cubeMetaArgs));
+            cubeMetaExtractor.execute(cubeMetaArgs);
+        }
+
+        if (includeYarnLogs) {
+            String[] yarnLogsArgs = { "-jobId", jobId, "-destDir", dest + 
"yarn_" + jobId + "/" };
+            logger.info("Start to related yarn job logs: " + 
StringUtils.join(yarnLogsArgs));
+            yarnLogExtractor.execute(yarnLogsArgs);
+        }
+
+        logger.info("Extracted kylin jobs located at: " + new 
File(dest).getAbsolutePath());
+    }
+
+    private void executeExtraction(String dest) {
+        logger.info("The resource paths going to be extracted:");
+        for (String s : requiredResources) {
+            logger.info(s + "(required)");
+        }
+
+        try {
+            ResourceStore src = 
ResourceStore.getStore(KylinConfig.getInstanceFromEnv());
+            ResourceStore dst = 
ResourceStore.getStore(KylinConfig.createInstanceFromUri(dest));
+
+            for (String path : requiredResources) {
+                ResourceTool.copyR(src, dst, path);
+            }
+
+        } catch (IOException e) {
+            throw new RuntimeException("IOException", e);
+        }
+    }
+
+    private void addRequired(String record) {
+        logger.info("adding required resource {}", record);
+        requiredResources.add(record);
+    }
+
+    public static void main(String args[]) {
+        JobInfoExtractor extractor = new JobInfoExtractor();
+        extractor.execute(args);
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/8de31563/assembly/src/main/java/org/apache/kylin/admin/YarnLogExtractor.java
----------------------------------------------------------------------
diff --git 
a/assembly/src/main/java/org/apache/kylin/admin/YarnLogExtractor.java 
b/assembly/src/main/java/org/apache/kylin/admin/YarnLogExtractor.java
new file mode 100644
index 0000000..354a3f9
--- /dev/null
+++ b/assembly/src/main/java/org/apache/kylin/admin/YarnLogExtractor.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.admin;
+
+import java.io.File;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.commons.io.FileUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.job.common.ShellExecutable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.dao.ExecutableDao;
+import org.apache.kylin.job.dao.ExecutablePO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+/**
+ * Created by dongli on 3/29/16.
+ */
+public class YarnLogExtractor extends AbstractApplication {
+    private static final Logger logger = 
LoggerFactory.getLogger(YarnLogExtractor.class);
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_JOB_ID = 
OptionBuilder.withArgName("jobId").hasArg().isRequired(true).withDescription("specify
 the Job ID to extract information. ").create("jobId");
+
+    @SuppressWarnings("static-access")
+    private static final Option OPTION_DEST = 
OptionBuilder.withArgName("destDir").hasArg().isRequired(true).withDescription("specify
 the dest dir to save the related information").create("destDir");
+
+    private Options options;
+
+    private KylinConfig kylinConfig;
+    private ExecutableDao executableDao;
+
+    List<String> requiredResources = Lists.newArrayList();
+
+    public YarnLogExtractor() {
+        options = new Options();
+        options.addOption(OPTION_JOB_ID);
+        options.addOption(OPTION_DEST);
+
+        kylinConfig = KylinConfig.getInstanceFromEnv();
+        executableDao = ExecutableDao.getInstance(kylinConfig);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        String jobId = optionsHelper.getOptionValue(OPTION_JOB_ID);
+        String dest = optionsHelper.getOptionValue(OPTION_DEST);
+
+        if (StringUtils.isEmpty(dest)) {
+            throw new RuntimeException("destDir is not set, exit directly 
without extracting");
+        }
+
+        if (!dest.endsWith("/")) {
+            dest = dest + "/";
+        }
+
+        ExecutablePO executablePO = executableDao.getJob(jobId);
+        for (ExecutablePO task : executablePO.getTasks()) {
+            addRequired(task.getUuid());
+        }
+        executeExtraction(dest);
+
+        logger.info("Extracted yarn logs located at: " + new 
File(dest).getAbsolutePath());
+    }
+
+    private void extractYarnLog(String taskId, String dest) throws Exception {
+        final Map<String, String> jobInfo = 
executableDao.getJobOutput(taskId).getInfo();
+        if (jobInfo.containsKey(ExecutableConstants.MR_JOB_ID)) {
+            String applicationId = 
jobInfo.get(ExecutableConstants.MR_JOB_ID).replace("job", "application");
+            File destFile = new File(dest + applicationId + ".log");
+
+            ShellExecutable yarnExec = new ShellExecutable();
+            yarnExec.setCmd("yarn logs -applicationId " + applicationId + " > 
" + destFile.getAbsolutePath());
+            yarnExec.setName(yarnExec.getCmd());
+
+            logger.info(yarnExec.getCmd());
+            kylinConfig.getCliCommandExecutor().execute(yarnExec.getCmd(), 
null);
+        }
+    }
+
+    private void executeExtraction(String dest) throws Exception {
+        logger.info("The resource paths going to be extracted:");
+        for (String taskId : requiredResources) {
+            logger.info(taskId + "(required)");
+        }
+
+        logger.info("Start to download yarn logs.");
+        FileUtils.forceMkdir(new File(dest));
+        for (String taskId : requiredResources) {
+            extractYarnLog(taskId, dest);
+        }
+    }
+
+    private void addRequired(String record) {
+        logger.info("adding required resource {}", record);
+        requiredResources.add(record);
+    }
+
+    public static void main(String args[]) {
+        YarnLogExtractor extractor = new YarnLogExtractor();
+        extractor.execute(args);
+    }
+}

Reply via email to