shaofengshi closed pull request #181: Kylin 3471
URL: https://github.com/apache/kylin/pull/181
 
 
   

This is a PR merged from a forked repository.
As GitHub hides the original diff on merge, it is displayed below for
the sake of provenance:

As this is a foreign pull request (from a fork), the diff is supplied
below (as it won't show otherwise due to GitHub magic):

diff --git 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index f6ad0ed8ef..c805f8a69a 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -54,6 +54,7 @@ private ExecutableConstants() {
     public static final String STEP_NAME_COPY_DICTIONARY = "Copy dictionary 
from Old Segment";
     public static final String STEP_NAME_MERGE_DICTIONARY = "Merge Cuboid 
Dictionary";
     public static final String STEP_NAME_MERGE_STATISTICS = "Merge Cuboid 
Statistics";
+    public static final String STEP_NAME_MERGE_UPDATE_DICTIONARY = "Update 
Dictionary Data";
     public static final String STEP_NAME_MERGE_STATISTICS_WITH_OLD = "Merge 
Cuboid Statistics with Old for Optimization";
     public static final String STEP_NAME_SAVE_STATISTICS = "Save Cuboid 
Statistics";
     public static final String STEP_NAME_MERGE_CUBOID = "Merge Cuboid Data";
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
index d443f523b4..d9027082b6 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchMergeJobBuilder2.java
@@ -20,7 +20,12 @@
 
 import java.util.List;
 
+import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.MergeDictionaryJob;
+import org.apache.kylin.job.constant.ExecutableConstants;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -55,8 +60,8 @@ public CubingJob build() {
 
         // Phase 1: Merge Dictionary
         inputSide.addStepPhase1_MergeDictionary(result);
-        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-        result.addTask(createMergeStatisticsStep(cubeSegment, 
mergingSegmentIds, getStatisticsPath(jobId)));
+        result.addTask(createMergeDictionaryStep(cubeSegment, jobId, 
mergingSegmentIds));
+        result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, 
mergingSegmentIds));
         outputSide.addStepPhase1_MergeDictionary(result);
 
         // Phase 2: Merge Cube Files
@@ -69,4 +74,25 @@ public CubingJob build() {
         return result;
     }
 
+    public MapReduceExecutable createMergeDictionaryStep(CubeSegment seg, 
String jobID, List<String> mergingSegmentIds) {
+        MapReduceExecutable mergeDictionaryStep = new MapReduceExecutable();
+        
mergeDictionaryStep.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+        StringBuilder cmd = new StringBuilder();
+
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, 
seg.getCubeInstance().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, 
seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, 
getSegmentMetadataUrl(seg.getConfig(), jobID));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_META_URL, 
getSegmentMetadataUrl(seg.getConfig(), jobID));
+        appendExecCmdParameters(cmd, 
MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt(), 
StringUtil.join(mergingSegmentIds, ","));
+        appendExecCmdParameters(cmd, 
MergeDictionaryJob.OPTION_OUTPUT_PATH_DICT.getOpt(), getDictInfoPath(jobID));
+        appendExecCmdParameters(cmd, 
MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt(), getStatisticsPath(jobID));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, 
"Kylin_Merge_Dictionary_" + seg.getCubeInstance().getName() + "_Step");
+
+        mergeDictionaryStep.setMapReduceParams(cmd.toString());
+        mergeDictionaryStep.setMapReduceJobClass(MergeDictionaryJob.class);
+
+        return mergeDictionaryStep;
+    }
+
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index 42e0f42893..649b4c3507 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -19,10 +19,14 @@
 package org.apache.kylin.engine.mr;
 
 import java.io.IOException;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.cube.model.CubeDesc;
@@ -39,11 +43,12 @@
 import org.apache.kylin.engine.mr.steps.UHCDictionaryJob;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterBuildStep;
 import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
+import org.apache.kylin.engine.mr.steps.UpdateDictionaryStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.TblColRef;
 
 import com.google.common.base.Preconditions;
-import org.apache.kylin.metadata.model.TblColRef;
 
 /**
  * Hold reusable steps for builders.
@@ -98,6 +103,22 @@ public MergeStatisticsStep 
createMergeStatisticsStep(CubeSegment seg, List<Strin
         return result;
     }
 
+    public UpdateDictionaryStep createUpdateDictionaryStep(CubeSegment seg, 
String jobId, List<String> mergingSegmentIds) {
+        UpdateDictionaryStep result = new UpdateDictionaryStep();
+        result.setName(ExecutableConstants.STEP_NAME_MERGE_UPDATE_DICTIONARY);
+
+        CubingExecutableUtil.setCubeName(seg.getRealization().getName(), 
result.getParams());
+        CubingExecutableUtil.setSegmentId(seg.getUuid(), result.getParams());
+        CubingExecutableUtil.setMergingSegmentIds(mergingSegmentIds, 
result.getParams());
+
+        // merged dict info path
+        result.getParams().put(BatchConstants.ARG_DICT_PATH, 
getDictInfoPath(jobId));
+        // metadata url
+        result.getParams().put(BatchConstants.ARG_META_URL, 
getSegmentMetadataUrl(seg.getConfig(), jobId));
+
+        return result;
+    }
+
     public MapReduceExecutable createBuildUHCDictStep(String jobId) {
         MapReduceExecutable result = new MapReduceExecutable();
         result.setName(ExecutableConstants.STEP_NAME_BUILD_UHC_DICTIONARY);
@@ -192,7 +213,6 @@ public UpdateCubeInfoAfterMergeStep 
createUpdateCubeInfoAfterMergeStep(List<Stri
         return result;
     }
 
-
     public boolean isEnableUHCDictStep() {
         if (!config.getConfig().isBuildUHCDictWithMREnabled()) {
             return false;
@@ -220,7 +240,6 @@ public LookupMaterializeContext 
addMaterializeLookupTableSteps(final CubingJob r
         return lookupMaterializeContext;
     }
 
-
     public SaveStatisticsStep createSaveStatisticsStep(String jobId) {
         SaveStatisticsStep result = new SaveStatisticsStep();
         result.setName(ExecutableConstants.STEP_NAME_SAVE_STATISTICS);
@@ -276,6 +295,10 @@ public String getDictRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/dict";
     }
 
+    public String getDictInfoPath(String jobId) {
+        return getRealizationRootPath(jobId) + "/dict_info";
+    }
+
     public String getOptimizationRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/optimize";
     }
@@ -327,7 +350,6 @@ public String getDumpMetadataPath(String jobId) {
         return getRealizationRootPath(jobId) + "/metadata";
     }
 
-
     public static String extractJobIDFromPath(String path) {
         Matcher matcher = JOB_NAME_PATTERN.matcher(path);
         // check the first occurrence
@@ -337,4 +359,10 @@ public static String extractJobIDFromPath(String path) {
             throw new IllegalStateException("Can not extract job ID from file 
path : " + path);
         }
     }
+
+    public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) 
{
+        Map<String, String> param = new HashMap<>();
+        param.put("path", getDumpMetadataPath(jobId));
+        return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), 
"hdfs", param).toString();
+    }
 }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 716eafe08f..180b56a485 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -27,6 +27,7 @@
 import static 
org.apache.kylin.engine.mr.common.JobRelatedMetaUtil.collectCubeMetadata;
 
 import java.io.File;
+import java.io.FileOutputStream;
 import java.io.IOException;
 import java.io.InputStream;
 import java.util.HashMap;
@@ -60,7 +61,9 @@
 import org.apache.hadoop.util.ToolRunner;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfig.SetAndUnsetThreadLocalConfig;
+import org.apache.kylin.common.KylinConfigExt;
 import org.apache.kylin.common.StorageURL;
+import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.HadoopUtil;
 import org.apache.kylin.common.util.OptionsHelper;
@@ -126,6 +129,9 @@
     protected static final Option OPTION_LOOKUP_SNAPSHOT_ID = 
OptionBuilder.withArgName(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID).hasArg()
             .isRequired(true).withDescription("Lookup table snapshotID")
             .create(BatchConstants.ARG_LOOKUP_SNAPSHOT_ID);
+    protected static final Option OPTION_META_URL = 
OptionBuilder.withArgName(BatchConstants.ARG_META_URL)
+            .hasArg().isRequired(true).withDescription("HDFS metadata 
url").create(BatchConstants.ARG_META_URL);
+
 
     private static final String MAP_REDUCE_CLASSPATH = 
"mapreduce.application.classpath";
 
@@ -548,6 +554,41 @@ protected void 
attachSegmentsMetadataWithDict(List<CubeSegment> segments, Config
         dumpKylinPropsAndMetadata(cube.getProject(), dumpList, 
cube.getConfig(), conf);
     }
 
+    protected void attachSegmentsMetadataWithDict(List<CubeSegment> segments, 
String metaUrl) throws IOException {
+        Set<String> dumpList = new LinkedHashSet<>();
+        
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+        for (CubeSegment segment : segments) {
+            dumpList.addAll(segment.getDictionaryPaths());
+            dumpList.add(segment.getStatisticsResourcePath());
+        }
+        dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) 
segments.get(0).getConfig(), metaUrl);
+    }
+
+    private void dumpAndUploadKylinPropsAndMetadata(Set<String> dumpList, 
KylinConfigExt kylinConfig, String metadataUrl)
+            throws IOException {
+        File tmp = File.createTempFile("kylin_job_meta", "");
+        FileUtils.forceDelete(tmp); // we need a directory, so delete the file 
first
+
+        File metaDir = new File(tmp, "meta");
+        metaDir.mkdirs();
+
+        // dump metadata
+        JobRelatedMetaUtil.dumpResources(kylinConfig, metaDir, dumpList);
+
+        // write kylin.properties
+        Properties props = kylinConfig.exportToProperties();
+        props.setProperty("kylin.metadata.url", metadataUrl);
+
+        File kylinPropsFile = new File(metaDir, "kylin.properties");
+        try (FileOutputStream os = new FileOutputStream(kylinPropsFile)) {
+            props.store(os, kylinPropsFile.getAbsolutePath());
+        }
+
+        KylinConfig dstConfig = KylinConfig.createKylinConfig(props);
+        //upload metadata
+        
ResourceTool.copy(KylinConfig.createInstanceFromUri(metaDir.getAbsolutePath()), 
dstConfig);
+    }
+
     protected void attachSegmentMetadataWithDict(CubeSegment segment, 
Configuration conf) throws IOException {
         attachSegmentMetadata(segment, conf, true, false);
     }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index d38f7a4168..a4a52ad0c4 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -103,6 +103,7 @@
     String ARG_TABLE_NAME = "tableName";
     String ARG_LOOKUP_SNAPSHOT_ID = "snapshotID";
     String ARG_EXT_LOOKUP_SNAPSHOTS_INFO = "extlookupsnapshots";
+    String ARG_META_URL = "metadataUrl";
 
     /**
      * logger and counter
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java
new file mode 100644
index 0000000000..32e8e0c651
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryJob.java
@@ -0,0 +1,234 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.List;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.io.Writable;
+import org.apache.hadoop.mapreduce.InputFormat;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.JobContext;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.metadata.model.Segments;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+
+public class MergeDictionaryJob extends AbstractHadoopJob {
+    private static final Logger logger = 
LoggerFactory.getLogger(MergeDictionaryJob.class);
+
+    public static final Option OPTION_MERGE_SEGMENT_IDS = 
OptionBuilder.withArgName("segmentIds").hasArg()
+            .isRequired(true).withDescription("Merging Cube Segment 
Ids").create("segmentIds");
+    public static final Option OPTION_OUTPUT_PATH_DICT = 
OptionBuilder.withArgName("dictOutputPath").hasArg()
+            .isRequired(true).withDescription("merged dictionary resource 
path").create("dictOutputPath");
+    public static final Option OPTION_OUTPUT_PATH_STAT = 
OptionBuilder.withArgName("statOutputPath").hasArg()
+            .isRequired(true).withDescription("merged statistics resource 
path").create("statOutputPath");
+
+    @Override
+    public int run(String[] args) throws Exception {
+        try {
+            Options options = new Options();
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_META_URL);
+            options.addOption(OPTION_MERGE_SEGMENT_IDS);
+            options.addOption(OPTION_OUTPUT_PATH_DICT);
+            options.addOption(OPTION_OUTPUT_PATH_STAT);
+            parseOptions(options, args);
+
+            final String segmentId = getOptionValue(OPTION_SEGMENT_ID);
+            final String segmentIds = getOptionValue(OPTION_MERGE_SEGMENT_IDS);
+            final String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            final String metaUrl = getOptionValue(OPTION_META_URL);
+            final String dictOutputPath = 
getOptionValue(OPTION_OUTPUT_PATH_DICT);
+            final String statOutputPath = 
getOptionValue(OPTION_OUTPUT_PATH_STAT);
+
+            CubeManager cubeMgr = 
CubeManager.getInstance(KylinConfig.getInstanceFromEnv());
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeDesc cubeDesc = cube.getDescriptor();
+            CubeSegment segment = cube.getSegmentById(segmentId);
+            Segments<CubeSegment> mergingSeg = 
cube.getMergingSegments(segment);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            job.getConfiguration().set(BatchConstants.ARG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(OPTION_META_URL.getOpt(), metaUrl);
+            job.getConfiguration().set(OPTION_SEGMENT_ID.getOpt(), segmentId);
+            job.getConfiguration().set(OPTION_MERGE_SEGMENT_IDS.getOpt(), 
segmentIds);
+            job.getConfiguration().set(OPTION_OUTPUT_PATH_STAT.getOpt(), 
statOutputPath);
+            job.getConfiguration().set("num.map.tasks", 
String.valueOf(cubeDesc.getAllColumnsNeedDictionaryBuilt().size() + 1));
+            job.setNumReduceTasks(1);
+
+            setJobClasspath(job, cube.getConfig());
+
+            // dump metadata to HDFS
+            attachSegmentsMetadataWithDict(mergingSeg, metaUrl);
+
+            // clean output dir
+            HadoopUtil.deletePath(job.getConfiguration(), new 
Path(dictOutputPath));
+
+            job.setMapperClass(MergeDictionaryMapper.class);
+            job.setReducerClass(MergeDictionaryReducer.class);
+
+            job.setMapOutputKeyClass(IntWritable.class);
+            job.setMapOutputValueClass(Text.class);
+
+            job.setOutputKeyClass(Text.class);
+            job.setOutputValueClass(Text.class);
+
+            job.setInputFormatClass(IndexArrInputFormat.class);
+            job.setOutputFormatClass(SequenceFileOutputFormat.class);
+
+            SequenceFileOutputFormat.setOutputCompressionType(job, 
SequenceFile.CompressionType.NONE);
+            SequenceFileOutputFormat.setOutputPath(job, new 
Path(dictOutputPath));
+
+            logger.info("Starting: " + job.getJobName());
+
+            return waitForCompletion(job);
+
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    static class IndexArrInputFormat extends InputFormat<IntWritable, 
NullWritable> {
+
+        @Override
+        public List<InputSplit> getSplits(JobContext jobContext) throws 
IOException, InterruptedException {
+            int numMapTasks = 
jobContext.getConfiguration().getInt("num.map.tasks", 0);
+            List<InputSplit> inputSplits = 
Lists.newArrayListWithCapacity(numMapTasks);
+
+            for (int i = 0; i < numMapTasks; i++) {
+                inputSplits.add(new IntInputSplit(i));
+            }
+
+            return inputSplits;
+        }
+
+        @Override
+        public RecordReader<IntWritable, NullWritable> 
createRecordReader(InputSplit inputSplit,
+                TaskAttemptContext taskAttemptContext) throws IOException, 
InterruptedException {
+
+            return new RecordReader<IntWritable, NullWritable>() {
+                private int index;
+                private IntWritable key;
+                private NullWritable value;
+
+                @Override
+                public void initialize(InputSplit inputSplit, 
TaskAttemptContext taskAttemptContext)
+                        throws IOException, InterruptedException {
+                    IntInputSplit intInputSplit = (IntInputSplit) inputSplit;
+                    index = intInputSplit.getIndex();
+                }
+
+                @Override
+                public boolean nextKeyValue() throws IOException, 
InterruptedException {
+
+                    if (key == null) {
+                        key = new IntWritable(index);
+                        value = NullWritable.get();
+                        return true;
+                    }
+
+                    return false;
+                }
+
+                @Override
+                public IntWritable getCurrentKey() throws IOException, 
InterruptedException {
+                    return key;
+                }
+
+                @Override
+                public NullWritable getCurrentValue() throws IOException, 
InterruptedException {
+                    return value;
+                }
+
+                @Override
+                public float getProgress() throws IOException, 
InterruptedException {
+                    return 1;
+                }
+
+                @Override
+                public void close() throws IOException {
+
+                }
+            };
+        }
+    }
+
+    static class IntInputSplit extends InputSplit implements Writable {
+        private int index;
+
+        public IntInputSplit() {
+
+        }
+
+        public IntInputSplit(int index) {
+            this.index = index;
+        }
+
+        @Override
+        public void write(DataOutput dataOutput) throws IOException {
+            dataOutput.writeInt(index);
+        }
+
+        @Override
+        public void readFields(DataInput dataInput) throws IOException {
+            this.index = dataInput.readInt();
+        }
+
+        @Override
+        public long getLength() throws IOException, InterruptedException {
+            return 1L;
+        }
+
+        @Override
+        public String[] getLocations() throws IOException, 
InterruptedException {
+            return new String[0];
+        }
+
+        public int getIndex() {
+            return index;
+        }
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
new file mode 100644
index 0000000000..522c06a491
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryMapper.java
@@ -0,0 +1,207 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+public class MergeDictionaryMapper extends KylinMapper<IntWritable, 
NullWritable, IntWritable, Text> {
+    private static final Logger logger = 
LoggerFactory.getLogger(MergeDictionaryMapper.class);
+
+    List<CubeSegment> mergingSegments;
+    TblColRef[] tblColRefs;
+    DictionaryManager dictMgr;
+
+    @Override
+    protected void doSetup(Context context) throws IOException, 
InterruptedException {
+        super.doSetup(context);
+
+        final SerializableConfiguration sConf = new 
SerializableConfiguration(context.getConfiguration());
+        final String metaUrl = 
context.getConfiguration().get(BatchConstants.ARG_META_URL);
+        final String cubeName = 
context.getConfiguration().get(BatchConstants.ARG_CUBE_NAME);
+        final String segmentIds = 
context.getConfiguration().get(MergeDictionaryJob.OPTION_MERGE_SEGMENT_IDS.getOpt());
+
+        final KylinConfig kylinConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+        final CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = 
CubeDescManager.getInstance(kylinConfig).getCubeDesc(cubeInstance.getDescName());
+
+        mergingSegments = getMergingSegments(cubeInstance, 
segmentIds.split(","));
+        tblColRefs = cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new 
TblColRef[0]);
+        dictMgr = DictionaryManager.getInstance(kylinConfig);
+    }
+
+    @Override
+    protected void doMap(IntWritable key, NullWritable value, Context context)
+            throws IOException, InterruptedException {
+
+        int index = key.get();
+
+        if (index < tblColRefs.length) {
+            // merge dictionary
+            TblColRef col = tblColRefs[index];
+            List<DictionaryInfo> dictInfos = Lists.newArrayList();
+            for (CubeSegment segment : mergingSegments) {
+                if (segment.getDictResPath(col) != null) {
+                    DictionaryInfo dictInfo = 
dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+                    if (dictInfo != null && !dictInfos.contains(dictInfo)) {
+                        dictInfos.add(dictInfo);
+                    }
+                }
+            }
+
+            DictionaryInfo mergedDictInfo = dictMgr.mergeDictionary(dictInfos);
+            String tblCol = col.getTableAlias() + ":" + col.getName();
+            String dictInfoPath = mergedDictInfo == null ? "" : 
mergedDictInfo.getResourcePath();
+
+            context.write(new IntWritable(-1), new Text(tblCol + "=" + 
dictInfoPath));
+
+        } else {
+            // merge statistics
+            KylinConfig kylinConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(new 
SerializableConfiguration(context.getConfiguration()), 
context.getConfiguration().get(BatchConstants.ARG_META_URL));
+
+            final String cubeName = 
context.getConfiguration().get(BatchConstants.ARG_CUBE_NAME);
+            final String segmentId = 
context.getConfiguration().get(BatchConstants.ARG_SEGMENT_ID);
+            final String statOutputPath = 
context.getConfiguration().get(MergeDictionaryJob.OPTION_OUTPUT_PATH_STAT.getOpt());
+            CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+
+            logger.info("Statistics output path: {}", statOutputPath);
+
+            CubeSegment newSegment = cubeInstance.getSegmentById(segmentId);
+            ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+            Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+            Configuration conf = null;
+            int averageSamplingPercentage = 0;
+
+            for (CubeSegment cubeSegment : mergingSegments) {
+                String filePath = cubeSegment.getStatisticsResourcePath();
+                InputStream is = rs.getResource(filePath).inputStream;
+                File tempFile;
+                FileOutputStream tempFileStream = null;
+
+                try {
+                    tempFile = File.createTempFile(segmentId, ".seq");
+                    tempFileStream = new FileOutputStream(tempFile);
+                    org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+                } finally {
+                    IOUtils.closeStream(is);
+                    IOUtils.closeStream(tempFileStream);
+                }
+
+                FileSystem fs = HadoopUtil.getFileSystem("file:///" + 
tempFile.getAbsolutePath());
+                SequenceFile.Reader reader = null;
+                try {
+                    conf = HadoopUtil.getCurrentConfiguration();
+                    //noinspection deprecation
+                    reader = new SequenceFile.Reader(fs, new 
Path(tempFile.getAbsolutePath()), conf);
+                    LongWritable keyW = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                    BytesWritable valueW = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                    while (reader.next(keyW, valueW)) {
+                        if (keyW.get() == 0L) {
+                            // sampling percentage;
+                            averageSamplingPercentage += 
Bytes.toInt(valueW.getBytes());
+                        } else if (keyW.get() > 0) {
+                            HLLCounter hll = new 
HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+                            ByteArray byteArray = new 
ByteArray(valueW.getBytes());
+                            hll.readRegisters(byteArray.asBuffer());
+
+                            if (cuboidHLLMap.get(keyW.get()) != null) {
+                                cuboidHLLMap.get(keyW.get()).merge(hll);
+                            } else {
+                                cuboidHLLMap.put(keyW.get(), hll);
+                            }
+                        }
+                    }
+                } catch (Exception e) {
+                    e.printStackTrace();
+                    throw e;
+                } finally {
+                    IOUtils.closeStream(reader);
+                }
+            }
+
+            averageSamplingPercentage = averageSamplingPercentage / 
mergingSegments.size();
+            CubeStatsWriter.writeCuboidStatistics(conf, new 
Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+            Path statisticsFilePath = new Path(statOutputPath, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+
+            FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, conf);
+            FSDataInputStream fis = fs.open(statisticsFilePath);
+
+            try {
+                // put the statistics to metadata store
+                String statisticsFileName = 
newSegment.getStatisticsResourcePath();
+                rs.putResource(statisticsFileName, fis, 
System.currentTimeMillis());
+            } finally {
+                IOUtils.closeStream(fis);
+            }
+
+            context.write(new IntWritable(-1), new Text(""));
+        }
+    }
+
+    private List<CubeSegment> getMergingSegments(CubeInstance cube, String[] 
segmentIds) {
+        List<CubeSegment> result = 
Lists.newArrayListWithCapacity(segmentIds.length);
+        for (String id : segmentIds) {
+            result.add(cube.getSegmentById(id));
+        }
+        return result;
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java
new file mode 100644
index 0000000000..1eb3c07f20
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/MergeDictionaryReducer.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.kylin.engine.mr.KylinReducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MergeDictionaryReducer extends KylinReducer<IntWritable, Text, 
Text, Text> {
+
+    private static final Logger logger = 
LoggerFactory.getLogger(MergeDictionaryReducer.class);
+
+    @Override
+    protected void doReduce(IntWritable key, Iterable<Text> values, Context 
context)
+            throws IOException, InterruptedException {
+        for (Text text : values) {
+            String value = text.toString();
+            String[] splited = value.split("=");
+            if (splited != null && splited.length == 2) {
+                logger.info("Dictionary for col {}, save at {}", splited[0], 
splited[1]);
+                context.write(new Text(splited[0]), new Text(splited[1]));
+            }
+        }
+    }
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
new file mode 100644
index 0000000000..bcdb29f52c
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/UpdateDictionaryStep.java
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import com.google.common.collect.Lists;
+import org.apache.commons.lang.StringUtils;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PathFilter;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.CubeUpdate;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.job.exception.ExecuteException;
+import org.apache.kylin.job.execution.AbstractExecutable;
+import org.apache.kylin.job.execution.ExecutableContext;
+import org.apache.kylin.job.execution.ExecuteResult;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UpdateDictionaryStep extends AbstractExecutable {
+    private static final Logger logger = 
LoggerFactory.getLogger(UpdateDictionaryStep.class);
+
+    @Override
+    protected ExecuteResult doWork(ExecutableContext context) throws 
ExecuteException {
+        final CubeManager cubeMgr = 
CubeManager.getInstance(context.getConfig());
+        final DictionaryManager dictMgrHdfs;
+        final DictionaryManager dictMgrHbase;
+        final CubeInstance cube = 
cubeMgr.getCube(CubingExecutableUtil.getCubeName(this.getParams()));
+        final CubeSegment newSegment = 
cube.getSegmentById(CubingExecutableUtil.getSegmentId(this.getParams()));
+        final List<CubeSegment> mergingSegments = getMergingSegments(cube);
+        final String dictInfoPath = 
this.getParams().get(BatchConstants.ARG_DICT_PATH);
+        final String metadataUrl = 
this.getParams().get(BatchConstants.ARG_META_URL);
+
+        final KylinConfig kylinConfHbase = cube.getConfig();
+        final KylinConfig kylinConfHdfs = 
KylinConfig.createInstanceFromUri(metadataUrl);
+
+        Collections.sort(mergingSegments);
+
+        try {
+            Configuration conf = HadoopUtil.getCurrentConfiguration();
+            FileSystem fs = HadoopUtil.getWorkingFileSystem();
+            ResourceStore hbaseRS = ResourceStore.getStore(kylinConfHbase);
+            ResourceStore hdfsRS = ResourceStore.getStore(kylinConfHdfs);
+            dictMgrHdfs = DictionaryManager.getInstance(kylinConfHdfs);
+            dictMgrHbase = DictionaryManager.getInstance(kylinConfHbase);
+
+            // work on copy instead of cached objects
+            CubeInstance cubeCopy = cube.latestCopyForWrite();
+            CubeSegment newSegCopy = 
cubeCopy.getSegmentById(newSegment.getUuid());
+
+            // update cube segment dictionary
+
+            FileStatus[] fileStatuss = fs.listStatus(new Path(dictInfoPath), 
new PathFilter() {
+                @Override
+                public boolean accept(Path path) {
+                    return path.getName().startsWith("part");
+                }
+            });
+
+            for (FileStatus fileStatus : fileStatuss) {
+                Path filePath = fileStatus.getPath();
+
+                SequenceFile.Reader reader = new SequenceFile.Reader(fs, 
filePath, conf);
+                Text key = (Text) 
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                Text value = (Text) 
ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                while (reader.next(key, value)) {
+                    String tblCol = key.toString();
+                    String dictInfoResource = value.toString();
+
+                    if (StringUtils.isNotEmpty(dictInfoResource)) {
+                        logger.info(dictInfoResource);
+                        // put dictionary file to metadata store
+                        DictionaryInfo dictInfoHdfs = 
dictMgrHdfs.getDictionaryInfo(dictInfoResource);
+                        DictionaryInfo dicInfoHbase = 
dictMgrHbase.trySaveNewDict(dictInfoHdfs.getDictionaryObject(), dictInfoHdfs);
+
+                        if (dicInfoHbase != null){
+                            TblColRef tblColRef = 
cube.getDescriptor().findColumnRef(tblCol.split(":")[0], tblCol.split(":")[1]);
+                            newSegCopy.putDictResPath(tblColRef, 
dicInfoHbase.getResourcePath());
+                        }
+                    }
+                }
+
+                IOUtils.closeStream(reader);
+            }
+
+            CubeSegment lastSeg = mergingSegments.get(mergingSegments.size() - 
1);
+            for (Map.Entry<String, String> entry : 
lastSeg.getSnapshots().entrySet()) {
+                newSegCopy.putSnapshotResPath(entry.getKey(), 
entry.getValue());
+            }
+
+            // update statistics
+            // put the statistics to metadata store
+            String statisticsFileName = newSegment.getStatisticsResourcePath();
+            hbaseRS.putResource(statisticsFileName, 
hdfsRS.getResource(newSegment.getStatisticsResourcePath()).inputStream, 
System.currentTimeMillis());
+
+            CubeUpdate update = new CubeUpdate(cubeCopy);
+            update.setToUpdateSegs(newSegCopy);
+            cubeMgr.updateCube(update);
+
+            return ExecuteResult.createSucceed();
+        } catch (IOException e) {
+            logger.error("fail to merge dictionary", e);
+            return ExecuteResult.createError(e);
+        }
+    }
+
+    private List<CubeSegment> getMergingSegments(CubeInstance cube) {
+        List<String> mergingSegmentIds = 
CubingExecutableUtil.getMergingSegmentIds(this.getParams());
+        List<CubeSegment> result = 
Lists.newArrayListWithCapacity(mergingSegmentIds.size());
+        for (String id : mergingSegmentIds) {
+            result.add(cube.getSegmentById(id));
+        }
+        return result;
+    }
+}
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
index 97861a3623..4487610885 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchMergeJobBuilder2.java
@@ -18,12 +18,8 @@
 
 package org.apache.kylin.engine.spark;
 
-import java.util.HashMap;
 import java.util.List;
-import java.util.Map;
 
-import org.apache.kylin.common.KylinConfig;
-import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.common.util.StringUtil;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.engine.mr.CubingJob;
@@ -63,8 +59,8 @@ public CubingJob build() {
 
         // Phase 1: Merge Dictionary
         inputSide.addStepPhase1_MergeDictionary(result);
-        result.addTask(createMergeDictionaryStep(mergingSegmentIds));
-        result.addTask(createMergeStatisticsStep(cubeSegment, 
mergingSegmentIds, getStatisticsPath(jobId)));
+        result.addTask(createMergeDictionaryStep(cubeSegment, jobId, 
mergingSegmentIds));
+        result.addTask(createUpdateDictionaryStep(cubeSegment, jobId, 
mergingSegmentIds));
         outputSide.addStepPhase1_MergeDictionary(result);
 
         // merge cube
@@ -80,6 +76,28 @@ public CubingJob build() {
         return result;
     }
 
+    public SparkExecutable createMergeDictionaryStep(CubeSegment seg, String 
jobID, List<String> mergingSegmentIds) {
+        final SparkExecutable sparkExecutable = new SparkExecutable();
+        sparkExecutable.setClassName(SparkMergingDictionary.class.getName());
+
+        
sparkExecutable.setParam(SparkMergingDictionary.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
+        
sparkExecutable.setParam(SparkMergingDictionary.OPTION_SEGMENT_ID.getOpt(), 
seg.getUuid());
+        
sparkExecutable.setParam(SparkMergingDictionary.OPTION_META_URL.getOpt(), 
getSegmentMetadataUrl(seg.getConfig(), jobID));
+        
sparkExecutable.setParam(SparkMergingDictionary.OPTION_MERGE_SEGMENT_IDS.getOpt(),
 StringUtil.join(mergingSegmentIds, ","));
+        
sparkExecutable.setParam(SparkMergingDictionary.OPTION_OUTPUT_PATH_DICT.getOpt(),
 getDictInfoPath(jobID));
+        
sparkExecutable.setParam(SparkMergingDictionary.OPTION_OUTPUT_PATH_STAT.getOpt(),
 getStatisticsPath(jobID));
+
+        sparkExecutable.setJobId(jobID);
+        
sparkExecutable.setName(ExecutableConstants.STEP_NAME_MERGE_DICTIONARY);
+
+        StringBuilder jars = new StringBuilder();
+
+        StringUtil.appendWithSeparator(jars, 
seg.getConfig().getSparkAdditionalJars());
+        sparkExecutable.setJars(jars.toString());
+
+        return sparkExecutable;
+    }
+
     public SparkExecutable createMergeCuboidDataStep(CubeSegment seg, 
List<CubeSegment> mergingSegments, String jobID) {
 
         final List<String> mergingCuboidPaths = Lists.newArrayList();
@@ -94,8 +112,7 @@ public SparkExecutable createMergeCuboidDataStep(CubeSegment 
seg, List<CubeSegme
         sparkExecutable.setParam(SparkCubingMerge.OPTION_CUBE_NAME.getOpt(), 
seg.getRealization().getName());
         sparkExecutable.setParam(SparkCubingMerge.OPTION_SEGMENT_ID.getOpt(), 
seg.getUuid());
         sparkExecutable.setParam(SparkCubingMerge.OPTION_INPUT_PATH.getOpt(), 
formattedPath);
-        sparkExecutable.setParam(SparkCubingMerge.OPTION_META_URL.getOpt(),
-                getSegmentMetadataUrl(seg.getConfig(), jobID));
+        sparkExecutable.setParam(SparkCubingMerge.OPTION_META_URL.getOpt(), 
getSegmentMetadataUrl(seg.getConfig(), jobID));
         sparkExecutable.setParam(SparkCubingMerge.OPTION_OUTPUT_PATH.getOpt(), 
outputPath);
 
         sparkExecutable.setJobId(jobID);
@@ -108,10 +125,4 @@ public SparkExecutable 
createMergeCuboidDataStep(CubeSegment seg, List<CubeSegme
 
         return sparkExecutable;
     }
-
-    public String getSegmentMetadataUrl(KylinConfig kylinConfig, String jobId) 
{
-        Map<String, String> param = new HashMap<>();
-        param.put("path", getDumpMetadataPath(jobId));
-        return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), 
"hdfs", param).toString();
-    }
 }
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
index 4635fad255..d8eba713b3 100644
--- 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkExecutable.java
@@ -37,6 +37,7 @@
 import org.apache.hadoop.util.Shell;
 import org.apache.kylin.common.KylinConfig;
 import org.apache.kylin.common.KylinConfigExt;
+import org.apache.kylin.common.persistence.ResourceStore;
 import org.apache.kylin.common.persistence.ResourceTool;
 import org.apache.kylin.common.util.CliCommandExecutor;
 import org.apache.kylin.common.util.Pair;
@@ -386,9 +387,13 @@ private void attachSegmentMetadataWithDict(CubeSegment 
segment) throws IOExcepti
     private void attachSegmentsMetadataWithDict(List<CubeSegment> segments) 
throws IOException {
         Set<String> dumpList = new LinkedHashSet<>();
         
dumpList.addAll(JobRelatedMetaUtil.collectCubeMetadata(segments.get(0).getCubeInstance()));
+        ResourceStore rs = ResourceStore.getStore(segments.get(0).getConfig());
         for (CubeSegment segment : segments) {
             dumpList.addAll(segment.getDictionaryPaths());
-            dumpList.add(segment.getStatisticsResourcePath());
+            if (rs.exists(segment.getStatisticsResourcePath())) {
+                // cube statistics is not available for new segment
+                dumpList.add(segment.getStatisticsResourcePath());
+            }
         }
         dumpAndUploadKylinPropsAndMetadata(dumpList, (KylinConfigExt) 
segments.get(0).getConfig());
     }
diff --git 
a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
new file mode 100644
index 0000000000..deb7968812
--- /dev/null
+++ 
b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkMergingDictionary.java
@@ -0,0 +1,307 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.spark;
+
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.cli.Option;
+import org.apache.commons.cli.OptionBuilder;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.IOUtils;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.SequenceFile;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
+import org.apache.hadoop.util.ReflectionUtils;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.common.persistence.ResourceStore;
+import org.apache.kylin.common.util.AbstractApplication;
+import org.apache.kylin.common.util.ByteArray;
+import org.apache.kylin.common.util.Bytes;
+import org.apache.kylin.common.util.HadoopUtil;
+import org.apache.kylin.common.util.OptionsHelper;
+import org.apache.kylin.cube.CubeDescManager;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.dict.DictionaryInfo;
+import org.apache.kylin.dict.DictionaryManager;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.apache.kylin.engine.mr.common.CubeStatsWriter;
+import org.apache.kylin.engine.mr.common.SerializableConfiguration;
+import org.apache.kylin.measure.hllc.HLLCounter;
+import org.apache.kylin.metadata.model.TblColRef;
+import org.apache.spark.SparkConf;
+import org.apache.spark.api.java.JavaPairRDD;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.api.java.function.PairFunction;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Lists;
+import com.google.common.collect.Maps;
+
+import scala.Tuple2;
+
+/**
+    merge dictionary
+ */
+public class SparkMergingDictionary extends AbstractApplication implements 
Serializable {
+    protected static final Logger logger = 
LoggerFactory.getLogger(SparkMergingDictionary.class);
+
+    public static final Option OPTION_CUBE_NAME = 
OptionBuilder.withArgName(BatchConstants.ARG_CUBE_NAME).hasArg()
+            .isRequired(true).withDescription("Cube 
Name").create(BatchConstants.ARG_CUBE_NAME);
+    public static final Option OPTION_SEGMENT_ID = 
OptionBuilder.withArgName("segmentId").hasArg().isRequired(true)
+            .withDescription("Cube Segment Id").create("segmentId");
+    public static final Option OPTION_META_URL = 
OptionBuilder.withArgName("metaUrl").hasArg().isRequired(true)
+            .withDescription("HDFS metadata url").create("metaUrl");
+    public static final Option OPTION_MERGE_SEGMENT_IDS = 
OptionBuilder.withArgName("segmentIds").hasArg()
+            .isRequired(true).withDescription("Merging Cube Segment 
Ids").create("segmentIds");
+    public static final Option OPTION_OUTPUT_PATH_DICT = 
OptionBuilder.withArgName("dictOutputPath").hasArg()
+            .isRequired(true).withDescription("merged dictionary resource 
path").create("dictOutputPath");
+    public static final Option OPTION_OUTPUT_PATH_STAT = 
OptionBuilder.withArgName("statOutputPath").hasArg()
+            .isRequired(true).withDescription("merged statistics resource 
path").create("statOutputPath");
+
+    private Options options;
+
+    public SparkMergingDictionary() {
+        options = new Options();
+        options.addOption(OPTION_CUBE_NAME);
+        options.addOption(OPTION_SEGMENT_ID);
+        options.addOption(OPTION_META_URL);
+        options.addOption(OPTION_MERGE_SEGMENT_IDS);
+        options.addOption(OPTION_OUTPUT_PATH_DICT);
+        options.addOption(OPTION_OUTPUT_PATH_STAT);
+    }
+
+    @Override
+    protected Options getOptions() {
+        return options;
+    }
+
+    @Override
+    protected void execute(OptionsHelper optionsHelper) throws Exception {
+        final String cubeName = optionsHelper.getOptionValue(OPTION_CUBE_NAME);
+        final String segmentId = 
optionsHelper.getOptionValue(OPTION_SEGMENT_ID);
+        final String metaUrl = optionsHelper.getOptionValue(OPTION_META_URL);
+        final String segmentIds = 
optionsHelper.getOptionValue(OPTION_MERGE_SEGMENT_IDS);
+        final String dictOutputPath = 
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_DICT);
+        final String statOutputPath = 
optionsHelper.getOptionValue(OPTION_OUTPUT_PATH_STAT);
+
+        Class[] kryoClassArray = new Class[] { 
Class.forName("scala.reflect.ClassTag$$anon$1"),
+                Class.forName("scala.collection.mutable.WrappedArray$ofRef") };
+
+        SparkConf conf = new SparkConf().setAppName("Merge dictionary for 
cube:" + cubeName + ", segment " + segmentId);
+        //serialization conf
+        conf.set("spark.serializer", 
"org.apache.spark.serializer.KryoSerializer");
+        conf.set("spark.kryo.registrator", 
"org.apache.kylin.engine.spark.KylinKryoRegistrator");
+        conf.set("spark.kryo.registrationRequired", 
"true").registerKryoClasses(kryoClassArray);
+
+        JavaSparkContext sc = new JavaSparkContext(conf);
+        KylinSparkJobListener jobListener = new KylinSparkJobListener();
+        sc.sc().addSparkListener(jobListener);
+
+        HadoopUtil.deletePath(sc.hadoopConfiguration(), new 
Path(dictOutputPath));
+
+        final SerializableConfiguration sConf = new 
SerializableConfiguration(sc.hadoopConfiguration());
+        final KylinConfig envConfig = 
AbstractHadoopJob.loadKylinConfigFromHdfs(sConf, metaUrl);
+
+        final CubeInstance cubeInstance = 
CubeManager.getInstance(envConfig).getCube(cubeName);
+        final CubeDesc cubeDesc = 
CubeDescManager.getInstance(envConfig).getCubeDesc(cubeInstance.getDescName());
+
+        logger.info("Dictionary output path: {}", dictOutputPath);
+        logger.info("Statistics output path: {}", statOutputPath);
+
+        final TblColRef[] tblColRefs = 
cubeDesc.getAllColumnsNeedDictionaryBuilt().toArray(new TblColRef[0]);
+        final int columnLength = tblColRefs.length;
+
+        List<Integer> indexs = Lists.newArrayListWithCapacity(columnLength);
+
+        for (int i = 0; i <= columnLength; i++) {
+            indexs.add(i);
+        }
+
+        JavaRDD<Integer> indexRDD = sc.parallelize(indexs, columnLength + 1);
+
+        JavaPairRDD<Text, Text> colToDictPathRDD = indexRDD.mapToPair(new 
MergeDictAndStatsFunction(cubeName, metaUrl,
+                segmentId, segmentIds.split(","), statOutputPath, tblColRefs, 
sConf));
+
+        colToDictPathRDD.coalesce(1, 
false).saveAsNewAPIHadoopFile(dictOutputPath, Text.class, Text.class, 
SequenceFileOutputFormat.class);
+    }
+
+    static public class MergeDictAndStatsFunction implements 
PairFunction<Integer, Text, Text> {
+        private volatile transient boolean initialized = false;
+        private String cubeName;
+        private String metaUrl;
+        private String segmentId;
+        private String[] segmentIds;
+        private String statOutputPath;
+        private TblColRef[] tblColRefs;
+        private SerializableConfiguration conf;
+        private DictionaryManager dictMgr;
+        private KylinConfig kylinConfig;
+        private List<CubeSegment> mergingSegments;
+
+        public MergeDictAndStatsFunction(String cubeName, String metaUrl, 
String segmentId, String[] segmentIds,
+                String statOutputPath, TblColRef[] tblColRefs, 
SerializableConfiguration conf) {
+            this.cubeName = cubeName;
+            this.metaUrl = metaUrl;
+            this.segmentId = segmentId;
+            this.segmentIds = segmentIds;
+            this.statOutputPath = statOutputPath;
+            this.tblColRefs = tblColRefs;
+            this.conf = conf;
+        }
+
+        private void init() {
+            kylinConfig = AbstractHadoopJob.loadKylinConfigFromHdfs(conf, 
metaUrl);
+            KylinConfig.setAndUnsetThreadLocalConfig(kylinConfig);
+            CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+            dictMgr = DictionaryManager.getInstance(kylinConfig);
+            mergingSegments = getMergingSegments(cubeInstance, segmentIds);
+        }
+
+        @Override
+        public Tuple2<Text, Text> call(Integer index) throws Exception {
+            if (initialized == false) {
+                synchronized (SparkMergingDictionary.class) {
+                    if (initialized == false) {
+                        init();
+                        initialized = true;
+                    }
+                }
+            }
+
+            if (index < tblColRefs.length) {
+                // merge dictionary
+                TblColRef col = tblColRefs[index];
+                List<DictionaryInfo> dictInfos = Lists.newArrayList();
+                for (CubeSegment segment : mergingSegments) {
+                    if (segment.getDictResPath(col) != null) {
+                        DictionaryInfo dictInfo = 
dictMgr.getDictionaryInfo(segment.getDictResPath(col));
+                        if (dictInfo != null && !dictInfos.contains(dictInfo)) 
{
+                            dictInfos.add(dictInfo);
+                        }
+                    }
+                }
+
+                DictionaryInfo mergedDictInfo = 
dictMgr.mergeDictionary(dictInfos);
+                String tblCol = col.getTableAlias() + ":" + col.getName();
+                String dictInfoPath = mergedDictInfo == null ? "" : 
mergedDictInfo.getResourcePath();
+
+                return new Tuple2<>(new Text(tblCol), new Text(dictInfoPath));
+            } else {
+                // merge statistics
+                CubeInstance cubeInstance = 
CubeManager.getInstance(kylinConfig).getCube(cubeName);
+                CubeSegment newSegment = 
cubeInstance.getSegmentById(segmentId);
+                ResourceStore rs = ResourceStore.getStore(kylinConfig);
+
+                Map<Long, HLLCounter> cuboidHLLMap = Maps.newHashMap();
+                Configuration conf = null;
+                int averageSamplingPercentage = 0;
+
+                for (CubeSegment cubeSegment : mergingSegments) {
+                    String filePath = cubeSegment.getStatisticsResourcePath();
+                    InputStream is = rs.getResource(filePath).inputStream;
+                    File tempFile;
+                    FileOutputStream tempFileStream = null;
+
+                    try {
+                        tempFile = File.createTempFile(segmentId, ".seq");
+                        tempFileStream = new FileOutputStream(tempFile);
+                        org.apache.commons.io.IOUtils.copy(is, tempFileStream);
+                    } finally {
+                        IOUtils.closeStream(is);
+                        IOUtils.closeStream(tempFileStream);
+                    }
+
+                    FileSystem fs = HadoopUtil.getFileSystem("file:///" + 
tempFile.getAbsolutePath());
+                    SequenceFile.Reader reader = null;
+
+                    try {
+                        conf = HadoopUtil.getCurrentConfiguration();
+                        //noinspection deprecation
+                        reader = new SequenceFile.Reader(fs, new 
Path(tempFile.getAbsolutePath()), conf);
+                        LongWritable key = (LongWritable) 
ReflectionUtils.newInstance(reader.getKeyClass(), conf);
+                        BytesWritable value = (BytesWritable) 
ReflectionUtils.newInstance(reader.getValueClass(), conf);
+
+                        while (reader.next(key, value)) {
+                            if (key.get() == 0L) {
+                                // sampling percentage;
+                                averageSamplingPercentage += 
Bytes.toInt(value.getBytes());
+                            } else if (key.get() > 0) {
+                                HLLCounter hll = new 
HLLCounter(kylinConfig.getCubeStatsHLLPrecision());
+                                ByteArray byteArray = new 
ByteArray(value.getBytes());
+                                hll.readRegisters(byteArray.asBuffer());
+
+                                if (cuboidHLLMap.get(key.get()) != null) {
+                                    cuboidHLLMap.get(key.get()).merge(hll);
+                                } else {
+                                    cuboidHLLMap.put(key.get(), hll);
+                                }
+                            }
+                        }
+                    } finally {
+                        IOUtils.closeStream(reader);
+                    }
+                }
+
+                averageSamplingPercentage = averageSamplingPercentage / 
mergingSegments.size();
+                CubeStatsWriter.writeCuboidStatistics(conf, new 
Path(statOutputPath), cuboidHLLMap, averageSamplingPercentage);
+                Path statisticsFilePath = new Path(statOutputPath, 
BatchConstants.CFG_STATISTICS_CUBOID_ESTIMATION_FILENAME);
+
+                FileSystem fs = HadoopUtil.getFileSystem(statisticsFilePath, 
conf);
+                FSDataInputStream fis = fs.open(statisticsFilePath);
+
+                try {
+                    // put the statistics to metadata store
+                    String statisticsFileName = 
newSegment.getStatisticsResourcePath();
+                    rs.putResource(statisticsFileName, fis, 
System.currentTimeMillis());
+                } finally {
+                    IOUtils.closeStream(fis);
+                }
+
+                return new Tuple2<>(new Text(""), new Text(""));
+            }
+
+        }
+
+        private List<CubeSegment> getMergingSegments(CubeInstance cube, 
String[] segmentIds) {
+            List<CubeSegment> result = 
Lists.newArrayListWithCapacity(segmentIds.length);
+            for (String id : segmentIds) {
+                result.add(cube.getSegmentById(id));
+            }
+            return result;
+        }
+    }
+
+}


 

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

Reply via email to