This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git


The following commit(s) were added to refs/heads/master by this push:
     new c5424ce  KYLIN-3477 Save spark job counter to hdfs
c5424ce is described below

commit c5424ce065ccbeb39bb0b9aea1c214a32fb082e8
Author: chao long <[email protected]>
AuthorDate: Mon Aug 27 11:32:47 2018 +0800

    KYLIN-3477 Save spark job counter to hdfs
---
 .../org/apache/kylin/common/util/HadoopUtil.java   | 36 ++++++++++++++++++++++
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  6 +++-
 .../kylin/engine/mr/common/BatchConstants.java     |  2 ++
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |  2 +-
 .../apache/kylin/engine/spark/SparkExecutable.java | 15 +++++++++
 .../kylin/engine/spark/SparkFactDistinct.java      | 15 ++++++++-
 .../kylin/storage/hbase/steps/HBaseSparkSteps.java |  2 +-
 .../kylin/storage/hbase/steps/SparkCubeHFile.java  | 16 ++++++++--
 8 files changed, 88 insertions(+), 6 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 3cb43fc..3aef34a 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -23,6 +23,7 @@ import java.io.DataOutputStream;
 import java.io.IOException;
 import java.net.URI;
 import java.net.URISyntaxException;
+import java.util.Map;
 
 import org.apache.commons.lang.StringUtils;
 import org.apache.hadoop.conf.Configuration;
@@ -30,12 +31,17 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
 import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.util.ReflectionUtils;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.StorageURL;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.collect.Maps;
+
 public class HadoopUtil {
     @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(HadoopUtil.class);
@@ -193,4 +199,34 @@ public class HadoopUtil {
         HadoopUtil.getFileSystem(realHdfsPath).delete(new Path(realHdfsPath), 
true);
         logger.info("Delete metadata in HDFS for this job: " + realHdfsPath);
     }
+
+    @SuppressWarnings("deprecation")
+    public static void writeToSequenceFile(Configuration conf, String 
outputPath, Map<String, String> counterMap) throws IOException {
+        try (SequenceFile.Writer writer = 
SequenceFile.createWriter(getWorkingFileSystem(conf), conf, new 
Path(outputPath), Text.class, Text.class)) {
+            for (Map.Entry<String, String> counterEntry : 
counterMap.entrySet()) {
+                writer.append(new Text(counterEntry.getKey()), new 
Text(counterEntry.getValue()));
+            }
+        }
+    }
+
+    @SuppressWarnings("deprecation")
+    public static Map<String, String> readFromSequenceFile(Configuration conf, 
String inputPath) throws IOException {
+        try (SequenceFile.Reader reader = new 
SequenceFile.Reader(getWorkingFileSystem(conf), new Path(inputPath), conf)) {
+            Map<String, String> map = Maps.newHashMap();
+
+            Text key = (Text) 
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+            Text value = (Text) 
ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+            while (reader.next(key, value)) {
+                map.put(key.toString(), value.toString());
+            }
+
+            return map;
+        }
+    }
+
+    public static Map<String, String> readFromSequenceFile(String inputPath) 
throws IOException {
+        return readFromSequenceFile(getCurrentConfiguration(), inputPath);
+    }
+
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 02e9fe5..5b1f38c 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -334,7 +334,11 @@ public class JobBuilderSupport {
     }
 
     public String getHBaseConfFilePath(String jobId) {
-       return getJobWorkingDir(jobId) + "/hbase-conf.xml";
+        return getJobWorkingDir(jobId) + "/hbase-conf.xml";
+    }
+
+    public String getCounterOuputPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/counter";
     }
 
     // 
============================================================================
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 8c2ba7f..66da1b2 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -106,6 +106,8 @@ public interface BatchConstants {
     String ARG_META_URL = "metadataUrl";
     String ARG_HBASE_CONF_PATH = "hbaseConfPath";
     String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
+    String ARG_COUNTER_OUPUT = "counterOutput";
+
     /**
      * logger and counter
      */
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 5fd7213..3f3c14d 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -103,7 +103,7 @@ public class SparkBatchCubingJobBuilder2 extends 
JobBuilderSupport {
 
         sparkExecutable.setJobId(jobId);
         
sparkExecutable.setName(ExecutableConstants.STEP_NAME_FACT_DISTINCT_COLUMNS);
-        sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + 
CubingJob.SOURCE_SIZE_BYTES);
+        sparkExecutable.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + 
CubingJob.SOURCE_SIZE_BYTES, getCounterOuputPath(jobId));
 
         StringBuilder jars = new StringBuilder();
 
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 6122397..d85337e 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -35,11 +35,13 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.util.CliCommandExecutor;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -80,6 +82,11 @@ public class SparkExecutable extends AbstractExecutable {
         this.setParam(COUNTER_SAVE_AS, value);
     }
 
+    public void setCounterSaveAs(String value, String counterOutputPath) {
+        this.setParam(COUNTER_SAVE_AS, value);
+        this.setParam(BatchConstants.ARG_COUNTER_OUPUT, counterOutputPath);
+    }
+
     public String getCounterSaveAs() {
         return getParam(COUNTER_SAVE_AS);
     }
@@ -286,6 +293,14 @@ public class SparkExecutable extends AbstractExecutable {
                 }
                 // done, update all properties
                 Map<String, String> joblogInfo = patternedLogger.getInfo();
+
+                // read counter from hdfs
+                String counterOutput = 
getParam(BatchConstants.ARG_COUNTER_OUPUT);
+                if (counterOutput != null){
+                    Map<String, String> counterMap = 
HadoopUtil.readFromSequenceFile(counterOutput);
+                    joblogInfo.putAll(counterMap);
+                }
+
                 readCounters(joblogInfo);
                 getManager().addJobInfo(getId(), joblogInfo);
 
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
index 61e2e53..6188a56 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkFactDistinct.java
@@ -73,6 +73,7 @@ import 
org.apache.kylin.engine.mr.steps.FactDistinctColumnsMapper.DictColDeduper
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsReducerMapping;
 import org.apache.kylin.engine.mr.steps.SelfDefineSortableKey;
 import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.hllc.HLLCounter;
 import org.apache.kylin.measure.hllc.RegisterType;
@@ -119,6 +120,8 @@ public class SparkFactDistinct extends AbstractApplication 
implements Serializab
             .withDescription("Hive Intermediate Table").create("hiveTable");
     public static final Option OPTION_INPUT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_INPUT).hasArg()
             .isRequired(true).withDescription("Hive Intermediate Table 
PATH").create(BatchConstants.ARG_INPUT);
+    public static final Option OPTION_COUNTER_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
+            .isRequired(true).withDescription("counter output 
path").create(BatchConstants.ARG_COUNTER_OUPUT);
 
     private Options options;
 
@@ -131,6 +134,7 @@ public class SparkFactDistinct extends AbstractApplication 
implements Serializab
         options.addOption(OPTION_INPUT_PATH);
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_STATS_SAMPLING_PERCENT);
+        options.addOption(OPTION_COUNTER_PATH);
     }
 
     @Override
@@ -146,6 +150,7 @@ public class SparkFactDistinct extends AbstractApplication 
implements Serializab
         String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
         String inputPath = optionsHelper.getOptionValue(OPTION_INPUT_PATH);
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
+        String counterPath = optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
         int samplingPercent = 
Integer.parseInt(optionsHelper.getOptionValue(OPTION_STATS_SAMPLING_PERCENT));
 
         Class[] kryoClassArray = new Class[] { 
Class.forName("scala.reflect.ClassTag$$anon$1"), 
Class.forName("org.apache.kylin.engine.mr.steps.SelfDefineSortableKey") };
@@ -173,6 +178,7 @@ public class SparkFactDistinct extends AbstractApplication 
implements Serializab
         logger.info("RDD Output path: {}", outputPath);
         logger.info("getTotalReducerNum: {}", 
reducerMapping.getTotalReducerNum());
         logger.info("getCuboidRowCounterReducerNum: {}", 
reducerMapping.getCuboidRowCounterReducerNum());
+        logger.info("counter path {}", counterPath);
 
         boolean isSequenceFile = 
JoinedFlatTable.SEQUENCEFILE.equalsIgnoreCase(envConfig.getFlatTableStorageFormat());
 
@@ -202,13 +208,20 @@ public class SparkFactDistinct extends 
AbstractApplication implements Serializab
 
         
multipleOutputsRDD.saveAsNewAPIHadoopDatasetWithMultipleOutputs(job.getConfiguration());
 
-        // only work for client mode, not work when 
spark.submit.deployMode=cluster
         logger.info("Map input records={}", recordRDD.count());
         logger.info("HDFS Read: {} HDFS Write", bytesWritten.value());
 
+        Map<String, String> counterMap = Maps.newHashMap();
+        counterMap.put(ExecutableConstants.SOURCE_RECORDS_COUNT, 
String.valueOf(recordRDD.count()));
+        counterMap.put(ExecutableConstants.SOURCE_RECORDS_SIZE, 
String.valueOf(bytesWritten.value()));
+
+        // save counter to hdfs
+        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, 
counterMap);
+
         HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 
+
     static class FlatOutputFucntion implements 
PairFlatMapFunction<Iterator<String[]>, SelfDefineSortableKey, Text> {
         private volatile transient boolean initialized = false;
         private String cubeName;
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
index be230f0..ccab22f 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseSparkSteps.java
@@ -71,7 +71,7 @@ public class HBaseSparkSteps extends HBaseJobSteps {
         sparkExecutable.setJars(jars.toString());
 
         
sparkExecutable.setName(ExecutableConstants.STEP_NAME_CONVERT_CUBOID_TO_HFILE);
-        sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES);
+        sparkExecutable.setCounterSaveAs(",," + CubingJob.CUBE_SIZE_BYTES, 
getCounterOuputPath(jobId));
 
         return sparkExecutable;
     }
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
index c87a739..539f03b 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/SparkCubeHFile.java
@@ -23,8 +23,10 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
+import java.util.Map;
 import java.util.Objects;
 
+import com.google.common.collect.Maps;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
@@ -55,6 +57,7 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.SerializableConfiguration;
 import org.apache.kylin.engine.spark.KylinSparkJobListener;
 import org.apache.kylin.engine.spark.SparkUtil;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.measure.MeasureCodec;
 import org.apache.spark.Partitioner;
 import org.apache.spark.SparkConf;
@@ -88,6 +91,8 @@ public class SparkCubeHFile extends AbstractApplication 
implements Serializable
             .isRequired(true).withDescription("Cuboid files 
PATH").create(BatchConstants.ARG_INPUT);
     public static final Option OPTION_PARTITION_FILE_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
             .hasArg().isRequired(true).withDescription("Partition file 
path.").create(BatchConstants.ARG_PARTITION);
+    public static final Option OPTION_COUNTER_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_COUNTER_OUPUT).hasArg()
+            .isRequired(true).withDescription("counter output 
path").create(BatchConstants.ARG_COUNTER_OUPUT);
 
     private Options options;
 
@@ -100,6 +105,7 @@ public class SparkCubeHFile extends AbstractApplication 
implements Serializable
         options.addOption(OPTION_OUTPUT_PATH);
         options.addOption(OPTION_PARTITION_FILE_PATH);
         options.addOption(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
+        options.addOption(OPTION_COUNTER_PATH);
     }
 
     @Override
@@ -116,6 +122,7 @@ public class SparkCubeHFile extends AbstractApplication 
implements Serializable
         final String outputPath = 
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
         final Path partitionFilePath = new 
Path(optionsHelper.getOptionValue(OPTION_PARTITION_FILE_PATH));
         final String hbaseConfFile = 
optionsHelper.getOptionValue(AbstractHadoopJob.OPTION_HBASE_CONF_PATH);
+        final String counterPath = 
optionsHelper.getOptionValue(OPTION_COUNTER_PATH);
 
         Class[] kryoClassArray = new Class[] { 
Class.forName("scala.reflect.ClassTag$$anon$1"), KeyValueCreator.class,
                 KeyValue.class, RowKeyWritable.class };
@@ -226,9 +233,14 @@ public class SparkCubeHFile extends AbstractApplication 
implements Serializable
                     }
                 }).saveAsNewAPIHadoopDataset(job.getConfiguration());
 
-        // output the data size to console, job engine will parse and save the 
metric
-        // please note: this mechanism won't work when 
spark.submit.deployMode=cluster
         logger.info("HDFS: Number of bytes written=" + 
jobListener.metrics.getBytesWritten());
+
+        Map<String, String> counterMap = Maps.newHashMap();
+        counterMap.put(ExecutableConstants.HDFS_BYTES_WRITTEN, 
String.valueOf(jobListener.metrics.getBytesWritten()));
+
+        // save counter to hdfs
+        HadoopUtil.writeToSequenceFile(sc.hadoopConfiguration(), counterPath, 
counterMap);
+
         //HadoopUtil.deleteHDFSMeta(metaUrl);
     }
 

Reply via email to