This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 5731f43fcf350247e76fd7e36b0980d5cf9fc912
Author: XiaoxiangYu <[email protected]>
AuthorDate: Thu May 28 23:26:20 2020 +0800

    KYLIN-4342 Improve code smell
---
 .../org/apache/kylin/common/KylinConfigBase.java   |  55 +++---
 .../kylin/job/constant/ExecutableConstants.java    |  12 +-
 .../bitmap/BitmapIntersectValueAggFunc.java        |  10 +-
 .../kylin/measure/bitmap/BitmapMeasureType.java    |   6 +-
 .../apache/kylin/metadata/model/FunctionDesc.java  |   4 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  53 +++---
 .../java/org/apache/kylin/engine/mr/IInput.java    |  24 ++-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  30 +--
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   8 +-
 .../kylin/engine/mr/common/BatchConstants.java     |   8 +-
 .../mr/steps/BuildGlobalHiveDictPartBuildJob.java  |  45 +++--
 .../steps/BuildGlobalHiveDictPartBuildMapper.java  |  13 +-
 ...va => BuildGlobalHiveDictPartBuildReducer.java} |  20 +-
 ...ava => BuildGlobalHiveDictPartPartitioner.java} |  17 +-
 ....java => BuildGlobalHiveDictTotalBuildJob.java} |  17 +-
 .../steps/BuildGlobalHiveDictTotalBuildMapper.java |  93 +++++----
 .../engine/spark/SparkBatchCubingJobBuilder2.java  |  49 +++--
 .../localmeta/cube_desc/ci_inner_join_cube.json    |   3 +-
 kubernetes/README.md                               |  12 +-
 .../kylin/source/hive/CreateMrHiveDictStep.java    |  60 ++----
 .../apache/kylin/source/hive/HiveInputBase.java    | 210 +++++++++++----------
 .../apache/kylin/source/hive/MRHiveDictUtil.java   | 140 +++++++++-----
 22 files changed, 476 insertions(+), 413 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index f7f73ac..3429963 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.common;
 
@@ -162,7 +162,6 @@ public abstract class KylinConfigBase implements 
Serializable {
     }
 
     /**
-     *
      * @param propertyKeys the collection of the properties; if null will 
return all properties
      * @return properties which contained in propertyKeys
      */
@@ -581,21 +580,28 @@ public abstract class KylinConfigBase implements 
Serializable {
 
 
     // 
============================================================================
-    // mr-hive dict
+    // Hive Global dictionary by mr/hive
     // 
============================================================================
     public String[] getMrHiveDictColumns() {
         String columnStr = getMrHiveDictColumnsStr();
-        if (!columnStr.equals("")) {
+        if (!StringUtils.isEmpty(columnStr)) {
             return columnStr.split(",");
         }
         return new String[0];
     }
 
+    /**
+     * @return the hdfs path for Hive Global dictionary table
+     */
+    public String getHiveDatabaseDir() {
+        return this.getOptional("kylin.source.hive.databasedir", "");
+    }
+
     public String[] getMrHiveDictColumnsExcludeRefColumns() {
         String[] excludeRefCols = null;
         String[] hiveDictColumns = getMrHiveDictColumns();
         Map<String, String> refCols = getMrHiveDictRefColumns();
-        if(Objects.nonNull(hiveDictColumns) && hiveDictColumns.length>0) {
+        if (Objects.nonNull(hiveDictColumns) && hiveDictColumns.length > 0) {
             excludeRefCols = Arrays.stream(hiveDictColumns).filter(x -> 
!refCols.containsKey(x)).toArray(String[]::new);
         }
         return excludeRefCols;
@@ -603,40 +609,40 @@ public abstract class KylinConfigBase implements 
Serializable {
 
     /**
      * set kylin.dictionary.mr-hive.columns in Cube level config , value are 
the columns which want to use MR/Hive to build global dict ,
-     * Format, tableAliasName_ColumnName, multiple columns separated by 
commas,eg KYLIN_SALES_BUYER_ID,KYLIN_SALES_SELLER_ID
-     * @return  if mr-hive dict not enabled, return "";
-     *          else return 
{TABLE_NAME}_{COLUMN_NAME1},{TABLE_NAME}_{COLUMN_NAME2}"
+     * Format, tableAliasName_ColumnName, multiple columns separated by 
comma,eg KYLIN_SALES_BUYER_ID,KYLIN_SALES_SELLER_ID
+     *
+     * @return if mr-hive dict not enabled, return "";
+     * else return {TABLE_NAME}_{COLUMN_NAME1},{TABLE_NAME}_{COLUMN_NAME2}"
      */
     private String getMrHiveDictColumnsStr() {
         return getOptional("kylin.dictionary.mr-hive.columns", "");
     }
 
     /**
-     * @return  The global dic reduce num per column. Default 2  per column.
+     * @return The global dic reduce num per column. Default 2  per column.
      */
     public Integer[] getMrHiveDictColumnsReduceNumExcludeRefCols() {
         String[] excludeRefCols = getMrHiveDictColumnsExcludeRefColumns();
 
-        if(Objects.nonNull(excludeRefCols) && excludeRefCols.length>0) {
+        if (Objects.nonNull(excludeRefCols) && excludeRefCols.length > 0) {
             String[] arr = null;
             Map<String, Integer> colNum = new HashMap<>();
             Integer[] reduceNumArr = new Integer[excludeRefCols.length];
             String[] columnReduceNum = 
getMrHiveDictColumnsReduceNumStr().split(",");
 
-            //change set columnReduceNum to map struct
             try {
-                for(int i=0;i<columnReduceNum.length;i++){
-                    if(!StringUtils.isBlank(columnReduceNum[i])) {
+                for (int i = 0; i < columnReduceNum.length; i++) {
+                    if (!StringUtils.isBlank(columnReduceNum[i])) {
                         arr = columnReduceNum[i].split(":");
                         colNum.put(arr[0], Integer.parseInt(arr[1]));
                     }
                 }
-            }catch (Exception e){
+            } catch (Exception e) {
                 logger.error("set kylin.dictionary.mr-hive.columns.reduce.num 
error {} , the value should like 
colAilasName:reduceNum,colAilasName:reduceNum", 
getMrHiveDictColumnsReduceNumStr());
             }
 
             for (int i = 0; i < excludeRefCols.length; i++) {
-                reduceNumArr[i] = 
colNum.containsKey(excludeRefCols[i])?colNum.get(excludeRefCols[i]): 
DEFAULT_MR_HIVE_GLOBAL_DICT_REDUCE_NUM_PER_COLUMN;
+                reduceNumArr[i] = colNum.containsKey(excludeRefCols[i]) ? 
colNum.get(excludeRefCols[i]) : 
DEFAULT_MR_HIVE_GLOBAL_DICT_REDUCE_NUM_PER_COLUMN;
             }
 
             Arrays.asList(reduceNumArr).stream().forEach(x -> {
@@ -646,7 +652,7 @@ public abstract class KylinConfigBase implements 
Serializable {
             });
 
             return reduceNumArr;
-        }else {
+        } else {
             return null;
         }
     }
@@ -654,15 +660,13 @@ public abstract class KylinConfigBase implements 
Serializable {
     /**
      * Set kylin.dictionary.mr-hive.columns.reduce.num in Cube level config , 
value are the reduce number for global dict columns which are set in 
kylin.dictionary.mr-hive.columns.
      * Format, tableAliasName_ColumnName:number, multiple columns separated by 
commas,eg KYLIN_SALES_BUYER_ID:5,KYLIN_SALES_SELLER_ID:3
-     * @return
      */
     private String getMrHiveDictColumnsReduceNumStr() {
         return getOptional("kylin.dictionary.mr-hive.columns.reduce.num", "");
     }
 
     /**
-     * MR/Hive global domain dic (reuse dict from other cube's MR/Hive global 
dic column)
-     * @return
+     * MR/Hive global domain dictionary (reuse dict from other cube's MR/Hive 
global dic column)
      */
     public Map<String, String> getMrHiveDictRefColumns() {
         Map<String, String> result = new HashMap<>();
@@ -670,7 +674,7 @@ public abstract class KylinConfigBase implements 
Serializable {
         if (!StringUtils.isEmpty(columnStr)) {
             String[] pairs = columnStr.split(",");
             for (String pair : pairs) {
-                String [] infos = pair.split(":");
+                String[] infos = pair.split(":");
                 result.put(infos[0], infos[1]);
             }
         }
@@ -685,8 +689,8 @@ public abstract class KylinConfigBase implements 
Serializable {
         return getOptional("kylin.dictionary.mr-hive.table.suffix", 
"_global_dict");
     }
 
-    public String getMrHiveDictIntermediateTTableSuffix() {
-        return 
getOptional("kylin.dictionary.mr-hive.intermediate.table.suffix", "__group_by");
+    public String getMrHiveDistinctValueTableSuffix() {
+        return 
getOptional("kylin.dictionary.mr-hive.intermediate.table.suffix", 
"__distinct_value");
     }
 
     // 
============================================================================
@@ -1100,9 +1104,6 @@ public abstract class KylinConfigBase implements 
Serializable {
         return this.getOptional("kylin.source.hive.database-for-flat-table", 
DEFAULT);
     }
 
-    public String getHiveDatabaseDir() {
-        return this.getOptional("kylin.source.hive.databasedir", "");
-    }
 
     public String getFlatTableStorageFormat() {
         return this.getOptional("kylin.source.hive.flat-table-storage-format", 
"SEQUENCEFILE");
@@ -2326,7 +2327,7 @@ public abstract class KylinConfigBase implements 
Serializable {
         return getPropertiesByPrefix("kylin.metrics.");
     }
 
-    public int printSampleEventRatio(){
+    public int printSampleEventRatio() {
         String val = getOptional("kylin.metrics.kafka-sample-ratio", "10000");
         return Integer.parseInt(val);
     }
@@ -2558,7 +2559,7 @@ public abstract class KylinConfigBase implements 
Serializable {
         return (getOptional("kylin.stream.event.timezone", ""));
     }
 
-    public boolean isAutoResubmitDiscardJob(){
+    public boolean isAutoResubmitDiscardJob() {
         return 
Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled",
 "true"));
     }
 
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index 8d4de09..576f4bf 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -84,12 +84,12 @@ public final class ExecutableConstants {
     public static final String STEP_NAME_STREAMING_BUILD_BASE_CUBOID = "Build 
Base Cuboid Data For Streaming Job";
     public static final String STEP_NAME_STREAMING_SAVE_DICTS = "Save Cube 
Dictionaries";
 
-    // MR - Hive Dict
-    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = 
"Build Global Dict - extract distinct value from data";
-    public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = 
"Build Global Dict - parallel part build";
-    public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = 
"Build Global Dict - parallel total build";
-    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = 
"Build Global Dict - merge to dict table";
-    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = 
"Build Global Dict - replace intermediate table";
+    // Hive Global Dictionary built by MR
+    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = 
"Build Hive Global Dict - extract distinct value";
+    public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = 
"Build Hive Global Dict - parallel part build";
+    public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = 
"Build Hive Global Dict - parallel total build";
+    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = 
"Build Hive Global Dict - merge to dict table";
+    public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = 
"Build Hive Global Dict - replace intermediate table";
 
     public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = 
"mergedict";
 }
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
index 7ec21b5..2ab4313 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 package org.apache.kylin.measure.bitmap;
 
 import java.util.List;
@@ -22,10 +22,7 @@ import java.util.List;
 import org.apache.kylin.measure.ParamAsMeasureCount;
 
 /**
- * BitmapIntersectDistinctCountAggFunc is an UDAF used for calculating the 
intersection of two or more bitmaps
- * Usage:   intersect_count(columnToCount, columnToFilter, filterList)
- * Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find 
the count of uuid in all A/B/C 3 bitmaps
- *          requires an bitmap count distinct measure of uuid, and an 
dimension of event
+ *
  */
 public class BitmapIntersectValueAggFunc implements ParamAsMeasureCount {
 
@@ -50,5 +47,4 @@ public class BitmapIntersectValueAggFunc implements 
ParamAsMeasureCount {
     public static String result(RetentionPartialResult result) {
         return result.valueResult();
     }
-}
-
+}
\ No newline at end of file
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
index 70a64ea..9d95584 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java
@@ -112,8 +112,8 @@ public class BitmapMeasureType extends 
MeasureType<BitmapCounter> {
                 int id;
                 TblColRef literalCol = 
measureDesc.getFunction().getParameter().getColRefs().get(0);
                 if (needDictionaryColumn(measureDesc.getFunction()) && 
dictionaryMap.containsKey(literalCol)) {
-                        Dictionary<String> dictionary = 
dictionaryMap.get(literalCol);
-                        id = dictionary.getIdFromValue(values[0]);
+                    Dictionary<String> dictionary = 
dictionaryMap.get(literalCol);
+                    id = dictionary.getIdFromValue(values[0]);
                 } else {
                     id = Integer.parseInt(values[0]);
                 }
@@ -153,6 +153,8 @@ public class BitmapMeasureType extends 
MeasureType<BitmapCounter> {
     private boolean needDictionaryColumn(FunctionDesc functionDesc) {
         DataType dataType = 
functionDesc.getParameter().getColRefs().get(0).getType();
         if (functionDesc.isMrDict()) {
+            // If isMrDict set to true, it means related column has been
+            // encoded in previous step by Hive Global Dictionary
             return false;
         }
         if (dataType.isIntegerFamily() && !dataType.isBigInt()) {
diff --git 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
index 93b4064..c4d002b 100644
--- 
a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
+++ 
b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java
@@ -88,6 +88,10 @@ public class FunctionDesc implements Serializable {
     private DataType returnDataType;
     private MeasureType<?> measureType;
     private boolean isDimensionAsMetric = false;
+
+    /**
+     * The flag of Hive Global Dictionary for COUNT_DISTINCT
+     */
     private boolean isMrDict = false;
 
     public boolean isMrDict() {
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 8ec7d36..47f709d 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -6,20 +6,21 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr;
 
 import java.util.List;
 import java.util.Objects;
+
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidUtil;
@@ -59,24 +60,8 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
         // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
         inputSide.addStepPhase1_CreateFlatTable(result);
 
-        // build global dict
-        KylinConfig dictConfig = seg.getConfig();
-        String[] mrHiveDictColumns = 
dictConfig.getMrHiveDictColumnsExcludeRefColumns();
-
-        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
-                && !"".equals(mrHiveDictColumns[0])) {
-
-            //parallel part build
-            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
-
-            //parallel total build
-            result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
-        }
-
-        //merge global dic and replace flat table
-        if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && 
dictConfig.getMrHiveDictColumns().length > 0 && 
!"".equals(dictConfig.getMrHiveDictColumns()[0])){
-            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
-        }
+        // Build global dictionary in distributed way
+        buildHiveGlobalDictionaryByMR(result, jobId);
 
         // Phase 2: Build Dictionary
         result.addTask(createFactDistinctColumnsStep(jobId));
@@ -106,7 +91,7 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
         result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, 
lookupMaterializeContext));
         inputSide.addStepPhase4_Cleanup(result);
         outputSide.addStepPhase4_Cleanup(result);
-        
+
         // Set the task priority if specified
         result.setPriorityBasedOnPriorityOffset(priorityOffset);
         result.getTasks().forEach(task -> 
task.setPriorityBasedOnPriorityOffset(priorityOffset));
@@ -216,6 +201,30 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
         return ndCuboidStep;
     }
 
+    /**
+     * Build hive global dictionary by MR and encode corresponding column into 
integer for flat table
+     */
+    protected void buildHiveGlobalDictionaryByMR(final CubingJob result, 
String jobId) {
+        KylinConfig dictConfig = seg.getConfig();
+        String[] mrHiveDictColumnExcludeRef = 
dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+        String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+
+        if (Objects.nonNull(mrHiveDictColumnExcludeRef) && 
mrHiveDictColumnExcludeRef.length > 0
+                && !"".equals(mrHiveDictColumnExcludeRef[0])) {
+
+            // 1. parallel part build
+            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
+
+            // 2. parallel total build
+            result.addTask(createBuildGlobalHiveDictTotalBuildJob(jobId));
+        }
+
+        // Merge new dictionary entry into global dictionary and 
replace/encode flat table
+        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 
&& !"".equals(mrHiveDictColumns[0])) {
+            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+        }
+    }
+
     protected Class<? extends AbstractHadoopJob> getNDCuboidJob() {
         return NDCuboidJob.class;
     }
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
index 9fdb300..2775cb7 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java
@@ -24,26 +24,38 @@ import org.apache.kylin.metadata.model.ISegment;
 
 public interface IInput {
 
-    /** Return a helper to participate in batch cubing job flow. */
+    /**
+     * Return a helper to participate in batch cubing job flow.
+     */
     public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc 
flatDesc);
 
-    /** Return a helper to participate in batch cubing merge job flow. */
+    /**
+     * Return a helper to participate in batch cubing merge job flow.
+     */
     public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg);
 
     public interface IBatchCubingInputSide {
-        /** Add step that creates an intermediate flat table as defined by 
CubeJoinedFlatTableDesc */
+        /**
+         * Add step that creates an intermediate flat table as defined by 
CubeJoinedFlatTableDesc
+         */
         public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable 
jobFlow);
 
-        /** Add step that replace flat table global column value by global 
dic*/
+        /**
+         * An optional step that replace/encode flat table with Hive Global 
Dictionary
+         */
         public void 
addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable 
jobFlow);
 
-        /** Add step that does necessary clean up, like delete the 
intermediate flat table */
+        /**
+         * Add step that does necessary clean up, like delete the intermediate 
flat table
+         */
         public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow);
     }
 
     public interface IBatchMergeInputSide {
 
-        /** Add step that executes before merge dictionary and before merge 
cube. */
+        /**
+         * Add step that executes before merge dictionary and before merge 
cube.
+         */
         public void addStepPhase1_MergeDictionary(DefaultChainedExecutable 
jobFlow);
 
     }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index a597279..479db86 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr;
 
@@ -37,7 +37,7 @@ import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
-import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDicTotalBuildJob;
+import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictTotalBuildJob;
 import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictPartBuildJob;
 import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
@@ -155,7 +155,7 @@ public class JobBuilderSupport {
     }
 
     public MapReduceExecutable createCalculateStatsFromBaseCuboid(String 
inputPath, String outputPath,
-            CuboidModeEnum cuboidMode) {
+                                                                  
CuboidModeEnum cuboidMode) {
         MapReduceExecutable result = new MapReduceExecutable();
         
result.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID);
         result.setMapReduceJobClass(CalculateStatsFromBaseCuboidJob.class);
@@ -224,10 +224,10 @@ public class JobBuilderSupport {
         return result;
     }
 
-    public MapReduceExecutable createBuildGlobalHiveDicTotalBuildJob(String 
jobId) {
+    public MapReduceExecutable createBuildGlobalHiveDictTotalBuildJob(String 
jobId) {
         MapReduceExecutable result = new MapReduceExecutable();
         
result.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL);
-        result.setMapReduceJobClass(BuildGlobalHiveDicTotalBuildJob.class);
+        result.setMapReduceJobClass(BuildGlobalHiveDictTotalBuildJob.class);
         StringBuilder cmd = new StringBuilder();
         appendMapReduceParameters(cmd);
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, 
seg.getRealization().getName());
@@ -384,26 +384,26 @@ public class JobBuilderSupport {
     }
 
     public String getBuildGlobalHiveDicTotalBuildJobInputPath(String jobId) {
-        return getBuildGlobalDictionaryBasePath(jobId)+"/part_sort";
+        return getBuildGlobalDictionaryBasePath(jobId) + "/part_sort";
     }
 
     public String getBuildGlobalDictionaryMaxDistinctCountPath(String jobId) {
         KylinConfig conf = seg.getConfig();
         String dbDir = conf.getHiveDatabaseDir();
         IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
-        String tableName = 
flatDesc.getTableName()+conf.getMrHiveDictIntermediateTTableSuffix();
-        String outPut = 
dbDir+"/"+tableName+"/dict_column="+BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE;
+        String tableName = flatDesc.getTableName() + 
conf.getMrHiveDistinctValueTableSuffix();
+        String outPut = dbDir + "/" + tableName + "/dict_column=" + 
BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE;
         return outPut;
     }
 
     public String getBuildGlobalDictionaryPartReduceStatsPathV2(String jobId) {
-        return getBuildGlobalDictionaryBasePath(jobId)+ "/reduce_stats";
+        return getBuildGlobalDictionaryBasePath(jobId) + "/reduce_stats";
     }
 
-    public String getBuildGlobalDictionaryTotalOutput(KylinConfig config){
+    public String getBuildGlobalDictionaryTotalOutput(KylinConfig config) {
         String dbDir = config.getHiveDatabaseDir();
-        String tableName = 
EngineFactory.getJoinedFlatTableDesc(seg).getTableName()+config.getMrHiveDictTableSuffix();
-        String path = dbDir+"/"+tableName;
+        String tableName = 
EngineFactory.getJoinedFlatTableDesc(seg).getTableName() + 
config.getMrHiveDictTableSuffix();
+        String path = dbDir + "/" + tableName;
         return path;
     }
 
@@ -509,7 +509,7 @@ public class JobBuilderSupport {
         List<FileStatus> outputs = Lists.newArrayList();
         scanFiles(input, fs, outputs);
         long size = 0L;
-        for (FileStatus stat: outputs) {
+        for (FileStatus stat : outputs) {
             size += stat.getLen();
         }
         return size;
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index aa377ed..baf415f 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -73,7 +73,7 @@ public class BaseCuboidBuilder implements 
java.io.Serializable {
         measureCodec = new BufferedMeasureCodec(measureDescList);
 
         kvBuilder = new KeyValueBuilder(intermediateTableDesc);
-        checkMrDictClolumn();
+        checkHiveGlobalDictionaryColumn();
     }
 
     public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, 
CubeSegment cubeSegment,
@@ -92,7 +92,7 @@ public class BaseCuboidBuilder implements 
java.io.Serializable {
         measureCodec = new BufferedMeasureCodec(measureDescList);
 
         kvBuilder = new KeyValueBuilder(intermediateTableDesc);
-        checkMrDictClolumn();
+        checkHiveGlobalDictionaryColumn();
     }
 
     public byte[] buildKey(String[] flatRow) {
@@ -121,7 +121,7 @@ public class BaseCuboidBuilder implements 
java.io.Serializable {
         }
     }
 
-    private void checkMrDictClolumn(){
+    private void checkHiveGlobalDictionaryColumn(){
         Set<String> mrDictColumnSet = new HashSet<>();
         if (kylinConfig.getMrHiveDictColumns() != null) {
             Collections.addAll(mrDictColumnSet, 
kylinConfig.getMrHiveDictColumns());
@@ -133,7 +133,7 @@ public class BaseCuboidBuilder implements 
java.io.Serializable {
                 TblColRef colRef = 
functionDesc.getParameter().getColRefs().get(0);
                 if (mrDictColumnSet.contains(JoinedFlatTable.colName(colRef, 
true))) {
                     functionDesc.setMrDict(true);
-                    logger.info("setMrDict for {}", colRef);
+                    logger.info("Enable hive global dictionary for {}", 
colRef);
                     measure.setFunction(functionDesc);
                 }
             }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 6031f3c..f8ab007 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr.common;
 
@@ -72,7 +72,7 @@ public interface BatchConstants {
     String CFG_MR_SPARK_JOB = "mr.spark.job";
     String CFG_SPARK_META_URL = "spark.meta.url";
     String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir";
-    String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE="KYLIN_MAX_DISTINCT_COUNT";
+    String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE = "KYLIN_MAX_DISTINCT_COUNT";
 
 
     String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum";
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
index 07b0824..c51cd11 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.LongWritable;
@@ -47,7 +48,7 @@ public class BuildGlobalHiveDictPartBuildJob extends 
AbstractHadoopJob {
     @Override
     public int run(String[] args) throws Exception {
         Options options = new Options();
-        String[] dicColsArr=null;
+        String[] dicColsArr = null;
 
         try {
             options.addOption(OPTION_JOB_NAME);
@@ -78,7 +79,7 @@ public class BuildGlobalHiveDictPartBuildJob extends 
AbstractHadoopJob {
             setJobClasspath(job, cube.getConfig());
 
             //FileInputFormat.setInputPaths(job, input);
-            setInputput(job, dicColsArr, getInputPath(config, segment));
+            setInput(job, dicColsArr, getInputPath(config, segment));
 
             // make each reducer output to respective dir
             setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH));
@@ -94,15 +95,14 @@ public class BuildGlobalHiveDictPartBuildJob extends 
AbstractHadoopJob {
             job.setOutputValueClass(Text.class);
 
             job.setMapperClass(BuildGlobalHiveDictPartBuildMapper.class);
-
-            job.setPartitionerClass(BuildGlobalHiveDicPartPartitioner.class);
-            job.setReducerClass(BuildGlobalHiveDicPartBuildReducer.class);
+            job.setPartitionerClass(BuildGlobalHiveDictPartPartitioner.class);
+            job.setReducerClass(BuildGlobalHiveDictPartBuildReducer.class);
 
             // prevent to create zero-sized default output
             LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
 
-            //delete output
-            Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            // delete output
+            Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             deletePath(job.getConfiguration(), baseOutputPath);
 
             attachSegmentMetadataWithDict(segment, job.getConfiguration());
@@ -113,44 +113,41 @@ public class BuildGlobalHiveDictPartBuildJob extends 
AbstractHadoopJob {
         }
     }
 
-    private void setOutput(Job job, String[] dicColsArry, String outputBase){
+    private void setOutput(Job job, String[] dicColsArr, String outputBase) {
         // make each reducer output to respective dir
-        //eg: 
/user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort
-        for(int i=0;i<dicColsArry.length;i++){
+        // eg: 
/user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort
+        for (int i = 0; i < dicColsArr.length; i++) {
             MultipleOutputs.addNamedOutput(job, i + "", 
TextOutputFormat.class, LongWritable.class, Text.class);
         }
-        Path outputPath=new Path(outputBase);
+        Path outputPath = new Path(outputBase);
         FileOutputFormat.setOutputPath(job, outputPath);
     }
 
-    private void setInputput(Job job, String[] dicColsArray, String inputBase) 
throws IOException {
-        StringBuffer paths=new StringBuffer();
+    private void setInput(Job job, String[] dicColsArray, String inputBase) 
throws IOException {
+        StringBuffer paths = new StringBuffer();
         // make each reducer output to respective dir
-        for(String col:dicColsArray){
+        for (String col : dicColsArray) {
             
paths.append(inputBase).append("/dict_column=").append(col).append(",");
         }
-
         paths.delete(paths.length() - 1, paths.length());
         FileInputFormat.setInputPaths(job, paths.toString());
-
     }
 
-    private void setReduceNum(Job job, KylinConfig config){
+    private void setReduceNum(Job job, KylinConfig config) {
         Integer[] reduceNumArr = 
config.getMrHiveDictColumnsReduceNumExcludeRefCols();
         int totalReduceNum = 0;
-        for(Integer num:reduceNumArr){
-            totalReduceNum +=num;
+        for (Integer num : reduceNumArr) {
+            totalReduceNum += num;
         }
         logger.info("BuildGlobalHiveDictPartBuildJob total reduce num is {}", 
totalReduceNum);
         job.setNumReduceTasks(totalReduceNum);
     }
 
-    private String getInputPath(KylinConfig config, CubeSegment segment){
+    private String getInputPath(KylinConfig config, CubeSegment segment) {
         String dbDir = config.getHiveDatabaseDir();
-        String tableName = 
EngineFactory.getJoinedFlatTableDesc(segment).getTableName()+config.getMrHiveDictIntermediateTTableSuffix();
-        String input = dbDir+"/"+tableName;
-        logger.info("part build base input path:"+input);
+        String tableName = 
EngineFactory.getJoinedFlatTableDesc(segment).getTableName() + 
config.getMrHiveDistinctValueTableSuffix();
+        String input = dbDir + "/" + tableName;
+        logger.info("part build base input path:" + input);
         return input;
     }
-
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
index 54708f3..76c73f9 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
+
 import org.apache.hadoop.io.NullWritable;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
@@ -62,12 +63,12 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, 
Object> extends KylinMapp
         String colName = name.split("=")[1];
         logger.info("this map build col name :{}", colName);
 
-        for(int i=0;i<dicCols.length;i++){
-            if(dicCols[i].equalsIgnoreCase(colName)){
-                colIndex=i;
+        for (int i = 0; i < dicCols.length; i++) {
+            if (dicCols[i].equalsIgnoreCase(colName)) {
+                colIndex = i;
             }
         }
-        if(colIndex<0 || colIndex>127){
+        if (colIndex < 0 || colIndex > 127) {
             logger.error("kylin.dictionary.mr-hive.columns colIndex :{} error 
", colIndex);
             logger.error("kylin.dictionary.mr-hive.columns set error,mr-hive 
columns's count should less than 128");
         }
@@ -77,7 +78,7 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, 
Object> extends KylinMapp
 
     @Override
     public void doMap(KEYIN key, Object record, Context context) throws 
IOException, InterruptedException {
-        count ++;
+        count++;
         writeFieldValue(context, key.toString());
     }
 
@@ -94,7 +95,7 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, 
Object> extends KylinMapp
         tmpbuf.put(valueBytes);
         outputKey.set(tmpbuf.array(), 0, tmpbuf.position());
         context.write(outputKey, NullWritable.get());
-        if(count<10){
+        if (count < 10) {
             logger.info("colIndex:{},input key:{}", colIndex, value);
         }
     }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java
similarity index 86%
rename from 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
rename to 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java
index 8cdd8f1..54cd4b9 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java
@@ -14,11 +14,12 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Text;
@@ -31,11 +32,11 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, 
LongWritable, LongWritable, Text> {
+public class BuildGlobalHiveDictPartBuildReducer extends KylinReducer<Text, 
LongWritable, LongWritable, Text> {
 
-    private static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDicPartBuildReducer.class);
+    private static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDictPartBuildReducer.class);
 
-    private Long count=0L;
+    private Long count = 0L;
     private MultipleOutputs mos;
     private String[] dicCols;
     private String colName;
@@ -58,16 +59,16 @@ public class BuildGlobalHiveDicPartBuildReducer extends 
KylinReducer<Text, LongW
             throws IOException, InterruptedException {
         count++;
         byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
-        if(count==1){
-            colIndex = key.getBytes()[0];//col index
+        if (count == 1) {
+            colIndex = key.getBytes()[0];
             colName = dicCols[colIndex];
         }
 
-        if(count<10){
+        if (count < 10) {
             logger.info("key:{}, temp dict num :{}, colIndex:{}, colName:{}", 
key.toString(), count, colIndex, colName);
         }
 
-        mos.write(colIndex+"", new LongWritable(count), new Text(keyBytes), 
"part_sort/"+colIndex);
+        mos.write(colIndex + "", new LongWritable(count), new Text(keyBytes), 
"part_sort/" + colIndex);
     }
 
     @Override
@@ -77,7 +78,6 @@ public class BuildGlobalHiveDicPartBuildReducer extends 
KylinReducer<Text, LongW
         String partition = conf.get(MRJobConfig.TASK_PARTITION);
         mos.write(colIndex + "", new LongWritable(count), new Text(partition), 
"reduce_stats/" + colIndex);
         mos.close();
-        logger.info("Reduce partition num {} finish, this reduce done item 
count is {}" , partition, count);
+        logger.info("Reduce partition num {} finish, this reduce done item 
count is {}", partition, count);
     }
-
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java
similarity index 79%
rename from 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
rename to 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java
index 97ad4f4..8858e20 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+
 import org.apache.hadoop.conf.Configurable;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.io.NullWritable;
@@ -28,7 +29,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 
-public class BuildGlobalHiveDicPartPartitioner extends Partitioner<Text, 
NullWritable> implements Configurable {
+public class BuildGlobalHiveDictPartPartitioner extends Partitioner<Text, 
NullWritable> implements Configurable {
     private Configuration conf;
 
     private Integer[] reduceNumArr;
@@ -49,21 +50,19 @@ public class BuildGlobalHiveDicPartPartitioner extends 
Partitioner<Text, NullWri
 
     @Override
     public int getPartition(Text key, NullWritable value, int numReduceTasks) {
-        //get first byte, the first byte value is the dic col index ,start 
from 0
+        // get first byte, the first byte value is the dic col index ,start 
from 0
         int colIndex = key.getBytes()[0];
         int colReduceNum = reduceNumArr[colIndex];
 
         int colReduceNumOffset = 0;
-        for (int i=0;i<colIndex;i++){
-            colReduceNumOffset += reduceNumArr[i] ;
+        for (int i = 0; i < colIndex; i++) {
+            colReduceNumOffset += reduceNumArr[i];
         }
 
-        //Calculate reduce number , reduce num = (value.hash % colReduceNum) + 
colReduceNumOffset
+        // Calculate reduce number , reduce num = (value.hash % colReduceNum) 
+ colReduceNumOffset
         byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1);
-        int hashCode = new Text(keyBytes).hashCode() &  0x7FFFFFFF ;
-        int reduceNo = hashCode % colReduceNum + colReduceNumOffset;
-
-        return reduceNo;
+        int hashCode = new Text(keyBytes).hashCode() & 0x7FFFFFFF;
+        return hashCode % colReduceNum + colReduceNumOffset;
     }
 
     @Override
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java
similarity index 92%
rename from 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
rename to 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java
index acdbb07..20bdfc7 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java
@@ -19,6 +19,7 @@
 package org.apache.kylin.engine.mr.steps;
 
 import java.io.IOException;
+
 import org.apache.commons.cli.Options;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
@@ -40,8 +41,8 @@ import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
-    protected static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDicTotalBuildJob.class);
+public class BuildGlobalHiveDictTotalBuildJob extends AbstractHadoopJob {
+    protected static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildJob.class);
 
     @Override
     public int run(String[] args) throws Exception {
@@ -77,7 +78,7 @@ public class BuildGlobalHiveDicTotalBuildJob extends 
AbstractHadoopJob {
             job.getConfiguration().set("last.max.dic.value.path", 
getOptionValue(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT));
             
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", 
false);
 
-            job.setJarByClass(BuildGlobalHiveDicTotalBuildJob.class);
+            job.setJarByClass(BuildGlobalHiveDictTotalBuildJob.class);
 
             setJobClasspath(job, cube.getConfig());
 
@@ -95,8 +96,8 @@ public class BuildGlobalHiveDicTotalBuildJob extends 
AbstractHadoopJob {
             // prevent to create zero-sized default output
             LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
 
-            //delete output
-            Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            // delete output
+            Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH));
             deletePath(job.getConfiguration(), baseOutputPath);
 
             attachSegmentMetadataWithDict(segment, job.getConfiguration());
@@ -107,10 +108,10 @@ public class BuildGlobalHiveDicTotalBuildJob extends 
AbstractHadoopJob {
         }
     }
 
-    private void setOutput(Job job, String[] dicColsArry, String outputBase){
+    private void setOutput(Job job, String[] dicColsArr, String outputBase) {
         // make each reducer output to respective dir
         
///user/prod_kylin/tmp/kylin2/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/bs_order_scene_day_new_cube_clone/dict_column=DM_ES_REPORT_ORDER_VIEW0420_DRIVER_ID/part_sort
-        for(int i=0;i<dicColsArry.length;i++){
+        for (int i = 0; i < dicColsArr.length; i++) {
             MultipleOutputs.addNamedOutput(job, i + "", 
TextOutputFormat.class, Text.class, LongWritable.class);
         }
         Path outputPath = new Path(outputBase);
@@ -120,7 +121,7 @@ public class BuildGlobalHiveDicTotalBuildJob extends 
AbstractHadoopJob {
     private void setInput(Job job, String input) throws IOException {
         Path path = new Path(input);
         FileSystem fs = path.getFileSystem(job.getConfiguration());
-        if(!fs.exists(path)){
+        if (!fs.exists(path)) {
             fs.mkdirs(path);
         }
         FileInputFormat.setInputPaths(job, getOptionValue(OPTION_INPUT_PATH));
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
index b2252c0..8af341c 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
@@ -21,10 +21,12 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.BufferedReader;
 import java.io.IOException;
 import java.io.InputStreamReader;
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.SortedMap;
 import java.util.TreeMap;
+
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileStatus;
@@ -40,7 +42,7 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends 
KylinMapper<KEYIN, Text, Text, LongWritable> {
+public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, KEYOUT> extends 
KylinMapper<KEYIN, Text, Text, LongWritable> {
     private static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildMapper.class);
 
     private MultipleOutputs mos;
@@ -65,26 +67,26 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, 
Object> extends KylinMap
 
         String statPath = conf.get("partition.statistics.path");
 
-        //get the input file name ,the file name format by 
colIndex-part-partitionNum, eg: 1-part-000019
+        // get the input file name ,the file name format by 
colIndex-part-partitionNum, eg: 1-part-000019
         FileSplit fileSplit = (FileSplit) context.getInputSplit();
         String[] arr = fileSplit.getPath().getName().split("-");
         int partitionNum = Integer.parseInt(arr[2]);
         colIndex = Integer.parseInt(arr[0]);
         colName = cols[colIndex];
-        logger.info("Input 
fileName:{},colIndex:{},colName:{},partitionNum:{}", 
fileSplit.getPath().getName(), colIndex, colName, partitionNum);
+        logger.info("Input fileName:{}, colIndex:{}, colName:{}, 
partitionNum:{}", fileSplit.getPath().getName(), colIndex, colName, 
partitionNum);
 
         //last max dic value per column
         String lastMaxValuePath = conf.get("last.max.dic.value.path");
-        logger.info("last.max.dic.value.path:"+lastMaxValuePath);
+        logger.info("last.max.dic.value.path:" + lastMaxValuePath);
         long lastMaxDictValue = this.getLastMaxDicValue(conf, 
lastMaxValuePath);
-        
logger.info("last.max.dic.value.path:"+lastMaxValuePath+",value="+lastMaxDictValue);
+        logger.info("last.max.dic.value.path:" + lastMaxValuePath + ",value=" 
+ lastMaxDictValue);
 
-        //Calculate the starting position of this file, the starting position 
of this file = sum (count) of all previous numbers + last max dic value of the 
column
-        Map<Integer, TreeMap<Integer, Long>> allStats = 
getPartitionsCount(conf, statPath);//<colIndex,<reduceNum,count>>
-        TreeMap<Integer, Long> partitionStats =allStats.get(colIndex);
-        if(partitionNum!=0) {
+        // Calculate the starting position of this file, the starting position 
of this file = sum (count) of all previous numbers + last max dic value of the 
column
+        Map<Integer, TreeMap<Integer, Long>> allStats = 
getPartitionsCount(conf, statPath); //<colIndex,<reduceNum,count>>
+        TreeMap<Integer, Long> partitionStats = allStats.get(colIndex);
+        if (partitionNum != 0) {
             SortedMap<Integer, Long> subStat = partitionStats.subMap(0, true, 
partitionNum, false);
-            subStat.forEach((k, v)->{
+            subStat.forEach((k, v) -> {
                 logger.info("Split num:{} and it's count:{}", k, v);
                 start += v;
             });
@@ -96,7 +98,7 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, 
Object> extends KylinMap
     @Override
     public void doMap(KEYIN key, Text record, Context context) throws 
IOException, InterruptedException {
         long inkey = Long.parseLong(key.toString());
-        mos.write(colIndex+"", record, new LongWritable(start + inkey), 
"dict_column="+colName+"/"+colIndex);
+        mos.write(colIndex + "", record, new LongWritable(start + inkey), 
"dict_column=" + colName + "/" + colIndex);
     }
 
     @Override
@@ -106,7 +108,7 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, 
Object> extends KylinMap
 
     private Map<Integer, TreeMap<Integer, Long>> 
getPartitionsCount(Configuration conf, String partitionStatPath) throws 
IOException {
         StringBuffer sb = new StringBuffer();
-        String temp=null;
+        String temp = null;
 
         String[] fileNameArr = null;
         String[] statsArr = null;
@@ -121,14 +123,14 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, 
Object> extends KylinMap
         if (fs.exists(path) && fs.isDirectory(path)) {
             for (FileStatus status : fs.listStatus(path)) {
                 //fileNameArr[0] is globaldict colIndex
-                fileNameArr=status.getPath().getName().split("-");
+                fileNameArr = status.getPath().getName().split("-");
                 colStats = allStats.get(Integer.parseInt(fileNameArr[0]));
-                if(colStats==null){
+                if (colStats == null) {
                     colStats = new TreeMap<>();
                 }
-                temp=cat(status.getPath(), fs);
+                temp = cat(status.getPath(), fs);
                 logger.info("partitionStatPath:{},content:{}", 
partitionStatPath, temp);
-                if(temp!=null){
+                if (temp != null) {
                     statsArr = temp.split("\t");
                     colStats.put(Integer.parseInt(statsArr[1]), 
Long.parseLong(statsArr[0]));
                     allStats.put(Integer.parseInt(fileNameArr[0]), colStats);
@@ -136,8 +138,8 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, 
Object> extends KylinMap
             }
         }
 
-        allStats.forEach((k, v)->{
-            v.forEach((k1, v1)->{
+        allStats.forEach((k, v) -> {
+            v.forEach((k1, v1) -> {
                 logger.info("allStats.colIndex:{},this split num:{},this split 
num's count:{}", k, k1, v1);
             });
         });
@@ -148,21 +150,21 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, 
Object> extends KylinMap
     private String cat(Path remotePath, FileSystem fs) throws IOException {
         FSDataInputStream in = null;
         BufferedReader buffer = null;
-        StringBuffer stat= new StringBuffer();
+        StringBuffer stat = new StringBuffer();
         try {
-            in= fs.open(remotePath);
-            buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ;
+            in = fs.open(remotePath);
+            buffer = new BufferedReader(new InputStreamReader(in, "UTF-8"));
             String line = null;
             while ((line = buffer.readLine()) != null) {
                 stat.append(line);
             }
         } catch (IOException e) {
             e.printStackTrace();
-        }finally {
-            if(buffer!=null) {
+        } finally {
+            if (buffer != null) {
                 buffer.close();
             }
-            if(in!=null) {
+            if (in != null) {
                 in.close();
             }
         }
@@ -170,15 +172,12 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, 
Object> extends KylinMap
     }
 
     /**
-     *
-     * @param conf
      * @param lastMaxDicValuePath eg: 
/user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000
-     *        remotePath content is dict colum stats info of per column: dic 
column name,extract distinct value count,last max dic value
+     *                            remotePath content is dict column stats info 
of per column: dic column name,extract distinct value count,last max dic value
      * @return this colIndex's last max dic value
-     * @throws IOException
      */
     private long getLastMaxDicValue(Configuration conf, String 
lastMaxDicValuePath) throws IOException {
-        StringBuffer sb=new StringBuffer();
+        StringBuffer sb = new StringBuffer();
         Map<Integer, Long> map = null;
         Path path = new Path(lastMaxDicValuePath);
         FileSystem fs = path.getFileSystem(conf);
@@ -187,39 +186,35 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, 
Object> extends KylinMap
                 logger.info("start buildMaxCountMap :");
                 map = buildMaxCountMap(status.getPath(), fs);
                 logger.info("end buildMaxCountMap :");
-
             }
         }
-        if(map == null){
+        if (map == null) {
             return 0L;
-        }else{
-            return  map.get(colIndex)==null?0L:map.get(colIndex);
+        } else {
+            return map.get(colIndex) == null ? 0L : map.get(colIndex);
         }
     }
 
     /**
-     *
      * @param remotePath , eg: 
/user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000
-     *        remotePath content is dict colum stats info of per column: dic 
column name,extract distinct value count,last max dic value
-     * @param fs
+     *                   remotePath content is dict column stats info of per 
column: dic column name,extract distinct value count,last max dic value
      * @return Map<>,key is colIndex, value is last max dict value
-     * @throws IOException
      */
-    private  Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem 
fs) throws IOException {
+    private Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem 
fs) throws IOException {
         FSDataInputStream in = null;
         BufferedReader buffer = null;
-        String[] arr=null;
-        Map<Integer, Long> map= new HashMap();
+        String[] arr = null;
+        Map<Integer, Long> map = new HashMap<>();
         try {
-            in= fs.open(remotePath);
-            buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ;
+            in = fs.open(remotePath);
+            buffer = new BufferedReader(new InputStreamReader(in, 
StandardCharsets.UTF_8));
             String line = null;
             while ((line = buffer.readLine()) != null) {
                 arr = line.split(",");
-                logger.info("line="+line+",arr.length:"+arr.length);
-                if(arr.length==3) {
+                logger.info("line=" + line + ",arr.length:" + arr.length);
+                if (arr.length == 3) {
                     for (int i = 0; i < cols.length; i++) {
-                        if(cols[i].equalsIgnoreCase(arr[0])) {
+                        if (cols[i].equalsIgnoreCase(arr[0])) {
                             map.put(i, Long.parseLong(arr[2]));
                             logger.info("col.{}.maxValue={}", cols[i], 
Long.parseLong(arr[2]));
                             break;
@@ -229,15 +224,15 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, 
Object> extends KylinMap
             }
         } catch (IOException e) {
             e.printStackTrace();
-        }finally {
-            if(buffer!=null) {
+        } finally {
+            if (buffer != null) {
                 buffer.close();
             }
-            if(in!=null) {
+            if (in != null) {
                 in.close();
             }
         }
-        logger.info("BuildMaxCountMap map="+map);
+        logger.info("BuildMaxCountMap map=" + map);
         return map;
     }
 }
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
index 9309a3d..7d6a367 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java
@@ -14,7 +14,7 @@
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.spark;
 
@@ -37,6 +37,7 @@ import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ *
  */
 public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport {
 
@@ -48,7 +49,7 @@ public class SparkBatchCubingJobBuilder2 extends 
JobBuilderSupport {
     public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String 
submitter) {
         this(newSegment, submitter, 0);
     }
-    
+
     public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String 
submitter, Integer priorityOffset) {
         super(newSegment, submitter, priorityOffset);
         this.inputSide = SparkUtil.getBatchCubingInputSide(seg);
@@ -65,22 +66,8 @@ public class SparkBatchCubingJobBuilder2 extends 
JobBuilderSupport {
         // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables
         inputSide.addStepPhase1_CreateFlatTable(result);
 
-        // build global dict
-        KylinConfig dictConfig = seg.getConfig();
-        String[] mrHiveDictColumns = 
dictConfig.getMrHiveDictColumnsExcludeRefColumns();
-
-        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0
-                && !"".equals(mrHiveDictColumns[0])) {
-            //parallel part build
-            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
-            //parallel total build
-            result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
-        }
-
-        // merge global dic and replace flat table
-        if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && 
dictConfig.getMrHiveDictColumns().length > 0 && 
!"".equals(dictConfig.getMrHiveDictColumns()[0])){
-            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
-        }
+        // Build global dictionary in distributed way
+        buildHiveGlobalDictionaryByMR(result, jobId);
 
         // Phase 2: Build Dictionary
         if (seg.getConfig().isSparkFactDistinctEnable()) {
@@ -202,7 +189,7 @@ public class SparkBatchCubingJobBuilder2 extends 
JobBuilderSupport {
 
 
     public void configureSparkJob(final CubeSegment seg, final SparkExecutable 
sparkExecutable,
-            final String jobId, final String cuboidRootPath) {
+                                  final String jobId, final String 
cuboidRootPath) {
         final IJoinedFlatTableDesc flatTableDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
         final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, 
getJobWorkingDir(jobId));
         sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
@@ -228,4 +215,28 @@ public class SparkBatchCubingJobBuilder2 extends 
JobBuilderSupport {
         param.put("path", getDumpMetadataPath(jobId));
         return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), 
"hdfs", param).toString();
     }
+
+    /**
+     * Build hive global dictionary by MR and encode corresponding column into 
integer for flat table
+     */
+    protected void buildHiveGlobalDictionaryByMR(final CubingJob result, 
String jobId) {
+        KylinConfig dictConfig = seg.getConfig();
+        String[] mrHiveDictColumnExcludeRef = 
dictConfig.getMrHiveDictColumnsExcludeRefColumns();
+        String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns();
+
+        if (Objects.nonNull(mrHiveDictColumnExcludeRef) && 
mrHiveDictColumnExcludeRef.length > 0
+                && !"".equals(mrHiveDictColumnExcludeRef[0])) {
+
+            // 1. parallel part build
+            result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
+
+            // 2. parallel total build
+            result.addTask(createBuildGlobalHiveDictTotalBuildJob(jobId));
+        }
+
+        // merge global dic and replace flat table
+        if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 
&& !"".equals(mrHiveDictColumns[0])) {
+            inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result);
+        }
+    }
 }
diff --git 
a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json 
b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
index fce237e..763cf90 100644
--- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
+++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json
@@ -595,7 +595,8 @@
   "override_kylin_properties": {
     "kylin.cube.algorithm": "LAYER",
     "kylin.dictionary.shrunken-from-global-enabled": "false",
-    "kylin.dictionary.mr-hive.columns": 
"TEST_KYLIN_FACT_TEST_COUNT_DISTINCT_BITMAP"
+    "kylin.dictionary.mr-hive.columns": 
"TEST_KYLIN_FACT_TEST_COUNT_DISTINCT_BITMAP",
+    "kylin.source.hive.databasedir" : "/apps/hive/warehouse"
   },
   "partition_date_start": 0
 }
diff --git a/kubernetes/README.md b/kubernetes/README.md
index 964c612..2d0ecc6 100644
--- a/kubernetes/README.md
+++ b/kubernetes/README.md
@@ -1,4 +1,4 @@
-## Backgroud
+## Background
 Kubernetes is a portable, extensible, open-source platform for managing 
containerized workloads and services, that facilitates 
 both declarative configuration and automation. It has a large, rapidly growing 
ecosystem. Kubernetes services, support, 
 and tools are widely available.
@@ -11,7 +11,7 @@ cluster, will reduce cost of maintenance and extension.
   Please update your configuration file here. 
 - **template**
   This directory provided two deployment templates, one for **quick-start** 
purpose, another for **production/distributed** deployment.
-  1. Quick-start template is for one node deployment with an **ALL** kylin 
instance.
+  1. Quick-start template is for one node deployment with an **ALL** kylin 
instance for test or PoC purpose.
   2. Production template is for multi-nodes deployment with a few of 
**job**/**query** kylin instances; and some other service 
   like **memcached** and **filebeat** will help to satisfy log 
collection/query cache/session sharing demand.
 - **docker**
@@ -21,16 +21,16 @@ cluster, will reduce cost of maintenance and extension.
   This is a complete example by applying production template in a CDH 5.7 
hadoop env with step by step guide.  
  
 ### Note 
-1. CuratorScheduler is used as default JobScheduler because it is more 
flexible.
+1. **CuratorScheduler** is used as default JobScheduler because it is more 
flexible.
 2. Spark building require use `cluster` as deployMode. If you forget it, your 
spark application will never submitted successfully because Hadoop cluster can 
not resolve hostname of Pod (Spark Driver).
 3. To modify `/etc/hosts` in Pod, please check this : 
https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/
 . 
-4. To build you own kylin-client docker image, please don't forget to download 
and put following jars into KYLIN_HOME/tomcat/lib to enable tomcat session 
sharing.
+4. To build you own kylin-client docker image, please don't forget to download 
and put following jars into `KYLIN_HOME/tomcat/lib` to enable tomcat session 
sharing.
     - 
https://repo1.maven.org/maven2/de/javakaffee/msm/memcached-session-manager-tc7/2.1.1/
     - 
https://repo1.maven.org/maven2/de/javakaffee/msm/memcached-session-manager/2.1.1/
 5. If you have difficulty in configure filebeat, please check this 
https://www.elastic.co/guide/en/beats/filebeat/current/index.html .
 6. External query cache is enabled by default, if you are interested in 
detail, you may check 
http://kylin.apache.org/blog/2019/07/30/detailed-analysis-of-refine-query-cache/
 .
-7. All configuration files is separated from Docker image, please use 
configMap or secret. Compared to configMap, secrets is more recommended for 
security reason.
-8. Some verified kylin-client image will be published to DockerHub, here is 
the link https://hub.docker.com/r/apachekylin/kylin-client . You may consider 
contributed your Dockerfile to kylin's repo if you are interested.
+7. All configuration files is separated from Docker image, please use 
**configMap** or **secret**. Compared to **configMap**, **secret** is more 
recommended for security reason.
+8. Some verified kylin-client image will be published to DockerHub, here is 
the link https://hub.docker.com/r/apachekylin/kylin-client . You may consider 
contributed your `Dockerfile` to kylin's repo if you are interested.
  
 ### Reference 
 - JIRA ticket: https://issues.apache.org/jira/browse/KYLIN-4447
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
index 305cdae..8538622 100644
--- 
a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
+++ 
b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java
@@ -58,8 +58,9 @@ public class CreateMrHiveDictStep extends AbstractExecutable {
     private static final String GET_SQL = "\" Get Max Dict Value Sql : \"";
 
     protected void createMrHiveDict(KylinConfig config, DistributedLock lock) 
throws Exception {
-        logger.info("start to run createMrHiveDict {}", getId());
+        logger.info("Start to run createMrHiveDict {}", getId());
         try {
+            // Step 1: Apply for lock if required
             if (getIsLock()) {
                 getLock(lock);
             }
@@ -72,12 +73,13 @@ public class CreateMrHiveDictStep extends 
AbstractExecutable {
             if (sql != null && sql.length() > 0) {
                 hiveCmdBuilder.addStatement(sql);
             }
-            Map<String, String> maxDictValMap = 
deserilizeForMap(getMaxDictStatementMap());
-            Map<String, String> dictSqlMap = 
deserilizeForMap(getCreateTableStatementMap());
+            Map<String, String> maxDictValMap = 
deserializeForMap(getMaxDictStatementMap());
+            Map<String, String> dictSqlMap = 
deserializeForMap(getCreateTableStatementMap());
 
-            if (dictSqlMap != null && dictSqlMap.size() > 0) {
+            // Step 2: Execute HQL
+            if (!dictSqlMap.isEmpty()) {
                 IHiveClient hiveClient = HiveClientFactory.getHiveClient();
-                if (maxDictValMap != null && maxDictValMap.size() > 0) {
+                if (!maxDictValMap.isEmpty()) {
                     if (maxDictValMap.size() == dictSqlMap.size()) {
                         maxDictValMap.forEach((columnName, maxDictValSql) -> {
                             int max = 0;
@@ -111,7 +113,7 @@ public class CreateMrHiveDictStep extends 
AbstractExecutable {
 
             final String cmd = hiveCmdBuilder.toString();
 
-            stepLogger.log("MR/Hive dict, cmd: " + cmd);
+            stepLogger.log("Build Hive Global Dictionary by: " + cmd);
 
             CubeManager manager = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = manager.getCube(getCubeName());
@@ -123,9 +125,9 @@ public class CreateMrHiveDictStep extends 
AbstractExecutable {
                 if (response.getFirst() != 0) {
                     throw new RuntimeException("Failed to create MR/Hive dict, 
error code " + response.getFirst());
                 }
-                getManager().addJobInfo(getId(), stepLogger.getInfo());
             }
 
+            // Step 3: Release lock if required
             if (getIsUnlock()) {
                 unLock(lock);
             }
@@ -153,20 +155,10 @@ public class CreateMrHiveDictStep extends 
AbstractExecutable {
                 lock = 
KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread();
             }
 
-            String preHdfsShell = getPreHdfsShell();
-            if (Objects.nonNull(preHdfsShell) && 
!"".equalsIgnoreCase(preHdfsShell)) {
-                doRetry(preHdfsShell, config);
-            }
-
             createMrHiveDict(config, lock);
 
-            String postfixHdfsCmd = getPostfixHdfsShell();
-            if (Objects.nonNull(postfixHdfsCmd) && 
!"".equalsIgnoreCase(postfixHdfsCmd)) {
-                doRetry(postfixHdfsCmd, config);
-            }
-
             if (isDiscarded()) {
-                if (getIsLock()) {
+                if (getIsLock() && lock != null) {
                     unLock(lock);
                 }
                 return new ExecuteResult(ExecuteResult.State.DISCARDED, 
stepLogger.getBufferedLog());
@@ -224,44 +216,28 @@ public class CreateMrHiveDictStep extends 
AbstractExecutable {
     }
 
     public void setCreateTableStatementMap(Map<String, String> dictSqlMap) {
-        setParam("HiveRedistributeDataMap", serilizeToMap(dictSqlMap));
+        setParam("DictSqlMap", serializeMap(dictSqlMap));
     }
 
     public String getCreateTableStatementMap() {
-        return getParam("HiveRedistributeDataMap");
+        return getParam("DictSqlMap");
     }
 
     public void setMaxDictStatementMap(Map<String, String> maxDictValMap) {
-        setParam("DictMaxMap", serilizeToMap(maxDictValMap));
+        setParam("DictMaxMap", serializeMap(maxDictValMap));
     }
 
     public String getMaxDictStatementMap() {
         return getParam("DictMaxMap");
     }
 
-    public String getPreHdfsShell() {
-        return getParam("preHdfsCmd");
-    }
-
-    public void setPrefixHdfsShell(String cmd) {
-        setParam("preHdfsCmd", cmd);
-    }
-
-    public String getPostfixHdfsShell() {
-        return getParam("postfixHdfsCmd");
-    }
-
-    public void setPostfixHdfsShell(String cmd) {
-        setParam("postfixHdfsCmd", cmd);
-    }
-
     public void setIsLock(Boolean isLock) {
         setParam("isLock", String.valueOf(isLock));
     }
 
     public boolean getIsLock() {
         String isLock = getParam("isLock");
-        return Strings.isNullOrEmpty(isLock) ? false : 
Boolean.parseBoolean(isLock);
+        return !Strings.isNullOrEmpty(isLock) && Boolean.parseBoolean(isLock);
     }
 
     public void setJobFlowJobId(String jobId) {
@@ -278,7 +254,7 @@ public class CreateMrHiveDictStep extends 
AbstractExecutable {
 
     public boolean getIsUnlock() {
         String isUnLock = getParam("isUnLock");
-        return Strings.isNullOrEmpty(isUnLock) ? false : 
Boolean.parseBoolean(isUnLock);
+        return !Strings.isNullOrEmpty(isUnLock) && 
Boolean.parseBoolean(isUnLock);
     }
 
     public void setLockPathName(String pathName) {
@@ -368,7 +344,7 @@ public class CreateMrHiveDictStep extends 
AbstractExecutable {
                                 }
                             }
                         }
-                        isLocked = true;//get lock fail,will try again
+                        isLocked = true; //get lock fail,will try again
                     }
                 }
                 // wait 1 min and try again
@@ -402,7 +378,7 @@ public class CreateMrHiveDictStep extends 
AbstractExecutable {
         }
     }
 
-    private static String serilizeToMap(Map<String, String> map) {
+    private static String serializeMap(Map<String, String> map) {
         JSONArray result = new JSONArray();
         if (map != null && map.size() > 0) {
             map.forEach((key, value) -> {
@@ -418,7 +394,7 @@ public class CreateMrHiveDictStep extends 
AbstractExecutable {
         return result.toString();
     }
 
-    private static Map<String, String> deserilizeForMap(String mapStr) {
+    private static Map<String, String> deserializeForMap(String mapStr) {
         Map<String, String> result = new HashMap<>();
         if (mapStr != null) {
             try {
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
index 49e3f8d..c60a2ce 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java
@@ -6,15 +6,15 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.source.hive;
 
@@ -43,7 +43,6 @@ import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.model.CubeDesc;
 import org.apache.kylin.engine.mr.IInput;
 import org.apache.kylin.engine.mr.JobBuilderSupport;
-import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
 import org.apache.kylin.engine.spark.SparkCreatingFlatTable;
 import org.apache.kylin.engine.spark.SparkExecutable;
@@ -97,17 +96,12 @@ public class HiveInputBase {
             // create flat table first
             addStepPhase1_DoCreateFlatTable(jobFlow);
 
-            // create global dict
-            KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
+            // create hive global dictionary
+            KylinConfig dictConfig = flatDesc.getSegment().getConfig();
             String[] mrHiveDictColumns = 
dictConfig.getMrHiveDictColumnsExcludeRefColumns();
             if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length 
> 0
                     && !"".equals(mrHiveDictColumns[0])) {
-                String globalDictDatabase = dictConfig.getMrHiveDictDB();
-                if (null == globalDictDatabase) {
-                    throw new IllegalArgumentException("Mr-Hive Global dict 
database is null.");
-                }
-                String globalDictTable = cubeName + 
dictConfig.getMrHiveDictTableSuffix();
-                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, 
mrHiveDictColumns, globalDictDatabase, globalDictTable);
+                addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, 
mrHiveDictColumns);
             }
 
             // then count and redistribute
@@ -129,80 +123,75 @@ public class HiveInputBase {
 
         @Override
         public void 
addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable 
jobFlow) {
-            KylinConfig dictConfig = (flatDesc.getSegment()).getConfig();
+            KylinConfig dictConfig = flatDesc.getSegment().getConfig();
             final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
+            String globalDictTable = 
MRHiveDictUtil.globalDictTableName(flatDesc, cubeName);
+            String globalDictDatabase = dictConfig.getMrHiveDictDB();
+
             String[] mrHiveDictColumnsExcludeRefCols = 
dictConfig.getMrHiveDictColumnsExcludeRefColumns();
             Map<String, String> dictRef = dictConfig.getMrHiveDictRefColumns();
             final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
 
-            String globalDictDatabase = dictConfig.getMrHiveDictDB();
-            if (null == globalDictDatabase) {
-                throw new IllegalArgumentException("Mr-Hive Global dict 
database is null.");
-            }
-            String globalDictTable = cubeName + 
dictConfig.getMrHiveDictTableSuffix();
-            if(Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && 
mrHiveDictColumnsExcludeRefCols.length > 0) {
-                //merge to dict table step
+            if (Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && 
mrHiveDictColumnsExcludeRefCols.length > 0) {
                 jobFlow.addTask(createHiveGlobalDictMergeGlobalDict(flatDesc, 
hiveInitStatements, cubeName, mrHiveDictColumnsExcludeRefCols, 
globalDictDatabase, globalDictTable));
-
                 for (String item : mrHiveDictColumnsExcludeRefCols) {
                     dictRef.put(item, "");
                 }
             }
 
-            //replace step
-            if(dictRef.size()>0) {
+            // replace step
+            if (!dictRef.isEmpty()) {
                 jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, 
hiveInitStatements, cubeName,
                         dictRef, flatTableDatabase, globalDictDatabase, 
globalDictTable, dictConfig.getMrHiveDictTableSuffix(), jobFlow.getId()));
             }
-
         }
 
-        protected void 
addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow,
-                String[] mrHiveDictColumns, String globalDictDatabase, String 
globalDictTable) {
+        /**
+         * 1. Create three related tables
+         * 2. Insert distinct value into distinct value table
+         * 3. Calculate statistics for dictionary
+         */
+        protected void 
addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow, 
String[] mrHiveDictColumns) {
             final String cubeName = 
CubingExecutableUtil.getCubeName(jobFlow.getParams());
             final String hiveInitStatements = 
JoinedFlatTable.generateHiveInitStatements(flatTableDatabase);
 
-            //Crete tables for global dict and extract distinct value
             jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, 
hiveInitStatements, cubeName,
-                    mrHiveDictColumns, globalDictDatabase, globalDictTable, 
jobFlow.getId()));
+                    mrHiveDictColumns, jobFlow.getId()));
 
         }
 
-        protected static AbstractExecutable 
createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc,
-                String hiveInitStatements, String cubeName, String[] 
mrHiveDictColumns,
-                String globalDictDatabase, String globalDictTable, String 
jobId) {
-            // Firstly, determine if the global dict hive table of cube is 
exists.
-            String createGlobalDictTableHql = "CREATE TABLE IF NOT EXISTS " + 
globalDictDatabase + "." + globalDictTable
-                    + "\n" + "( dict_key STRING COMMENT '', \n" + "dict_val 
INT COMMENT '' \n" + ") \n"
-                    + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) 
\n" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n" + "STORED AS 
TEXTFILE; \n";
-
-            final String dropDictIntermediateTableHql = 
MRHiveDictUtil.generateDropTableStatement(flatDesc);
-            final String createDictIntermediateTableHql = 
MRHiveDictUtil.generateCreateTableStatement(flatDesc);
-            final String groupByTable = flatDesc.getTableName() + 
flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
-            final String globalDictIntermediateTable = 
MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
-            final String dropGlobalDictIntermediateTableHql = 
MRHiveDictUtil.generateDropTableStatement(globalDictIntermediateTable);
-            final String createGlobalDictIntermediateTableHql = 
MRHiveDictUtil.generateCreateGlobalDicIntermediateTableStatement(globalDictIntermediateTable);
-
-            String maxAndDistinctCountSql = "INSERT OVERWRITE TABLE  " + 
groupByTable + " PARTITION (DICT_COLUMN = '" + 
BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') "
-                    + "\n" + "SELECT  CONCAT_WS(',', tc.dict_column, 
cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', 
cast(max_dict_val as string))) "
-                    + "\n" + "FROM ("
-                    + "\n" + "    SELECT  dict_column,count(1) 
total_distinct_val FROM "
-                    + "\n" + groupByTable + " where DICT_COLUMN != '" + 
BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "' group by dict_column) 
tc "
-                    + "\n" + "LEFT JOIN (\n"
-                    + "\n" + "    SELECT  dict_column,if(max(dict_val) is 
null, 0, max(dict_val)) as max_dict_val FROM "
-                    + "\n" + globalDictDatabase + "." + globalDictTable + " 
group by dict_column) tm "
-                    + "\n" + "ON  tc.dict_column = tm.dict_column;";
+        protected static AbstractExecutable 
createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc, String 
hiveInitStatements,
+                                                                              
String cubeName, String[] mrHiveDictColumns, String jobId) {
+            KylinConfig cfg = flatDesc.getSegment().getConfig();
+            String globalDictTable = 
MRHiveDictUtil.globalDictTableName(flatDesc, cubeName);
+            String globalDictDatabase = cfg.getMrHiveDictDB();
+            final String distinctValueTable = 
MRHiveDictUtil.distinctValueTable(flatDesc);
+            final String segmentLevelDictTableName = 
MRHiveDictUtil.segmentLevelDictTableName(flatDesc);
+
+            final String createGlobalDictTableHql = 
MRHiveDictUtil.generateDictionaryDdl(globalDictDatabase, globalDictTable);
+            final String dropDistinctValueTableHql = 
MRHiveDictUtil.generateDropTableStatement(distinctValueTable);
+            final String createDistinctValueTableHql = 
MRHiveDictUtil.generateDistinctValueTableStatement(flatDesc);
+            final String dropSegmentLevelDictTableHql = 
MRHiveDictUtil.generateDropTableStatement(segmentLevelDictTableName);
+            final String createSegmentLevelDictTableHql = 
MRHiveDictUtil.generateDictTableStatement(segmentLevelDictTableName);
+
+            String maxAndDistinctCountSql = 
MRHiveDictUtil.generateDictStatisticsSql(distinctValueTable, globalDictTable, 
globalDictDatabase);
 
             StringBuilder insertDataToDictIntermediateTableSql = new 
StringBuilder();
             for (String dictColumn : mrHiveDictColumns) {
                 insertDataToDictIntermediateTableSql
                         
.append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn, 
globalDictDatabase, globalDictTable));
             }
-            String set = "set hive.exec.compress.output=false;set 
hive.mapred.mode=unstrict;";
+            String setParametersHql = "set hive.exec.compress.output=false;set 
hive.mapred.mode=unstrict;";
             CreateMrHiveDictStep step = new CreateMrHiveDictStep();
             step.setInitStatement(hiveInitStatements);
-            step.setCreateTableStatement(set + createGlobalDictTableHql + 
dropDictIntermediateTableHql
-                    + createDictIntermediateTableHql + 
dropGlobalDictIntermediateTableHql + createGlobalDictIntermediateTableHql + 
insertDataToDictIntermediateTableSql.toString() + maxAndDistinctCountSql);
+            step.setCreateTableStatement(setParametersHql
+                    + createGlobalDictTableHql
+                    + dropDistinctValueTableHql
+                    + createDistinctValueTableHql
+                    + dropSegmentLevelDictTableHql
+                    + createSegmentLevelDictTableHql
+                    + insertDataToDictIntermediateTableSql.toString()
+                    + maxAndDistinctCountSql);
             CubingExecutableUtil.setCubeName(cubeName, step.getParams());
             
step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL);
             step.setIsLock(true);
@@ -212,20 +201,27 @@ public class HiveInputBase {
             return step;
         }
 
+        /**
+         * In the previous step, data of hive global dictionary is prepared by 
MR,
+         * so now it is time for create partition for Segment Dictionary Table
+         * and merge into Hive Global Dictionary Table.
+         */
         protected static AbstractExecutable 
createHiveGlobalDictMergeGlobalDict(IJoinedFlatTableDesc flatDesc,
                                                                                
 String hiveInitStatements, String cubeName, String[] mrHiveDictColumns,
                                                                                
 String globalDictDatabase, String globalDictTable) {
 
-            String globalDictItermediateTable = 
MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc);
+            String globalDictIntermediateTable = 
MRHiveDictUtil.segmentLevelDictTableName(flatDesc);
+            StringBuilder addPartitionHql = new StringBuilder();
 
-            StringBuffer addPartition = new StringBuffer();
-            Map<String, String> maxDictValMap = new HashMap<>();
             Map<String, String> dictHqlMap = new HashMap<>();
             for (String dictColumn : mrHiveDictColumns) {
                 try {
-                    addPartition.append("alter table 
").append(globalDictItermediateTable)
-                            .append(" add  IF NOT EXISTS partition 
(dict_column='").append(dictColumn)
-                            .append("');").append(" \n");
+                    addPartitionHql.append("ALTER TABLE ")
+                            .append(globalDictIntermediateTable)
+                            .append(" ADD IF NOT EXISTS PARTITION 
(dict_column='")
+                            .append(dictColumn)
+                            .append("');")
+                            .append(" \n");
 
                     String dictHql = "INSERT OVERWRITE TABLE " + 
globalDictDatabase + "." + globalDictTable + " \n"
                             + "PARTITION (dict_column = '" + dictColumn + "') 
\n"
@@ -233,7 +229,7 @@ public class HiveInputBase {
                             + globalDictDatabase + "." + globalDictTable + " 
\n" + "WHERE dict_column = '" + dictColumn
                             + "' \n" + 
flatDesc.getDataModel().getConfig().getHiveUnionStyle() + " \n"
                             + "SELECT dict_key, dict_val FROM "
-                            + globalDictItermediateTable + " \n" + " WHERE 
dict_column = '" + dictColumn + "' ;\n";
+                            + globalDictIntermediateTable + " \n" + " WHERE 
dict_column = '" + dictColumn + "' ;\n";
                     dictHqlMap.put(dictColumn, dictHql);
                 } catch (Exception e) {
                     logger.error("", e);
@@ -241,9 +237,8 @@ public class HiveInputBase {
             }
             String hiveInitStatementForUnstrict = "set 
hive.mapred.mode=unstrict;";
             CreateMrHiveDictStep step = new CreateMrHiveDictStep();
-            step.setInitStatement(hiveInitStatements + 
hiveInitStatementForUnstrict + addPartition);
+            step.setInitStatement(hiveInitStatements + 
hiveInitStatementForUnstrict + addPartitionHql);
             step.setCreateTableStatementMap(dictHqlMap);
-            step.setMaxDictStatementMap(maxDictValMap);
             step.setIsLock(false);
             step.setIsUnLock(false);
             step.setLockPathName(cubeName);
@@ -252,58 +247,75 @@ public class HiveInputBase {
             return step;
         }
 
+        /**
+         * Use Hive Global Dictionary to replace/encode flat table
+         *
+         * @param mrHiveDictColumns a Map which key is and vale is .
+         */
         protected static AbstractExecutable 
createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc, String 
hiveInitStatements, String cubeName, Map<String, String> mrHiveDictColumns, 
String flatTableDatabase, String globalDictDatabase, String globalDictTable, 
String dictSuffix, String jobId) {
             Map<String, String> dictHqlMap = new HashMap<>();
-            StringBuilder addPartition = new StringBuilder();
             for (String dictColumn : mrHiveDictColumns.keySet()) {
-                StringBuilder dictHql = new StringBuilder();
+                StringBuilder insertOverwriteHql = new StringBuilder();
                 TblColRef dictColumnRef = null;
 
                 String flatTable = flatTableDatabase + "." + 
flatDesc.getTableName();
-                // replace the flat table's dict column value
-                dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n");
+                insertOverwriteHql.append("INSERT OVERWRITE TABLE 
").append(flatTable).append(" \n");
                 try {
-                    dictHql.append("SELECT \n");
-                    Integer flatTableColumnSize = 
flatDesc.getAllColumns().size();
+                    insertOverwriteHql.append("SELECT \n");
+                    int flatTableColumnSize = flatDesc.getAllColumns().size();
                     for (int i = 0; i < flatTableColumnSize; i++) {
                         TblColRef tblColRef = flatDesc.getAllColumns().get(i);
+                        String colName = JoinedFlatTable.colName(tblColRef, 
flatDesc.useAlias());
+
                         if (i > 0) {
-                            dictHql.append(",");
+                            insertOverwriteHql.append(",");
                         }
-                        if (JoinedFlatTable.colName(tblColRef, 
flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
-                            dictHql.append("b.dict_val \n");
+
+                        if (colName.equalsIgnoreCase(dictColumn)) {
+                            // Note: replace original value into encoded 
integer
+                            insertOverwriteHql.append("b.dict_val \n");
                             dictColumnRef = tblColRef;
                         } else {
-                            dictHql.append("a." + 
JoinedFlatTable.colName(tblColRef) + " \n");
+                            // Note: keep its original value
+                            insertOverwriteHql.append("a.")
+                                    .append(JoinedFlatTable.colName(tblColRef))
+                                    .append(" \n");
                         }
                     }
 
                     if 
(!Strings.isNullOrEmpty(mrHiveDictColumns.get(dictColumn))) {
-                        String[] cubePartion = 
mrHiveDictColumns.get(dictColumn).split("\\.");
-
-                        String refGlobalDictTable = cubePartion[0] + 
dictSuffix;
-                        String refDictColumn = cubePartion[1];
-
-                        dictHql.append("FROM " + flatTable + " a \n" + "LEFT 
OUTER JOIN \n" + "( \n"
-                                + "SELECT dict_key, dict_val FROM " + 
globalDictDatabase + "." + refGlobalDictTable
-                                + " WHERE dict_column = '" + refDictColumn + 
"' \n" + ") b \n" + " ON a."
-                                + JoinedFlatTable.colName(dictColumnRef) + " = 
b.dict_key;");
-                        dictHqlMap.put(dictColumn, dictHql.toString());
-                    }else {
-                        dictHql.append("FROM " + flatTable + " a \n" + "LEFT 
OUTER JOIN \n" + "( \n"
-                                + "SELECT dict_key, dict_val FROM " + 
globalDictDatabase + "." + globalDictTable
-                                + " WHERE dict_column = '" + dictColumn + "' 
\n" + ") b \n" + " ON a."
-                                + JoinedFlatTable.colName(dictColumnRef) + " = 
b.dict_key;");
+                        // Note: reuse previous hive global dictionary
+                        String[] tableColumn = 
mrHiveDictColumns.get(dictColumn).split("\\.");
+
+                        String refGlobalDictTable = tableColumn[0] + 
dictSuffix;
+                        String refDictColumn = tableColumn[1];
+
+                        insertOverwriteHql
+                                .append("FROM ").append(flatTable).append(" a 
\nLEFT OUTER JOIN \n (")
+                                .append("SELECT dict_key, dict_val FROM ")
+                                
.append(globalDictDatabase).append(".").append(refGlobalDictTable)
+                                .append(" WHERE dict_column = 
'").append(refDictColumn).append("') b \n")
+                                .append("ON 
a.").append(JoinedFlatTable.colName(dictColumnRef)).append(" = b.dict_key;");
+                        dictHqlMap.put(dictColumn, 
insertOverwriteHql.toString());
+                    } else {
+                        // Note: use hive global dictionary built by current 
cube
+                        insertOverwriteHql
+                                .append("FROM ").append(flatTable).append(" a 
\nLEFT OUTER JOIN \n (")
+                                .append("SELECT dict_key, dict_val FROM ")
+                                
.append(globalDictDatabase).append(".").append(globalDictTable)
+                                .append(" WHERE dict_column = 
'").append(dictColumn).append("') b \n")
+                                .append("ON 
a.").append(JoinedFlatTable.colName(dictColumnRef)).append(" = b.dict_key;");
                     }
-                    dictHqlMap.put(dictColumn, dictHql.toString());
+                    dictHqlMap.put(dictColumn, insertOverwriteHql.toString());
                 } catch (Exception e) {
                     logger.error("", e);
                 }
             }
-            String set = "set hive.exec.compress.output=false; set 
hive.mapred.mode=unstrict;";
+            String setParameterHal = "set hive.exec.compress.output=false; set 
hive.mapred.mode=unstrict;";
             CreateMrHiveDictStep step = new CreateMrHiveDictStep();
-            step.setInitStatement(hiveInitStatements + set + addPartition);
+            step.setInitStatement(hiveInitStatements + setParameterHal);
             step.setCreateTableStatementMap(dictHqlMap);
+
             step.setIsUnLock(true);
             step.setLockPathName(cubeName);
             step.setJobFlowJobId(jobId);
@@ -389,7 +401,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable createFlatHiveTableStep(String 
hiveInitStatements, String jobWorkingDir,
-            String cubeName, IJoinedFlatTableDesc flatDesc) {
+                                                                String 
cubeName, IJoinedFlatTableDesc flatDesc) {
         //from hive to hive
         final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatDesc);
         final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
@@ -404,7 +416,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable createFlatHiveTableByLivyStep(String 
hiveInitStatements, String jobWorkingDir,
-            String cubeName, IJoinedFlatTableDesc flatDesc) {
+                                                                      String 
cubeName, IJoinedFlatTableDesc flatDesc) {
         //from hive to hive
         final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatDesc);
         final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir);
@@ -419,7 +431,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable createFlatHiveTableBySparkSql(String 
hiveInitStatements,
-            String jobWorkingDir, String cubeName, IJoinedFlatTableDesc 
flatDesc) {
+                                                                      String 
jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) {
         final String dropTableHql = 
JoinedFlatTable.generateDropTableStatement(flatDesc);
         final String createTableHql = 
JoinedFlatTable.generateCreateTableStatement(flatDesc,
                 jobWorkingDir);
@@ -472,7 +484,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable 
createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName,
-            IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
+                                                                            
IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
         RedistributeFlatHiveTableStep step = new 
RedistributeFlatHiveTableStep();
         step.setInitStatement(hiveInitStatements);
         step.setIntermediateTable(flatDesc.getTableName());
@@ -483,7 +495,7 @@ public class HiveInputBase {
     }
 
     protected static AbstractExecutable 
createRedistributeFlatHiveTableByLivyStep(String hiveInitStatements,
-            String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) 
{
+                                                                               
   String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) {
         RedistributeFlatHiveTableByLivyStep step = new 
RedistributeFlatHiveTableByLivyStep();
         step.setInitStatement(hiveInitStatements);
         step.setIntermediateTable(flatDesc.getTableName());
@@ -493,8 +505,8 @@ public class HiveInputBase {
         return step;
     }
 
-    protected static ShellExecutable 
createLookupHiveViewMaterializationStep(String hiveInitStatements,
-            String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> 
intermediateTables, String uuid) {
+    protected static ShellExecutable 
createLookupHiveViewMaterializationStep(String hiveInitStatements, String 
jobWorkingDir, IJoinedFlatTableDesc flatDesc,
+                                                                             
List<String> intermediateTables, String uuid) {
         ShellExecutable step = new ShellExecutable();
         
step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP);
 
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
index 85cd855..573ecd3 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java
@@ -27,6 +27,7 @@ import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.livy.LivyRestBuilder;
 import org.apache.kylin.common.livy.LivyRestExecutor;
 import org.apache.kylin.common.livy.LivyTypeEnum;
+import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.job.JoinedFlatTable;
 import org.apache.kylin.job.common.PatternedLogger;
 import org.apache.kylin.job.constant.ExecutableConstants;
@@ -41,6 +42,30 @@ import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+/**
+ * <pre>
+ * Hold some constant/enum/statement for Hive Global Dictionary.
+ *
+ * There are two different temporary tables which help to build Hive Global 
Dictionary.
+ * They should be deleted at the final step of building job.
+ *   1. Distinct Value Table (Temporary table)
+ *      TableName: ${FlatTable}_${DistinctValueSuffix}
+ *      Schema: One normal column, for original column value; with another 
partition column.
+ *   @see #distinctValueTable
+ *
+ *   2. Segment Level Dictionary Table (Temporary table)
+ *      TableName: ${FlatTable}_${DictTableSuffix}
+ *      Schema: Two normal columns, first for original column value, second 
for is its encoded integer;
+ *          also with another partition column
+ *   @see #segmentLevelDictTableName
+ *
+ * After that, Hive Global Dictionary itself is stored in a third hive table.
+ *   3. Hive Global Dictionary Table
+ *      TableName: ${CubeName}_${DictTableSuffix}
+ *      Schema: Two columns, first for original column value, second is its 
encoded integer; also with another partition column
+ *   @see #globalDictTableName
+ * </pre>
+ */
 public class MRHiveDictUtil {
     private static final Logger logger = 
LoggerFactory.getLogger(MRHiveDictUtil.class);
     protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION 
\'(.*)\';");
@@ -59,12 +84,27 @@ public class MRHiveDictUtil {
         }
     }
 
-    public static String generateDropTableStatement(IJoinedFlatTableDesc 
flatDesc) {
-        StringBuilder ddl = new StringBuilder();
-        String table = flatDesc.getTableName()
-                + 
flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
-        ddl.append("DROP TABLE IF EXISTS " + table + ";").append(" \n");
-        return ddl.toString();
+    public static String distinctValueTable(IJoinedFlatTableDesc flatDesc) {
+        return flatDesc.getTableName() + 
flatDesc.getSegment().getConfig().getMrHiveDistinctValueTableSuffix();
+    }
+
+    public static String segmentLevelDictTableName(IJoinedFlatTableDesc 
flatDesc) {
+        return flatDesc.getTableName() + 
flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
+    }
+
+    public static String globalDictTableName(IJoinedFlatTableDesc flatDesc, 
String cubeName) {
+        return cubeName + 
flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
+    }
+
+    public static String generateDictionaryDdl(String db, String tbl) {
+        return "CREATE TABLE IF NOT EXISTS " + db + "." + tbl + "\n"
+                + " ( dict_key STRING COMMENT '', \n"
+                + "   dict_val INT COMMENT '' \n"
+                + ") \n"
+                + "COMMENT 'Hive Global Dictionary' \n"
+                + "PARTITIONED BY (dict_column string) \n"
+                + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n"
+                + "STORED AS TEXTFILE; \n";
     }
 
     public static String generateDropTableStatement(String tableName) {
@@ -73,14 +113,14 @@ public class MRHiveDictUtil {
         return ddl.toString();
     }
 
-    public static String generateCreateTableStatement(IJoinedFlatTableDesc 
flatDesc) {
+    public static String 
generateDistinctValueTableStatement(IJoinedFlatTableDesc flatDesc) {
         StringBuilder ddl = new StringBuilder();
         String table = flatDesc.getTableName()
-                + 
flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
+                + 
flatDesc.getSegment().getConfig().getMrHiveDistinctValueTableSuffix();
 
         ddl.append("CREATE TABLE IF NOT EXISTS " + table + " \n");
         ddl.append("( \n ");
-        ddl.append("dict_key" + " " + "STRING" + " COMMENT '' \n");
+        ddl.append("  dict_key" + " " + "STRING" + " COMMENT '' \n");
         ddl.append(") \n");
         ddl.append("COMMENT '' \n");
         ddl.append("PARTITIONED BY (dict_column string) \n");
@@ -89,13 +129,13 @@ public class MRHiveDictUtil {
         return ddl.toString();
     }
 
-    public static String 
generateCreateGlobalDicIntermediateTableStatement(String globalTableName) {
+    public static String generateDictTableStatement(String globalTableName) {
         StringBuilder ddl = new StringBuilder();
 
         ddl.append("CREATE TABLE IF NOT EXISTS " + globalTableName + " \n");
         ddl.append("( \n ");
-        ddl.append("dict_key" + " " + "STRING" + " COMMENT '' , \n");
-        ddl.append("dict_val" + " " + "STRING" + " COMMENT '' \n");
+        ddl.append("  dict_key" + " " + "STRING" + " COMMENT '' , \n");
+        ddl.append("  dict_val" + " " + "STRING" + " COMMENT '' \n");
         ddl.append(") \n");
         ddl.append("COMMENT '' \n");
         ddl.append("PARTITIONED BY (dict_column string) \n");
@@ -105,12 +145,12 @@ public class MRHiveDictUtil {
         return ddl.toString();
     }
 
+    /**
+     * Fetch distinct value from flat table and insert into distinctValueTable.
+     *
+     * @see #distinctValueTable
+     */
     public static String generateInsertDataStatement(IJoinedFlatTableDesc 
flatDesc, String dictColumn, String globalDictDatabase, String globalDictTable) 
{
-        String table = getMRHiveFlatTableGroupBytableName(flatDesc);
-
-        StringBuilder sql = new StringBuilder();
-        sql.append("SELECT a.DICT_KEY FROM (" + "\n");
-
         int index = 0;
         for (TblColRef tblColRef : flatDesc.getAllColumns()) {
             if (JoinedFlatTable.colName(tblColRef, 
flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) {
@@ -118,42 +158,56 @@ public class MRHiveDictUtil {
             }
             index++;
         }
-
         if (index == flatDesc.getAllColumns().size()) {
             String msg = "Can not find correct column for " + dictColumn
                     + ", please check 'kylin.dictionary.mr-hive.columns'";
             logger.error(msg);
             throw new IllegalArgumentException(msg);
         }
-        sql.append(" SELECT " + "\n");
-        TblColRef col = flatDesc.getAllColumns().get(index);
-        sql.append(JoinedFlatTable.colName(col) + "  as DICT_KEY \n");
-
-        MRHiveDictUtil.appendJoinStatement(flatDesc, sql);
 
-        //group by
-        sql.append("GROUP BY ");
-        sql.append(JoinedFlatTable.colName(col) + ") a \n");
+        String table = distinctValueTable(flatDesc);
+        StringBuilder sql = new StringBuilder();
+        TblColRef col = flatDesc.getAllColumns().get(index);
 
-        //join
-        sql.append(" LEFT JOIN \n");
-        sql.append("(SELECT  DICT_KEY FROM ");
-        sql.append(globalDictDatabase).append(".").append(globalDictTable);
-        sql.append(" WHERE DICT_COLUMN = '" + dictColumn + "'");
-        sql.append(") b \n");
+        sql.append("SELECT a.DICT_KEY FROM (\n");
+        sql.append("  SELECT " + "\n");
+        sql.append(JoinedFlatTable.colName(col)).append(" as DICT_KEY \n");
+        sql.append("  FROM ").append(flatDesc.getTableName()).append("\n");
+        sql.append("  GROUP BY ");
+        sql.append(JoinedFlatTable.colName(col)).append(") a \n");
+        sql.append("    LEFT JOIN \n");
+        sql.append("  (SELECT DICT_KEY FROM 
").append(globalDictDatabase).append(".").append(globalDictTable);
+        sql.append("    WHERE DICT_COLUMN = '").append(dictColumn);
+        sql.append("' ) b \n");
         sql.append("ON a.DICT_KEY = b.DICT_KEY \n");
-        sql.append("WHERE   b.DICT_KEY IS NULL \n");
+        sql.append("WHERE b.DICT_KEY IS NULL \n");
 
-        return "INSERT OVERWRITE TABLE " + table + " \n" + "PARTITION 
(dict_column = '" + dictColumn + "')" + " \n"
-                + sql + ";\n";
+        return "INSERT OVERWRITE TABLE " + table + " \n"
+                + "PARTITION (dict_column = '" + dictColumn + "')" + " \n"
+                + sql.toString()
+                + ";\n";
     }
 
-    public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, 
StringBuilder sql) {
-        sql.append("FROM " + flatDesc.getTableName() + "\n");
+    /**
+     * Calculate and store "columnName,segmentDistinctCount,previousMaxDictId" 
into specific partition
+     */
+    public static String generateDictStatisticsSql(String distinctValueTable, 
String globalDictTable, String globalDictDatabase) {
+        return "INSERT OVERWRITE TABLE  " + distinctValueTable + " PARTITION 
(DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') "
+                + "\n" + "SELECT CONCAT_WS(',', tc.dict_column, 
cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', 
cast(max_dict_val as string))) "
+                + "\n" + "FROM ("
+                + "\n" + "    SELECT dict_column, count(1) total_distinct_val"
+                + "\n" + "    FROM " + globalDictDatabase + "." + 
distinctValueTable
+                + "\n" + "    WHERE DICT_COLUMN != '" + 
BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "'"
+                + "\n" + "    GROUP BY dict_column) tc "
+                + "\n" + "LEFT JOIN (\n"
+                + "\n" + "    SELECT dict_column, if(max(dict_val) is null, 0, 
max(dict_val)) as max_dict_val "
+                + "\n" + "    FROM " + globalDictDatabase + "." + 
globalDictTable
+                + "\n" + "    GROUP BY dict_column) tm "
+                + "\n" + "ON tc.dict_column = tm.dict_column;";
     }
 
     public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig 
config, ImmutableList<String> sqls,
-            ExecutableManager executableManager, String jobId) throws 
IOException {
+                                     ExecutableManager executableManager, 
String jobId) throws IOException {
         final LivyRestBuilder livyRestBuilder = new LivyRestBuilder();
         livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride());
         StringBuilder stringBuilder = new StringBuilder();
@@ -171,7 +225,7 @@ public class MRHiveDictUtil {
         executor.execute(livyRestBuilder, stepLogger);
 
         Map<String, String> info = stepLogger.getInfo();
-        //get the flat Hive table size
+        // get the flat Hive table size
         Matcher matcher = HDFS_LOCATION.matcher(args);
         if (matcher.find()) {
             String hiveFlatTableHdfsUrl = matcher.group(1);
@@ -194,14 +248,6 @@ public class MRHiveDictUtil {
         return DictHiveType.MrEphemeralDictLockPath.getName() + cubeName;
     }
 
-    public static String 
getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) {
-        return flatDesc.getTableName() + 
flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix();
-    }
-
-    public static String 
getMRHiveFlatTableGlobalDictTableName(IJoinedFlatTableDesc flatDesc) {
-        return flatDesc.getTableName() + 
flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix();
-    }
-
     private static long getFileSize(String hdfsUrl) throws IOException {
         Configuration configuration = new Configuration();
         Path path = new Path(hdfsUrl);

Reply via email to