KYLIN-2811, refine spark cubing

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/2d939a59
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/2d939a59
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/2d939a59

Branch: refs/heads/master
Commit: 2d939a59fdf385bb20b4724e8a6f87879e26bdd7
Parents: ecf4819
Author: Cheng Wang <cheng.w...@kyligence.io>
Authored: Tue Sep 5 19:10:47 2017 +0800
Committer: Roger Shi <rogershijich...@gmail.com>
Committed: Tue Sep 5 21:14:02 2017 +0800

----------------------------------------------------------------------
 .../org/apache/kylin/common/KylinConfig.java    | 23 ++---
 .../engine/mr/common/AbstractHadoopJob.java     | 26 ++++++
 .../kylin/engine/mr/common/BatchConstants.java  |  1 +
 .../spark/SparkBatchCubingJobBuilder2.java      | 10 +--
 .../kylin/engine/spark/SparkCubingByLayer.java  | 95 +++++++++-----------
 .../kylin/engine/spark/SparkExecutable.java     | 30 +++++--
 6 files changed, 108 insertions(+), 77 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
----------------------------------------------------------------------
diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
index a003638..c7c7f60 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfig.java
@@ -18,15 +18,6 @@
 
 package org.apache.kylin.common;
 
-import com.google.common.base.Preconditions;
-import org.apache.commons.io.IOUtils;
-import org.apache.commons.lang.StringUtils;
-import org.apache.kylin.common.restclient.RestClient;
-import org.apache.kylin.common.util.ClassUtil;
-import org.apache.kylin.common.util.OrderedProperties;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import java.io.BufferedReader;
 import java.io.File;
 import java.io.FileInputStream;
@@ -41,6 +32,16 @@ import java.nio.charset.Charset;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.lang.StringUtils;
+import org.apache.kylin.common.restclient.RestClient;
+import org.apache.kylin.common.util.ClassUtil;
+import org.apache.kylin.common.util.OrderedProperties;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
 /**
  */
 public class KylinConfig extends KylinConfigBase {
@@ -58,7 +59,7 @@ public class KylinConfig extends KylinConfigBase {
 
     // thread-local instances, will override SYS_ENV_INSTANCE
     private static transient ThreadLocal<KylinConfig> THREAD_ENV_INSTANCE = 
new ThreadLocal<>();
-    
+
     static {
         /*
          * Make Calcite to work with Unicode.
@@ -226,7 +227,7 @@ public class KylinConfig extends KylinConfigBase {
         }
     }
 
-    private static Properties streamToProps(InputStream is) throws IOException 
{
+    public static Properties streamToProps(InputStream is) throws IOException {
         Properties prop = new Properties();
         prop.load(is);
         IOUtils.closeQuietly(is);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 081ac93..292c57d 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -27,10 +27,12 @@ import static org.apache.hadoop.util.StringUtils.formatTime;
 
 import java.io.File;
 import java.io.IOException;
+import java.io.InputStream;
 import java.util.HashMap;
 import java.util.LinkedHashSet;
 import java.util.Map;
 import java.util.Map.Entry;
+import java.util.Properties;
 import java.util.Set;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
@@ -464,6 +466,30 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
         }
     }
 
+    public static KylinConfig loadKylinConfigFromHdfs(String uri) {
+        if (uri == null)
+            throw new IllegalArgumentException("meta url should not be null");
+
+        if (!uri.contains("@hdfs"))
+            throw new IllegalArgumentException("meta url should like @hdfs 
schema");
+
+        logger.info("Ready to load KylinConfig from uri: {}", uri);
+        KylinConfig config;
+        FileSystem fs;
+        int cut = uri.indexOf('@');
+        String realHdfsPath = uri.substring(0, cut) + "/" + 
KylinConfig.KYLIN_CONF_PROPERTIES_FILE;
+        try {
+            fs = HadoopUtil.getFileSystem(realHdfsPath);
+            InputStream is = fs.open(new Path(realHdfsPath));
+            Properties prop = KylinConfig.streamToProps(is);
+            config = KylinConfig.createKylinConfig(prop);
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        KylinConfig.setKylinConfigThreadLocal(config);
+        return config;
+    }
+
     protected void attachTableMetadata(TableDesc table, Configuration conf) 
throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
         dumpList.add(table.getResourcePath());

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 84ca006..bbf38e5 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -65,6 +65,7 @@ public interface BatchConstants {
     String CFG_OUTPUT_STATISTICS = "statistics";
     String CFG_OUTPUT_PARTITION = "partition";
     String CFG_MR_SPARK_JOB = "mr.spark.job";
+    String CFG_SPARK_META_URL = "spark.meta.url";
 
     /**
      * command line ARGuments

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 779f340..2773f97 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -47,9 +47,10 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_SEGMENT_ID.getOpt(), 
seg.getUuid());
-        
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(), 
seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + 
flatTableDesc.getTableName());
-        sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(), 
getSegmentMetadataUrl(seg.getConfig(), seg.getUuid()));
-        sparkExecutable.setParam(SparkCubingByLayer.OPTION_CONF_PATH.getOpt(), 
KylinConfig.getKylinConfDir().getAbsolutePath());
+        
sparkExecutable.setParam(SparkCubingByLayer.OPTION_INPUT_TABLE.getOpt(),
+                seg.getConfig().getHiveDatabaseForIntermediateTable() + "." + 
flatTableDesc.getTableName());
+        sparkExecutable.setParam(SparkCubingByLayer.OPTION_META_URL.getOpt(),
+                getSegmentMetadataUrl(seg.getConfig(), seg.getUuid()));
         
sparkExecutable.setParam(SparkCubingByLayer.OPTION_OUTPUT_PATH.getOpt(), 
cuboidRootPath);
 
         StringBuilder jars = new StringBuilder();
@@ -57,9 +58,6 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         StringUtil.appendWithSeparator(jars, 
findJar("org.htrace.HTraceConfiguration", null)); // htrace-core.jar
         StringUtil.appendWithSeparator(jars, 
findJar("org.apache.htrace.Trace", null)); // htrace-core.jar
         StringUtil.appendWithSeparator(jars, 
findJar("org.cloudera.htrace.HTraceConfiguration", null)); // htrace-core.jar
-        StringUtil.appendWithSeparator(jars, 
findJar("org.apache.hadoop.hbase.client.HConnection", null)); // 
hbase-client.jar
-        StringUtil.appendWithSeparator(jars, 
findJar("org.apache.hadoop.hbase.HBaseConfiguration", null)); // 
hbase-common.jar
-        StringUtil.appendWithSeparator(jars, 
findJar("org.apache.hadoop.hbase.util.ByteStringer", null)); // 
hbase-protocol.jar
         StringUtil.appendWithSeparator(jars, 
findJar("com.yammer.metrics.core.Gauge", null)); // metrics-core.jar
         StringUtil.appendWithSeparator(jars, 
findJar("com.google.common.collect.Maps", "guava")); //guava.jar
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index dab5fb7..94435f5 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,7 +17,6 @@
 */
 package org.apache.kylin.engine.spark;
 
-import java.io.File;
 import java.io.IOException;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
@@ -54,6 +53,7 @@ import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.IMROutput2;
 import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CubeStatsReader;
@@ -63,7 +63,6 @@ import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.MeasureDesc;
 import org.apache.spark.SparkConf;
-import org.apache.spark.SparkFiles;
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function;
@@ -74,7 +73,6 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
 import org.apache.spark.storage.StorageLevel;
-import org.apache.spark.util.SizeEstimator;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -97,8 +95,6 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             .isRequired(true).withDescription("Cube output 
path").create(BatchConstants.ARG_OUTPUT);
     public static final Option OPTION_INPUT_TABLE = 
OptionBuilder.withArgName("hiveTable").hasArg().isRequired(true)
             .withDescription("Hive Intermediate Table").create("hiveTable");
-    public static final Option OPTION_CONF_PATH = 
OptionBuilder.withArgName("confPath").hasArg().isRequired(true)
-            .withDescription("Configuration Path").create("confPath");
 
     private Options options;
 
@@ -109,7 +105,6 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         options.addOption(OPTION_SEGMENT_ID);
         options.addOption(OPTION_META_URL);
         options.addOption(OPTION_OUTPUT_PATH);
-        options.addOption(OPTION_CONF_PATH);
     }
 
     @Override
@@ -117,22 +112,12 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         return options;
     }
 
-    public static KylinConfig getKylinConfigForExecutor(String metaUrl) {
-        File file = new 
File(SparkFiles.get(KylinConfig.KYLIN_CONF_PROPERTIES_FILE));
-        String confPath = file.getParentFile().getAbsolutePath();
-        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
-        final KylinConfig config = KylinConfig.getInstanceFromEnv();
-        config.setMetadataUrl(metaUrl);
-        return config;
-    }
-
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
         String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
         String hiveTable = optionsHelper.getOptionValue(OPTION_INPUT_TABLE);
         String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
         String segmentId = optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
-        String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
         String outputPath = optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
 
         Class[] kryoClassArray = new Class[] { org.apache.hadoop.io.Text.class,
@@ -147,14 +132,19 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         JavaSparkContext sc = new JavaSparkContext(conf);
         HadoopUtil.deletePath(sc.hadoopConfiguration(), new Path(outputPath));
 
-        sc.addFile(confPath + File.separator + 
KylinConfig.KYLIN_CONF_PROPERTIES_FILE);
-        System.setProperty(KylinConfig.KYLIN_CONF, confPath);
-        KylinConfig envConfig = KylinConfig.getInstanceFromEnv();
+        KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
 
         final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
         final CubeDesc cubeDesc = cubeInstance.getDescriptor();
         final CubeSegment cubeSegment = cubeInstance.getSegmentById(segmentId);
 
+        Configuration confOverwrite = new 
Configuration(sc.hadoopConfiguration());
+        confOverwrite.set("dfs.replication", "2"); // cuboid intermediate 
files, replication=2
+        final Job job = Job.getInstance(confOverwrite);
+
+        logger.info("RDD Output path: {}", outputPath);
+        setHadoopConf(job);
+
         int countMeasureIndex = 0;
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
             if (measureDesc.getFunction().isCount() == true) {
@@ -194,26 +184,22 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         final int totalLevels = cubeDesc.getBuildLevel();
         JavaPairRDD<ByteArray, Object[]>[] allRDDs = new 
JavaPairRDD[totalLevels + 1];
         int level = 0;
-        long baseRDDSize = SizeEstimator.estimate(encodedBaseRDD) / (1024 * 
1024);
-        int partition = estimateRDDPartitionNum(level, cubeStatsReader, 
envConfig, (int) baseRDDSize);
+        int partition = estimateRDDPartitionNum(level, cubeStatsReader, 
envConfig);
 
         // aggregate to calculate base cuboid
         allRDDs[0] = encodedBaseRDD.reduceByKey(baseCuboidReducerFunction, 
partition).persist(storageLevel);
-        Configuration confOverwrite = new 
Configuration(sc.hadoopConfiguration());
-        confOverwrite.set("dfs.replication", "2"); // cuboid intermediate 
files, replication=2
 
-        saveToHDFS(allRDDs[0], cubeName, metaUrl, cubeSegment, outputPath, 0, 
confOverwrite);
+        saveToHDFS(allRDDs[0], metaUrl, cubeName, cubeSegment, outputPath, 0, 
job);
 
         // aggregate to ND cuboids
         for (level = 1; level <= totalLevels; level++) {
-            long levelRddSize = SizeEstimator.estimate(allRDDs[level - 1]) / 
(1024 * 1024);
-            partition = estimateRDDPartitionNum(level, cubeStatsReader, 
envConfig, (int) levelRddSize);
+            partition = estimateRDDPartitionNum(level, cubeStatsReader, 
envConfig);
             allRDDs[level] = allRDDs[level - 1].flatMapToPair(new 
CuboidFlatMap(cubeName, segmentId, metaUrl))
                     .reduceByKey(reducerFunction2, 
partition).persist(storageLevel);
             if (envConfig.isSparkSanityCheckEnabled() == true) {
                 sanityCheck(allRDDs[level], totalCount, level, 
cubeStatsReader, countMeasureIndex);
             }
-            saveToHDFS(allRDDs[level], cubeName, metaUrl, cubeSegment, 
outputPath, level, confOverwrite);
+            saveToHDFS(allRDDs[level], metaUrl, cubeName, cubeSegment, 
outputPath, level, job);
             allRDDs[level - 1].unpersist();
         }
         allRDDs[totalLevels].unpersist();
@@ -221,9 +207,13 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         deleteHDFSMeta(metaUrl);
     }
 
-    private static int estimateRDDPartitionNum(int level, CubeStatsReader 
statsReader, KylinConfig kylinConfig,
-            int rddSize) {
-        int baseCuboidSize = (int) Math.min(rddSize, 
statsReader.estimateLayerSize(level));
+    protected void setHadoopConf(Job job) throws Exception {
+        job.setOutputKeyClass(Text.class);
+        job.setOutputValueClass(Text.class);
+    }
+
+    protected int estimateRDDPartitionNum(int level, CubeStatsReader 
statsReader, KylinConfig kylinConfig) {
+        double baseCuboidSize = statsReader.estimateLayerSize(level);
         float rddCut = kylinConfig.getSparkRDDPartitionCutMB();
         int partition = (int) (baseCuboidSize / rddCut);
         partition = Math.max(kylinConfig.getSparkMinPartition(), partition);
@@ -231,20 +221,13 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         return partition;
     }
 
-    private void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, final 
String cubeName, final String metaUrl,
-            final CubeSegment cubeSeg, final String hdfsBaseLocation, int 
level, Configuration conf) throws Exception {
+    protected void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, 
final String metaUrl, final String cubeName,
+            final CubeSegment cubeSeg, final String hdfsBaseLocation, int 
level, Job job) throws Exception {
         final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
 
-        Job job = Job.getInstance(conf);
         IMROutput2.IMROutputFormat outputFormat = 
MRUtil.getBatchCubingOutputSide2(cubeSeg).getOuputFormat();
         outputFormat.configureJobOutput(job, cuboidOutputPath, cubeSeg, level);
 
-        job.setOutputKeyClass(Text.class);
-        job.setOutputValueClass(Text.class);
-        job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, 
cubeSeg.getCubeInstance().getName());
-        job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
cubeSeg.getUuid());
-        job.getConfiguration().set(BatchConstants.CFG_MR_SPARK_JOB, "spark");
-
         rdd.mapToPair(
                 new PairFunction<Tuple2<ByteArray, Object[]>, 
org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
                     private volatile transient boolean initialized = false;
@@ -253,10 +236,11 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
                     @Override
                     public Tuple2<org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text> call(
                             Tuple2<ByteArray, Object[]> tuple2) throws 
Exception {
-                        if (!initialized) {
+
+                        if (initialized == false) {
                             synchronized (SparkCubingByLayer.class) {
-                                if (!initialized) {
-                                    KylinConfig kylinConfig = 
getKylinConfigForExecutor(metaUrl);
+                                if (initialized == false) {
+                                    KylinConfig kylinConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
                                     CubeDesc desc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeName);
                                     codec = new 
BufferedMeasureCodec(desc.getMeasures());
                                     initialized = true;
@@ -269,21 +253,22 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
                         return new Tuple2<>(new 
org.apache.hadoop.io.Text(tuple2._1().array()),
                                 new org.apache.hadoop.io.Text(encodedBytes));
                     }
-                
}).sortByKey().saveAsNewAPIHadoopDataset(job.getConfiguration());
+
+                }).saveAsNewAPIHadoopDataset(job.getConfiguration());
         logger.info("Persisting RDD for level " + level + " into " + 
cuboidOutputPath);
     }
 
-    static class EncodeBaseCuboid implements PairFunction<Row, ByteArray, 
Object[]> {
+    static public class EncodeBaseCuboid implements PairFunction<Row, 
ByteArray, Object[]> {
         private volatile transient boolean initialized = false;
         private BaseCuboidBuilder baseCuboidBuilder = null;
         private String cubeName;
         private String segmentId;
-        private String metaurl;
+        private String metaUrl;
 
         public EncodeBaseCuboid(String cubeName, String segmentId, String 
metaurl) {
             this.cubeName = cubeName;
             this.segmentId = segmentId;
-            this.metaurl = metaurl;
+            this.metaUrl = metaurl;
         }
 
         @Override
@@ -291,7 +276,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             if (initialized == false) {
                 synchronized (SparkCubingByLayer.class) {
                     if (initialized == false) {
-                        KylinConfig kConfig = 
getKylinConfigForExecutor(metaurl);
+                        KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
                         CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
                         CubeDesc cubeDesc = cubeInstance.getDescriptor();
                         CubeSegment cubeSegment = 
cubeInstance.getSegmentById(segmentId);
@@ -327,7 +312,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         }
     }
 
-    static class BaseCuboidReducerFunction2 implements Function2<Object[], 
Object[], Object[]> {
+    static public class BaseCuboidReducerFunction2 implements 
Function2<Object[], Object[], Object[]> {
         protected String cubeName;
         protected String metaUrl;
         protected CubeDesc cubeDesc;
@@ -341,7 +326,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         }
 
         public void init() {
-            KylinConfig kConfig = getKylinConfigForExecutor(metaUrl);
+            KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
             CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
             cubeDesc = cubeInstance.getDescriptor();
             aggregators = new MeasureAggregators(cubeDesc.getMeasures());
@@ -364,7 +349,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         }
     }
 
-    static class CuboidReducerFunction2 extends BaseCuboidReducerFunction2 {
+    static public class CuboidReducerFunction2 extends 
BaseCuboidReducerFunction2 {
         private boolean[] needAggr;
 
         public CuboidReducerFunction2(String cubeName, String metaUrl, 
boolean[] needAggr) {
@@ -390,7 +375,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
 
     private static final java.lang.Iterable<Tuple2<ByteArray, Object[]>> 
EMTPY_ITERATOR = new ArrayList(0);
 
-    static class CuboidFlatMap implements 
PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
+    static public class CuboidFlatMap implements 
PairFlatMapFunction<Tuple2<ByteArray, Object[]>, ByteArray, Object[]> {
 
         private String cubeName;
         private String segmentId;
@@ -409,11 +394,12 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         }
 
         public void init() {
-            KylinConfig kConfig = getKylinConfigForExecutor(metaUrl);
+            KylinConfig kConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(metaUrl);
             CubeInstance cubeInstance = 
CubeManager.getInstance(kConfig).getCube(cubeName);
             this.cubeSegment = cubeInstance.getSegmentById(segmentId);
             this.cubeDesc = cubeInstance.getDescriptor();
             this.cuboidScheduler = cubeDesc.getCuboidScheduler();
+
             this.ndCuboidBuilder = new NDCuboidBuilder(cubeSegment, new 
RowKeyEncoderProvider(cubeSegment));
             this.rowKeySplitter = new RowKeySplitter(cubeSegment, 65, 256);
         }
@@ -456,8 +442,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         }
     }
 
-    //sanity check
-    private void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long 
totalCount, int thisLevel,
+    protected void sanityCheck(JavaPairRDD<ByteArray, Object[]> rdd, Long 
totalCount, int thisLevel,
             CubeStatsReader cubeStatsReader, final int countMeasureIndex) {
         int thisCuboidNum = 
cubeStatsReader.getCuboidsByLayer(thisLevel).size();
         Long count2 = getRDDCountSum(rdd, countMeasureIndex);
@@ -487,7 +472,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         return count;
     }
 
-    private void deleteHDFSMeta(String metaUrl) throws IOException {
+    protected void deleteHDFSMeta(String metaUrl) throws IOException {
         int cut = metaUrl.indexOf('@');
         String path = metaUrl.substring(0, cut);
         HadoopUtil.getFileSystem(path).delete(new Path(path), true);

http://git-wip-us.apache.org/repos/asf/kylin/blob/2d939a59/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index d369e3d..7f4b377 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -34,11 +34,13 @@ import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.engine.mr.common.JobRelatedMetaUtil;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.exception.ExecuteException;
 import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.job.execution.ExecuteResult;
 import org.slf4j.LoggerFactory;
 
@@ -50,11 +52,16 @@ public class SparkExecutable extends AbstractExecutable {
 
     private static final String CLASS_NAME = "className";
     private static final String JARS = "jars";
+    private static final String JOB_ID = "jobId";
 
     public void setClassName(String className) {
         this.setParam(CLASS_NAME, className);
     }
 
+    public void setJobId(String jobId) {
+        this.setParam(JOB_ID, jobId);
+    }
+
     public void setJars(String jars) {
         this.setParam(JARS, jars);
     }
@@ -66,7 +73,7 @@ public class SparkExecutable extends AbstractExecutable {
             tmp.append("-").append(entry.getKey()).append(" 
").append(entry.getValue()).append(" ");
             if (entry.getKey().equals(CLASS_NAME)) {
                 stringBuilder.insert(0, tmp);
-            } else if (entry.getKey().equals(JARS)) {
+            } else if (entry.getKey().equals(JARS) || 
entry.getKey().equals(JOB_ID)) {
                 // JARS is for spark-submit, not for app
                 continue;
             } else {
@@ -86,6 +93,8 @@ public class SparkExecutable extends AbstractExecutable {
         CubeInstance cube = 
CubeManager.getInstance(context.getConfig()).getCube(cubeName);
         final KylinConfig config = cube.getConfig();
 
+        setAlgorithmLayer();
+
         if (KylinConfig.getSparkHome() == null) {
             throw new NullPointerException();
         }
@@ -99,7 +108,8 @@ public class SparkExecutable extends AbstractExecutable {
         hadoopConf = System.getProperty("kylin.hadoop.conf.dir");
 
         if (StringUtils.isEmpty(hadoopConf)) {
-            throw new RuntimeException("kylin_hadoop_conf_dir is empty, check 
if there's error in the output of 'kylin.sh start'");
+            throw new RuntimeException(
+                    "kylin_hadoop_conf_dir is empty, check if there's error in 
the output of 'kylin.sh start'");
         }
 
         File hiveConfFile = new File(hadoopConf, "hive-site.xml");
@@ -124,7 +134,8 @@ public class SparkExecutable extends AbstractExecutable {
         }
 
         StringBuilder stringBuilder = new StringBuilder();
-        stringBuilder.append("export HADOOP_CONF_DIR=%s && %s/bin/spark-submit 
--class org.apache.kylin.common.util.SparkEntry ");
+        stringBuilder.append(
+                "export HADOOP_CONF_DIR=%s && %s/bin/spark-submit --class 
org.apache.kylin.common.util.SparkEntry ");
 
         Map<String, String> sparkConfs = config.getSparkConfigOverride();
         for (Map.Entry<String, String> entry : sparkConfs.entrySet()) {
@@ -133,7 +144,8 @@ public class SparkExecutable extends AbstractExecutable {
 
         stringBuilder.append("--jars %s %s %s");
         try {
-            String cmd = String.format(stringBuilder.toString(), hadoopConf, 
KylinConfig.getSparkHome(), jars, jobJar, formatArgs());
+            String cmd = String.format(stringBuilder.toString(), hadoopConf, 
KylinConfig.getSparkHome(), jars, jobJar,
+                    formatArgs());
             logger.info("cmd: " + cmd);
             CliCommandExecutor exec = new CliCommandExecutor();
             PatternedLogger patternedLogger = new PatternedLogger(logger);
@@ -146,6 +158,13 @@ public class SparkExecutable extends AbstractExecutable {
         }
     }
 
+    // Spark Cubing can only work in layer algorithm
+    private void setAlgorithmLayer() {
+        ExecutableManager execMgr = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
+        CubingJob cubingJob = (CubingJob) 
execMgr.getJob(this.getParam(JOB_ID));
+        cubingJob.setAlgorithm(CubingJob.AlgorithmEnum.LAYER);
+    }
+
     private void attachSegmentMetadataWithDict(CubeSegment segment) throws 
IOException {
         Set<String> dumpList = new LinkedHashSet<>();
         
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segment.getCubeInstance()));
@@ -154,7 +173,8 @@ public class SparkExecutable extends AbstractExecutable {
         dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) 
segment.getConfig());
     }
 
-    private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, 
KylinConfigExt kylinConfig) throws IOException {
+    private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, 
KylinConfigExt kylinConfig)
+            throws IOException {
         File tmp = File.createTempFile("kylin_job_meta", "");
         FileUtils.forceDelete(tmp); // we need a directory, so delete the file 
first
 

Reply via email to