This is an automated email from the ASF dual-hosted git repository.

shaofengshi pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 637f45d8444c7b52713780c1701d33d6656fffc0
Author: Zhong <[email protected]>
AuthorDate: Wed Aug 15 17:40:54 2018 +0800

    KYLIN-3491 add a shrunken global dictionary step to improve the encoding 
process
    
    Signed-off-by: shaofengshi <[email protected]>
---
 .../org/apache/kylin/common/KylinConfigBase.java   |   4 +
 .../java/org/apache/kylin/cube/model/CubeDesc.java |   4 +
 .../org/apache/kylin/dict/ShrunkenDictionary.java  | 159 +++++++++++++++++++++
 .../kylin/dict/ShrunkenDictionaryBuilder.java      |  49 +++++++
 .../apache/kylin/dict/ShrunkenDictionaryTest.java  |  77 ++++++++++
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |  10 ++
 .../java/org/apache/kylin/engine/mr/IMRInput.java  |   4 +
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  22 +++
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |   3 +
 .../kylin/engine/mr/common/BaseCuboidBuilder.java  |   6 +-
 .../kylin/engine/mr/common/BatchConstants.java     |   1 +
 .../engine/mr/common/DictionaryGetterUtil.java     |  76 ++++++++++
 .../engine/mr/steps/BaseCuboidMapperBase.java      |  10 +-
 .../apache/kylin/engine/mr/steps/CuboidJob.java    |   5 +
 ...ob.java => ExtractDictionaryFromGlobalJob.java} | 108 ++++++--------
 .../steps/ExtractDictionaryFromGlobalMapper.java   | 141 ++++++++++++++++++
 .../kylin/engine/mr/steps/InMemCuboidJob.java      |   5 +
 .../engine/mr/steps/InMemCuboidMapperBase.java     |  15 +-
 .../org/apache/kylin/source/hive/HiveMRInput.java  |   9 ++
 .../apache/kylin/source/kafka/KafkaMRInput.java    |   7 +
 .../kylin/storage/hbase/steps/HBaseJobSteps.java   |   1 +
 22 files changed, 633 insertions(+), 84 deletions(-)

diff --git 
a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java 
b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
index 925488f..58d9caa 100644
--- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
+++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java
@@ -430,6 +430,10 @@ abstract public class KylinConfigBase implements 
Serializable {
         return 
Double.parseDouble(getOptional("kylin.snapshot.ext.local.cache.max-size-gb", 
"200"));
     }
 
+    public boolean isShrunkenDictFromGlobalEnabled() {
+        return 
Boolean.parseBoolean(this.getOptional("kylin.dictionary.shrunken-from-global-enabled",
 "false"));
+    }
+
     // 
============================================================================
     // CUBE
     // 
============================================================================
diff --git a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java 
b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
index 95c8b40..15d67ea 100644
--- a/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
+++ b/core-cube/src/main/java/org/apache/kylin/cube/model/CubeDesc.java
@@ -1480,6 +1480,10 @@ public class CubeDesc extends RootPersistentEntity 
implements IEngineAware {
         }
         return globalDictCols;
     }
+
+    public boolean isShrunkenDictFromGlobalEnabled() {
+        return config.isShrunkenDictFromGlobalEnabled() && 
!getAllGlobalDictColumns().isEmpty();
+    }
     
     // UHC (ultra high cardinality column): contain the ShardByColumns and the 
GlobalDictionaryColumns
     public List<TblColRef> getAllUHCColumns() {
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java 
b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java
new file mode 100644
index 0000000..35c995e
--- /dev/null
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionary.java
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.io.PrintStream;
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableMap;
+
+public class ShrunkenDictionary<T> extends Dictionary<T> {
+
+    private ImmutableMap<T, Integer> valueToIdMap;
+    private ImmutableMap<Integer, T> idToValueMap;
+
+    private int minId;
+    private int maxId;
+    private int sizeOfId;
+    private int sizeOfValue;
+
+    private ValueSerializer<T> valueSerializer;
+
+    public ShrunkenDictionary(ValueSerializer<T> valueSerializer) { // default 
constructor for Writable interface
+        this.valueSerializer = valueSerializer;
+    }
+
+    public ShrunkenDictionary(ValueSerializer<T> valueSerializer, int minId, 
int maxId, int sizeOfId, int sizeOfValue,
+            Map<T, Integer> valueToIdMap) {
+        this.valueSerializer = valueSerializer;
+
+        this.minId = minId;
+        this.maxId = maxId;
+        this.sizeOfId = sizeOfId;
+        this.sizeOfValue = sizeOfValue;
+
+        Preconditions.checkNotNull(valueToIdMap);
+        this.valueToIdMap = ImmutableMap.<T, Integer> 
builder().putAll(valueToIdMap).build();
+    }
+
+    @Override
+    public int getMinId() {
+        return minId;
+    }
+
+    @Override
+    public int getMaxId() {
+        return maxId;
+    }
+
+    @Override
+    public int getSizeOfId() {
+        return sizeOfId;
+    }
+
+    @Override
+    public int getSizeOfValue() {
+        return sizeOfValue;
+    }
+
+    @Override
+    public boolean contains(Dictionary<?> another) {
+        return false;
+    }
+
+    protected int getIdFromValueImpl(T value, int roundingFlag) {
+        Integer id = valueToIdMap.get(value);
+        if (id == null) {
+            return -1;
+        }
+        return id;
+    }
+
+    protected T getValueFromIdImpl(int id) {
+        if (idToValueMap == null) {
+            idToValueMap = buildIdToValueMap();
+        }
+        return idToValueMap.get(id);
+    }
+
+    private ImmutableMap<Integer, T> buildIdToValueMap() {
+        ImmutableMap.Builder<Integer, T> idToValueMapBuilder = 
ImmutableMap.builder();
+        for (T value : valueToIdMap.keySet()) {
+            idToValueMapBuilder.put(valueToIdMap.get(value), value);
+        }
+        return idToValueMapBuilder.build();
+    }
+
+    public void dump(PrintStream out) {
+        out.println(String.format("Total %d values for ShrunkenDictionary", 
valueToIdMap.size()));
+    }
+
+    public void write(DataOutput out) throws IOException {
+        out.writeInt(minId);
+        out.writeInt(maxId);
+        out.writeInt(sizeOfId);
+        out.writeInt(sizeOfValue);
+
+        out.writeInt(valueToIdMap.size());
+        for (T value : valueToIdMap.keySet()) {
+            valueSerializer.writeValue(out, value);
+            out.writeInt(valueToIdMap.get(value));
+        }
+    }
+
+    public void readFields(DataInput in) throws IOException {
+        this.minId = in.readInt();
+        this.maxId = in.readInt();
+        this.sizeOfId = in.readInt();
+        this.sizeOfValue = in.readInt();
+
+        int sizeValueMap = in.readInt();
+        ImmutableMap.Builder<T, Integer> valueToIdMapBuilder = 
ImmutableMap.builder();
+        for (int i = 0; i < sizeValueMap; i++) {
+            T value = valueSerializer.readValue(in);
+            int id = in.readInt();
+            valueToIdMapBuilder.put(value, id);
+        }
+        this.valueToIdMap = valueToIdMapBuilder.build();
+    }
+
+    public interface ValueSerializer<T> {
+        void writeValue(DataOutput out, T value) throws IOException;
+
+        T readValue(DataInput in) throws IOException;
+    }
+
+    public static class StringValueSerializer implements 
ValueSerializer<String> {
+        @Override
+        public void writeValue(DataOutput out, String value) throws 
IOException {
+            out.writeUTF(value);
+        }
+
+        @Override
+        public String readValue(DataInput in) throws IOException {
+            return in.readUTF();
+        }
+    }
+}
diff --git 
a/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java
 
b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java
new file mode 100644
index 0000000..ab3df5e
--- /dev/null
+++ 
b/core-dictionary/src/main/java/org/apache/kylin/dict/ShrunkenDictionaryBuilder.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.util.Map;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.dict.ShrunkenDictionary.ValueSerializer;
+
+import com.google.common.collect.Maps;
+
+public class ShrunkenDictionaryBuilder<T> {
+
+    private Map<T, Integer> valueToIdMap;
+
+    private Dictionary<T> fullDict;
+
+    public ShrunkenDictionaryBuilder(Dictionary<T> fullDict) {
+        this.fullDict = fullDict;
+
+        this.valueToIdMap = Maps.newHashMap();
+    }
+
+    public void addValue(T value) {
+        int id = fullDict.getIdFromValue(value);
+        valueToIdMap.put(value, id);
+    }
+
+    public ShrunkenDictionary<T> build(ValueSerializer<T> valueSerializer) {
+        return new ShrunkenDictionary<>(valueSerializer, fullDict.getMinId(), 
fullDict.getMaxId(),
+                fullDict.getSizeOfId(), fullDict.getSizeOfValue(), 
valueToIdMap);
+    }
+}
diff --git 
a/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java
 
b/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java
new file mode 100644
index 0000000..7a86e5f
--- /dev/null
+++ 
b/core-dictionary/src/test/java/org/apache/kylin/dict/ShrunkenDictionaryTest.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.dict;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+
+import org.apache.kylin.common.util.Dictionary;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class ShrunkenDictionaryTest {
+
+    @Test
+    public void testStringDictionary() {
+        ArrayList<String> strList = new ArrayList<String>();
+        strList.add("");
+        strList.add("part");
+        strList.add("par");
+        strList.add("partition");
+        strList.add("party");
+        strList.add("parties");
+        strList.add("paint");
+
+        TrieDictionaryBuilder<String> dictBuilder = new 
TrieDictionaryBuilder<>(new StringBytesConverter());
+        for (String str : strList) {
+            dictBuilder.addValue(str);
+        }
+        Dictionary<String> dict = dictBuilder.build(0);
+
+        ShrunkenDictionary.StringValueSerializer valueSerializer = new 
ShrunkenDictionary.StringValueSerializer();
+        ShrunkenDictionaryBuilder<String> shrunkenDictBuilder = new 
ShrunkenDictionaryBuilder<>(dict);
+        for (int i = 0; i < strList.size(); i += 2) {
+            shrunkenDictBuilder.addValue(strList.get(i));
+        }
+        Dictionary<String> shrunkenDict = 
shrunkenDictBuilder.build(valueSerializer);
+
+        try {
+            ByteArrayOutputStream bos = new ByteArrayOutputStream();
+            DataOutputStream dos = new DataOutputStream(bos);
+
+            shrunkenDict.write(dos);
+
+            ByteArrayInputStream bis = new 
ByteArrayInputStream(bos.toByteArray());
+            DataInputStream dis = new DataInputStream(bis);
+
+            Dictionary<String> dShrunkenDict = new 
ShrunkenDictionary<>(valueSerializer);
+            dShrunkenDict.readFields(dis);
+
+            for (int i = 0; i < strList.size(); i += 2) {
+                String value = strList.get(i);
+                Assert.assertEquals(dict.getIdFromValue(value), 
dShrunkenDict.getIdFromValue(value));
+            }
+        } catch (IOException e) {
+        }
+    }
+}
diff --git 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index c805f8a..560293c 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -34,6 +34,7 @@ public final class ExecutableConstants {
     public static final String SOURCE_RECORDS_COUNT = "source_records_count";
     public static final String SOURCE_RECORDS_SIZE = "source_records_size";
 
+    public static final String STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL = 
"Extract Dictionary from Global Dictionary";
     public static final String STEP_NAME_BUILD_DICTIONARY = "Build Dimension 
Dictionary";
     public static final String STEP_NAME_BUILD_UHC_DICTIONARY = "Build UHC 
Dictionary";
     public static final String STEP_NAME_CREATE_FLAT_HIVE_TABLE = "Create 
Intermediate Flat Hive Table";
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 5498365..1695a22 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -73,6 +73,10 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
 
         outputSide.addStepPhase2_BuildDictionary(result);
 
+        if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
+            result.addTask(createExtractDictionaryFromGlobalJob(jobId));
+        }
+
         // Phase 3: Build Cube
         addLayerCubingSteps(result, jobId, cuboidRootPath); // layer cubing, 
only selected algorithm will execute
         addInMemCubingSteps(result, jobId, cuboidRootPath); // inmem cubing, 
only selected algorithm will execute
@@ -124,6 +128,9 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, 
cuboidRootPath);
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, 
"Kylin_Cube_Builder_" + seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+        if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
+            appendExecCmdParameters(cmd, 
BatchConstants.ARG_SHRUNKEN_DICT_PATH, getShrunkenDictionaryPath(jobId));
+        }
 
         cubeStep.setMapReduceParams(cmd.toString());
         cubeStep.setMapReduceJobClass(getInMemCuboidJob());
@@ -150,6 +157,9 @@ public class BatchCubingJobBuilder2 extends 
JobBuilderSupport {
         appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, 
"Kylin_Base_Cuboid_Builder_" + seg.getRealization().getName());
         appendExecCmdParameters(cmd, BatchConstants.ARG_LEVEL, "0");
         appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+        if (seg.getCubeDesc().isShrunkenDictFromGlobalEnabled()) {
+            appendExecCmdParameters(cmd, 
BatchConstants.ARG_SHRUNKEN_DICT_PATH, getShrunkenDictionaryPath(jobId));
+        }
 
         baseCuboidStep.setMapReduceParams(cmd.toString());
         baseCuboidStep.setMapReduceJobClass(getBaseCuboidJob());
diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
index aca9853..f650321 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IMRInput.java
@@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr;
 
 import java.util.Collection;
 
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.kylin.job.execution.DefaultChainedExecutable;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
@@ -50,6 +51,9 @@ public interface IMRInput {
 
         /** Parse a mapper input object into column values. */
         public Collection<String[]> parseMapperInput(Object mapperInput);
+
+        /** Get the signature for the input split*/
+        public String getInputSplitSignature(InputSplit inputSplit);
     }
 
     /**
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index c6abf16..02e9fe5 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -36,6 +36,7 @@ import org.apache.kylin.engine.mr.common.MapReduceExecutable;
 import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
 import org.apache.kylin.engine.mr.steps.CubingExecutableUtil;
+import org.apache.kylin.engine.mr.steps.ExtractDictionaryFromGlobalJob;
 import org.apache.kylin.engine.mr.steps.FactDistinctColumnsJob;
 import org.apache.kylin.engine.mr.steps.MergeDictionaryStep;
 import org.apache.kylin.engine.mr.steps.MergeStatisticsStep;
@@ -175,6 +176,23 @@ public class JobBuilderSupport {
         return buildDictionaryStep;
     }
 
+    public MapReduceExecutable createExtractDictionaryFromGlobalJob(String 
jobId) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        
result.setName(ExecutableConstants.STEP_NAME_EXTRACT_DICTIONARY_FROM_GLOBAL);
+        result.setMapReduceJobClass(ExtractDictionaryFromGlobalJob.class);
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, 
seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, 
seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                "Kylin_Extract_Dictionary_from_Global_" + 
seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBING_JOB_ID, jobId);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, 
getShrunkenDictionaryPath(jobId));
+
+        result.setMapReduceParams(cmd.toString());
+        return result;
+    }
+
     public UpdateCubeInfoAfterBuildStep 
createUpdateCubeInfoAfterBuildStep(String jobId, LookupMaterializeContext 
lookupMaterializeContext) {
         final UpdateCubeInfoAfterBuildStep result = new 
UpdateCubeInfoAfterBuildStep();
         result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
@@ -291,6 +309,10 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/fact_distinct_columns/" + 
BatchConstants.CFG_OUTPUT_STATISTICS;
     }
 
+    public String getShrunkenDictionaryPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/dictionary_shrunken";
+    }
+
     public String getDictRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/dict";
     }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 2976080..8873f30 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -112,6 +112,9 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
             .hasArg().isRequired(true).withDescription("Partition file 
path.").create(BatchConstants.ARG_PARTITION);
     protected static final Option OPTION_HTABLE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_HTABLE_NAME)
             .hasArg().isRequired(true).withDescription("HTable 
name").create(BatchConstants.ARG_HTABLE_NAME);
+    protected static final Option OPTION_DICTIONARY_SHRUNKEN_PATH = 
OptionBuilder
+            
.withArgName(BatchConstants.ARG_SHRUNKEN_DICT_PATH).hasArg().isRequired(false)
+            .withDescription("Dictionary shrunken 
path").create(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
 
     protected static final Option OPTION_STATISTICS_OUTPUT = 
OptionBuilder.withArgName(BatchConstants.ARG_STATS_OUTPUT)
             .hasArg().isRequired(false).withDescription("Statistics 
output").create(BatchConstants.ARG_STATS_OUTPUT);
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
index 5dd55b2..13bc688 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java
@@ -74,18 +74,18 @@ public class BaseCuboidBuilder implements 
java.io.Serializable {
         measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
     }
 
-    public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, 
CubeSegment cubeSegment, CubeJoinedFlatTableEnrich intermediateTableDesc) {
+    public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, 
CubeSegment cubeSegment,
+            CubeJoinedFlatTableEnrich intermediateTableDesc, Map<TblColRef, 
Dictionary<String>> dictionaryMap) {
         this.kylinConfig = kylinConfig;
         this.cubeDesc = cubeDesc;
         this.cubeSegment = cubeSegment;
         this.intermediateTableDesc = intermediateTableDesc;
+        this.dictionaryMap = dictionaryMap;
 
         init();
         rowKeyEncoder = AbstractRowKeyEncoder.createInstance(cubeSegment, 
baseCuboid);
         measureCodec = new BufferedMeasureCodec(cubeDesc.getMeasures());
         aggrIngesters = MeasureIngester.create(cubeDesc.getMeasures());
-        dictionaryMap = cubeSegment.buildDictionaryMap();
-
     }
 
     private void init() {
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 6fe55e2..8c2ba7f 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -105,6 +105,7 @@ public interface BatchConstants {
     String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
     String ARG_META_URL = "metadataUrl";
     String ARG_HBASE_CONF_PATH = "hbaseConfPath";
+    String ARG_SHRUNKEN_DICT_PATH = "shrunkenDictPath";
     /**
      * logger and counter
      */
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java
new file mode 100644
index 0000000..0895244
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/DictionaryGetterUtil.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.common;
+
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.dict.ShrunkenDictionary;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DictionaryGetterUtil {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(DictionaryGetterUtil.class);
+
+    public static String getInputSplitSignature(CubeSegment cubeSegment, 
InputSplit inputSplit) {
+        return 
MRUtil.getBatchCubingInputSide(cubeSegment).getFlatTableInputFormat().getInputSplitSignature(inputSplit);
+    }
+
+    public static Map<TblColRef, Dictionary<String>> 
getDictionaryMap(CubeSegment cubeSegment, InputSplit inputSplit,
+                                                                      
Configuration configuration) throws IOException {
+        Map<TblColRef, Dictionary<String>> dictionaryMap = 
cubeSegment.buildDictionaryMap();
+
+        String shrunkenDictPath = 
configuration.get(BatchConstants.ARG_SHRUNKEN_DICT_PATH);
+        if (shrunkenDictPath == null) {
+            return dictionaryMap;
+        }
+
+        // replace global dictionary with shrunken dictionary if possible
+        String inputSplitSignature = getInputSplitSignature(cubeSegment, 
inputSplit);
+        FileSystem fs = FileSystem.get(configuration);
+        ShrunkenDictionary.StringValueSerializer valueSerializer = new 
ShrunkenDictionary.StringValueSerializer();
+        for (TblColRef colRef : 
cubeSegment.getCubeDesc().getAllGlobalDictColumns()) {
+            Path colShrunkenDictDir = new Path(shrunkenDictPath, 
colRef.getIdentity());
+            Path colShrunkenDictPath = new Path(colShrunkenDictDir, 
inputSplitSignature);
+            if (!fs.exists(colShrunkenDictPath)) {
+                logger.warn("Shrunken dictionary for column " + 
colRef.getIdentity() + " in split "
+                        + inputSplitSignature + " does not exist!!!");
+                continue;
+            }
+            try (DataInputStream dis = fs.open(colShrunkenDictPath)) {
+                Dictionary<String> shrunkenDict = new 
ShrunkenDictionary(valueSerializer);
+                shrunkenDict.readFields(dis);
+
+                dictionaryMap.put(colRef, shrunkenDict);
+            }
+        }
+
+        return dictionaryMap;
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
index 091f9a2..b5dc961 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BaseCuboidMapperBase.java
@@ -21,10 +21,12 @@ package org.apache.kylin.engine.mr.steps;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.util.Arrays;
+import java.util.Map;
 
 import org.apache.hadoop.io.Text;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.Dictionary;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
@@ -35,6 +37,8 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BaseCuboidBuilder;
 import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.DictionaryGetterUtil;
+import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -66,8 +70,12 @@ abstract public class BaseCuboidMapperBase<KEYIN, VALUEIN> 
extends KylinMapper<K
         cubeDesc = cube.getDescriptor();
         cubeSegment = cube.getSegmentById(segmentID);
         CubeJoinedFlatTableEnrich intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
-        baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, 
cubeSegment, intermediateTableDesc);
 
+        Map<TblColRef, Dictionary<String>> dictionaryMap = 
DictionaryGetterUtil.getDictionaryMap(cubeSegment,
+                context.getInputSplit(), context.getConfiguration());
+
+        baseCuboidBuilder = new BaseCuboidBuilder(kylinConfig, cubeDesc, 
cubeSegment, intermediateTableDesc,
+                dictionaryMap);
     }
 
 
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
index b49b639..d7da2c2 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/CuboidJob.java
@@ -93,6 +93,7 @@ public class CuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_NCUBOID_LEVEL);
             options.addOption(OPTION_CUBING_JOB_ID);
             options.addOption(OPTION_CUBOID_MODE);
+            options.addOption(OPTION_DICTIONARY_SHRUNKEN_PATH);
             parseOptions(options, args);
 
             String output = getOptionValue(OPTION_OUTPUT_PATH);
@@ -118,6 +119,10 @@ public class CuboidJob extends AbstractHadoopJob {
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, 
cubingJobId);
+            String shrunkenDictPath = 
getOptionValue(OPTION_DICTIONARY_SHRUNKEN_PATH);
+            if (shrunkenDictPath != null) {
+                
job.getConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, 
shrunkenDictPath);
+            }
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java
similarity index 54%
copy from 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
copy to 
engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java
index b0ea7b7..df61ca9 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalJob.java
@@ -6,59 +6,44 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *     http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
  * See the License for the specific language governing permissions and
  * limitations under the License.
-*/
+ */
 
 package org.apache.kylin.engine.mr.steps;
 
 import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
 import org.apache.hadoop.mapreduce.Job;
-import org.apache.hadoop.util.ToolRunner;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.cube.CubeInstance;
 import org.apache.kylin.cube.CubeManager;
 import org.apache.kylin.cube.CubeSegment;
-import org.apache.kylin.engine.mr.ByteArrayWritable;
-import org.apache.kylin.engine.mr.CubingJob;
-import org.apache.kylin.engine.mr.IMRInput.IMRTableInputFormat;
-import org.apache.kylin.engine.mr.IMROutput2;
+import org.apache.kylin.engine.mr.IMRInput;
 import org.apache.kylin.engine.mr.MRUtil;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
-import org.apache.kylin.job.execution.ExecutableManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 /**
+ * Directly using global dictionary to encode values will bring lots of memory 
swapping of the slices, which will make
+ * the encoding process very slow. This job will change the encoding process 
for the raw column values to
+ * 1. For each data block, a mapper will generating distinct values, sort 
them, extract shrunken dictionary from global
+ * 2. For each data block, scan again to encode the raw values by the shrunken 
dictionary rather than the global one
  */
-public class InMemCuboidJob extends AbstractHadoopJob {
-
-    protected static final Logger logger = 
LoggerFactory.getLogger(InMemCuboidJob.class);
-
-    private boolean skipped = false;
-
-    @Override
-    public boolean isSkipped() {
-        return skipped;
-    }
-
-    private boolean checkSkip(String cubingJobId) {
-        if (cubingJobId == null)
-            return false;
-
-        ExecutableManager execMgr = 
ExecutableManager.getInstance(KylinConfig.getInstanceFromEnv());
-        CubingJob cubingJob = (CubingJob) execMgr.getJob(cubingJobId);
-        skipped = cubingJob.isInMemCubing() == false;
-        return skipped;
-    }
+public class ExtractDictionaryFromGlobalJob extends AbstractHadoopJob {
+    protected static final Logger logger = 
LoggerFactory.getLogger(ExtractDictionaryFromGlobalJob.class);
 
     @Override
     public int run(String[] args) throws Exception {
@@ -66,68 +51,57 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
         try {
             options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_CUBE_NAME);
             options.addOption(OPTION_SEGMENT_ID);
-            options.addOption(OPTION_OUTPUT_PATH);
-            options.addOption(OPTION_CUBING_JOB_ID);
             parseOptions(options, args);
 
-            String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            String job_id = getOptionValue(OPTION_CUBING_JOB_ID);
+            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, 
job_id);
+
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
             String segmentID = getOptionValue(OPTION_SEGMENT_ID);
-            String output = getOptionValue(OPTION_OUTPUT_PATH);
 
+            // 
----------------------------------------------------------------------------
+            // add metadata to distributed cache
             CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
             CubeInstance cube = cubeMgr.getCube(cubeName);
             CubeSegment segment = cube.getSegmentById(segmentID);
-            String cubingJobId = getOptionValue(OPTION_CUBING_JOB_ID);
 
-            if (checkSkip(cubingJobId)) {
-                logger.info("Skip job " + getOptionValue(OPTION_JOB_NAME) + " 
for " + segment);
-                return 0;
-            }
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
 
-            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
-            job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, 
cubingJobId);
             logger.info("Starting: " + job.getJobName());
 
+            job.getConfiguration().set("mapreduce.map.speculative", "false");
             setJobClasspath(job, cube.getConfig());
 
-            // add metadata to distributed cache
-            attachSegmentMetadataWithAll(segment, job.getConfiguration());
-
-            // set job configuration
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
-            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
+            // Mapper
+            job.setMapperClass(ExtractDictionaryFromGlobalMapper.class);
 
-            // set mapper
-            job.setMapperClass(InMemCuboidMapper.class);
-            job.setMapOutputKeyClass(ByteArrayWritable.class);
-            job.setMapOutputValueClass(ByteArrayWritable.class);
+            // Reducer
+            job.setNumReduceTasks(0);
 
-            // set reducer
-            // the cuboid file and KV class must be compatible with 0.7 
version for smooth upgrade
-            job.setReducerClass(InMemCuboidReducer.class);
+            // Input
+            IMRInput.IMRTableInputFormat flatTableInputFormat = 
MRUtil.getBatchCubingInputSide(segment)
+                    .getFlatTableInputFormat();
+            flatTableInputFormat.configureJob(job);
+            // Output
+            //// prevent to create zero-sized default output
+            LazyOutputFormat.setOutputFormatClass(job, 
SequenceFileOutputFormat.class);
+            Path output = new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            FileOutputFormat.setOutputPath(job, output);
             job.setOutputKeyClass(Text.class);
             job.setOutputValueClass(Text.class);
+            deletePath(job.getConfiguration(), output);
 
-            // set input
-            IMRTableInputFormat flatTableInputFormat = 
MRUtil.getBatchCubingInputSide(segment).getFlatTableInputFormat();
-            flatTableInputFormat.configureJob(job);
-
-            // set output
-            IMROutput2.IMROutputFormat outputFormat = 
MRUtil.getBatchCubingOutputSide2(segment).getOuputFormat();
-            outputFormat.configureJobOutput(job, output, segment, 
segment.getCuboidScheduler(), 0);
-
+            attachSegmentMetadataWithDict(segment, job.getConfiguration());
             return waitForCompletion(job);
         } finally {
             if (job != null)
                 cleanupTempConfFile(job.getConfiguration());
         }
     }
-
-    public static void main(String[] args) throws Exception {
-        InMemCuboidJob job = new InMemCuboidJob();
-        int exitCode = ToolRunner.run(job, args);
-        System.exit(exitCode);
-    }
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java
new file mode 100644
index 0000000..34a5ec7
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/ExtractDictionaryFromGlobalMapper.java
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.util.Collection;
+import java.util.Collections;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.Dictionary;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.cube.model.CubeJoinedFlatTableEnrich;
+import org.apache.kylin.dict.ShrunkenDictionary;
+import org.apache.kylin.dict.ShrunkenDictionaryBuilder;
+import org.apache.kylin.engine.EngineFactory;
+import org.apache.kylin.engine.mr.IMRInput;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.MRUtil;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.DictionaryGetterUtil;
+import org.apache.kylin.metadata.model.TblColRef;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Sets;
+
+public class ExtractDictionaryFromGlobalMapper<KEYIN, Object> extends 
KylinMapper<KEYIN, Object, Text, Text> {
+    private String cubeName;
+    private CubeDesc cubeDesc;
+    private CubeInstance cube;
+    private CubeSegment cubeSeg;
+
+    private IMRInput.IMRTableInputFormat flatTableInputFormat;
+    private CubeJoinedFlatTableEnrich intermediateTableDesc;
+
+    private List<TblColRef> globalColumns;
+    private int[] globalColumnIndex;
+    private List<Set<String>> globalColumnValues;
+    private List<Dictionary<String>> globalDicts;
+
+    private String splitKey;
+
+    @Override
+    protected void doSetup(Context context) throws IOException {
+        Configuration conf = context.getConfiguration();
+        bindCurrentConfiguration(conf);
+        KylinConfig config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+
+        cubeName = conf.get(BatchConstants.CFG_CUBE_NAME);
+        cube = CubeManager.getInstance(config).getCube(cubeName);
+        cubeDesc = cube.getDescriptor();
+        cubeSeg = 
cube.getSegmentById(conf.get(BatchConstants.CFG_CUBE_SEGMENT_ID));
+        flatTableInputFormat = 
MRUtil.getBatchCubingInputSide(cubeSeg).getFlatTableInputFormat();
+
+        intermediateTableDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSeg), 
cubeDesc);
+
+        globalColumns = cubeDesc.getAllGlobalDictColumns();
+        globalColumnIndex = new int[globalColumns.size()];
+        globalColumnValues = 
Lists.newArrayListWithExpectedSize(globalColumns.size());
+        globalDicts = Lists.newArrayListWithExpectedSize(globalColumns.size());
+        for (int i = 0; i < globalColumns.size(); i++) {
+            TblColRef colRef = globalColumns.get(i);
+            int columnIndexOnFlatTbl = 
intermediateTableDesc.getColumnIndex(colRef);
+            globalColumnIndex[i] = columnIndexOnFlatTbl;
+
+            globalColumnValues.add(Sets.<String> newHashSet());
+            globalDicts.add(cubeSeg.getDictionary(colRef));
+        }
+
+        splitKey = DictionaryGetterUtil.getInputSplitSignature(cubeSeg, 
context.getInputSplit());
+    }
+
+    @Override
+    public void doMap(KEYIN key, Object record, Context context) throws 
IOException, InterruptedException {
+        Collection<String[]> rowCollection = 
flatTableInputFormat.parseMapperInput(record);
+
+        for (String[] row : rowCollection) {
+            for (int i = 0; i < globalColumnIndex.length; i++) {
+                String fieldValue = row[globalColumnIndex[i]];
+                if (fieldValue == null)
+                    continue;
+
+                globalColumnValues.get(i).add(fieldValue);
+            }
+        }
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, 
InterruptedException {
+        FileSystem fs = FileSystem.get(context.getConfiguration());
+        Path outputDirBase = new 
Path(context.getConfiguration().get(FileOutputFormat.OUTDIR));
+
+        ShrunkenDictionary.StringValueSerializer strValueSerializer = new 
ShrunkenDictionary.StringValueSerializer();
+        for (int i = 0; i < globalColumns.size(); i++) {
+            List<String> colDistinctValues = 
Lists.newArrayList(globalColumnValues.get(i));
+            // sort values to accelerate the encoding process by reducing the 
swapping of global dictionary slices
+            Collections.sort(colDistinctValues);
+
+            ShrunkenDictionaryBuilder<String> dictBuilder = new 
ShrunkenDictionaryBuilder<>(globalDicts.get(i));
+            for (String colValue : colDistinctValues) {
+                dictBuilder.addValue(colValue);
+            }
+            Dictionary<String> shrunkenDict = 
dictBuilder.build(strValueSerializer);
+
+            Path colDictDir = new Path(outputDirBase, 
globalColumns.get(i).getIdentity());
+            if (!fs.exists(colDictDir)) {
+                fs.mkdirs(colDictDir);
+            }
+            try (DataOutputStream dos = fs.create(new Path(colDictDir, 
splitKey))) {
+                shrunkenDict.write(dos);
+            }
+        }
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
index b0ea7b7..f8874fe 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidJob.java
@@ -70,6 +70,7 @@ public class InMemCuboidJob extends AbstractHadoopJob {
             options.addOption(OPTION_SEGMENT_ID);
             options.addOption(OPTION_OUTPUT_PATH);
             options.addOption(OPTION_CUBING_JOB_ID);
+            options.addOption(OPTION_DICTIONARY_SHRUNKEN_PATH);
             parseOptions(options, args);
 
             String cubeName = getOptionValue(OPTION_CUBE_NAME).toUpperCase();
@@ -88,6 +89,10 @@ public class InMemCuboidJob extends AbstractHadoopJob {
 
             job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
             job.getConfiguration().set(BatchConstants.ARG_CUBING_JOB_ID, 
cubingJobId);
+            String shrunkenDictPath = 
getOptionValue(OPTION_DICTIONARY_SHRUNKEN_PATH);
+            if (shrunkenDictPath != null) {
+                
job.getConfiguration().set(BatchConstants.ARG_SHRUNKEN_DICT_PATH, 
shrunkenDictPath);
+            }
             logger.info("Starting: " + job.getJobName());
 
             setJobClasspath(job, cube.getConfig());
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
index 73af138..e95ce8a 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/InMemCuboidMapperBase.java
@@ -42,13 +42,12 @@ import org.apache.kylin.engine.mr.KylinMapper;
 import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.CuboidSchedulerUtil;
+import org.apache.kylin.engine.mr.common.DictionaryGetterUtil;
 import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import com.google.common.collect.Maps;
-
 /**
  */
 public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, KEYOUT, VALUEOUT, 
T> extends KylinMapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
@@ -94,17 +93,7 @@ public abstract class InMemCuboidMapperBase<KEYIN, VALUEIN, 
KEYOUT, VALUEOUT, T>
         cubeSegment = cube.getSegmentById(segmentID);
         flatDesc = new 
CubeJoinedFlatTableEnrich(EngineFactory.getJoinedFlatTableDesc(cubeSegment), 
cubeDesc);
 
-        dictionaryMap = Maps.newHashMap();
-
-        // dictionary
-        for (TblColRef col : cubeDesc.getAllColumnsHaveDictionary()) {
-            Dictionary<?> dict = cubeSegment.getDictionary(col);
-            if (dict == null) {
-                logger.warn("Dictionary for " + col + " was not found.");
-            }
-
-            dictionaryMap.put(col, cubeSegment.getDictionary(col));
-        }
+        dictionaryMap = DictionaryGetterUtil.getDictionaryMap(cubeSegment, 
context.getInputSplit(), conf);
 
         // check memory more often if a single row is big
         if (cubeDesc.hasMemoryHungryMeasures()) {
diff --git 
a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java 
b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
index 2e39285..33b1059 100644
--- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
+++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveMRInput.java
@@ -22,9 +22,12 @@ import java.io.IOException;
 import java.util.Collections;
 import java.util.List;
 
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hive.hcatalog.data.HCatRecord;
 import org.apache.hive.hcatalog.mapreduce.HCatInputFormat;
+import org.apache.hive.hcatalog.mapreduce.HCatSplit;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.StringUtil;
@@ -100,6 +103,12 @@ public class HiveMRInput extends HiveInputBase implements 
IMRInput {
             return 
Collections.singletonList(HiveTableReader.getRowAsStringArray((HCatRecord) 
mapperInput));
         }
 
+        @Override
+        public String getInputSplitSignature(InputSplit inputSplit) {
+            FileSplit baseSplit = (FileSplit) ((HCatSplit) 
inputSplit).getBaseSplit();
+            //file name(for intermediate table) + start pos + length
+            return baseSplit.getPath().getName() + "_" + baseSplit.getStart() 
+ "_" + baseSplit.getLength();
+        }
     }
 
     public static class BatchCubingInputSide implements 
IMRBatchCubingInputSide {
diff --git 
a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java 
b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
index a45cc63..2c95c1c 100644
--- a/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
+++ b/source-kafka/src/main/java/org/apache/kylin/source/kafka/KafkaMRInput.java
@@ -24,6 +24,8 @@ import java.util.List;
 
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapreduce.InputSplit;
 import org.apache.hadoop.mapreduce.Job;
 import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
 import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
@@ -100,6 +102,11 @@ public class KafkaMRInput extends KafkaInputBase 
implements IMRInput {
             return Collections.singletonList(columns);
         }
 
+        @Override
+        public String getInputSplitSignature(InputSplit inputSplit) {
+            FileSplit baseSplit = (FileSplit) inputSplit;
+            return baseSplit.getPath().getName() + "_" + baseSplit.getStart() 
+ "_" + baseSplit.getLength();
+        }
     }
 
     public static class BatchCubingInputSide implements 
IMRBatchCubingInputSide {
diff --git 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
index e48090d..4d61d9b 100644
--- 
a/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
+++ 
b/storage-hbase/src/main/java/org/apache/kylin/storage/hbase/steps/HBaseJobSteps.java
@@ -250,6 +250,7 @@ public abstract class HBaseJobSteps extends 
JobBuilderSupport {
         List<String> toDeletePaths = new ArrayList<>();
         toDeletePaths.add(getFactDistinctColumnsPath(jobId));
         toDeletePaths.add(getHFilePath(jobId));
+        toDeletePaths.add(getShrunkenDictionaryPath(jobId));
 
         HDFSPathGarbageCollectionStep step = new 
HDFSPathGarbageCollectionStep();
         step.setName(ExecutableConstants.STEP_NAME_GARBAGE_COLLECTION_HBASE);

Reply via email to