This is an automated email from the ASF dual-hosted git repository.
shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git
The following commit(s) were added to refs/heads/master by this push:
new c5424ce KYLIN-3477 Save spark job counter to hdfs
c5424ce is described below
commit c5424ce065ccbeb39bb0b9aea1c214a32fb082e8
Author: chao long <[email protected]>
AuthorDate: Mon Aug 27 11:32:47 2018 +0800
KYLIN-3477 Save spark job counter to hdfs
---
.../org/apache/kylin/common/util/HadoopUtil.java | 36 ++++++++++++++++++++++
.../apache/kylin/engine/mr/JobBuilderSupport.java | 6 +++-
.../kylin/engine/mr/common/BatchConstants.java | 2 ++
.../engine/spark/SparkBatchCubingJobBuilder2.java | 2 +-
.../apache/kylin/engine/spark/SparkExecutable.java | 15 +++++++++
.../kylin/engine/spark/SparkFactDistinct.java | 15 ++++++++-
.../kylin/storage/hbase/steps/HBaseSparkSteps.java | 2 +-
.../kylin/storage/hbase/steps/SparkCubeHFile.java | 16 ++++++++--
8 files changed, 88 insertions(+), 6 deletions(-)
diff --git
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 3cb43fc..3aef34a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
+import java.util.Map;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.conf.Configuration;
@@ -30,12 +31,17 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.StorageURL;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.google.common.collect.Maps;
+
public class HadoopUtil {
@SuppressWarnings("unused")
private static final Logger logger =
LoggerFactory.getLogger(HadoopUtil.class);
@@ -193,4 +199,34 @@ public class HadoopUtil {
HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath),
true);
logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
}
+
+ @SuppressWarnings("deprecation")
+ public static void writeToSequenceFile(Configuration conf, String
outputPath, Map<String, String> counterMap) throws IOException {
+ try (SequenceFile.Writer writer =
SequenceFile.createWriter(getWorkingFileSystem(conf), conf, new
Path(outputPath), Text.class, Text.class)) {
+ for (Map.Entry<String, String> counterEntry :
counterMap.entrySet()) {
+ writer.append(new Text(counterEntry.getKey()), new
Text(counterEntry.getValue()));
+ }
+ }
+ }
+
+ @SuppressWarnings("deprecation")
+ public static Map<String, String> readFromSequenceFile(Configuration conf,
String inputPath) throws IOException {
+ try (SequenceFile.Reader reader = new
SequenceFile.Reader(getWorkingFileSystem(conf), new Path(inputPath), conf)) {
+ Map<String, String> map = Maps.newHashMap();
+
+ Text key = (Text)
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+ Text value = (Text)
ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+ while (reader.next(key, value)) {
+ map.put(key.toString(), value.toString());
+ }
+
+ return map;
+ }
+ }
+
+ public static Map<String, String> readFromSequenceFile(String inputPath)
throws IOException {
+ return readFromSequenceFile(getCurrentConfiguration(), inputPath);
+ }
+
}
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 02e9fe5..5b1f38c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -334,7 +334,11 @@ public class JobBuilderSupport {
}
public String getHBaseConfFilePath(String jobId) {
- return getJobWorkingDir(jobId) + "/hbase-conf.xml";
+ return getJobWorkingDir(jobId) + "/hbase-conf.xml";
+ }
+
+ public String getCounterOuputPath(String jobId) {
+ return getRealizationRootPath(jobId) + "/counter";
}
//
============================================================================
diff --git
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 8c2ba7f..66da1b2 100644
---
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -106,6 +106,8 @@ public interface BatchConstants {
String ARG_META_URL = "metadataUrl";
String ARG_HBASE_CONF_PATH = "hbaseConfPath";
String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
+ String ARG_COUNTER_OUPUT = "counterOutput";
+
/**
* logger and counter
*/
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 5fd7213..3f3c14d 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -103,7 +103,7 @@ public class SparkBatchCubingJobBuilder2 extends
JobBuilderSupport {
sparkExecutable.setJobId(jobId);
sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
- sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," +
CubingJob.SOURCE_SIZE_BYTES);
+ sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," +
CubingJob.SOURCE_SIZE_BYTES, getCounterOuputPath(jobId));
StringBuilder jars = new StringBuilder();
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 6122397..d85337e 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -35,11 +35,13 @@ import org.apache.kylin.common.KylinConfig;
import org.apache.kylin.common.KylinConfigExt;
import org.apache.kylin.common.persistence.ResourceStore;
import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
import org.apache.kylin.common.util.Pair;
import org.apache.kylin.cube.CubeInstance;
import org.apache.kylin.cube.CubeManager;
import org.apache.kylin.cube.CubeSegment;
import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
import org.apache.kylin.job.common.PatternedLogger;
import org.apache.kylin.job.constant.ExecutableConstants;
@@ -80,6 +82,11 @@ public class SparkExecutable extends AbstractExecutable {
this.setParam(COUNTER_SAVE_AS, value);
}
+ public void setCounterSaveAs(String value, String counterOutputPath) {
+ this.setParam(COUNTER_SAVE_AS, value);
+ this.setParam(BatchConstants.ARG_COUNTER_OUPUT, counterOutputPath);
+ }
+
public String getCounterSaveAs() {
return getParam(COUNTER_SAVE_AS);
}
@@ -286,6 +293,14 @@ public class SparkExecutable extends AbstractExecutable {
}
// done, update all properties
Map<String, String> joblogInfo = patternedLogger.getInfo();
+
+ // read counter from hdfs
+ String counterOutput =
getParam(BatchConstants.ARG_COUNTER_OUPUT);
+ if (counterOutput != null){
+ Map<String, String> counterMap =
HadoopUtil.readFromSequenceFile(counterOutput);
+ joblogInfo.putAll(counterMap);
+ }
+
readCounters(joblogInfo);
getManager().addJobInfo(getId(), joblogInfo);
diff --git
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 61e2e53..6188a56 100644
---
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -73,6 +73,7 @@ import
org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper
import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.measure.BufferedMeasureCodec;
import org.apache.kylin.measure.hllc.HLLCounter;
import org.apache.kylin.measure.hllc.RegisterType;
@@ -119,6 +120,8 @@ public class SparkFactDistinct extends AbstractApplication
implements Serializab
.withDescription("Hive Intermediate Table").create("hiveTable");
public static final Option OPTION_INPUT_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
.isRequired(true).withDescription("Hive Intermediate Table
PATH").create(BatchConstants.ARG_INPUT);
+ public static final Option OPTION_COUNTER_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
+ .isRequired(true).withDescription("counter output
path").create(BatchConstants.ARG_COUNTER_OUPUT);
private Options options;
@@ -131,6 +134,7 @@ public class SparkFactDistinct extends AbstractApplication
implements Serializab
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_SEGMENT_ID);
options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+ options.addOption(OPTION_COUNTER_PATH);
}
@Override
@@ -146,6 +150,7 @@ public class SparkFactDistinct extends AbstractApplication
implements Serializab
String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+ String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
int samplingPercent =
Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
Class[] kryoClassArray = new Class[] {
Class.forName("scala.reflect.ClassTag$$anon$1"),
Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") };
@@ -173,6 +178,7 @@ public class SparkFactDistinct extends AbstractApplication
implements Serializab
logger.info("RDD Output path: {}", outputPath);
logger.info("getTotalReducerNum: {}",
reducerMapping.getTotalReducerNum());
logger.info("getCuboidRowCounterReducerNum: {}",
reducerMapping.getCuboidRowCounterReducerNum());
+ logger.info("counter path {}", counterPath);
boolean isSequenceFile =
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
@@ -202,13 +208,20 @@ public class SparkFactDistinct extends
AbstractApplication implements Serializab
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
- // only work for client mode, not work when
spark.submit.deployMode=cluster
logger.info("Map input records={}", recordRDD.count());
logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
+ Map<String, String> counterMap = Maps.newHashMap();
+ counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT,
String.valueOf(recordRDD.count()));
+ counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE,
String.valueOf(bytesWritten.value()));
+
+ // save counter to hdfs
+ HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath,
counterMap);
+
HadoopUtil.deleteHDFSMeta(metaUrl);
}
+
static class FlatOutputFucntion implements
PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
private volatile transient boolean initialized = false;
private String cubeName;
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index be230f0..ccab22f 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -71,7 +71,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
sparkExecutable.setJars(jars.toString());
sparkExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
- sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+ sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES,
getCounterOuputPath(jobId));
return sparkExecutable;
}
diff --git
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index c87a739..539f03b 100644
---
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -23,8 +23,10 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
+import com.google.common.collect.Maps;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionBuilder;
import org.apache.commons.cli.Options;
@@ -55,6 +57,7 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
import org.apache.kylin.engine.mr.common.SerializableConfiguration;
import org.apache.kylin.engine.spark.KylinSparkJobListener;
import org.apache.kylin.engine.spark.SparkUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
import org.apache.kylin.measure.MeasureCodec;
import org.apache.spark.Partitioner;
import org.apache.spark.SparkConf;
@@ -88,6 +91,8 @@ public class SparkCubeHFile extends AbstractApplication
implements Serializable
.isRequired(true).withDescription("Cuboid files
PATH").create(BatchConstants.ARG_INPUT);
public static final Option OPTION_PARTITION_FILE_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
.hasArg().isRequired(true).withDescription("Partition file
path.").create(BatchConstants.ARG_PARTITION);
+ public static final Option OPTION_COUNTER_PATH =
OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
+ .isRequired(true).withDescription("counter output
path").create(BatchConstants.ARG_COUNTER_OUPUT);
private Options options;
@@ -100,6 +105,7 @@ public class SparkCubeHFile extends AbstractApplication
implements Serializable
options.addOption(OPTION_OUTPUT_PATH);
options.addOption(OPTION_PARTITION_FILE_PATH);
options.addOption(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
+ options.addOption(OPTION_COUNTER_PATH);
}
@Override
@@ -116,6 +122,7 @@ public class SparkCubeHFile extends AbstractApplication
implements Serializable
final String outputPath =
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
final Path partitionFilePath = new
Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
final String hbaseConfFile =
optionsHelper.getOptionValue(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
+ final String counterPath =
optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
Class[] kryoClassArray = new Class[] {
Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class,
KeyValue.class, RowKeyWritable.class };
@@ -226,9 +233,14 @@ public class SparkCubeHFile extends AbstractApplication
implements Serializable
}
}).saveAsNewAPIHadoopDataset(job.getConfiguration());
- // output the data size to console, job engine will parse and save the
metric
- // please note: this mechanism won't work when
spark.submit.deployMode=cluster
logger.info("HDFS: Number of bytes written=" +
jobListener.metrics.getBytesWritten());
+
+ Map<String, String> counterMap = Maps.newHashMap();
+ counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN,
String.valueOf(jobListener.metrics.getBytesWritten()));
+
+ // save counter to hdfs
+ HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath,
counterMap);
+
//HadoopUtil.deleteHDFSMeta(metaUrl);
}