[ 
https://issues.apache.org/jira/browse/KYLIN-3710?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16716513#comment-16716513
 ] 

ASF GitHub Bot commented on KYLIN-3710:
---------------------------------------

shaofengshi closed pull request #377: Kylin-3680, KYLIN-3710
URL: https://github.com/apache/kylin/pull/377
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java 
b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
index 936427930d..c3e4f0163f 100644
--- a/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
+++ b/core-common/src/main/java/org/apache/kylin/common/util/HadoopUtil.java
@@ -236,4 +236,28 @@ public static void writeToSequenceFile(Configuration conf, 
String outputPath, Ma
         return readFromSequenceFile(getCurrentConfiguration(), inputPath);
     }
 
+    public static boolean isSequenceFile(Configuration conf, Path filePath) {
+        try (SequenceFile.Reader reader = new 
SequenceFile.Reader(getWorkingFileSystem(conf), filePath, conf)) {
+            return true;
+        } catch (Exception e) {
+            logger.warn("Read sequence file {} failed.", filePath.getName(), 
e);
+            return false;
+        }
+    }
+
+    public static boolean isSequenceDir(Configuration conf, Path fileDir) 
throws IOException {
+        FileSystem fs = getWorkingFileSystem(conf);
+        FileStatus[] fileStatuses = fs.listStatus(fileDir, new PathFilter() {
+            @Override
+            public boolean accept(Path path) {
+                return !"_SUCCESS".equals(path.getName());
+            }
+        });
+
+        if (fileStatuses != null && fileStatuses.length > 0) {
+            return isSequenceFile(conf, fileStatuses[0].getPath());
+        }
+
+        return false;
+    }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
new file mode 100644
index 0000000000..758b081be0
--- /dev/null
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr;
+
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+
+public interface IInput {
+
+    /** Return a helper to participate in batch cubing job flow. */
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc 
flatDesc);
+
+    /** Return a helper to participate in batch cubing merge job flow. */
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+
+    public interface IBatchCubingInputSide {
+        /** Add step that creates an intermediate flat table as defined by 
CubeJoinedFlatTableDesc */
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow);
+
+        /** Add step that does necessary clean up, like delete the 
intermediate flat table */
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
+    }
+
+    public interface IBatchMergeInputSide {
+
+        /** Add step that executes before merge dictionary and before merge 
cube. */
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow);
+
+    }
+}
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index c259c4eadd..74153e0e08 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -22,25 +22,16 @@
 
 import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
 
 /**
  * Any ISource that wishes to serve as input of MapReduce build engine must 
adapt to this interface.
  */
-public interface IMRInput {
-
-    /** Return a helper to participate in batch cubing job flow. */
-    public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
+public interface IMRInput extends IInput {
 
     /** Return an InputFormat that reads from specified table. */
     public IMRTableInputFormat getTableInputFormat(TableDesc table, String 
uuid);
 
-    /** Return a helper to participate in batch cubing merge job flow. */
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
-
     /**
      * Utility that configures mapper to read from a table.
      */
@@ -65,22 +56,13 @@
      * - Phase 3: Build Cube (with FlatTableInputFormat)
      * - Phase 4: Update Metadata & Cleanup
      */
-    public interface IMRBatchCubingInputSide {
+    public interface IMRBatchCubingInputSide extends IBatchCubingInputSide {
 
         /** Return an InputFormat that reads from the intermediate flat table 
*/
         public IMRTableInputFormat getFlatTableInputFormat();
-
-        /** Add step that creates an intermediate flat table as defined by 
CubeJoinedFlatTableDesc */
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow);
-
-        /** Add step that does necessary clean up, like delete the 
intermediate flat table */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface IMRBatchMergeInputSide {
-
-        /** Add step that executes before merge dictionary and before merge 
cube. */
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow);
+    public interface IMRBatchMergeInputSide extends IBatchMergeInputSide {
 
     }
 }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
index 3a0fb84b0f..60d0445aae 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/MRUtil.java
@@ -26,6 +26,7 @@
 import org.apache.kylin.dict.lookup.LookupProviderFactory;
 import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.IMRInput.IMRBatchCubingInputSide;
+import org.apache.kylin.engine.mr.IMRInput.IMRBatchMergeInputSide;
 import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchCubingOutputSide2;
 import org.apache.kylin.engine.mr.IMROutput2.IMRBatchMergeOutputSide2;
@@ -39,7 +40,7 @@
 
     public static IMRBatchCubingInputSide getBatchCubingInputSide(CubeSegment 
seg) {
         IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
-        return SourceManager.createEngineAdapter(seg, 
IMRInput.class).getBatchCubingInputSide(flatDesc);
+        return (IMRBatchCubingInputSide)SourceManager.createEngineAdapter(seg, 
IMRInput.class).getBatchCubingInputSide(flatDesc);
     }
 
     public static IMRTableInputFormat getTableInputFormat(String tableName, 
String prj, String uuid) {
@@ -63,8 +64,8 @@ public static IMRBatchMergeOutputSide2 
getBatchMergeOutputSide2(CubeSegment seg)
         return StorageFactory.createEngineAdapter(seg, 
IMROutput2.class).getBatchMergeOutputSide(seg);
     }
 
-    public static IMRInput.IMRBatchMergeInputSide 
getBatchMergeInputSide(CubeSegment seg) {
-        return SourceManager.createEngineAdapter(seg, 
IMRInput.class).getBatchMergeInputSide(seg);
+    public static IMRBatchMergeInputSide getBatchMergeInputSide(CubeSegment 
seg) {
+        return (IMRBatchMergeInputSide)SourceManager.createEngineAdapter(seg, 
IMRInput.class).getBatchMergeInputSide(seg);
     }
 
     public static IMROutput2.IMRBatchOptimizeOutputSide2 
getBatchOptimizeOutputSide2(CubeSegment seg) {
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
index 5459c70a0a..4af616ce98 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/ISparkInput.java
@@ -18,20 +18,12 @@
 
 package org.apache.kylin.engine.spark;
 
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.engine.mr.IInput;
 
 /**
  * Any ISource that wishes to serve as input of MapReduce build engine must 
adapt to this interface.
  */
-public interface ISparkInput {
-
-    /** Return a helper to participate in batch cubing job flow. */
-    public ISparkBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc);
-
-    /** Return a helper to participate in batch cubing merge job flow. */
-    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
+public interface ISparkInput extends IInput {
 
     /**
      * Participate the batch cubing flow as the input side. Responsible for 
creating
@@ -42,19 +34,11 @@
      * - Phase 3: Build Cube (with FlatTableInputFormat)
      * - Phase 4: Update Metadata & Cleanup
      */
-    public interface ISparkBatchCubingInputSide {
+    public interface ISparkBatchCubingInputSide extends IBatchCubingInputSide {
 
-        /** Add step that creates an intermediate flat table as defined by 
CubeJoinedFlatTableDesc */
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow);
-
-        /** Add step that does necessary clean up, like delete the 
intermediate flat table */
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
-    public interface ISparkBatchMergeInputSide {
-
-        /** Add step that executes before merge dictionary and before merge 
cube. */
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow);
+    public interface ISparkBatchMergeInputSide extends IBatchMergeInputSide {
 
     }
 }
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
index 82a1a9be06..151103ace1 100644
--- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
+++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkUtil.java
@@ -30,6 +30,7 @@
 import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.EngineFactory;
@@ -43,6 +44,8 @@
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.kylin.engine.spark.ISparkInput.ISparkBatchCubingInputSide;
+import org.apache.kylin.engine.spark.ISparkInput.ISparkBatchMergeInputSide;
 
 import com.google.common.collect.Lists;
 import org.apache.spark.api.java.function.Function;
@@ -52,9 +55,9 @@
 
 public class SparkUtil {
 
-    public static ISparkInput.ISparkBatchCubingInputSide 
getBatchCubingInputSide(CubeSegment seg) {
+    public static ISparkBatchCubingInputSide 
getBatchCubingInputSide(CubeSegment seg) {
         IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
-        return SourceManager.createEngineAdapter(seg, 
ISparkInput.class).getBatchCubingInputSide(flatDesc);
+        return 
(ISparkBatchCubingInputSide)SourceManager.createEngineAdapter(seg, 
ISparkInput.class).getBatchCubingInputSide(flatDesc);
     }
 
     public static ISparkOutput.ISparkBatchCubingOutputSide 
getBatchCubingOutputSide(CubeSegment seg) {
@@ -65,8 +68,8 @@
         return StorageFactory.createEngineAdapter(seg, 
ISparkOutput.class).getBatchMergeOutputSide(seg);
     }
 
-    public static ISparkInput.ISparkBatchMergeInputSide 
getBatchMergeInputSide(CubeSegment seg) {
-        return SourceManager.createEngineAdapter(seg, 
ISparkInput.class).getBatchMergeInputSide(seg);
+    public static ISparkBatchMergeInputSide getBatchMergeInputSide(CubeSegment 
seg) {
+        return 
(ISparkBatchMergeInputSide)SourceManager.createEngineAdapter(seg, 
ISparkInput.class).getBatchMergeInputSide(seg);
     }
 
     public static IMROutput2.IMRBatchOptimizeOutputSide2 
getBatchOptimizeOutputSide2(CubeSegment seg) {
@@ -140,39 +143,47 @@ public static void 
modifySparkHadoopConfiguration(SparkContext sc) throws Except
         
sc.hadoopConfiguration().set("mapreduce.output.fileoutputformat.compress.codec",
 "org.apache.hadoop.io.compress.DefaultCodec"); // or 
org.apache.hadoop.io.compress.SnappyCodec
     }
 
-    public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, 
JavaSparkContext sc, String inputPath, String hiveTable) {
+    public static JavaRDD<String[]> hiveRecordInputRDD(boolean isSequenceFile, 
JavaSparkContext sc, String inputPath, String hiveTable) throws IOException {
         JavaRDD<String[]> recordRDD;
 
-        if (isSequenceFile) {
-            recordRDD = sc.sequenceFile(inputPath, BytesWritable.class, 
Text.class).values()
-                    .map(new Function<Text, String[]>() {
-                        @Override
-                        public String[] call(Text text) throws Exception {
-                            String s = Bytes.toString(text.getBytes(), 0, 
text.getLength());
-                            return 
s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
-                        }
-                    });
+        if (isSequenceFile && 
HadoopUtil.isSequenceDir(sc.hadoopConfiguration(), new Path(inputPath))) {
+            recordRDD = getSequenceFormatHiveInput(sc, inputPath);
         } else {
-            SparkSession sparkSession = 
SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
-            final Dataset intermediateTable = sparkSession.table(hiveTable);
-            recordRDD = intermediateTable.javaRDD().map(new Function<Row, 
String[]>() {
-                @Override
-                public String[] call(Row row) throws Exception {
-                    String[] result = new String[row.size()];
-                    for (int i = 0; i < row.size(); i++) {
-                        final Object o = row.get(i);
-                        if (o != null) {
-                            result[i] = o.toString();
-                        } else {
-                            result[i] = null;
-                        }
-                    }
-                    return result;
-                }
-            });
+            recordRDD = getOtherFormatHiveInput(sc, hiveTable);
         }
 
         return recordRDD;
     }
 
+    private static JavaRDD<String[]> 
getSequenceFormatHiveInput(JavaSparkContext sc, String inputPath) {
+        return sc.sequenceFile(inputPath, BytesWritable.class, 
Text.class).values()
+                .map(new Function<Text, String[]>() {
+                    @Override
+                    public String[] call(Text text) throws Exception {
+                        String s = Bytes.toString(text.getBytes(), 0, 
text.getLength());
+                        return 
s.split(BatchConstants.SEQUENCE_FILE_DEFAULT_DELIMITER);
+                    }
+                });
+    }
+
+    private static JavaRDD<String[]> getOtherFormatHiveInput(JavaSparkContext 
sc, String hiveTable) {
+        SparkSession sparkSession = 
SparkSession.builder().config(sc.getConf()).enableHiveSupport().getOrCreate();
+        final Dataset intermediateTable = sparkSession.table(hiveTable);
+        return intermediateTable.javaRDD().map(new Function<Row, String[]>() {
+            @Override
+            public String[] call(Row row) throws Exception {
+                String[] result = new String[row.size()];
+                for (int i = 0; i < row.size(); i++) {
+                    final Object o = row.get(i);
+                    if (o != null) {
+                        result[i] = o.toString();
+                    } else {
+                        result[i] = null;
+                    }
+                }
+                return result;
+            }
+        });
+    }
+
 }
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index c55015b542..2f25e50016 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -19,16 +19,22 @@
 package org.apache.kylin.source.hive;
 
 import java.io.IOException;
+import java.util.Collections;
 import java.util.List;
 import java.util.Locale;
 import java.util.Set;
 
+import com.google.common.collect.Lists;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.HiveCmdBuilder;
+import org.apache.kylin.common.util.StringUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
@@ -48,9 +54,82 @@
 
 public class HiveInputBase {
 
-    @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(HiveInputBase.class);
 
+    public static class BaseBatchCubingInputSide implements 
IInput.IBatchCubingInputSide {
+
+        final protected IJoinedFlatTableDesc flatDesc;
+        final protected String flatTableDatabase;
+        final protected String hdfsWorkingDir;
+
+        List<String> hiveViewIntermediateTables = Lists.newArrayList();
+
+        public BaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            this.flatDesc = flatDesc;
+            this.flatTableDatabase = 
config.getHiveDatabaseForIntermediateTable();
+            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow) {
+            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
+            final KylinConfig cubeConfig = cubeInstance.getConfig();
+
+            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+
+            // create flat table first
+            addStepPhase1_DoCreateFlatTable(jobFlow);
+
+            // then count and redistribute
+            if (cubeConfig.isHiveRedistributeEnabled()) {
+                
jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, 
cubeName, flatDesc,
+                        cubeInstance.getDescriptor()));
+            }
+
+            // special for hive
+            addStepPhase1_DoMaterializeLookupTable(jobFlow);
+        }
+
+        protected void 
addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
+
+            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, 
jobWorkingDir, cubeName, flatDesc));
+        }
+
+        protected void 
addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
+
+            AbstractExecutable task = 
createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
+                    flatDesc, hiveViewIntermediateTables, jobFlow.getId());
+            if (task != null) {
+                jobFlow.addTask(task);
+            }
+        }
+
+        @Override
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
+
+            org.apache.kylin.source.hive.GarbageCollectionStep step = new 
org.apache.kylin.source.hive.GarbageCollectionStep();
+            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
+            
step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
+            
step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc,
 jobWorkingDir)));
+            
step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables,
 ","));
+            jobFlow.addTask(step);
+        }
+
+        protected String getIntermediateTableIdentity() {
+            return flatTableDatabase + "." + flatDesc.getTableName();
+        }
+    }
+
+    // ===== static methods ======
+
     protected static String getTableNameForHCat(TableDesc table, String uuid) {
         String tableName = (table.isView()) ? table.getMaterializedName(uuid) 
: table.getName();
         String database = (table.isView()) ? 
KylinConfig.getInstanceFromEnv().getHiveDatabaseForIntermediateTable()
@@ -58,15 +137,6 @@ protected static String getTableNameForHCat(TableDesc 
table, String uuid) {
         return String.format(Locale.ROOT, "%s.%s", database, 
tableName).toUpperCase(Locale.ROOT);
     }
 
-    protected void addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable 
jobFlow, String hdfsWorkingDir,
-            IJoinedFlatTableDesc flatTableDesc, String flatTableDatabase) {
-        final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
-        final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-        final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir);
-
-        jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, 
jobWorkingDir, cubeName, flatTableDesc));
-    }
-
     protected static AbstractExecutable createFlatHiveTableStep(String 
hiveInitStatements, String jobWorkingDir,
             String cubeName, IJoinedFlatTableDesc flatDesc) {
         //from hive to hive
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index d6b85eddcd..df20b2cbff 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -28,42 +28,23 @@
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
 import org.apache.hive.hcatalog.mapreduce.HCatSplit;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.engine.mr.IMRInput;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.job.execution.ExecutableManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 
 public class HiveMRInput extends HiveInputBase implements IMRInput {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(HiveMRInput.class);
-
     @Override
-    public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc 
flatDesc) {
+        return new HiveMRBatchCubingInputSide(flatDesc);
     }
 
     @Override
-    public IMRTableInputFormat getTableInputFormat(TableDesc table, String 
uuid) {
-        return new HiveTableInputFormat(getTableNameForHCat(table, uuid));
-    }
-
-    @Override
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
         return new IMRBatchMergeInputSide() {
             @Override
             public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow) {
@@ -72,6 +53,11 @@ public void 
addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
         };
     }
 
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String 
uuid) {
+        return new HiveTableInputFormat(getTableNameForHCat(table, uuid));
+    }
+
     public static class HiveTableInputFormat implements IMRTableInputFormat {
         final String dbName;
         final String tableName;
@@ -111,80 +97,15 @@ public String getInputSplitSignature(InputSplit 
inputSplit) {
         }
     }
 
-    public static class BatchCubingInputSide implements 
IMRBatchCubingInputSide {
-
-        final protected IJoinedFlatTableDesc flatDesc;
-        final protected String flatTableDatabase;
-        final protected String hdfsWorkingDir;
-
-        List<String> hiveViewIntermediateTables = Lists.newArrayList();
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            this.flatDesc = flatDesc;
-            this.flatTableDatabase = 
config.getHiveDatabaseForIntermediateTable();
-            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow) {
-            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-            final KylinConfig cubeConfig = cubeInstance.getConfig();
-
-            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-
-            // create flat table first
-            addStepPhase1_DoCreateFlatTable(jobFlow);
-
-            // then count and redistribute
-            if (cubeConfig.isHiveRedistributeEnabled()) {
-                
jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, 
cubeName, flatDesc,
-                        cubeInstance.getDescriptor()));
-            }
+    public static class HiveMRBatchCubingInputSide extends 
BaseBatchCubingInputSide implements IMRBatchCubingInputSide {
 
-            // special for hive
-            addStepPhase1_DoMaterializeLookupTable(jobFlow);
-        }
-
-        protected void 
addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
-
-            jobFlow.addTask(createFlatHiveTableStep(hiveInitStatements, 
jobWorkingDir, cubeName, flatDesc));
-        }
-
-        protected void 
addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
-
-            AbstractExecutable task = 
createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
-                    flatDesc, hiveViewIntermediateTables, jobFlow.getId());
-            if (task != null) {
-                jobFlow.addTask(task);
-            }
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
-
-            org.apache.kylin.source.hive.GarbageCollectionStep step = new 
org.apache.kylin.source.hive.GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            
step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
-            
step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc,
 jobWorkingDir)));
-            
step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables,
 ","));
-            jobFlow.addTask(step);
+        public HiveMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
         }
 
         @Override
         public IMRTableInputFormat getFlatTableInputFormat() {
-            return new HiveTableInputFormat(getIntermediateTableIdentity());
-        }
-
-        private String getIntermediateTableIdentity() {
-            return flatTableDatabase + "." + flatDesc.getTableName();
+            return new 
HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
         }
     }
 
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
index d710db7fdc..0660a66f6b 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveSparkInput.java
@@ -18,39 +18,26 @@
 
 package org.apache.kylin.source.hive;
 
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.StringUtil;
-import org.apache.kylin.cube.CubeInstance;
-import org.apache.kylin.cube.CubeManager;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.ISparkInput;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Lists;
-
 public class HiveSparkInput extends HiveInputBase implements ISparkInput {
 
     @SuppressWarnings("unused")
     private static final Logger logger = 
LoggerFactory.getLogger(HiveSparkInput.class);
 
     @Override
-    public ISparkInput.ISparkBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc 
flatDesc) {
+        return new SparkBatchCubingInputSide(flatDesc);
     }
 
     @Override
-    public ISparkInput.ISparkBatchMergeInputSide 
getBatchMergeInputSide(ISegment seg) {
-        return new ISparkInput.ISparkBatchMergeInputSide() {
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkBatchMergeInputSide() {
             @Override
             public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow) {
                 // doing nothing
@@ -58,67 +45,10 @@ public void 
addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow) {
         };
     }
 
-    public class BatchCubingInputSide implements ISparkBatchCubingInputSide {
-
-        final protected IJoinedFlatTableDesc flatDesc;
-        final protected String flatTableDatabase;
-        final protected String hdfsWorkingDir;
-
-        List<String> hiveViewIntermediateTables = Lists.newArrayList();
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-            KylinConfig config = KylinConfig.getInstanceFromEnv();
-            this.flatDesc = flatDesc;
-            this.flatTableDatabase = 
config.getHiveDatabaseForIntermediateTable();
-            this.hdfsWorkingDir = config.getHdfsWorkingDirectory();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow) {
-            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            CubeInstance cubeInstance = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv()).getCube(cubeName);
-            final KylinConfig cubeConfig = cubeInstance.getConfig();
-            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-
-            // create flat table first
-            addStepPhase1_DoCreateFlatTable(jobFlow, hdfsWorkingDir, flatDesc, 
flatTableDatabase);
-
-            // then count and redistribute
-            if (cubeConfig.isHiveRedistributeEnabled()) {
-                
jobFlow.addTask(createRedistributeFlatHiveTableStep(hiveInitStatements, 
cubeName, flatDesc,
-                        cubeInstance.getDescriptor()));
-            }
-
-            // special for hive
-            addStepPhase1_DoMaterializeLookupTable(jobFlow);
-        }
-
-        protected void 
addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
-
-            AbstractExecutable task = 
createLookupHiveViewMaterializationStep(hiveInitStatements, jobWorkingDir,
-                    flatDesc, hiveViewIntermediateTables, jobFlow.getId());
-            if (task != null) {
-                jobFlow.addTask(task);
-            }
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
+    public static class SparkBatchCubingInputSide extends 
BaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
 
-            GarbageCollectionStep step = new GarbageCollectionStep();
-            step.setName(ExecutableConstants.STEP_NAME_HIVE_CLEANUP);
-            
step.setIntermediateTables(Collections.singletonList(getIntermediateTableIdentity()));
-            
step.setExternalDataPaths(Collections.singletonList(JoinedFlatTable.getTableDir(flatDesc,
 jobWorkingDir)));
-            
step.setHiveViewIntermediateTableIdentities(StringUtil.join(hiveViewIntermediateTables,
 ","));
-            jobFlow.addTask(step);
-        }
-
-        private String getIntermediateTableIdentity() {
-            return flatTableDatabase + "." + flatDesc.getTableName();
+        public SparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
         }
     }
-
 }
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
new file mode 100644
index 0000000000..3769473e60
--- /dev/null
+++ 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveInputBase.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kylin.source.jdbc;
+
+import com.google.common.collect.Maps;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.SourceConfigurationUtil;
+import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
+import org.apache.kylin.metadata.TableMetadataManager;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.TableExtDesc;
+import org.apache.kylin.metadata.model.TableRef;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.source.hive.HiveInputBase;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+import java.util.Locale;
+import java.util.Map;
+
+public class JdbcHiveInputBase extends HiveInputBase {
+    private static final Logger logger = 
LoggerFactory.getLogger(JdbcHiveInputBase.class);
+    private static final String MR_OVERRIDE_QUEUE_KEY = 
"mapreduce.job.queuename";
+    private static final String DEFAULT_QUEUE = "default";
+
+    public static class JdbcBaseBatchCubingInputSide extends 
BaseBatchCubingInputSide {
+
+        public JdbcBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
+        }
+
+        protected KylinConfig getConfig() {
+            return flatDesc.getDataModel().getConfig();
+        }
+
+        @Override
+        protected void 
addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
+            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
+            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
+
+            jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, 
cubeName));
+            jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, 
jobWorkingDir));
+        }
+
+        @Override
+        protected void 
addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
+            // skip
+        }
+
+        private AbstractExecutable createFlatHiveTableFromFiles(String 
hiveInitStatements, String jobWorkingDir) {
+            final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatDesc);
+            String filedDelimiter = getConfig().getJdbcSourceFieldDelimiter();
+            // Sqoop does not support exporting SEQUENSEFILE to Hive now 
SQOOP-869
+            final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir,
+                    "TEXTFILE", filedDelimiter);
+
+            HiveCmdStep step = new HiveCmdStep();
+            step.setCmd(hiveInitStatements + dropTableHql + createTableHql);
+            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
+            return step;
+        }
+
+        /**
+         * Choose a better split-by column for sqoop. The strategy is:
+         * 1. Prefer ClusteredBy column
+         * 2. Prefer DistributedBy column
+         * 3. Prefer Partition date column
+         * 4. Prefer Higher cardinality column
+         * 5. Prefer numeric column
+         * 6. Pick a column at first glance
+         * @return A column reference <code>TblColRef</code>for sqoop split-by
+         */
+        protected TblColRef determineSplitColumn() {
+            if (null != flatDesc.getClusterBy()) {
+                return flatDesc.getClusterBy();
+            }
+            if (null != flatDesc.getDistributedBy()) {
+                return flatDesc.getDistributedBy();
+            }
+            PartitionDesc partitionDesc = 
flatDesc.getDataModel().getPartitionDesc();
+            if (partitionDesc.isPartitioned()) {
+                return partitionDesc.getPartitionDateColumnRef();
+            }
+            TblColRef splitColumn = null;
+            TableMetadataManager tblManager = 
TableMetadataManager.getInstance(getConfig());
+            long maxCardinality = 0;
+            for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
+                TableExtDesc tableExtDesc = 
tblManager.getTableExt(tableRef.getTableDesc());
+                List<TableExtDesc.ColumnStats> columnStatses = 
tableExtDesc.getColumnStats();
+                if (!columnStatses.isEmpty()) {
+                    for (TblColRef colRef : tableRef.getColumns()) {
+                        long cardinality = 
columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
+                                .getCardinality();
+                        splitColumn = cardinality > maxCardinality ? colRef : 
splitColumn;
+                    }
+                }
+            }
+            if (null == splitColumn) {
+                for (TblColRef colRef : flatDesc.getAllColumns()) {
+                    if (colRef.getType().isIntegerFamily()) {
+                        return colRef;
+                    }
+                }
+                splitColumn = flatDesc.getAllColumns().get(0);
+            }
+
+            return splitColumn;
+        }
+
+        private String getSqoopJobQueueName(KylinConfig config) {
+            Map<String, String> mrConfigOverride = 
config.getMRConfigOverride();
+            if (mrConfigOverride.containsKey(MR_OVERRIDE_QUEUE_KEY)) {
+                return mrConfigOverride.get(MR_OVERRIDE_QUEUE_KEY);
+            }
+            return DEFAULT_QUEUE;
+        }
+
+        protected AbstractExecutable createSqoopToFlatHiveStep(String 
jobWorkingDir, String cubeName) {
+            KylinConfig config = getConfig();
+            PartitionDesc partitionDesc = 
flatDesc.getDataModel().getPartitionDesc();
+            String partCol = null;
+
+            if (partitionDesc.isPartitioned()) {
+                partCol = 
partitionDesc.getPartitionDateColumn();//tablename.colname
+            }
+
+            String splitTable;
+            String splitTableAlias;
+            String splitColumn;
+            String splitDatabase;
+            TblColRef splitColRef = determineSplitColumn();
+            splitTable = splitColRef.getTableRef().getTableName();
+            splitTableAlias = splitColRef.getTableAlias();
+            splitColumn = 
JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
+            splitDatabase = 
splitColRef.getColumnDesc().getTable().getDatabase();
+
+            //using sqoop to extract data from jdbc source and dump them to 
hive
+            String selectSql = 
JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { 
partCol });
+            selectSql = escapeQuotationInSql(selectSql);
+
+
+
+            String hiveTable = flatDesc.getTableName();
+            String connectionUrl = config.getJdbcSourceConnectionUrl();
+            String driverClass = config.getJdbcSourceDriver();
+            String jdbcUser = config.getJdbcSourceUser();
+            String jdbcPass = config.getJdbcSourcePass();
+            String sqoopHome = config.getSqoopHome();
+            String filedDelimiter = config.getJdbcSourceFieldDelimiter();
+            int mapperNum = config.getSqoopMapperNum();
+
+            String bquery = String.format(Locale.ROOT, "SELECT min(%s), 
max(%s) FROM %s.%s as %s", splitColumn,
+                    splitColumn, splitDatabase, splitTable, splitTableAlias);
+            if (partitionDesc.isPartitioned()) {
+                SegmentRange segRange = flatDesc.getSegRange();
+                if (segRange != null && !segRange.isInfinite()) {
+                    if 
(partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
+                            && (partitionDesc.getPartitionTimeColumnRef() == 
null || partitionDesc
+                            
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+                        String quotedPartCond = 
FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
+                                
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
+                                        flatDesc.getSegment(), segRange),
+                                "`");
+                        bquery += " WHERE " + quotedPartCond;
+                    }
+                }
+            }
+            bquery = escapeQuotationInSql(bquery);
+
+            // escape ` in cmd
+            splitColumn = escapeQuotationInSql(splitColumn);
+
+            String cmd = String.format(Locale.ROOT,
+                    "%s/bin/sqoop import" + generateSqoopConfigArgString()
+                            + "--connect \"%s\" --driver %s --username %s 
--password %s --query \"%s AND \\$CONDITIONS\" "
+                            + "--target-dir %s/%s --split-by %s 
--boundary-query \"%s\" --null-string '' "
+                            + "--fields-terminated-by '%s' --num-mappers %d",
+                    sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, 
selectSql, jobWorkingDir, hiveTable,
+                    splitColumn, bquery, filedDelimiter, mapperNum);
+            logger.debug(String.format(Locale.ROOT, "sqoop cmd:%s", cmd));
+            CmdStep step = new CmdStep();
+            step.setCmd(cmd);
+            
step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
+            return step;
+        }
+
+        protected String generateSqoopConfigArgString() {
+            KylinConfig kylinConfig = getConfig();
+            Map<String, String> config = Maps.newHashMap();
+            config.put("mapreduce.job.queuename", 
getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
+            config.putAll(SourceConfigurationUtil.loadSqoopConfiguration());
+            config.putAll(kylinConfig.getSqoopConfigOverride());
+
+            StringBuilder args = new StringBuilder(" 
-Dorg.apache.sqoop.splitter.allow_text_splitter=true ");
+            for (Map.Entry<String, String> entry : config.entrySet()) {
+                args.append(" -D" + entry.getKey() + "=" + entry.getValue() + 
" ");
+            }
+            return args.toString();
+        }
+    }
+
+    protected static String escapeQuotationInSql(String sqlExpr) {
+        sqlExpr = sqlExpr.replaceAll("\"", "\\\\\"");
+        sqlExpr = sqlExpr.replaceAll("`", "\\\\`");
+        return sqlExpr;
+    }
+}
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
index 3460dd2bcf..19f354c922 100644
--- 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
+++ 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveMRInput.java
@@ -17,222 +17,45 @@
 */
 package org.apache.kylin.source.jdbc;
 
-import java.util.List;
-import java.util.Locale;
-import java.util.Map;
-
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.util.SourceConfigurationUtil;
-import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
-import org.apache.kylin.metadata.TableMetadataManager;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TableExtDesc;
-import org.apache.kylin.metadata.model.TableExtDesc.ColumnStats;
-import org.apache.kylin.metadata.model.TableRef;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.hive.HiveMRInput;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Maps;
-
-public class JdbcHiveMRInput extends HiveMRInput {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(JdbcHiveMRInput.class);
-    private static final String MR_OVERRIDE_QUEUE_KEY = 
"mapreduce.job.queuename";
-    private static final String DEFAULT_QUEUE = "default";
+public class JdbcHiveMRInput extends JdbcHiveInputBase implements IMRInput {
 
-    public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc);
+    @Override
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc 
flatDesc) {
+        return new JdbcMRBatchCubingInputSide(flatDesc);
     }
 
-    public static class BatchCubingInputSide extends 
HiveMRInput.BatchCubingInputSide {
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-            super(flatDesc);
-        }
-
-        private KylinConfig getConfig() {
-            return flatDesc.getDataModel().getConfig();
-        }
-
-        @Override
-        protected void 
addStepPhase1_DoCreateFlatTable(DefaultChainedExecutable jobFlow) {
-            final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
-            final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
-            final String jobWorkingDir = getJobWorkingDir(jobFlow, 
hdfsWorkingDir);
-
-            jobFlow.addTask(createSqoopToFlatHiveStep(jobWorkingDir, 
cubeName));
-            jobFlow.addTask(createFlatHiveTableFromFiles(hiveInitStatements, 
jobWorkingDir));
-        }
-
-        private AbstractExecutable createFlatHiveTableFromFiles(String 
hiveInitStatements, String jobWorkingDir) {
-            final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatDesc);
-            String filedDelimiter = getConfig().getJdbcSourceFieldDelimiter();
-            // Sqoop does not support exporting SEQUENSEFILE to Hive now 
SQOOP-869
-            final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir,
-                    "TEXTFILE", filedDelimiter);
-
-            HiveCmdStep step = new HiveCmdStep();
-            step.setCmd(hiveInitStatements + dropTableHql + createTableHql);
-            step.setName(ExecutableConstants.STEP_NAME_CREATE_FLAT_HIVE_TABLE);
-            return step;
-        }
-
-        /**
-         * Choose a better split-by column for sqoop. The strategy is:
-         * 1. Prefer ClusteredBy column
-         * 2. Prefer DistributedBy column
-         * 3. Prefer Partition date column
-         * 4. Prefer Higher cardinality column
-         * 5. Prefer numeric column
-         * 6. Pick a column at first glance
-         * @return A column reference <code>TblColRef</code>for sqoop split-by
-         */
-        protected TblColRef determineSplitColumn() {
-            if (null != flatDesc.getClusterBy()) {
-                return flatDesc.getClusterBy();
-            }
-            if (null != flatDesc.getDistributedBy()) {
-                return flatDesc.getDistributedBy();
-            }
-            PartitionDesc partitionDesc = 
flatDesc.getDataModel().getPartitionDesc();
-            if (partitionDesc.isPartitioned()) {
-                return partitionDesc.getPartitionDateColumnRef();
-            }
-            TblColRef splitColumn = null;
-            TableMetadataManager tblManager = 
TableMetadataManager.getInstance(getConfig());
-            long maxCardinality = 0;
-            for (TableRef tableRef : flatDesc.getDataModel().getAllTables()) {
-                TableExtDesc tableExtDesc = 
tblManager.getTableExt(tableRef.getTableDesc());
-                List<ColumnStats> columnStatses = 
tableExtDesc.getColumnStats();
-                if (!columnStatses.isEmpty()) {
-                    for (TblColRef colRef : tableRef.getColumns()) {
-                        long cardinality = 
columnStatses.get(colRef.getColumnDesc().getZeroBasedIndex())
-                                .getCardinality();
-                        splitColumn = cardinality > maxCardinality ? colRef : 
splitColumn;
-                    }
-                }
-            }
-            if (null == splitColumn) {
-                for (TblColRef colRef : flatDesc.getAllColumns()) {
-                    if (colRef.getType().isIntegerFamily()) {
-                        return colRef;
-                    }
-                }
-                splitColumn = flatDesc.getAllColumns().get(0);
-            }
-
-            return splitColumn;
-        }
-
-        private String getSqoopJobQueueName(KylinConfig config) {
-            Map<String, String> mrConfigOverride = 
config.getMRConfigOverride();
-            if (mrConfigOverride.containsKey(MR_OVERRIDE_QUEUE_KEY)) {
-                return mrConfigOverride.get(MR_OVERRIDE_QUEUE_KEY);
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new IMRBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow) {
+                // doing nothing
             }
-            return DEFAULT_QUEUE;
-        }
-
-        protected AbstractExecutable createSqoopToFlatHiveStep(String 
jobWorkingDir, String cubeName) {
-            KylinConfig config = getConfig();
-            PartitionDesc partitionDesc = 
flatDesc.getDataModel().getPartitionDesc();
-            String partCol = null;
-
-            if (partitionDesc.isPartitioned()) {
-                partCol = 
partitionDesc.getPartitionDateColumn();//tablename.colname
-            }
-
-            String splitTable;
-            String splitTableAlias;
-            String splitColumn;
-            String splitDatabase;
-            TblColRef splitColRef = determineSplitColumn();
-            splitTable = splitColRef.getTableRef().getTableName();
-            splitTableAlias = splitColRef.getTableAlias();
-            splitColumn = 
JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
-            splitDatabase = 
splitColRef.getColumnDesc().getTable().getDatabase();
-
-            //using sqoop to extract data from jdbc source and dump them to 
hive
-            String selectSql = 
JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { 
partCol });
-            selectSql = escapeQuotationInSql(selectSql);
-
-
-
-            String hiveTable = flatDesc.getTableName();
-            String connectionUrl = config.getJdbcSourceConnectionUrl();
-            String driverClass = config.getJdbcSourceDriver();
-            String jdbcUser = config.getJdbcSourceUser();
-            String jdbcPass = config.getJdbcSourcePass();
-            String sqoopHome = config.getSqoopHome();
-            String filedDelimiter = config.getJdbcSourceFieldDelimiter();
-            int mapperNum = config.getSqoopMapperNum();
+        };
+    }
 
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), 
max(%s) FROM \"%s\".%s as %s", splitColumn,
-                    splitColumn, splitDatabase, splitTable, splitTableAlias);
-            if (partitionDesc.isPartitioned()) {
-                SegmentRange segRange = flatDesc.getSegRange();
-                if (segRange != null && !segRange.isInfinite()) {
-                    if 
(partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
-                            && (partitionDesc.getPartitionTimeColumnRef() == 
null || partitionDesc
-                                    
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        String quotedPartCond = 
FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
-                                
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                                        flatDesc.getSegment(), segRange),
-                                "`");
-                        bquery += " WHERE " + quotedPartCond;
-                    }
-                }
-            }
-            bquery = escapeQuotationInSql(bquery);
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String 
uuid) {
+        return new HiveMRInput.HiveTableInputFormat(getTableNameForHCat(table, 
uuid));
+    }
 
-            // escape ` in cmd
-            splitColumn = escapeQuotationInSql(splitColumn);
+    public static class JdbcMRBatchCubingInputSide extends 
JdbcBaseBatchCubingInputSide implements IMRBatchCubingInputSide {
 
-            String cmd = String.format(Locale.ROOT,
-                    "%s/bin/sqoop import" + generateSqoopConfigArgString()
-                            + "--connect \"%s\" --driver %s --username %s 
--password %s --query \"%s AND \\$CONDITIONS\" "
-                            + "--target-dir %s/%s --split-by %s 
--boundary-query \"%s\" --null-string '' "
-                            + "--fields-terminated-by '%s' --num-mappers %d",
-                    sqoopHome, connectionUrl, driverClass, jdbcUser, jdbcPass, 
selectSql, jobWorkingDir, hiveTable,
-                    splitColumn, bquery, filedDelimiter, mapperNum);
-            logger.debug(String.format(Locale.ROOT, "sqoop cmd:%s", cmd));
-            CmdStep step = new CmdStep();
-            step.setCmd(cmd);
-            
step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
-            return step;
+        public JdbcMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
         }
 
         @Override
-        protected void 
addStepPhase1_DoMaterializeLookupTable(DefaultChainedExecutable jobFlow) {
-            // skip
-        }
-
-        protected String generateSqoopConfigArgString() {
-            KylinConfig kylinConfig = getConfig();
-            Map<String, String> config = Maps.newHashMap();
-            config.put("mapreduce.job.queuename", 
getSqoopJobQueueName(kylinConfig)); // override job queue from mapreduce config
-            config.putAll(SourceConfigurationUtil.loadSqoopConfiguration());
-            config.putAll(kylinConfig.getSqoopConfigOverride());
-
-            StringBuilder args = new StringBuilder(" 
-Dorg.apache.sqoop.splitter.allow_text_splitter=true ");
-            for (Map.Entry<String, String> entry : config.entrySet()) {
-                args.append(" -D" + entry.getKey() + "=" + entry.getValue() + 
" ");
-            }
-            return args.toString();
+        public IMRTableInputFormat getFlatTableInputFormat() {
+            return new 
HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
         }
     }
 
-    protected static String escapeQuotationInSql(String sqlExpr) {
-        sqlExpr = sqlExpr.replaceAll("\"", "\\\\\"");
-        sqlExpr = sqlExpr.replaceAll("`", "\\\\`");
-        return sqlExpr;
-    }
 }
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
new file mode 100644
index 0000000000..8a8471ab53
--- /dev/null
+++ 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcHiveSparkInput.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc;
+
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+
+public class JdbcHiveSparkInput extends JdbcHiveInputBase implements 
ISparkInput {
+
+    @Override
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc 
flatDesc) {
+        return new JdbcSparkBatchCubingInputSide(flatDesc);
+    }
+
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow) {
+                // doing nothing
+            }
+        };
+    }
+
+    public static class JdbcSparkBatchCubingInputSide extends 
JdbcBaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+        public JdbcSparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+            super(flatDesc);
+        }
+    }
+}
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
index 3bf7498158..1bda6c2a88 100644
--- a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
+++ b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/JdbcSource.java
@@ -21,6 +21,7 @@
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.source.IReadableTable;
@@ -44,6 +45,8 @@ public ISourceMetadataExplorer getSourceMetadataExplorer() {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == IMRInput.class) {
             return (I) new JdbcHiveMRInput();
+        } else if (engineInterface == ISparkInput.class) {
+            return (I) new JdbcHiveSparkInput();
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
new file mode 100644
index 0000000000..10eb31ee21
--- /dev/null
+++ 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveInputBase.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.source.jdbc.extensible;
+
+import org.apache.hadoop.util.StringUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.job.JoinedFlatTable;
+import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.PartitionDesc;
+import org.apache.kylin.metadata.model.SegmentRange;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
+import org.apache.kylin.source.jdbc.sqoop.SqoopCmdStep;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Locale;
+
+public class JdbcHiveInputBase extends 
org.apache.kylin.source.jdbc.JdbcHiveInputBase {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(JdbcHiveInputBase.class);
+
+    public static class JDBCBaseBatchCubingInputSide extends 
JdbcBaseBatchCubingInputSide {
+        private final JdbcConnector dataSource;
+
+        public JDBCBaseBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, 
JdbcConnector dataSource) {
+            super(flatDesc);
+            this.dataSource = dataSource;
+        }
+
+        protected JdbcConnector getDataSource() {
+            return dataSource;
+        }
+
+        @Override
+        protected AbstractExecutable createSqoopToFlatHiveStep(String 
jobWorkingDir, String cubeName) {
+            KylinConfig config = flatDesc.getDataModel().getConfig();
+            PartitionDesc partitionDesc = 
flatDesc.getDataModel().getPartitionDesc();
+            String partCol = null;
+
+            if (partitionDesc.isPartitioned()) {
+                partCol = partitionDesc.getPartitionDateColumn(); 
//tablename.colname
+            }
+
+            String splitTable;
+            String splitTableAlias;
+            String splitColumn;
+            String splitDatabase;
+            TblColRef splitColRef = determineSplitColumn();
+            splitTable = splitColRef.getTableRef().getTableName();
+            splitTable = splitColRef.getTableRef().getTableDesc().getName();
+            splitTableAlias = splitColRef.getTableAlias();
+            //to solve case sensitive if necessary
+            splitColumn = 
JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
+            splitDatabase = 
splitColRef.getColumnDesc().getTable().getDatabase().toLowerCase(Locale.ROOT);
+
+            //using sqoop to extract data from jdbc source and dump them to 
hive
+            String selectSql = 
JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { 
partCol });
+            selectSql = escapeQuotationInSql(dataSource.convertSql(selectSql));
+
+            String hiveTable = flatDesc.getTableName();
+            String sqoopHome = config.getSqoopHome();
+            String filedDelimiter = config.getJdbcSourceFieldDelimiter();
+            int mapperNum = config.getSqoopMapperNum();
+
+            String bquery = String.format(Locale.ROOT, "SELECT min(%s), 
max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
+                    splitDatabase, splitTable, splitTableAlias);
+            bquery = dataSource.convertSql(bquery);
+            if (partitionDesc.isPartitioned()) {
+                SegmentRange segRange = flatDesc.getSegRange();
+                if (segRange != null && !segRange.isInfinite()) {
+                    if 
(partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
+                            && (partitionDesc.getPartitionTimeColumnRef() == 
null || partitionDesc
+                            
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
+                        String quotedPartCond = 
FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
+                                
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
+                                        flatDesc.getSegment(), segRange),
+                                "`");
+                        bquery += " WHERE " + quotedPartCond;
+                    }
+                }
+            }
+            bquery = escapeQuotationInSql(bquery);
+
+            splitColumn = 
escapeQuotationInSql(dataSource.convertColumn(splitColumn, 
FlatTableSqlQuoteUtils.QUOTE));
+
+            String cmd = StringUtils.format(
+                    "--connect \"%s\" --driver %s --username %s --password %s 
--query \"%s AND \\$CONDITIONS\" "
+                            + "--target-dir %s/%s --split-by %s 
--boundary-query \"%s\" --null-string '' "
+                            + "--fields-terminated-by '%s' --num-mappers %d",
+                    dataSource.getJdbcUrl(), dataSource.getJdbcDriver(), 
dataSource.getJdbcUser(),
+                    dataSource.getJdbcPassword(), selectSql, jobWorkingDir, 
hiveTable, splitColumn, bquery,
+                    filedDelimiter, mapperNum);
+            logger.debug("sqoop cmd: {}", cmd);
+
+            SqoopCmdStep step = new SqoopCmdStep();
+            step.setCmd(cmd);
+            
step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
+            return step;
+        }
+    }
+}
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
index 2e57a446c8..7df4ab57c2 100644
--- 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
+++ 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInput.java
@@ -17,25 +17,15 @@
  */
 package org.apache.kylin.source.jdbc.extensible;
 
-import org.apache.hadoop.util.StringUtils;
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.job.JoinedFlatTable;
-import org.apache.kylin.job.constant.ExecutableConstants;
-import org.apache.kylin.job.execution.AbstractExecutable;
-import org.apache.kylin.job.util.FlatTableSqlQuoteUtils;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
-import org.apache.kylin.metadata.model.PartitionDesc;
-import org.apache.kylin.metadata.model.SegmentRange;
-import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
-import org.apache.kylin.source.jdbc.sqoop.SqoopCmdStep;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.kylin.source.hive.HiveMRInput;
 
-import java.util.Locale;
-
-public class JdbcHiveMRInput extends 
org.apache.kylin.source.jdbc.JdbcHiveMRInput {
-    private static final Logger logger = 
LoggerFactory.getLogger(JdbcHiveMRInput.class);
+public class JdbcHiveMRInput extends JdbcHiveInputBase implements IMRInput {
 
     private final JdbcConnector dataSource;
 
@@ -44,86 +34,34 @@
     }
 
     public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
-        return new BatchCubingInputSide(flatDesc, dataSource);
+        return new JdbcMRBatchCubingInputSide(flatDesc, dataSource);
     }
 
-    public static class BatchCubingInputSide extends 
org.apache.kylin.source.jdbc.JdbcHiveMRInput.BatchCubingInputSide {
-        private final JdbcConnector dataSource;
-
-        public BatchCubingInputSide(IJoinedFlatTableDesc flatDesc, 
JdbcConnector dataSource) {
-            super(flatDesc);
-            this.dataSource = dataSource;
-        }
-
-        protected JdbcConnector getDataSource() {
-            return dataSource;
-        }
-
-        @Override
-        protected AbstractExecutable createSqoopToFlatHiveStep(String 
jobWorkingDir, String cubeName) {
-            KylinConfig config = flatDesc.getDataModel().getConfig();
-            PartitionDesc partitionDesc = 
flatDesc.getDataModel().getPartitionDesc();
-            String partCol = null;
-
-            if (partitionDesc.isPartitioned()) {
-                partCol = partitionDesc.getPartitionDateColumn(); 
//tablename.colname
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new IMRBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow) {
+                // doing nothing
             }
+        };
+    }
 
-            String splitTable;
-            String splitTableAlias;
-            String splitColumn;
-            String splitDatabase;
-            TblColRef splitColRef = determineSplitColumn();
-            splitTable = splitColRef.getTableRef().getTableName();
-            splitTable = splitColRef.getTableRef().getTableDesc().getName();
-            splitTableAlias = splitColRef.getTableAlias();
-            //to solve case sensitive if necessary
-            splitColumn = 
JoinedFlatTable.getQuotedColExpressionInSourceDB(flatDesc, splitColRef);
-            splitDatabase = 
splitColRef.getColumnDesc().getTable().getDatabase().toLowerCase(Locale.ROOT);
-
-            //using sqoop to extract data from jdbc source and dump them to 
hive
-            String selectSql = 
JoinedFlatTable.generateSelectDataStatement(flatDesc, true, new String[] { 
partCol });
-            selectSql = escapeQuotationInSql(dataSource.convertSql(selectSql));
-
-            String hiveTable = flatDesc.getTableName();
-            String sqoopHome = config.getSqoopHome();
-            String filedDelimiter = config.getJdbcSourceFieldDelimiter();
-            int mapperNum = config.getSqoopMapperNum();
-
-            String bquery = String.format(Locale.ROOT, "SELECT min(%s), 
max(%s) FROM `%s`.%s as `%s`", splitColumn, splitColumn,
-                    splitDatabase, splitTable, splitTableAlias);
-            bquery = dataSource.convertSql(bquery);
-            if (partitionDesc.isPartitioned()) {
-                SegmentRange segRange = flatDesc.getSegRange();
-                if (segRange != null && !segRange.isInfinite()) {
-                    if 
(partitionDesc.getPartitionDateColumnRef().getTableAlias().equals(splitTableAlias)
-                            && (partitionDesc.getPartitionTimeColumnRef() == 
null || partitionDesc
-                            
.getPartitionTimeColumnRef().getTableAlias().equals(splitTableAlias))) {
-                        String quotedPartCond = 
FlatTableSqlQuoteUtils.quoteIdentifierInSqlExpr(flatDesc,
-                                
partitionDesc.getPartitionConditionBuilder().buildDateRangeCondition(partitionDesc,
-                                        flatDesc.getSegment(), segRange),
-                                "`");
-                        bquery += " WHERE " + quotedPartCond;
-                    }
-                }
-            }
-            bquery = escapeQuotationInSql(bquery);
+    @Override
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String 
uuid) {
+        return new HiveMRInput.HiveTableInputFormat(getTableNameForHCat(table, 
uuid));
+    }
 
-            splitColumn = 
escapeQuotationInSql(dataSource.convertColumn(splitColumn, 
FlatTableSqlQuoteUtils.QUOTE));
+    public static class JdbcMRBatchCubingInputSide extends 
JDBCBaseBatchCubingInputSide implements IMRInput.IMRBatchCubingInputSide {
 
-            String cmd = StringUtils.format(
-                    "--connect \"%s\" --driver %s --username %s --password %s 
--query \"%s AND \\$CONDITIONS\" "
-                            + "--target-dir %s/%s --split-by %s 
--boundary-query \"%s\" --null-string '' "
-                            + "--fields-terminated-by '%s' --num-mappers %d",
-                    dataSource.getJdbcUrl(), dataSource.getJdbcDriver(), 
dataSource.getJdbcUser(),
-                    dataSource.getJdbcPassword(), selectSql, jobWorkingDir, 
hiveTable, splitColumn, bquery,
-                    filedDelimiter, mapperNum);
-            logger.debug("sqoop cmd: {}", cmd);
+        public JdbcMRBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, 
JdbcConnector dataSource) {
+            super(flatDesc, dataSource);
+        }
 
-            SqoopCmdStep step = new SqoopCmdStep();
-            step.setCmd(cmd);
-            
step.setName(ExecutableConstants.STEP_NAME_SQOOP_TO_FLAT_HIVE_TABLE);
-            return step;
+        @Override
+        public IMRInput.IMRTableInputFormat getFlatTableInputFormat() {
+            return new 
HiveMRInput.HiveTableInputFormat(getIntermediateTableIdentity());
         }
     }
+
 }
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java
 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java
new file mode 100644
index 0000000000..a5701ad285
--- /dev/null
+++ 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveSparkInput.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+*/
+package org.apache.kylin.source.jdbc.extensible;
+
+import org.apache.kylin.engine.spark.ISparkInput;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
+import org.apache.kylin.metadata.model.ISegment;
+import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
+
+public class JdbcHiveSparkInput extends JdbcHiveInputBase implements 
ISparkInput {
+
+    private final JdbcConnector dataSource;
+
+    JdbcHiveSparkInput(JdbcConnector dataSource) {
+        this.dataSource = dataSource;
+    }
+
+    @Override
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc 
flatDesc) {
+        return new JdbcSparkBatchCubingInputSide(flatDesc, dataSource);
+    }
+
+    @Override
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new ISparkBatchMergeInputSide() {
+            @Override
+            public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow) {
+                // doing nothing
+            }
+        };
+    }
+
+    public static class JdbcSparkBatchCubingInputSide extends 
JDBCBaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
+
+        public JdbcSparkBatchCubingInputSide(IJoinedFlatTableDesc flatDesc, 
JdbcConnector dataSource) {
+            super(flatDesc, dataSource);
+        }
+    }
+}
diff --git 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
index 3e8f0fd34e..da055e1e24 100644
--- 
a/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
+++ 
b/source-jdbc/src/main/java/org/apache/kylin/source/jdbc/extensible/JdbcSource.java
@@ -22,6 +22,7 @@
 
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.spark.ISparkInput;
 import org.apache.kylin.metadata.model.IBuildable;
 import org.apache.kylin.metadata.model.TableDesc;
 import org.apache.kylin.sdk.datasource.framework.JdbcConnector;
@@ -60,6 +61,8 @@ public ISourceMetadataExplorer getSourceMetadataExplorer() {
     public <I> I adaptToBuildEngine(Class<I> engineInterface) {
         if (engineInterface == IMRInput.class) {
             return (I) new JdbcHiveMRInput(dataSource);
+        } else if (engineInterface == ISparkInput.class) {
+            return (I) new JdbcHiveSparkInput(dataSource);
         } else {
             throw new RuntimeException("Cannot adapt to " + engineInterface);
         }
diff --git 
a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
 
b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
index 956f86c397..20c37efc0a 100644
--- 
a/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
+++ 
b/source-jdbc/src/test/java/org/apache/kylin/source/jdbc/extensible/JdbcHiveMRInputTest.java
@@ -60,7 +60,7 @@ public void testGenSqoopCmd_Partition() throws IOException {
         CubeSegment seg = 
cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
                 new SegmentRange.TSRange(System.currentTimeMillis() - 100L, 
System.currentTimeMillis() + 100L));
         CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
-        JdbcHiveMRInput.BatchCubingInputSide inputSide = 
(JdbcHiveMRInput.BatchCubingInputSide) input
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = 
(JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
                 .getBatchCubingInputSide(flatDesc);
 
         AbstractExecutable executable = new MockInputSide(flatDesc, 
inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -86,7 +86,7 @@ public void testGenSqoopCmd_NoPartition() throws IOException {
         CubeSegment seg = 
cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
                 new SegmentRange.TSRange(0L, Long.MAX_VALUE));
         CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
-        JdbcHiveMRInput.BatchCubingInputSide inputSide = 
(JdbcHiveMRInput.BatchCubingInputSide) input
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = 
(JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
                 .getBatchCubingInputSide(flatDesc);
 
         AbstractExecutable executable = new MockInputSide(flatDesc, 
inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -111,7 +111,7 @@ public void testGenSqoopCmd_WithLookupShardBy() throws 
IOException {
         CubeSegment seg = 
cubeManager.appendSegment(cubeManager.getCube(cubeDesc.getName()),
                 new SegmentRange.TSRange(System.currentTimeMillis() - 100L, 
System.currentTimeMillis() + 100L));
         CubeJoinedFlatTableDesc flatDesc = new CubeJoinedFlatTableDesc(seg);
-        JdbcHiveMRInput.BatchCubingInputSide inputSide = 
(JdbcHiveMRInput.BatchCubingInputSide) input
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide inputSide = 
(JdbcHiveMRInput.JdbcMRBatchCubingInputSide) input
                 .getBatchCubingInputSide(flatDesc);
 
         AbstractExecutable executable = new MockInputSide(flatDesc, 
inputSide).createSqoopToFlatHiveStep("/tmp",
@@ -127,10 +127,10 @@ public void testGenSqoopCmd_WithLookupShardBy() throws 
IOException {
         source.close();
     }
 
-    private static class MockInputSide extends 
JdbcHiveMRInput.BatchCubingInputSide {
-        JdbcHiveMRInput.BatchCubingInputSide input;
+    private static class MockInputSide extends 
JdbcHiveMRInput.JdbcMRBatchCubingInputSide {
+        JdbcHiveMRInput.JdbcMRBatchCubingInputSide input;
 
-        public MockInputSide(IJoinedFlatTableDesc flatDesc, 
JdbcHiveMRInput.BatchCubingInputSide input) {
+        public MockInputSide(IJoinedFlatTableDesc flatDesc, 
JdbcHiveMRInput.JdbcMRBatchCubingInputSide input) {
             super(flatDesc, input.getDataSource());
             this.input = input;
         }
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
index cb2e14ce5c..7620ab3152 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaInputBase.java
@@ -20,18 +20,23 @@
 
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Locale;
 import java.util.Set;
 
-import com.google.common.collect.Sets;
+import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.constant.ExecutableConstants;
+import org.apache.kylin.job.engine.JobEngineConfig;
 import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.DefaultChainedExecutable;
+import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.DataModelDesc;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
@@ -45,8 +50,78 @@
 import org.apache.kylin.source.kafka.hadoop.KafkaFlatTableJob;
 import org.apache.kylin.source.kafka.job.MergeOffsetStep;
 
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
 public class KafkaInputBase {
 
+    public static class BaseBatchCubingInputSide implements 
IInput.IBatchCubingInputSide {
+
+        final JobEngineConfig conf;
+        final CubeSegment seg;
+        private CubeDesc cubeDesc;
+        private KylinConfig config;
+        protected IJoinedFlatTableDesc flatDesc;
+        protected String hiveTableDatabase;
+        final private List<String> intermediateTables = Lists.newArrayList();
+        final private List<String> intermediatePaths = Lists.newArrayList();
+        private String cubeName;
+
+        public BaseBatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc 
flatDesc) {
+            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
+            this.config = seg.getConfig();
+            this.flatDesc = flatDesc;
+            this.hiveTableDatabase = 
config.getHiveDatabaseForIntermediateTable();
+            this.seg = seg;
+            this.cubeDesc = seg.getCubeDesc();
+            this.cubeName = seg.getCubeInstance().getName();
+        }
+
+        @Override
+        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow) {
+
+            boolean onlyOneTable = 
cubeDesc.getModel().getLookupTables().size() == 0;
+            final String baseLocation = getJobWorkingDir(jobFlow);
+            if (onlyOneTable) {
+                // directly use flat table location
+                final String intermediateFactTable = flatDesc.getTableName();
+                final String tableLocation = baseLocation + "/" + 
intermediateFactTable;
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), 
tableLocation, seg));
+                intermediatePaths.add(tableLocation);
+            } else {
+                final String mockFactTableName = 
MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
+                        + cubeName.toLowerCase(Locale.ROOT) + "_" + 
seg.getUuid().replaceAll("-", "_") + "_fact";
+                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), 
baseLocation + "/" + mockFactTableName, seg));
+                jobFlow.addTask(createFlatTable(hiveTableDatabase, 
mockFactTableName, baseLocation, cubeName, cubeDesc,
+                        flatDesc, intermediateTables, intermediatePaths));
+            }
+        }
+
+        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
+            return 
JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), 
jobFlow.getId());
+        }
+
+        @Override
+        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
+            jobFlow.addTask(createGCStep(intermediateTables, 
intermediatePaths));
+
+        }
+    }
+
+    public static  class BaseBatchMergeInputSide implements 
IInput.IBatchMergeInputSide {
+
+        private CubeSegment cubeSegment;
+
+        BaseBatchMergeInputSide(CubeSegment cubeSegment) {
+            this.cubeSegment = cubeSegment;
+        }
+
+        @Override
+        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow) {
+            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), 
cubeSegment));
+        }
+    }
+
     protected static AbstractExecutable createMergeOffsetStep(String jobId, 
CubeSegment cubeSegment) {
 
         final MergeOffsetStep result = new MergeOffsetStep();
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index 1c94f9c7c7..d7095725d6 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -20,8 +20,6 @@
 import java.io.IOException;
 import java.util.Collection;
 import java.util.Collections;
-import java.util.List;
-import java.util.Locale;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
@@ -30,46 +28,36 @@
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.cube.model.CubeJoinedFlatTableDesc;
 import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
 import org.apache.kylin.metadata.model.TableDesc;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 
 public class KafkaMRInput extends KafkaInputBase implements IMRInput {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(KafkaMRInput.class);
     private CubeSegment cubeSegment;
 
     @Override
-    public IMRBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc 
flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment, flatDesc);
+        return new KafkaMRBatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
-    public IMRTableInputFormat getTableInputFormat(TableDesc table, String 
uuid) {
-
-        return new KafkaTableInputFormat(cubeSegment, null);
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new KafkaMRBatchMergeInputSide((CubeSegment)seg);
     }
 
     @Override
-    public IMRBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new KafkaMRBatchMergeInputSide((CubeSegment) seg);
+    public IMRTableInputFormat getTableInputFormat(TableDesc table, String 
uuid) {
+        return new KafkaTableInputFormat(cubeSegment, null);
     }
 
     public static class KafkaTableInputFormat implements IMRTableInputFormat {
@@ -110,56 +98,10 @@ public String getInputSplitSignature(InputSplit 
inputSplit) {
         }
     }
 
-    public static class BatchCubingInputSide implements 
IMRBatchCubingInputSide {
-
-        final JobEngineConfig conf;
-        final CubeSegment seg;
-        private CubeDesc cubeDesc;
-        private KylinConfig config;
-        protected IJoinedFlatTableDesc flatDesc;
-        protected String hiveTableDatabase;
-        private List<String> intermediateTables = Lists.newArrayList();
-        private List<String> intermediatePaths = Lists.newArrayList();
-        private String cubeName;
-
-        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc 
flatDesc) {
-            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            this.config = seg.getConfig();
-            this.flatDesc = flatDesc;
-            this.hiveTableDatabase = 
config.getHiveDatabaseForIntermediateTable();
-            this.seg = seg;
-            this.cubeDesc = seg.getCubeDesc();
-            this.cubeName = seg.getCubeInstance().getName();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow) {
-
-            boolean onlyOneTable = 
cubeDesc.getModel().getLookupTables().size() == 0;
-            final String baseLocation = getJobWorkingDir(jobFlow);
-            if (onlyOneTable) {
-                // directly use flat table location
-                final String intermediateFactTable = flatDesc.getTableName();
-                final String tableLocation = baseLocation + "/" + 
intermediateFactTable;
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), 
tableLocation, seg));
-                intermediatePaths.add(tableLocation);
-            } else {
-                final String mockFactTableName = 
MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
-                        + cubeName.toLowerCase(Locale.ROOT) + "_" + 
seg.getUuid().replaceAll("-", "_") + "_fact";
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), 
baseLocation + "/" + mockFactTableName, seg));
-                jobFlow.addTask(createFlatTable(hiveTableDatabase, 
mockFactTableName, baseLocation, cubeName, cubeDesc,
-                        flatDesc, intermediateTables, intermediatePaths));
-            }
-        }
-
-        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
-            return 
JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), 
jobFlow.getId());
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createGCStep(intermediateTables, 
intermediatePaths));
+    public static class KafkaMRBatchCubingInputSide extends 
BaseBatchCubingInputSide implements IMRBatchCubingInputSide {
 
+        public KafkaMRBatchCubingInputSide(CubeSegment seg, 
IJoinedFlatTableDesc flatDesc) {
+            super(seg, flatDesc);
         }
 
         @Override
@@ -168,18 +110,10 @@ public IMRTableInputFormat getFlatTableInputFormat() {
         }
     }
 
-    class KafkaMRBatchMergeInputSide implements IMRBatchMergeInputSide {
-
-        private CubeSegment cubeSegment;
+    public static class KafkaMRBatchMergeInputSide extends 
BaseBatchMergeInputSide implements IMRBatchMergeInputSide {
 
         KafkaMRBatchMergeInputSide(CubeSegment cubeSegment) {
-            this.cubeSegment = cubeSegment;
-        }
-
-        @Override
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow) {
-            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), 
cubeSegment));
+            super(cubeSegment);
         }
     }
-
 }
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
index 7db6c32661..edbc0025f6 100644
--- 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
+++ 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaSparkInput.java
@@ -17,105 +17,37 @@
 */
 package org.apache.kylin.source.kafka;
 
-import java.util.List;
-import java.util.Locale;
-
-import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.cube.model.CubeDesc;
-import org.apache.kylin.engine.mr.JobBuilderSupport;
 import org.apache.kylin.engine.spark.ISparkInput;
-import org.apache.kylin.job.engine.JobEngineConfig;
-import org.apache.kylin.job.execution.DefaultChainedExecutable;
-import org.apache.kylin.metadata.MetadataConstants;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.ISegment;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Lists;
 
 public class KafkaSparkInput extends KafkaInputBase implements ISparkInput {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(KafkaSparkInput.class);
     private CubeSegment cubeSegment;
 
     @Override
-    public ISparkInput.ISparkBatchCubingInputSide 
getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc) {
+    public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc 
flatDesc) {
         this.cubeSegment = (CubeSegment) flatDesc.getSegment();
-        return new BatchCubingInputSide(cubeSegment, flatDesc);
+        return new KafkaSparkBatchCubingInputSide(cubeSegment, flatDesc);
     }
 
     @Override
-    public ISparkBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
-        return new KafkaSparkBatchMergeInputSide((CubeSegment) seg);
+    public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg) {
+        return new KafkaSparkBatchMergeInputSide((CubeSegment)seg);
     }
 
-    public static class BatchCubingInputSide implements 
ISparkBatchCubingInputSide {
-
-        final JobEngineConfig conf;
-        final CubeSegment seg;
-        private CubeDesc cubeDesc;
-        private KylinConfig config;
-        protected IJoinedFlatTableDesc flatDesc;
-        protected String hiveTableDatabase;
-        final private List<String> intermediateTables = Lists.newArrayList();
-        final private List<String> intermediatePaths = Lists.newArrayList();
-        private String cubeName;
-
-        public BatchCubingInputSide(CubeSegment seg, IJoinedFlatTableDesc 
flatDesc) {
-            this.conf = new JobEngineConfig(KylinConfig.getInstanceFromEnv());
-            this.config = seg.getConfig();
-            this.flatDesc = flatDesc;
-            this.hiveTableDatabase = 
config.getHiveDatabaseForIntermediateTable();
-            this.seg = seg;
-            this.cubeDesc = seg.getCubeDesc();
-            this.cubeName = seg.getCubeInstance().getName();
-        }
-
-        @Override
-        public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow) {
-
-            boolean onlyOneTable = 
cubeDesc.getModel().getLookupTables().size() == 0;
-            final String baseLocation = getJobWorkingDir(jobFlow);
-            if (onlyOneTable) {
-                // directly use flat table location
-                final String intermediateFactTable = flatDesc.getTableName();
-                final String tableLocation = baseLocation + "/" + 
intermediateFactTable;
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), 
tableLocation, seg));
-                intermediatePaths.add(tableLocation);
-            } else {
-                final String mockFactTableName = 
MetadataConstants.KYLIN_INTERMEDIATE_PREFIX
-                        + cubeName.toLowerCase(Locale.ROOT) + "_" + 
seg.getUuid().replaceAll("-", "_") + "_fact";
-                jobFlow.addTask(createSaveKafkaDataStep(jobFlow.getId(), 
baseLocation + "/" + mockFactTableName, seg));
-                jobFlow.addTask(createFlatTable(hiveTableDatabase, 
mockFactTableName, baseLocation, cubeName, cubeDesc,
-                        flatDesc, intermediateTables, intermediatePaths));
-            }
-        }
-
-        protected String getJobWorkingDir(DefaultChainedExecutable jobFlow) {
-            return 
JobBuilderSupport.getJobWorkingDir(config.getHdfsWorkingDirectory(), 
jobFlow.getId());
-        }
-
-        @Override
-        public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
-            jobFlow.addTask(createGCStep(intermediateTables, 
intermediatePaths));
+    public static class KafkaSparkBatchCubingInputSide extends 
BaseBatchCubingInputSide implements ISparkBatchCubingInputSide {
 
+        public KafkaSparkBatchCubingInputSide(CubeSegment seg, 
IJoinedFlatTableDesc flatDesc) {
+            super(seg, flatDesc);
         }
     }
 
-    class KafkaSparkBatchMergeInputSide implements ISparkBatchMergeInputSide {
-
-        private CubeSegment cubeSegment;
+    public static class KafkaSparkBatchMergeInputSide extends 
BaseBatchMergeInputSide implements ISparkBatchMergeInputSide {
 
         KafkaSparkBatchMergeInputSide(CubeSegment cubeSegment) {
-            this.cubeSegment = cubeSegment;
-        }
-
-        @Override
-        public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow) {
-            jobFlow.addTask(createMergeOffsetStep(jobFlow.getId(), 
cubeSegment));
+            super(cubeSegment);
         }
     }
-
 }


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


> JDBC data source not support Spark cubing
> -----------------------------------------
>
>                 Key: KYLIN-3710
>                 URL: https://issues.apache.org/jira/browse/KYLIN-3710
>             Project: Kylin
>          Issue Type: Bug
>          Components: RDBMS Source
>            Reporter: Chao Long
>            Assignee: Chao Long
>            Priority: Major
>             Fix For: v2.6.0
>
>
> Caused by: java.lang.RuntimeException: Cannot adapt to interface 
> org.apache.kylin.engine.spark.ISparkInput
>  at 
> org.apache.kylin.source.jdbc.JdbcSource.adaptToBuildEngine(JdbcSource.java:49)
>  at 
> org.apache.kylin.source.SourceManager.createEngineAdapter(SourceManager.java:148)
>  at 
> org.apache.kylin.engine.spark.SparkUtil.getBatchCubingInputSide(SparkUtil.java:57)
>  at 
> org.apache.kylin.engine.spark.SparkBatchCubingJobBuilder2.<init>(SparkBatchCubingJobBuilder2.java:49)
>  at 
> org.apache.kylin.engine.spark.SparkBatchCubingEngine2.createBatchCubingJob(SparkBatchCubingEngine2.java:44)
>  at 
> org.apache.kylin.engine.EngineFactory.createBatchCubingJob(EngineFactory.java:56)
>  at 
> org.apache.kylin.rest.service.JobService.submitJobInternal(JobService.java:245)
>  at org.apache.kylin.rest.service.JobService.submitJob(JobService.java:213)
>  at 
> org.apache.kylin.rest.controller.CubeController.buildInternal(CubeController.java:398)
>  ... 77 more



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to