KYLIN-2503 KYLIN-2502 Hive/spark steps should show YARN app link
Project: http://git-wip-us.apache.org/repos/asf/kylin/repo Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/47b7c800 Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/47b7c800 Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/47b7c800 Branch: refs/heads/master-hbase0.98 Commit: 47b7c800ecdfe14d10426d64eae6b47e11f2a328 Parents: f0eca24 Author: shaofengshi <shaofeng...@apache.org> Authored: Sun Mar 12 14:13:04 2017 +0800 Committer: shaofengshi <shaofeng...@apache.org> Committed: Sun Mar 12 14:13:04 2017 +0800 ---------------------------------------------------------------------- .../kylin/common/util/BufferedLogger.java | 6 +- .../kylin/job/common/PatternedLogger.java | 130 +++++++++++++++++++ .../kylin/job/common/ShellExecutable.java | 106 +-------------- .../kylin/engine/mr/common/CubeStatsReader.java | 2 +- .../kylin/engine/spark/SparkExecutable.java | 19 +-- .../source/hive/CreateFlatHiveTableStep.java | 5 +- .../apache/kylin/source/hive/HiveMRInput.java | 6 +- 7 files changed, 153 insertions(+), 121 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java ---------------------------------------------------------------------- diff --git a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java index cef598d..02a1fa3 100644 --- a/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java +++ b/core-common/src/main/java/org/apache/kylin/common/util/BufferedLogger.java @@ -27,6 +27,8 @@ public class BufferedLogger implements Logger { private final org.slf4j.Logger wrappedLogger; private final StringBuilder buffer = new StringBuilder(); + private static int MAX_BUFFER_SIZE = 10 * 1024 * 1024; + public BufferedLogger(org.slf4j.Logger wrappedLogger) { this.wrappedLogger = wrappedLogger; } @@ -34,7 +36,9 @@ public class BufferedLogger implements Logger { @Override public void log(String message) { wrappedLogger.info(message); - buffer.append(message).append("\n"); + if (buffer.length() < MAX_BUFFER_SIZE) { + buffer.append(message).append("\n"); + } } public String getBufferedLog() { http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java new file mode 100644 index 0000000..8399b44 --- /dev/null +++ b/core-job/src/main/java/org/apache/kylin/job/common/PatternedLogger.java @@ -0,0 +1,130 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. +*/ + +package org.apache.kylin.job.common; + +import java.util.Map; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.kylin.common.util.BufferedLogger; +import org.apache.kylin.job.constant.ExecutableConstants; +import org.slf4j.Logger; + +import com.google.common.collect.Maps; + +/** + * A logger which parses certain patterns from log + */ +public class PatternedLogger extends BufferedLogger { + private final Map<String, String> info = Maps.newHashMap(); + + private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager"); + private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)"); + private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)"); + private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)"); + private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)"); + private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write"); + + // hive + private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)"); + private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS"); + + // spark + private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)"); + private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)"); + + + public PatternedLogger(Logger wrappedLogger) { + super(wrappedLogger); + } + + @Override + public void log(String message) { + super.log(message); + Matcher matcher = PATTERN_APP_ID.matcher(message); + if (matcher.find()) { + String appId = matcher.group(1); + info.put(ExecutableConstants.YARN_APP_ID, appId); + } + + matcher = PATTERN_APP_URL.matcher(message); + if (matcher.find()) { + String appTrackingUrl = matcher.group(1); + info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl); + } + + matcher = PATTERN_JOB_ID.matcher(message); + if (matcher.find()) { + String mrJobID = matcher.group(1); + info.put(ExecutableConstants.MR_JOB_ID, mrJobID); + } + + matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message); + if (matcher.find()) { + String hdfsWritten = matcher.group(1); + info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten); + } + + matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message); + if (matcher.find()) { + String sourceCount = matcher.group(1); + info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount); + } + + matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message); + if (matcher.find()) { + String sourceSize = matcher.group(1); + info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize); + } + + // hive + matcher = PATTERN_HIVE_APP_ID_URL.matcher(message); + if (matcher.find()) { + String jobId = matcher.group(1); + String trackingUrl = matcher.group(2); + info.put(ExecutableConstants.MR_JOB_ID, jobId); + info.put(ExecutableConstants.YARN_APP_URL, trackingUrl); + } + + matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message); + if (matcher.find()) { + // String hdfsRead = matcher.group(1); + String hdfsWritten = matcher.group(2); + info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten); + } + + // spark + matcher = PATTERN_SPARK_APP_ID.matcher(message); + if (matcher.find()) { + String app_id = matcher.group(1); + info.put(ExecutableConstants.YARN_APP_ID, app_id); + } + + matcher = PATTERN_SPARK_APP_URL.matcher(message); + if (matcher.find()) { + String trackingUrl = matcher.group(1); + info.put(ExecutableConstants.YARN_APP_URL, trackingUrl); + } + } + + public Map<String, String> getInfo() { + return info; + } + +} http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java ---------------------------------------------------------------------- diff --git a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java index 5e0d0db..9f431b0 100644 --- a/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java +++ b/core-job/src/main/java/org/apache/kylin/job/common/ShellExecutable.java @@ -19,21 +19,13 @@ package org.apache.kylin.job.common; import java.io.IOException; -import java.util.Map; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import org.apache.kylin.common.util.Logger; import org.apache.kylin.common.util.Pair; -import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; import org.apache.kylin.job.execution.ExecuteResult; import org.slf4j.LoggerFactory; -import com.google.common.collect.Maps; - /** */ public class ShellExecutable extends AbstractExecutable { @@ -50,9 +42,9 @@ public class ShellExecutable extends AbstractExecutable { protected ExecuteResult doWork(ExecutableContext context) throws ExecuteException { try { logger.info("executing:" + getCmd()); - final ShellExecutableLogger logger = new ShellExecutableLogger(); - final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), logger); - getManager().addJobInfo(getId(), logger.getInfo()); + final PatternedLogger patternedLogger = new PatternedLogger(logger); + final Pair<Integer, String> result = context.getConfig().getCliCommandExecutor().execute(getCmd(), patternedLogger); + getManager().addJobInfo(getId(), patternedLogger.getInfo()); return new ExecuteResult(result.getFirst() == 0 ? ExecuteResult.State.SUCCEED : ExecuteResult.State.FAILED, result.getSecond()); } catch (IOException e) { logger.error("job:" + getId() + " execute finished with exception", e); @@ -68,96 +60,4 @@ public class ShellExecutable extends AbstractExecutable { return getParam(CMD); } - private static class ShellExecutableLogger implements Logger { - - private final Map<String, String> info = Maps.newHashMap(); - - private static final Pattern PATTERN_APP_ID = Pattern.compile("Submitted application (.*?) to ResourceManager"); - private static final Pattern PATTERN_APP_URL = Pattern.compile("The url to track the job: (.*)"); - private static final Pattern PATTERN_JOB_ID = Pattern.compile("Running job: (.*)"); - private static final Pattern PATTERN_HDFS_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS: Number of bytes written=(\\d+)"); - private static final Pattern PATTERN_SOURCE_RECORDS_COUNT = Pattern.compile("Map input records=(\\d+)"); - private static final Pattern PATTERN_SOURCE_RECORDS_SIZE = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write"); - - // hive - private static final Pattern PATTERN_HIVE_APP_ID_URL = Pattern.compile("Starting Job = (.*?), Tracking URL = (.*)"); - private static final Pattern PATTERN_HIVE_BYTES_WRITTEN = Pattern.compile("(?:HD|MAPR)FS Read: (\\d+) HDFS Write: (\\d+) SUCCESS"); - - // spark - private static final Pattern PATTERN_SPARK_APP_ID = Pattern.compile("Submitted application (.*?)"); - private static final Pattern PATTERN_SPARK_APP_URL = Pattern.compile("tracking URL: (.*)"); - - @Override - public void log(String message) { - Matcher matcher = PATTERN_APP_ID.matcher(message); - if (matcher.find()) { - String appId = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_ID, appId); - } - - matcher = PATTERN_APP_URL.matcher(message); - if (matcher.find()) { - String appTrackingUrl = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_URL, appTrackingUrl); - } - - matcher = PATTERN_JOB_ID.matcher(message); - if (matcher.find()) { - String mrJobID = matcher.group(1); - info.put(ExecutableConstants.MR_JOB_ID, mrJobID); - } - - matcher = PATTERN_HDFS_BYTES_WRITTEN.matcher(message); - if (matcher.find()) { - String hdfsWritten = matcher.group(1); - info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten); - } - - matcher = PATTERN_SOURCE_RECORDS_COUNT.matcher(message); - if (matcher.find()) { - String sourceCount = matcher.group(1); - info.put(ExecutableConstants.SOURCE_RECORDS_COUNT, sourceCount); - } - - matcher = PATTERN_SOURCE_RECORDS_SIZE.matcher(message); - if (matcher.find()) { - String sourceSize = matcher.group(1); - info.put(ExecutableConstants.SOURCE_RECORDS_SIZE, sourceSize); - } - - // hive - matcher = PATTERN_HIVE_APP_ID_URL.matcher(message); - if (matcher.find()) { - String jobId = matcher.group(1); - String trackingUrl = matcher.group(2); - info.put(ExecutableConstants.MR_JOB_ID, jobId); - info.put(ExecutableConstants.YARN_APP_URL, trackingUrl); - } - - matcher = PATTERN_HIVE_BYTES_WRITTEN.matcher(message); - if (matcher.find()) { - // String hdfsRead = matcher.group(1); - String hdfsWritten = matcher.group(2); - info.put(ExecutableConstants.HDFS_BYTES_WRITTEN, hdfsWritten); - } - - // spark - matcher = PATTERN_SPARK_APP_ID.matcher(message); - if (matcher.find()) { - String app_id = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_ID, app_id); - } - - matcher = PATTERN_SPARK_APP_URL.matcher(message); - if (matcher.find()) { - String trackingUrl = matcher.group(1); - info.put(ExecutableConstants.YARN_APP_URL, trackingUrl); - } - } - - Map<String, String> getInfo() { - return info; - } - } - } http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java ---------------------------------------------------------------------- diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java index 9cb60f8..b54f401 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/CubeStatsReader.java @@ -224,7 +224,7 @@ public class CubeStatsReader { double cuboidSizeRatio = kylinConf.getJobCuboidSizeRatio(); double cuboidSizeMemHungryRatio = kylinConf.getJobCuboidSizeCountDistinctRatio(); double ret = (1.0 * normalSpace * rowCount * cuboidSizeRatio + 1.0 * countDistinctSpace * rowCount * cuboidSizeMemHungryRatio) / (1024L * 1024L); - logger.info("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + (normalSpace + countDistinctSpace) + " bytes." + " Total size is " + ret + "M."); + logger.debug("Cuboid " + cuboidId + " has " + rowCount + " rows, each row size is " + (normalSpace + countDistinctSpace) + " bytes." + " Total size is " + ret + "M."); return ret; } http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java ---------------------------------------------------------------------- diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java index 5ad21cf..1ed2235 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java @@ -23,7 +23,7 @@ import java.util.Map; import org.apache.commons.lang.StringUtils; import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.CliCommandExecutor; -import org.apache.kylin.common.util.Logger; +import org.apache.kylin.job.common.PatternedLogger; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -118,22 +118,17 @@ public class SparkExecutable extends AbstractExecutable { stringBuilder.append("--files %s --jars %s %s %s"); try { String cmd = String.format(stringBuilder.toString(), hadoopConf, KylinConfig.getSparkHome(), hbaseConfFile.getAbsolutePath(), jars, jobJar, formatArgs()); - logger.info("cmd:" + cmd); - final StringBuilder output = new StringBuilder(); + logger.info("cmd: " + cmd); CliCommandExecutor exec = new CliCommandExecutor(); - exec.execute(cmd, new Logger() { - @Override - public void log(String message) { - output.append(message); - output.append("\n"); - logger.info(message); - } - }); - return new ExecuteResult(ExecuteResult.State.SUCCEED, output.toString()); + PatternedLogger patternedLogger = new PatternedLogger(logger); + exec.execute(cmd, patternedLogger); + getManager().addJobInfo(getId(), patternedLogger.getInfo()); + return new ExecuteResult(ExecuteResult.State.SUCCEED, patternedLogger.getBufferedLog()); } catch (Exception e) { logger.error("error run spark job:", e); return new ExecuteResult(ExecuteResult.State.ERROR, e.getLocalizedMessage()); } } + } http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java index b197f0e..8b241d2 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateFlatHiveTableStep.java @@ -20,12 +20,12 @@ package org.apache.kylin.source.hive; import java.io.IOException; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.BufferedLogger; import org.apache.kylin.common.util.HiveCmdBuilder; import org.apache.kylin.common.util.Pair; import org.apache.kylin.cube.CubeInstance; import org.apache.kylin.cube.CubeManager; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; +import org.apache.kylin.job.common.PatternedLogger; import org.apache.kylin.job.exception.ExecuteException; import org.apache.kylin.job.execution.AbstractExecutable; import org.apache.kylin.job.execution.ExecutableContext; @@ -38,7 +38,7 @@ import org.slf4j.LoggerFactory; public class CreateFlatHiveTableStep extends AbstractExecutable { private static final Logger logger = LoggerFactory.getLogger(CreateFlatHiveTableStep.class); - private final BufferedLogger stepLogger = new BufferedLogger(logger); + private final PatternedLogger stepLogger = new PatternedLogger(logger); protected void createFlatHiveTable(KylinConfig config) throws IOException { final HiveCmdBuilder hiveCmdBuilder = new HiveCmdBuilder(); @@ -50,6 +50,7 @@ public class CreateFlatHiveTableStep extends AbstractExecutable { stepLogger.log(cmd); Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); + getManager().addJobInfo(getId(), stepLogger.getInfo()); if (response.getFirst() != 0) { throw new RuntimeException("Failed to create flat hive table, error code " + response.getFirst()); } http://git-wip-us.apache.org/repos/asf/kylin/blob/47b7c800/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java ---------------------------------------------------------------------- diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java index 39a0273..bbf3c60 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java @@ -29,7 +29,6 @@ import org.apache.hadoop.mapreduce.Job; import org.apache.hive.hcatalog.data.HCatRecord; import org.apache.hive.hcatalog.mapreduce.HCatInputFormat; import org.apache.kylin.common.KylinConfig; -import org.apache.kylin.common.util.BufferedLogger; import org.apache.kylin.common.util.HadoopUtil; import org.apache.kylin.common.util.HiveCmdBuilder; import org.apache.kylin.common.util.Pair; @@ -40,6 +39,7 @@ import org.apache.kylin.engine.mr.IMRInput; import org.apache.kylin.engine.mr.JobBuilderSupport; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.job.JoinedFlatTable; +import org.apache.kylin.job.common.PatternedLogger; import org.apache.kylin.job.common.ShellExecutable; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; @@ -246,7 +246,7 @@ public class HiveMRInput implements IMRInput { } public static class RedistributeFlatHiveTableStep extends AbstractExecutable { - private final BufferedLogger stepLogger = new BufferedLogger(logger); + private final PatternedLogger stepLogger = new PatternedLogger(logger); private long computeRowCount(String database, String table) throws Exception { IHiveClient hiveClient = HiveClientFactory.getHiveClient(); @@ -265,6 +265,8 @@ public class HiveMRInput implements IMRInput { stepLogger.log(cmd); Pair<Integer, String> response = config.getCliCommandExecutor().execute(cmd, stepLogger); + getManager().addJobInfo(getId(), stepLogger.getInfo()); + if (response.getFirst() != 0) { throw new RuntimeException("Failed to redistribute flat hive table"); }