KYLIN-2421 Add spark engine to Integration Test

Project: http://git-wip-us.apache.org/repos/asf/kylin/repo
Commit: http://git-wip-us.apache.org/repos/asf/kylin/commit/fac9f358
Tree: http://git-wip-us.apache.org/repos/asf/kylin/tree/fac9f358
Diff: http://git-wip-us.apache.org/repos/asf/kylin/diff/fac9f358

Branch: refs/heads/spark-it
Commit: fac9f358c374d6ce92a2cf04c72ccda9049c27cb
Parents: 546f88f
Author: shaofengshi <shaofeng...@apache.org>
Authored: Fri Jan 20 11:28:57 2017 +0800
Committer: shaofengshi <shaofeng...@apache.org>
Committed: Sat Feb 4 14:13:44 2017 +0800

----------------------------------------------------------------------
 .../apache/kylin/common/KylinConfigBase.java    |  10 ++
 .../measure/bitmap/RoaringBitmapCounter.java    |   3 +-
 .../bitmap/RoaringBitmapCounterFactory.java     |   3 +-
 .../measure/percentile/PercentileCounter.java   |  22 ++-
 .../percentile/PercentileSerializer.java        |   6 +-
 .../kylin/measure/topn/TopNAggregator.java      |   5 +-
 .../percentile/PercentileCounterTest.java       |  47 ++++++
 .../kylin/engine/mr/BatchCubingJobBuilder2.java |   8 +-
 .../engine/spark/KylinKryoRegistrator.java      | 161 +++++++++++++++++++
 .../spark/SparkBatchCubingJobBuilder2.java      |  12 +-
 .../apache/kylin/engine/spark/SparkCubing.java  | 123 +-------------
 .../kylin/engine/spark/SparkCubingByLayer.java  |  65 ++++----
 .../localmeta/cube_desc/ci_inner_join_cube.json |  14 +-
 examples/test_case_data/sandbox/core-site.xml   |   2 +
 .../test_case_data/sandbox/kylin.properties     |  29 ++--
 kylin-it/pom.xml                                |  21 +++
 .../kylin/provision/BuildCubeWithEngine.java    |  25 +++
 17 files changed, 355 insertions(+), 201 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
----------------------------------------------------------------------
diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 6a88fc4..fe15b1e 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -68,6 +68,12 @@ abstract public class KylinConfigBase implements 
Serializable {
             return sparkHome;
         }
 
+        sparkHome = System.getProperty("SPARK_HOME");
+        if (StringUtils.isNotEmpty(sparkHome)) {
+            logger.info("SPARK_HOME was set to " + sparkHome);
+            return sparkHome;
+        }
+
         return getKylinHome() + File.separator + "spark";
     }
 
@@ -760,6 +766,10 @@ abstract public class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.engine.spark.env.hadoop-conf-dir", "");
     }
 
+    public void setHadoopConfDir(String hadoopConfDir) {
+        setProperty("kylin.engine.spark.env.hadoop-conf-dir", hadoopConfDir);
+    }
+
     public String getSparkAdditionalJars() {
         return getOptional("kylin.engine.spark.additional-jars", "");
     }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
index fb9dcfc..eec45f2 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounter.java
@@ -24,6 +24,7 @@ import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 import java.io.DataOutputStream;
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.BufferOverflowException;
 import java.nio.ByteBuffer;
 import java.util.Iterator;
@@ -31,7 +32,7 @@ import java.util.Iterator;
 /**
  * A {@link BitmapCounter} based on roaring bitmap.
  */
-public class RoaringBitmapCounter implements BitmapCounter {
+public class RoaringBitmapCounter implements BitmapCounter, Serializable {
 
     private ImmutableRoaringBitmap bitmap;
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
index a71df95..822afa2 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/RoaringBitmapCounterFactory.java
@@ -21,9 +21,10 @@ package org.apache.kylin.measure.bitmap;
 import org.roaringbitmap.buffer.MutableRoaringBitmap;
 
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.ByteBuffer;
 
-public class RoaringBitmapCounterFactory implements BitmapCounterFactory {
+public class RoaringBitmapCounterFactory implements BitmapCounterFactory, 
Serializable {
     public static final BitmapCounterFactory INSTANCE = new 
RoaringBitmapCounterFactory();
 
     private RoaringBitmapCounterFactory() {}

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
index bf505cf..f86a796 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileCounter.java
@@ -18,6 +18,9 @@
 
 package org.apache.kylin.measure.percentile;
 
+import java.io.IOException;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.io.Serializable;
 import java.nio.ByteBuffer;
 
@@ -30,7 +33,7 @@ public class PercentileCounter implements Serializable {
     double compression;
     double quantileRatio;
 
-    TDigest registers;
+    transient TDigest registers;
 
     public PercentileCounter(double compression) {
         this(compression, INVALID_QUANTILE_RATIO);
@@ -94,4 +97,21 @@ public class PercentileCounter implements Serializable {
     public void clear() {
         reInitRegisters();
     }
+
+    private void writeObject(ObjectOutputStream out) throws IOException {
+        registers.compress();
+        int bound = registers.byteSize();
+        ByteBuffer buf = ByteBuffer.allocate(bound);
+        registers.asSmallBytes(buf);
+        out.defaultWriteObject();
+        out.writeInt(bound);
+        out.write(buf.array(), 0, bound);
+    }
+    private void readObject(ObjectInputStream in) throws IOException, 
ClassNotFoundException {
+        in.defaultReadObject();
+        int bound = in.readInt();
+        ByteBuffer buf = ByteBuffer.allocate(bound);
+        in.read(buf.array(), 0, bound);
+        registers = AVLTreeDigest.fromBytes(buf);
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
index a0a2a77..d7e4204 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/percentile/PercentileSerializer.java
@@ -25,7 +25,7 @@ import org.apache.kylin.metadata.datatype.DataTypeSerializer;
 
 public class PercentileSerializer extends 
DataTypeSerializer<PercentileCounter> {
     // be thread-safe and avoid repeated obj creation
-    private ThreadLocal<PercentileCounter> current = new ThreadLocal<>();
+    private transient ThreadLocal<PercentileCounter> current = null;
 
     private double compression;
 
@@ -49,6 +49,10 @@ public class PercentileSerializer extends 
DataTypeSerializer<PercentileCounter>
     }
 
     private PercentileCounter current() {
+        if (current == null) {
+            current = new ThreadLocal<>();
+        }
+
         PercentileCounter counter = current.get();
         if (counter == null) {
             counter = new PercentileCounter(compression);

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
index b5e316f..bc2bc36 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/topn/TopNAggregator.java
@@ -46,10 +46,11 @@ public class TopNAggregator extends 
MeasureAggregator<TopNCounter<ByteArray>> {
 
     @Override
     public TopNCounter<ByteArray> aggregate(TopNCounter<ByteArray> value1, 
TopNCounter<ByteArray> value2) {
-        TopNCounter<ByteArray> aggregated = new TopNCounter<>(capacity * 2);
+        int thisCapacity = value1.getCapacity();
+        TopNCounter<ByteArray> aggregated = new TopNCounter<>(thisCapacity * 
2);
         aggregated.merge(value1);
         aggregated.merge(value2);
-        aggregated.retain(capacity);
+        aggregated.retain(thisCapacity);
         return aggregated;
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java
----------------------------------------------------------------------
diff --git 
a/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java
 
b/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java
index abaa409..94a1233 100644
--- 
a/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java
+++ 
b/core-metadata/src/test/java/org/apache/kylin/measure/percentile/PercentileCounterTest.java
@@ -20,11 +20,19 @@ package org.apache.kylin.measure.percentile;
 
 import static org.junit.Assert.assertEquals;
 
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.ObjectInputStream;
+import java.io.ObjectOutputStream;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
 
+import org.apache.commons.io.IOUtils;
 import org.apache.kylin.common.util.MathUtil;
+import org.junit.Assert;
 import org.junit.Test;
 
 import com.google.common.collect.Lists;
@@ -76,4 +84,43 @@ public class PercentileCounterTest {
 
         assertEquals(expectedResult, actualResult, 0);
     }
+
+    @Test
+    public void testSerialization() {
+        double compression = 100;
+        double quantile = 0.5;
+        ByteArrayOutputStream os = new ByteArrayOutputStream(1024);
+        ObjectOutputStream out = null;
+        PercentileCounter origin_counter = null;
+        try {
+            out = new ObjectOutputStream(os);
+
+            origin_counter = new PercentileCounter(compression, quantile);
+            out.writeObject(origin_counter);
+
+        } catch (IOException e) {
+            e.printStackTrace();
+        } finally {
+            IOUtils.closeQuietly(out);
+        }
+
+        InputStream is = new ByteArrayInputStream(os.toByteArray());
+        PercentileCounter serialized_counter = null;
+        ObjectInputStream in = null;
+        try {
+            in = new ObjectInputStream(is);
+            serialized_counter = (PercentileCounter)in.readObject();
+
+            Assert.assertNotNull(serialized_counter);
+            Assert.assertNotNull(serialized_counter.registers);
+        } catch (IOException e) {
+            e.printStackTrace();
+        } catch (ClassNotFoundException e) {
+            e.printStackTrace();
+        } finally {
+            IOUtils.closeQuietly(os);
+            IOUtils.closeQuietly(is);
+        }
+
+    }
 }

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 0f604e2..106077c 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -31,7 +31,6 @@ import org.apache.kylin.engine.mr.steps.NDCuboidJob;
 import org.apache.kylin.engine.mr.steps.SaveStatisticsStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -65,7 +64,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport 
{
 
         // Phase 3: Build Cube
         addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, 
only selected algorithm will execute
-        result.addTask(createInMemCubingStep(jobId, cuboidRootPath)); // inmem 
cubing, only selected algorithm will execute
+        addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, 
only selected algorithm will execute
         outputSide.addStepPhase3_BuildCube(result);
 
         // Phase 4: Update Metadata & Cleanup
@@ -96,7 +95,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport 
{
         return result;
     }
 
-    protected AbstractExecutable createInMemCubingStep(String jobId, String 
cuboidRootPath) {
+    protected void addInMemCubingSteps(final CubingJob result, String jobId, 
String cuboidRootPath) {
         // base cuboid job
         MapReduceExecutable cubeStep = new MapReduceExecutable();
 
@@ -113,8 +112,7 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
 
         cubeStep.setMapReduceParams(cmd.toString());
         cubeStep.setMapReduceJobClass(getInMemCuboidJob());
-//        cubeStep.setCounterSaveAs(CubingJob.SOURCE_RECORD_COUNT + "," + 
CubingJob.SOURCE_SIZE_BYTES + "," + CubingJob.CUBE_SIZE_BYTES);
-        return cubeStep;
+        result.addTask(cubeStep);
     }
 
     protected Class<? extends AbstractHadoopJob> getInMemCuboidJob() {

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
new file mode 100644
index 0000000..3d33aa8
--- /dev/null
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/KylinKryoRegistrator.java
@@ -0,0 +1,161 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+
+package org.apache.kylin.engine.spark;
+
+import com.esotericsoftware.kryo.Kryo;
+import com.google.common.collect.Sets;
+import com.google.common.hash.Hashing;
+import org.apache.kylin.measure.MeasureIngester;
+import org.apache.spark.serializer.KryoRegistrator;
+import org.reflections.Reflections;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.util.Set;
+
+/**
+ * Registor for registering classes and serializers to Kryo
+ */
+public class KylinKryoRegistrator implements KryoRegistrator {
+    protected static final Logger logger = 
LoggerFactory.getLogger(KylinKryoRegistrator.class);
+
+    @Override
+    public void registerClasses(Kryo kryo) {
+
+        Set<Class> kyroClasses = Sets.newLinkedHashSet();
+        kyroClasses.add(byte[].class);
+        kyroClasses.add(int[].class);
+        kyroClasses.add(byte[][].class);
+        kyroClasses.add(String[].class);
+        kyroClasses.add(String[][].class);
+        kyroClasses.add(Object[].class);
+        kyroClasses.add(java.math.BigDecimal.class);
+        kyroClasses.add(java.util.ArrayList.class);
+        kyroClasses.add(java.util.LinkedList.class);
+        kyroClasses.add(java.util.HashSet.class);
+        kyroClasses.add(java.util.LinkedHashSet.class);
+        kyroClasses.add(java.util.LinkedHashMap.class);
+        kyroClasses.add(java.util.HashMap.class);
+        kyroClasses.add(java.util.TreeMap.class);
+        kyroClasses.add(java.util.Properties.class);
+        kyroClasses.addAll(new 
Reflections("org.apache.kylin").getSubTypesOf(Serializable.class));
+        kyroClasses.addAll(new 
Reflections("org.apache.kylin.dimension").getSubTypesOf(Serializable.class));
+        kyroClasses.addAll(new 
Reflections("org.apache.kylin.cube").getSubTypesOf(Serializable.class));
+        kyroClasses.addAll(new 
Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class));
+        kyroClasses.addAll(new 
Reflections("org.apache.kylin.metadata").getSubTypesOf(Object.class));
+        kyroClasses.addAll(new 
Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class));
+        kyroClasses.addAll(new 
Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class));
+        kyroClasses.addAll(new 
Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(org.apache.kylin.common.util.BytesSerializer.class));
+        kyroClasses.addAll(new 
Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class));
+
+        kyroClasses.add(org.apache.spark.sql.Row[].class);
+        kyroClasses.add(org.apache.spark.sql.Row.class);
+        
kyroClasses.add(org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema.class);
+        kyroClasses.add(org.apache.spark.sql.types.StructType.class);
+        kyroClasses.add(org.apache.spark.sql.types.StructField[].class);
+        kyroClasses.add(org.apache.spark.sql.types.StructField.class);
+        kyroClasses.add(org.apache.spark.sql.types.DateType$.class);
+        kyroClasses.add(org.apache.spark.sql.types.Metadata.class);
+        kyroClasses.add(org.apache.spark.sql.types.StringType$.class);
+        kyroClasses.add(Hashing.murmur3_128().getClass());
+        
kyroClasses.add(org.apache.spark.sql.execution.columnar.CachedBatch.class);
+        kyroClasses.add(org.apache.spark.sql.types.Decimal.class);
+        kyroClasses.add(scala.math.BigDecimal.class);
+        kyroClasses.add(java.math.MathContext.class);
+        kyroClasses.add(java.math.RoundingMode.class);
+        kyroClasses.add(java.util.concurrent.ConcurrentHashMap.class);
+        kyroClasses.add(java.util.Random.class);
+        kyroClasses.add(java.util.concurrent.atomic.AtomicLong.class);
+
+        kyroClasses.add(org.apache.kylin.metadata.model.ColumnDesc[].class);
+        kyroClasses.add(org.apache.kylin.metadata.model.JoinTableDesc[].class);
+        kyroClasses.add(org.apache.kylin.metadata.model.TblColRef[].class);
+        
kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class);
+        
kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class);
+        
kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.DefaultPartitionConditionBuilder.class);
+        
kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.PartitionType.class);
+        kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveInfo.class);
+        kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveType.class);
+        
kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class);
+        kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class);
+        kyroClasses.add(org.apache.kylin.metadata.model.MeasureDesc[].class);
+        kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class);
+        kyroClasses.add(org.apache.kylin.common.util.Array.class);
+        kyroClasses.add(org.apache.kylin.metadata.model.Segments.class);
+        
kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class);
+        
kyroClasses.add(org.apache.kylin.metadata.model.SegmentStatusEnum.class);
+        kyroClasses.add(org.apache.kylin.measure.BufferedMeasureCodec.class);
+        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class);
+        kyroClasses.add(org.apache.kylin.measure.MeasureCodec.class);
+        kyroClasses.add(org.apache.kylin.measure.MeasureAggregator[].class);
+        
kyroClasses.add(org.apache.kylin.metadata.datatype.DataTypeSerializer[].class);
+        kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class);
+        kyroClasses.add(org.apache.kylin.measure.basic.BasicMeasureType.class);
+        kyroClasses.add(org.apache.kylin.common.util.SplittedBytes[].class);
+        kyroClasses.add(org.apache.kylin.common.util.SplittedBytes.class);
+        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class);
+        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class);
+        
kyroClasses.add(org.apache.kylin.measure.basic.BigDecimalIngester.class);
+        kyroClasses.add(org.apache.kylin.dimension.DictionaryDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.IntDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.BooleanDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.DateDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.FixedLenDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.FixedLenHexDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.IntegerDimEnc.class);
+        
kyroClasses.add(org.apache.kylin.dimension.OneMoreByteVLongDimEnc.class);
+        kyroClasses.add(org.apache.kylin.dimension.TimeDimEnc.class);
+        
kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class);
+        
kyroClasses.add(org.apache.kylin.measure.topn.DoubleDeltaSerializer.class);
+        
kyroClasses.add(org.apache.kylin.measure.bitmap.RoaringBitmapCounter.class);
+        kyroClasses.add(org.roaringbitmap.buffer.MutableRoaringArray.class);
+        kyroClasses.add(org.roaringbitmap.buffer.MappeableContainer[].class);
+        kyroClasses.add(org.roaringbitmap.buffer.MutableRoaringBitmap.class);
+        
kyroClasses.add(org.roaringbitmap.buffer.MappeableArrayContainer.class);
+        
kyroClasses.add(org.apache.kylin.measure.bitmap.RoaringBitmapCounterFactory.class);
+        kyroClasses.add(org.apache.kylin.measure.topn.Counter.class);
+        kyroClasses.add(org.apache.kylin.measure.topn.TopNCounter.class);
+        
kyroClasses.add(org.apache.kylin.measure.percentile.PercentileSerializer.class);
+        kyroClasses.add(com.tdunning.math.stats.AVLTreeDigest.class);
+        kyroClasses.add(com.tdunning.math.stats.Centroid.class);
+
+        addClassQuitely(kyroClasses, 
"com.google.common.collect.EmptyImmutableList");
+        addClassQuitely(kyroClasses, "java.nio.HeapShortBuffer");
+        addClassQuitely(kyroClasses, 
"scala.collection.immutable.Map$EmptyMap$");
+        addClassQuitely(kyroClasses, 
"org.apache.spark.sql.catalyst.expressions.GenericInternalRow");
+        addClassQuitely(kyroClasses, 
"org.apache.spark.unsafe.types.UTF8String");
+        addClassQuitely(kyroClasses, "com.tdunning.math.stats.AVLGroupTree");
+
+        for (Class kyroClass : kyroClasses) {
+            kryo.register(kyroClass);
+        }
+
+        // TODO: should use JavaSerializer for PercentileCounter after Kryo 
bug be fixed: https://github.com/EsotericSoftware/kryo/issues/489
+        //        kryo.register(PercentileCounter.class, new JavaSerializer());
+    }
+
+    private static void addClassQuitely(Set<Class> kyroClasses, String 
className) {
+        try {
+            kyroClasses.add(Class.forName(className));
+        } catch (ClassNotFoundException e) {
+            logger.error("failed to load class", e);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 208a0c9..76b73b6 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -26,7 +26,6 @@ import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.BatchCubingJobBuilder2;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -43,11 +42,6 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
 
     @Override
     protected void addLayerCubingSteps(final CubingJob result, final String 
jobId, final String cuboidRootPath) {
-
-    }
-
-    @Override
-    protected AbstractExecutable createInMemCubingStep(String jobId, String 
cuboidRootPath) {
         IJoinedFlatTableDesc flatTableDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
         final SparkExecutable sparkExecutable = new SparkExecutable();
         sparkExecutable.setClassName(SparkCubingByLayer.class.getName());
@@ -71,7 +65,11 @@ public class SparkBatchCubingJobBuilder2 extends 
BatchCubingJobBuilder2 {
         sparkExecutable.setJars(jars.toString());
 
         
sparkExecutable.setName(ExecutableConstants.STEP_NAME_BUILD_SPARK_CUBE);
-        return sparkExecutable;
+        result.addTask(sparkExecutable);
+    }
+
+    @Override
+    protected void addInMemCubingSteps(final CubingJob result, String jobId, 
String cuboidRootPath) {
 
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
index 0437a80..2a0981a 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubing.java
@@ -20,10 +20,8 @@ package org.apache.kylin.engine.spark;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.IOException;
-import java.io.Serializable;
 import java.nio.ByteBuffer;
 import java.util.ArrayList;
-import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
@@ -31,17 +29,13 @@ import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.NoSuchElementException;
-import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 
-import javax.annotation.Nullable;
-
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.FsShell;
@@ -84,7 +78,6 @@ import org.apache.kylin.engine.spark.util.IteratorUtils;
 import org.apache.kylin.measure.BufferedMeasureCodec;
 import org.apache.kylin.measure.MeasureAggregators;
 import org.apache.kylin.measure.hllc.HLLCounter;
-import org.apache.kylin.measure.MeasureIngester;
 import org.apache.kylin.metadata.model.FunctionDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.MeasureDesc;
@@ -108,16 +101,12 @@ import org.apache.spark.api.java.function.PairFunction;
 import org.apache.spark.sql.DataFrame;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.hive.HiveContext;
-import org.reflections.Reflections;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import com.google.common.base.Preconditions;
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import com.google.common.collect.Maps;
-import com.google.common.collect.Sets;
 import com.google.common.hash.HashFunction;
 import com.google.common.hash.Hasher;
 import com.google.common.hash.Hashing;
@@ -545,109 +534,6 @@ public class SparkCubing extends AbstractApplication {
         }
     }
 
-    public static Collection<String> getKyroClasses() {
-        Set<Class> kyroClasses = Sets.newHashSet();
-        kyroClasses.addAll(new 
Reflections("org.apache.kylin").getSubTypesOf(Serializable.class));
-        kyroClasses.addAll(new 
Reflections("org.apache.kylin.dimension").getSubTypesOf(Serializable.class));
-        kyroClasses.addAll(new 
Reflections("org.apache.kylin.cube").getSubTypesOf(Serializable.class));
-        kyroClasses.addAll(new 
Reflections("org.apache.kylin.cube.model").getSubTypesOf(Object.class));
-        kyroClasses.addAll(new 
Reflections("org.apache.kylin.metadata").getSubTypesOf(Object.class));
-        kyroClasses.addAll(new 
Reflections("org.apache.kylin.metadata.model").getSubTypesOf(Object.class));
-        kyroClasses.addAll(new 
Reflections("org.apache.kylin.metadata.measure").getSubTypesOf(Object.class));
-        kyroClasses.addAll(new 
Reflections("org.apache.kylin.metadata.datatype").getSubTypesOf(org.apache.kylin.common.util.BytesSerializer.class));
-        kyroClasses.addAll(new 
Reflections("org.apache.kylin.measure").getSubTypesOf(MeasureIngester.class));
-
-        kyroClasses.add(HashMap.class);
-        kyroClasses.add(org.apache.spark.sql.Row[].class);
-        kyroClasses.add(org.apache.spark.sql.Row.class);
-        
kyroClasses.add(org.apache.spark.sql.catalyst.expressions.GenericRowWithSchema.class);
-        kyroClasses.add(org.apache.spark.sql.types.StructType.class);
-        kyroClasses.add(org.apache.spark.sql.types.StructField[].class);
-        kyroClasses.add(org.apache.spark.sql.types.StructField.class);
-        kyroClasses.add(org.apache.spark.sql.types.DateType$.class);
-        kyroClasses.add(org.apache.spark.sql.types.Metadata.class);
-        kyroClasses.add(org.apache.spark.sql.types.StringType$.class);
-        kyroClasses.add(Hashing.murmur3_128().getClass());
-        
kyroClasses.add(org.apache.spark.sql.execution.columnar.CachedBatch.class);
-        kyroClasses.add(Object[].class);
-        kyroClasses.add(int[].class);
-        kyroClasses.add(byte[].class);
-        kyroClasses.add(byte[][].class);
-        kyroClasses.add(String[].class);
-        kyroClasses.add(String[][].class);
-        kyroClasses.add(org.apache.spark.sql.types.Decimal.class);
-        kyroClasses.add(scala.math.BigDecimal.class);
-        kyroClasses.add(java.math.BigDecimal.class);
-        kyroClasses.add(java.math.MathContext.class);
-        kyroClasses.add(java.math.RoundingMode.class);
-        kyroClasses.add(java.util.ArrayList.class);
-        kyroClasses.add(java.util.LinkedList.class);
-        kyroClasses.add(java.util.HashSet.class);
-        kyroClasses.add(java.util.LinkedHashSet.class);
-        kyroClasses.add(java.util.LinkedHashMap.class);
-        kyroClasses.add(java.util.TreeMap.class);
-        kyroClasses.add(java.util.concurrent.ConcurrentHashMap.class);
-
-        kyroClasses.add(java.util.HashMap.class);
-        kyroClasses.add(java.util.Properties.class);
-        kyroClasses.add(org.apache.kylin.metadata.model.ColumnDesc[].class);
-        kyroClasses.add(org.apache.kylin.metadata.model.JoinTableDesc[].class);
-        kyroClasses.add(org.apache.kylin.metadata.model.TblColRef[].class);
-        
kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.RealizationCapacity.class);
-        
kyroClasses.add(org.apache.kylin.metadata.model.DataModelDesc.TableKind.class);
-        
kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.DefaultPartitionConditionBuilder.class);
-        
kyroClasses.add(org.apache.kylin.metadata.model.PartitionDesc.PartitionType.class);
-        kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveInfo.class);
-        kyroClasses.add(org.apache.kylin.cube.model.CubeDesc.DeriveType.class);
-        
kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnFamilyDesc[].class);
-        kyroClasses.add(org.apache.kylin.cube.model.HBaseColumnDesc[].class);
-        kyroClasses.add(org.apache.kylin.metadata.model.MeasureDesc[].class);
-        kyroClasses.add(org.apache.kylin.cube.model.RowKeyColDesc[].class);
-        kyroClasses.add(org.apache.kylin.common.util.Array.class);
-        kyroClasses.add(org.apache.kylin.metadata.model.Segments.class);
-        
kyroClasses.add(org.apache.kylin.metadata.realization.RealizationStatusEnum.class);
-        
kyroClasses.add(org.apache.kylin.metadata.model.SegmentStatusEnum.class);
-        kyroClasses.add(org.apache.kylin.measure.BufferedMeasureCodec.class);
-        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyColumnIO.class);
-        kyroClasses.add(org.apache.kylin.measure.MeasureCodec.class);
-        kyroClasses.add(org.apache.kylin.measure.MeasureAggregator[].class);
-        
kyroClasses.add(org.apache.kylin.metadata.datatype.DataTypeSerializer[].class);
-        kyroClasses.add(org.apache.kylin.cube.kv.CubeDimEncMap.class);
-        kyroClasses.add(org.apache.kylin.measure.basic.BasicMeasureType.class);
-        kyroClasses.add(org.apache.kylin.common.util.SplittedBytes[].class);
-        kyroClasses.add(org.apache.kylin.common.util.SplittedBytes.class);
-        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoderProvider.class);
-        kyroClasses.add(org.apache.kylin.cube.kv.RowKeyEncoder.class);
-        
kyroClasses.add(org.apache.kylin.measure.basic.BigDecimalIngester.class);
-        kyroClasses.add(org.apache.kylin.dimension.DictionaryDimEnc.class);
-        kyroClasses.add(org.apache.kylin.dimension.IntDimEnc.class);
-        kyroClasses.add(org.apache.kylin.dimension.BooleanDimEnc.class);
-        kyroClasses.add(org.apache.kylin.dimension.DateDimEnc.class);
-        kyroClasses.add(org.apache.kylin.dimension.FixedLenDimEnc.class);
-        kyroClasses.add(org.apache.kylin.dimension.FixedLenHexDimEnc.class);
-        kyroClasses.add(org.apache.kylin.dimension.IntegerDimEnc.class);
-        
kyroClasses.add(org.apache.kylin.dimension.OneMoreByteVLongDimEnc.class);
-        kyroClasses.add(org.apache.kylin.dimension.TimeDimEnc.class);
-        
kyroClasses.add(org.apache.kylin.cube.model.AggregationGroup.HierarchyMask.class);
-        
kyroClasses.add(org.apache.kylin.measure.topn.DoubleDeltaSerializer.class);
-        kyroClasses.add(org.apache.kylin.measure.topn.Counter.class);
-
-        try {
-            
kyroClasses.add(Class.forName("com.google.common.collect.EmptyImmutableList"));
-        } catch (ClassNotFoundException e) {
-            logger.error("failed to load class", e);
-        }
-
-        ArrayList<String> result = Lists.newArrayList();
-        for (Class kyroClass : kyroClasses) {
-            result.add(kyroClass.getName());
-        }
-        result.add("scala.collection.immutable.Map$EmptyMap$");
-        
result.add("org.apache.spark.sql.catalyst.expressions.GenericInternalRow");
-        result.add("org.apache.spark.unsafe.types.UTF8String");
-        return result;
-    }
-
     @Override
     protected void execute(OptionsHelper optionsHelper) throws Exception {
         final String hiveTable = 
optionsHelper.getOptionValue(OPTION_INPUT_PATH);
@@ -658,15 +544,8 @@ public class SparkCubing extends AbstractApplication {
 
         //serialization conf
         conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", 
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true");
-        final Iterable<String> allClasses = 
Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister",
 "").split(",")), getKyroClasses()), new Predicate<String>() {
-            @Override
-            public boolean apply(@Nullable String input) {
-                return input != null && input.trim().length() > 0;
-            }
-        });
-        System.out.println("kyro classes:" + allClasses.toString());
-        conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, 
","));
 
         JavaSparkContext sc = new JavaSparkContext(conf);
         HiveContext sqlContext = new HiveContext(sc.sc());

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
----------------------------------------------------------------------
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
index d6790aa..8892a73 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkCubingByLayer.java
@@ -17,13 +17,10 @@
 */
 package org.apache.kylin.engine.spark;
 
-import com.google.common.base.Predicate;
-import com.google.common.collect.Iterables;
 import com.google.common.collect.Lists;
 import org.apache.commons.cli.Option;
 import org.apache.commons.cli.OptionBuilder;
 import org.apache.commons.cli.Options;
-import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
@@ -71,7 +68,6 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import scala.Tuple2;
 
-import javax.annotation.Nullable;
 import java.io.File;
 import java.io.FileFilter;
 import java.io.Serializable;
@@ -79,7 +75,6 @@ import java.nio.ByteBuffer;
 import java.util.Collection;
 import java.util.List;
 
-import static org.apache.kylin.engine.spark.SparkCubing.getKyroClasses;
 
 /**
  */
@@ -129,11 +124,12 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
     }
 
     private static final void prepare() {
-        final File file = new File(SparkFiles.get("kylin.properties"));
-        final String confPath = file.getParentFile().getAbsolutePath();
+        File file = new File(SparkFiles.get("kylin.properties"));
+        String confPath = file.getParentFile().getAbsolutePath();
         logger.info("conf directory:" + confPath);
         System.setProperty(KylinConfig.KYLIN_CONF, confPath);
         ClassUtil.addClasspath(confPath);
+
     }
 
     @Override
@@ -144,17 +140,11 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         final String confPath = optionsHelper.getOptionValue(OPTION_CONF_PATH);
         final String outputPath = 
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH);
 
-        SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + 
", segment " + segmentId);
+        SparkConf conf = new SparkConf().setAppName("Cubing for:" + cubeName + 
" segment " + segmentId);
         //serialization conf
         conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", 
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
         conf.set("spark.kryo.registrationRequired", "true");
-        final Iterable<String> allClasses = 
Iterables.filter(Iterables.concat(Lists.newArrayList(conf.get("spark.kryo.classesToRegister",
 "").split(",")), getKyroClasses()), new Predicate<String>() {
-            @Override
-            public boolean apply(@Nullable String input) {
-                return input != null && input.trim().length() > 0;
-            }
-        });
-        conf.set("spark.kryo.classesToRegister", StringUtils.join(allClasses, 
","));
 
         JavaSparkContext sc = new JavaSparkContext(conf);
         setupClasspath(sc, confPath);
@@ -176,11 +166,7 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         final NDCuboidBuilder ndCuboidBuilder = new 
NDCuboidBuilder(vCubeSegment.getValue(), new 
RowKeyEncoderProvider(vCubeSegment.getValue()));
 
         final Broadcast<CuboidScheduler> vCuboidScheduler = sc.broadcast(new 
CuboidScheduler(vCubeDesc.getValue()));
-
-        final long baseCuboidId = Cuboid.getBaseCuboidId(cubeDesc);
-        final Cuboid baseCuboid = Cuboid.findById(cubeDesc, baseCuboidId);
         final int measureNum = cubeDesc.getMeasures().size();
-        final BaseCuboidBuilder baseCuboidBuilder = new 
BaseCuboidBuilder(kylinConfig, vCubeDesc.getValue(), vCubeSegment.getValue(), 
intermediateTableDesc, AbstractRowKeyEncoder.createInstance(cubeSegment, 
baseCuboid), MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
 
         int countMeasureIndex = 0;
         for (MeasureDesc measureDesc : cubeDesc.getMeasures()) {
@@ -204,12 +190,20 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
         // encode with dimension encoding, transform to <ByteArray, Object[]> 
RDD
         final JavaPairRDD<ByteArray, Object[]> encodedBaseRDD = 
intermediateTable.javaRDD().mapToPair(new PairFunction<Row, ByteArray, 
Object[]>() {
             transient boolean initialized = false;
+            BaseCuboidBuilder baseCuboidBuilder = null;
 
             @Override
             public Tuple2<ByteArray, Object[]> call(Row row) throws Exception {
                 if (initialized == false) {
-                    prepare();
-                    initialized = true;
+                    synchronized (SparkCubingByLayer.class) {
+                        if (initialized == false) {
+                            prepare();
+                            long baseCuboidId = 
Cuboid.getBaseCuboidId(cubeDesc);
+                            Cuboid baseCuboid = Cuboid.findById(cubeDesc, 
baseCuboidId);
+                            baseCuboidBuilder = new 
BaseCuboidBuilder(kylinConfig, cubeDesc, cubeSegment, intermediateTableDesc, 
AbstractRowKeyEncoder.createInstance(cubeSegment, baseCuboid), 
MeasureIngester.create(cubeDesc.getMeasures()), 
cubeSegment.buildDictionaryMap());
+                            initialized = true;
+                        }
+                    }
                 }
 
                 String[] rowArray = rowToArray(row);
@@ -235,7 +229,7 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
         });
 
         logger.info("encodedBaseRDD partition number: " + 
encodedBaseRDD.getNumPartitions());
-                Long totalCount = 0L;
+        Long totalCount = 0L;
         if (kylinConfig.isSparkSanityCheckEnabled()) {
             totalCount = encodedBaseRDD.count();
             logger.info("encodedBaseRDD row count: " + encodedBaseRDD.count());
@@ -267,8 +261,8 @@ public class SparkCubingByLayer extends AbstractApplication 
implements Serializa
             partition = estimateRDDPartitionNum(level, cubeStatsReader, 
kylinConfig);
             logger.info("Level " + level + " partition number: " + partition);
             allRDDs[level] = allRDDs[level - 
1].flatMapToPair(flatMapFunction).reduceByKey(reducerFunction2, 
partition).persist(storageLevel);
-             if (kylinConfig.isSparkSanityCheckEnabled() == true) {
-                 sanityCheck(allRDDs[level], totalCount, level, 
cubeStatsReader, countMeasureIndex);
+            if (kylinConfig.isSparkSanityCheckEnabled() == true) {
+                sanityCheck(allRDDs[level], totalCount, level, 
cubeStatsReader, countMeasureIndex);
             }
             saveToHDFS(allRDDs[level], vCubeDesc.getValue(), outputPath, 
level, confOverwrite);
             allRDDs[level - 1].unpersist();
@@ -288,17 +282,18 @@ public class SparkCubingByLayer extends 
AbstractApplication implements Serializa
     }
 
     private static void saveToHDFS(final JavaPairRDD<ByteArray, Object[]> rdd, 
final CubeDesc cubeDesc, final String hdfsBaseLocation, int level, 
Configuration conf) {
-       final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
-                rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, 
org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
-                    BufferedMeasureCodec codec = new 
BufferedMeasureCodec(cubeDesc.getMeasures());
-                    @Override
-                    public Tuple2<org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws 
Exception {
-                        ByteBuffer valueBuf = codec.encode(tuple2._2());
-                        byte[] encodedBytes = new byte[valueBuf.position()];
-                        System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, 
valueBuf.position());
-                        return new Tuple2<>(new 
org.apache.hadoop.io.Text(tuple2._1().array()), new 
org.apache.hadoop.io.Text(encodedBytes));
-                    }
-                }).saveAsNewAPIHadoopFile(cuboidOutputPath, 
org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, 
SequenceFileOutputFormat.class, conf);
+        final String cuboidOutputPath = 
BatchCubingJobBuilder2.getCuboidOutputPathsByLevel(hdfsBaseLocation, level);
+        rdd.mapToPair(new PairFunction<Tuple2<ByteArray, Object[]>, 
org.apache.hadoop.io.Text, org.apache.hadoop.io.Text>() {
+            BufferedMeasureCodec codec = new 
BufferedMeasureCodec(cubeDesc.getMeasures());
+
+            @Override
+            public Tuple2<org.apache.hadoop.io.Text, 
org.apache.hadoop.io.Text> call(Tuple2<ByteArray, Object[]> tuple2) throws 
Exception {
+                ByteBuffer valueBuf = codec.encode(tuple2._2());
+                byte[] encodedBytes = new byte[valueBuf.position()];
+                System.arraycopy(valueBuf.array(), 0, encodedBytes, 0, 
valueBuf.position());
+                return new Tuple2<>(new 
org.apache.hadoop.io.Text(tuple2._1().array()), new 
org.apache.hadoop.io.Text(encodedBytes));
+            }
+        }).saveAsNewAPIHadoopFile(cuboidOutputPath, 
org.apache.hadoop.io.Text.class, org.apache.hadoop.io.Text.class, 
SequenceFileOutputFormat.class, conf);
         logger.info("Persisting RDD for level " + level + " into " + 
cuboidOutputPath);
     }
 

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
----------------------------------------------------------------------
diff --git 
a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json 
b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
index 0fda3b3..99013ce 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
@@ -267,16 +267,6 @@
       },
       "returntype" : "raw"
     }
-  }, {
-    "name" : "GVM_PERCENTILE",
-    "function" : {
-      "expression" : "PERCENTILE",
-      "parameter" : {
-        "type" : "column",
-        "value" : "TEST_KYLIN_FACT.PRICE"
-      },
-      "returntype" : "percentile(100)"
-    }
   } ],
   "dictionaries": [ {
     "column": "TEST_KYLIN_FACT.TEST_COUNT_DISTINCT_BITMAP",
@@ -368,7 +358,7 @@
       "name" : "f3",
       "columns" : [ {
         "qualifier" : "m",
-        "measure_refs" : [ "TEST_EXTENDED_COLUMN", "TRANS_ID_RAW", 
"PRICE_RAW", "CAL_DT_RAW", "BUYER_CONTACT", "SELLER_CONTACT", "GVM_PERCENTILE" ]
+        "measure_refs" : [ "TEST_EXTENDED_COLUMN", "TRANS_ID_RAW", 
"PRICE_RAW", "CAL_DT_RAW", "BUYER_CONTACT", "SELLER_CONTACT" ]
       } ]
     } ]
   },
@@ -448,7 +438,7 @@
   "status_need_notify" : [ ],
   "auto_merge_time_ranges" : null,
   "retention_range" : 0,
-  "engine_type" : 2,
+  "engine_type" : 4,
   "storage_type" : 2,
   "override_kylin_properties": {
     "kylin.cube.algorithm": "LAYER"

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/examples/test_case_data/sandbox/core-site.xml
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/core-site.xml 
b/examples/test_case_data/sandbox/core-site.xml
index 7660a7e..a4ad5c6 100644
--- a/examples/test_case_data/sandbox/core-site.xml
+++ b/examples/test_case_data/sandbox/core-site.xml
@@ -178,9 +178,11 @@
         <value>false</value>
     </property>
 
+    <!--
     <property>
         <name>net.topology.script.file.name</name>
         <value>/etc/hadoop/conf/topology_script.py</value>
     </property>
 
+    -->
 </configuration>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/examples/test_case_data/sandbox/kylin.properties
----------------------------------------------------------------------
diff --git a/examples/test_case_data/sandbox/kylin.properties 
b/examples/test_case_data/sandbox/kylin.properties
index 6cb5148..91566ae 100644
--- a/examples/test_case_data/sandbox/kylin.properties
+++ b/examples/test_case_data/sandbox/kylin.properties
@@ -162,23 +162,24 @@ kylin.server.query-metrics-percentiles-intervals=60, 360, 
3600
 kylin.env=DEV
 kylin.source.hive.keep-flat-table=false
 
-### Spark as Engine ###
-kylin.engine.spark.env.hadoop-conf-dir=../examples/test_case_data/sandbox
-kylin.engine.spark.sanity-check-enabled=false
+
+# Estimate the RDD partition numbers, the test cubes have a couple 
memory-hungry measure so the estimation is wild
+kylin.engine.spark.rdd-partition-cut-mb=100
 
 ### Spark conf overwrite for cube engine
+kylin.engine.spark-conf.spark.yarn.submit.file.replication=1
 kylin.engine.spark-conf.spark.master=yarn
-kylin.engine.spark-conf.spark.submit.deployMode=client
-kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=512
-kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=384
-kylin.engine.spark-conf.spark.executor.memory=1G
+kylin.engine.spark-conf.spark.submit.deployMode=cluster
+kylin.engine.spark-conf.spark.yarn.executor.memoryOverhead=384
+kylin.engine.spark-conf.spark.yarn.driver.memoryOverhead=256
+kylin.engine.spark-conf.spark.executor.memory=768M
 kylin.engine.spark-conf.spark.executor.cores=1
 kylin.engine.spark-conf.spark.executor.instances=1
 kylin.engine.spark-conf.spark.storage.memoryFraction=0.3
-kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///kylin/spark-history
-kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///kylin/spark-history
-#kylin.engine.spark-conf.spark.yarn.queue=default
-#kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
-#kylin.engine.spark-conf.spark.io.compression.codec=org.apache.spark.io.SnappyCompressionCodec
-
-
+kylin.engine.spark-conf.spark.eventLog.enabled=true
+kylin.engine.spark-conf.spark.history.fs.logDirectory=hdfs\:///spark-history
+kylin.engine.spark-conf.spark.eventLog.dir=hdfs\:///spark-history
+kylin.engine.spark-conf.spark.yarn.jar=hdfs://sandbox.hortonworks.com:8020/kylin/spark/spark-assembly-1.6.3-hadoop2.6.0.jar
+kylin.engine.spark-conf.spark.driver.extraJavaOptions=-Dhdp.version=current
+kylin.engine.spark-conf.spark.yarn.am.extraJavaOptions=-Dhdp.version=current
+kylin.engine.spark-conf.spark.executor.extraJavaOptions=-Dhdp.version=current

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/kylin-it/pom.xml
----------------------------------------------------------------------
diff --git a/kylin-it/pom.xml b/kylin-it/pom.xml
index 9662806..91104ba 100644
--- a/kylin-it/pom.xml
+++ b/kylin-it/pom.xml
@@ -36,6 +36,7 @@
     <properties>
         <hdp.version/>
         <fastBuildMode/>
+        <engineType/>
     </properties>
 
     <!-- Dependencies. -->
@@ -238,6 +239,25 @@
             <artifactId>kafka_2.10</artifactId>
             <scope>provided</scope>
         </dependency>
+
+        <!-- Spark dependency -->
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-core_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-sql_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.spark</groupId>
+            <artifactId>spark-hive_2.10</artifactId>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
 
 
@@ -296,6 +316,7 @@
                                     <arguments>
                                         
<argument>-Dhdp.version=${hdp.version}</argument>
                                         
<argument>-DfastBuildMode=${fastBuildMode}</argument>
+                                        
<argument>-DengineType=${engineType}</argument>
                                         
<argument>-Dlog4j.configuration=file:${project.basedir}/..//build/conf/kylin-tools-log4j.properties</argument>
                                         <argument>-classpath</argument>
                                         <classpath/>

http://git-wip-us.apache.org/repos/asf/kylin/blob/fac9f358/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
----------------------------------------------------------------------
diff --git 
a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java 
b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
index 08cc6b9..726d72f 100644
--- a/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
+++ b/kylin-it/src/test/java/org/apache/kylin/provision/BuildCubeWithEngine.java
@@ -40,10 +40,12 @@ import org.apache.kylin.common.util.ClassUtil;
 import org.apache.kylin.common.util.HBaseMetadataTestCase;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.Pair;
+import org.apache.kylin.cube.CubeDescManager;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.CubingJob;
 import org.apache.kylin.job.DeployUtil;
@@ -68,9 +70,11 @@ import com.google.common.collect.Lists;
 public class BuildCubeWithEngine {
 
     private CubeManager cubeManager;
+    private CubeDescManager cubeDescManager;
     private DefaultScheduler scheduler;
     protected ExecutableManager jobService;
     private static boolean fastBuildMode = false;
+    private static int engineType;
 
     private static final Logger logger = 
LoggerFactory.getLogger(BuildCubeWithEngine.class);
 
@@ -110,7 +114,15 @@ public class BuildCubeWithEngine {
             logger.info("Will not use fast build mode");
         }
 
+        String specifiedEngineType = System.getProperty("engineType");
+        if (StringUtils.isNotEmpty(specifiedEngineType)) {
+            engineType = Integer.parseInt(specifiedEngineType);
+        } else {
+            engineType = 2;
+        }
+
         System.setProperty(KylinConfig.KYLIN_CONF, 
HBaseMetadataTestCase.SANDBOX_TEST_DATA);
+        System.setProperty("SPARK_HOME", "/usr/local/spark"); // need manually 
create and put spark to this folder on Jenkins
         if (StringUtils.isEmpty(System.getProperty("hdp.version"))) {
             throw new RuntimeException("No hdp.version set; Please set 
hdp.version in your jvm option, for example: -Dhdp.version=2.4.0.0-169");
         }
@@ -154,6 +166,7 @@ public class BuildCubeWithEngine {
             }
         }
 
+        cubeDescManager = CubeDescManager.getInstance(kylinConfig);
     }
 
     public void after() {
@@ -251,6 +264,9 @@ public class BuildCubeWithEngine {
         String cubeName = "ci_left_join_cube";
         clearSegment(cubeName);
 
+        // ci_left_join_cube has percentile which isn't supported by Spark 
engine now
+        // updateCubeEngineType(cubeName);
+
         SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
         f.setTimeZone(TimeZone.getTimeZone("GMT"));
         long date1 = 0;
@@ -278,6 +294,7 @@ public class BuildCubeWithEngine {
 
         String cubeName = "ci_inner_join_cube";
         clearSegment(cubeName);
+        //updateCubeEngineType(cubeName);
 
         SimpleDateFormat f = new SimpleDateFormat("yyyy-MM-dd");
         f.setTimeZone(TimeZone.getTimeZone("GMT"));
@@ -295,6 +312,14 @@ public class BuildCubeWithEngine {
         return false;
     }
 
+    private void updateCubeEngineType(String cubeName) throws IOException {
+        CubeDesc cubeDesc = cubeDescManager.getCubeDesc(cubeName);
+        if (cubeDesc.getEngineType() != engineType) {
+            cubeDesc.setEngineType(engineType);
+            cubeDescManager.updateCubeDesc(cubeDesc);
+        }
+    }
+
     private void clearSegment(String cubeName) throws Exception {
         CubeInstance cube = cubeManager.getCube(cubeName);
         // remove all existing segments

Reply via email to