This is an automated email from the ASF dual-hosted git repository.

xxyu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/kylin.git

commit 428de356cb8b1c6520ae2b6c3113805371246e71
Author: wangxiaojing <[email protected]>
AuthorDate: Wed May 6 14:39:59 2020 +0800

    KYLIN-4346 Build Global Dict by MR/Hive, Parallel Total Build
     Step implementation
---
 .../kylin/job/constant/ExecutableConstants.java    |   1 +
 .../kylin/engine/mr/BatchCubingJobBuilder2.java    |   3 +-
 .../apache/kylin/engine/mr/JobBuilderSupport.java  |  45 ++++
 .../kylin/engine/mr/common/AbstractHadoopJob.java  |   4 +
 .../kylin/engine/mr/common/BatchConstants.java     |   4 +
 .../mr/steps/BuildGlobalHiveDicTotalBuildJob.java  | 129 ++++++++++++
 .../steps/BuildGlobalHiveDictTotalBuildMapper.java | 231 +++++++++++++++++++++
 7 files changed, 416 insertions(+), 1 deletion(-)

diff --git 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
index b2d087b..8d4de09 100644
--- 
a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
+++ 
b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java
@@ -87,6 +87,7 @@ public final class ExecutableConstants {
     // MR - Hive Dict
     public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = 
"Build Global Dict - extract distinct value from data";
     public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = 
"Build Global Dict - parallel part build";
+    public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = 
"Build Global Dict - parallel total build";
     public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = 
"Build Global Dict - merge to dict table";
     public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = 
"Build Global Dict - replace intermediate table";
 
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
index 0aae61d..b62650a 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java
@@ -69,7 +69,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport 
{
             //parallel part build
             result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId));
 
-            //toDo parallel total build
+            //parallel total build
+            result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId));
         }
 
         //toDo merge global dic and replace flat table
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
index dddb67d..a597279 100644
--- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
+++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java
@@ -33,9 +33,11 @@ import org.apache.kylin.common.StorageURL;
 import org.apache.kylin.cube.CubeSegment;
 import org.apache.kylin.cube.cuboid.CuboidModeEnum;
 import org.apache.kylin.cube.model.CubeDesc;
+import org.apache.kylin.engine.EngineFactory;
 import org.apache.kylin.engine.mr.common.BatchConstants;
 import org.apache.kylin.engine.mr.common.HadoopShellExecutable;
 import org.apache.kylin.engine.mr.common.MapReduceExecutable;
+import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDicTotalBuildJob;
 import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictPartBuildJob;
 import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob;
 import org.apache.kylin.engine.mr.steps.CreateDictionaryJob;
@@ -51,6 +53,7 @@ import 
org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep;
 import org.apache.kylin.engine.mr.steps.UpdateDictionaryStep;
 import org.apache.kylin.job.constant.ExecutableConstants;
 import org.apache.kylin.job.engine.JobEngineConfig;
+import org.apache.kylin.metadata.model.IJoinedFlatTableDesc;
 import org.apache.kylin.metadata.model.TblColRef;
 
 import org.apache.kylin.shaded.com.google.common.base.Preconditions;
@@ -221,6 +224,24 @@ public class JobBuilderSupport {
         return result;
     }
 
+    public MapReduceExecutable createBuildGlobalHiveDicTotalBuildJob(String 
jobId) {
+        MapReduceExecutable result = new MapReduceExecutable();
+        
result.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL);
+        result.setMapReduceJobClass(BuildGlobalHiveDicTotalBuildJob.class);
+        StringBuilder cmd = new StringBuilder();
+        appendMapReduceParameters(cmd);
+        appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, 
seg.getRealization().getName());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, 
seg.getUuid());
+        appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME,
+                ExecutableConstants.STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL 
+ seg.getRealization().getName() + "_Step");
+        appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, 
getBuildGlobalHiveDicTotalBuildJobInputPath(jobId));
+        appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, 
getBuildGlobalDictionaryTotalOutput(seg.getConfig()));
+        appendExecCmdParameters(cmd, 
BatchConstants.ARG_GLOBAL_DIC_PART_REDUCE_STATS, 
getBuildGlobalDictionaryPartReduceStatsPathV2(jobId));
+        appendExecCmdParameters(cmd, 
BatchConstants.ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT, 
getBuildGlobalDictionaryMaxDistinctCountPath(jobId));
+        result.setMapReduceParams(cmd.toString());
+        return result;
+    }
+
     public UpdateCubeInfoAfterBuildStep 
createUpdateCubeInfoAfterBuildStep(String jobId, LookupMaterializeContext 
lookupMaterializeContext) {
         final UpdateCubeInfoAfterBuildStep result = new 
UpdateCubeInfoAfterBuildStep();
         result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO);
@@ -362,6 +383,30 @@ public class JobBuilderSupport {
         return getRealizationRootPath(jobId) + "/global_dic";
     }
 
+    public String getBuildGlobalHiveDicTotalBuildJobInputPath(String jobId) {
+        return getBuildGlobalDictionaryBasePath(jobId)+"/part_sort";
+    }
+
+    public String getBuildGlobalDictionaryMaxDistinctCountPath(String jobId) {
+        KylinConfig conf = seg.getConfig();
+        String dbDir = conf.getHiveDatabaseDir();
+        IJoinedFlatTableDesc flatDesc = 
EngineFactory.getJoinedFlatTableDesc(seg);
+        String tableName = 
flatDesc.getTableName()+conf.getMrHiveDictIntermediateTTableSuffix();
+        String outPut = 
dbDir+"/"+tableName+"/dict_column="+BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE;
+        return outPut;
+    }
+
+    public String getBuildGlobalDictionaryPartReduceStatsPathV2(String jobId) {
+        return getBuildGlobalDictionaryBasePath(jobId)+ "/reduce_stats";
+    }
+
+    public String getBuildGlobalDictionaryTotalOutput(KylinConfig config){
+        String dbDir = config.getHiveDatabaseDir();
+        String tableName = 
EngineFactory.getJoinedFlatTableDesc(seg).getTableName()+config.getMrHiveDictTableSuffix();
+        String path = dbDir+"/"+tableName;
+        return path;
+    }
+
     public String getDictRootPath(String jobId) {
         return getRealizationRootPath(jobId) + "/dict";
     }
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
index 371b10b..c463014 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java
@@ -107,6 +107,10 @@ public abstract class AbstractHadoopJob extends Configured 
implements Tool {
             .isRequired(true).withDescription("Output 
path").create(BatchConstants.ARG_OUTPUT);
     protected static final Option OPTION_DICT_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg()
             .isRequired(false).withDescription("Dict 
path").create(BatchConstants.ARG_DICT_PATH);
+    protected static final Option OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT = 
OptionBuilder.withArgName(BatchConstants.ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT).hasArg()
+            .isRequired(false).withDescription("GLOBAL dic max distinct count 
path").create(BatchConstants.ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT);
+    protected static final Option OPTION_GLOBAL_DIC_PART_REDUCE_STATS = 
OptionBuilder.withArgName(BatchConstants.ARG_GLOBAL_DIC_PART_REDUCE_STATS).hasArg()
+            .isRequired(false).withDescription("Global dic part reduce 
stats").create(BatchConstants.ARG_GLOBAL_DIC_PART_REDUCE_STATS);
     protected static final Option OPTION_NCUBOID_LEVEL = 
OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg()
             .isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 
2, 3...").create(BatchConstants.ARG_LEVEL);
     protected static final Option OPTION_PARTITION_FILE_PATH = 
OptionBuilder.withArgName(BatchConstants.ARG_PARTITION)
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
index 3fffad2..6031f3c 100644
--- 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java
@@ -72,6 +72,8 @@ public interface BatchConstants {
     String CFG_MR_SPARK_JOB = "mr.spark.job";
     String CFG_SPARK_META_URL = "spark.meta.url";
     String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir";
+    String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE="KYLIN_MAX_DISTINCT_COUNT";
+
 
     String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum";
 
@@ -112,6 +114,8 @@ public interface BatchConstants {
     String ARG_BASE64_ENCODED_STEP_NAME = "base64StepName";
     String ARG_SQL_COUNT = "sqlCount";
     String ARG_BASE64_ENCODED_SQL = "base64EncodedSql";
+    String ARG_GLOBAL_DIC_PART_REDUCE_STATS = "global_dic_part_reduce_stats";
+    String ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT = "globa_dicl_max_distinct_count";
 
     /**
      * logger and counter
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
new file mode 100644
index 0000000..acdbb07
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.IOException;
+import org.apache.commons.cli.Options;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.cube.CubeInstance;
+import org.apache.kylin.cube.CubeManager;
+import org.apache.kylin.cube.CubeSegment;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.apache.kylin.engine.mr.common.BatchConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob {
+    protected static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDicTotalBuildJob.class);
+
+    @Override
+    public int run(String[] args) throws Exception {
+        Options options = new Options();
+        String[] dicColsArr = null;
+        try {
+            options.addOption(OPTION_JOB_NAME);
+            options.addOption(OPTION_INPUT_PATH);
+            options.addOption(OPTION_OUTPUT_PATH);
+            options.addOption(OPTION_CUBE_NAME);
+            options.addOption(OPTION_SEGMENT_ID);
+            options.addOption(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT);
+            options.addOption(OPTION_GLOBAL_DIC_PART_REDUCE_STATS);
+            parseOptions(options, args);
+
+            KylinConfig config = KylinConfig.getInstanceFromEnv();
+            dicColsArr = config.getMrHiveDictColumnsExcludeRefColumns();
+            String cubeName = getOptionValue(OPTION_CUBE_NAME);
+            String segmentID = getOptionValue(OPTION_SEGMENT_ID);
+
+            job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME));
+            logger.info("Starting: " + job.getJobName());
+
+            // 
----------------------------------------------------------------------------
+            // add metadata to distributed cache
+            CubeManager cubeMgr = CubeManager.getInstance(config);
+            CubeInstance cube = cubeMgr.getCube(cubeName);
+            CubeSegment segment = cube.getSegmentById(segmentID);
+
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName);
+            job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, 
segmentID);
+            job.getConfiguration().set("partition.statistics.path", 
getOptionValue(OPTION_GLOBAL_DIC_PART_REDUCE_STATS));
+            job.getConfiguration().set("last.max.dic.value.path", 
getOptionValue(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT));
+            
job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", 
false);
+
+            job.setJarByClass(BuildGlobalHiveDicTotalBuildJob.class);
+
+            setJobClasspath(job, cube.getConfig());
+
+            // Mapper
+            job.setMapperClass(BuildGlobalHiveDictTotalBuildMapper.class);
+
+            // Input Output
+            setInput(job, getOptionValue(OPTION_INPUT_PATH));
+            setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH));
+
+            job.setNumReduceTasks(0);//no reduce
+
+            job.setInputFormatClass(KeyValueTextInputFormat.class);
+
+            // prevent to create zero-sized default output
+            LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);
+
+            //delete output
+            Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH));
+            deletePath(job.getConfiguration(), baseOutputPath);
+
+            attachSegmentMetadataWithDict(segment, job.getConfiguration());
+            return waitForCompletion(job);
+        } finally {
+            if (job != null)
+                cleanupTempConfFile(job.getConfiguration());
+        }
+    }
+
+    private void setOutput(Job job, String[] dicColsArry, String outputBase){
+        // make each reducer output to respective dir
+        
///user/prod_kylin/tmp/kylin2/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/bs_order_scene_day_new_cube_clone/dict_column=DM_ES_REPORT_ORDER_VIEW0420_DRIVER_ID/part_sort
+        for(int i=0;i<dicColsArry.length;i++){
+            MultipleOutputs.addNamedOutput(job, i + "", 
TextOutputFormat.class, Text.class, LongWritable.class);
+        }
+        Path outputPath = new Path(outputBase);
+        FileOutputFormat.setOutputPath(job, outputPath);
+    }
+
+    private void setInput(Job job, String input) throws IOException {
+        Path path = new Path(input);
+        FileSystem fs = path.getFileSystem(job.getConfiguration());
+        if(!fs.exists(path)){
+            fs.mkdirs(path);
+        }
+        FileInputFormat.setInputPaths(job, getOptionValue(OPTION_INPUT_PATH));
+    }
+
+}
diff --git 
a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
new file mode 100644
index 0000000..6efac31
--- /dev/null
+++ 
b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java
@@ -0,0 +1,231 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.kylin.engine.mr.steps;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs;
+import org.apache.kylin.common.KylinConfig;
+import org.apache.kylin.engine.mr.KylinMapper;
+import org.apache.kylin.engine.mr.common.AbstractHadoopJob;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends 
KylinMapper<KEYIN, Text, Text, LongWritable> {
+    private static final Logger logger = 
LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildMapper.class);
+
+    private MultipleOutputs mos;
+    private Integer colIndex = null;
+    private String colName = null;
+    private Long start = 0L;//start index
+    private String[] cols = null;
+
+    @Override
+    protected void doSetup(Context context) throws IOException, 
InterruptedException {
+        Configuration conf = context.getConfiguration();
+        mos = new MultipleOutputs(context);
+
+        KylinConfig config;
+        try {
+            config = AbstractHadoopJob.loadKylinPropsAndMetadata();
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+        cols = config.getMrHiveDictColumnsExcludeRefColumns();
+
+
+        String statPath = conf.get("partition.statistics.path");
+
+        //get the input file name ,the file name format by 
colIndex-part-partitionNum, eg: 1-part-000019
+        FileSplit fileSplit = (FileSplit) context.getInputSplit();
+        String[] arr = fileSplit.getPath().getName().split("-");
+        int partitionNum = Integer.parseInt(arr[2]);
+        colIndex = Integer.parseInt(arr[0]);
+        colName = cols[colIndex];
+        logger.info("Input 
fileName:{},colIndex:{},colName:{},partitionNum:{}", 
fileSplit.getPath().getName(), colIndex, colName, partitionNum);
+
+        //last max dic value per column
+        String lastMaxValuePath = conf.get("last.max.dic.value.path");
+        logger.info("last.max.dic.value.path:"+lastMaxValuePath);
+        long lastMaxDictValue = this.getLastMaxDicValue(conf, 
lastMaxValuePath);
+        
logger.info("last.max.dic.value.path:"+lastMaxValuePath+",value="+lastMaxDictValue);
+
+        //Calculate the starting position of this file, the starting position 
of this file = sum (count) of all previous numbers + last max dic value of the 
column
+        Map<Integer, TreeMap<Integer, Long>> allStats = 
getPartitionsCount(conf, statPath);//<colIndex,<reduceNum,count>>
+        TreeMap<Integer, Long> partitionStats =allStats.get(colIndex);
+        if(partitionNum!=0) {
+            SortedMap<Integer, Long> subStat = partitionStats.subMap(0, true, 
partitionNum, false);
+            subStat.forEach((k, v)->{
+                logger.info("Split num:{} and it's count:{}", k, v);
+                start += v;
+            });
+        }
+        start += lastMaxDictValue;
+        logger.info("global dic.{}.split.num.{} build dict start offset is 
{}", colName, partitionNum, start);
+    }
+
+    @Override
+    public void doMap(KEYIN key, Text record, Context context) throws 
IOException, InterruptedException {
+        long inkey = Long.parseLong(key.toString());
+        mos.write(colIndex+"", record, new LongWritable(start + inkey), 
"dict_column="+colName+"/"+colIndex);
+    }
+
+    @Override
+    protected void doCleanup(Context context) throws IOException, 
InterruptedException {
+        mos.close();
+    }
+
+    private Map<Integer, TreeMap<Integer, Long>> 
getPartitionsCount(Configuration conf, String partitionStatPath) throws 
IOException {
+        StringBuffer sb = new StringBuffer();
+        String temp=null;
+
+        String[] fileNameArr = null;
+        String[] statsArr = null;
+
+        //colStats key is last step reduce num,value is last step that reduce 
item count
+        TreeMap<Integer, Long> colStats = null;
+        //allStats key is colIndex,value is colStats(that col statistics info)
+        Map<Integer, TreeMap<Integer, Long>> allStats = new HashMap<>();
+
+        Path path = new Path(partitionStatPath);
+        FileSystem fs = path.getFileSystem(conf);
+        if (fs.exists(path) && fs.isDirectory(path)) {
+            for (FileStatus status : fs.listStatus(path)) {
+                //fileNameArr[0] is globaldict colIndex
+                fileNameArr=status.getPath().getName().split("-");
+                colStats = allStats.get(Integer.parseInt(fileNameArr[0]));
+                if(colStats==null){
+                    colStats = new TreeMap<>();
+                }
+                temp=cat(status.getPath(), fs);
+                logger.info("partitionStatPath:{},content:{}", 
partitionStatPath, temp);
+                if(temp!=null){
+                    statsArr = temp.split("\t");
+                    colStats.put(Integer.parseInt(statsArr[1]), 
Long.parseLong(statsArr[0]));
+                    allStats.put(Integer.parseInt(fileNameArr[0]), colStats);
+                }
+            }
+        }
+
+        allStats.forEach((k, v)->{
+            v.forEach((k1, v1)->{
+                logger.info("allStats.colIndex:{},this split num:{},this split 
num's count:{}", k, k1, v1);
+            });
+        });
+
+        return allStats;
+    }
+
+    private String cat(Path remotePath, FileSystem fs) throws IOException {
+        FSDataInputStream in = null;
+        BufferedReader buffer = null;
+        StringBuffer stat= new StringBuffer();
+        try {
+            in= fs.open(remotePath);
+            buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ;
+            String line = null;
+            while ((line = buffer.readLine()) != null) {
+                stat.append(line);
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }finally {
+            buffer.close();
+            in.close();
+        }
+        return stat.toString();
+    }
+
+    /**
+     *
+     * @param conf
+     * @param lastMaxDicValuePath, eg: 
/user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000
+     *        remotePath content is dict colum stats info of per column: dic 
column name,extract distinct value count,last max dic value
+     * @return this colIndex's last max dic value
+     * @throws IOException
+     */
+    private long getLastMaxDicValue(Configuration conf, String 
lastMaxDicValuePath) throws IOException {
+        StringBuffer sb=new StringBuffer();
+        Map<Integer, Long> map = null;
+        Path path = new Path(lastMaxDicValuePath);
+        FileSystem fs = path.getFileSystem(conf);
+        if (fs.exists(path) && fs.isDirectory(path)) {
+            for (FileStatus status : fs.listStatus(path)) {
+                logger.info("start buildMaxCountMap :");
+                map = buildMaxCountMap(status.getPath(), fs);
+                logger.info("end buildMaxCountMap :");
+
+            }
+        }
+        return  map.get(colIndex)==null?0L:map.get(colIndex);
+    }
+
+    /**
+     *
+     * @param remotePath , eg: 
/user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000
+     *        remotePath content is dict colum stats info of per column: dic 
column name,extract distinct value count,last max dic value
+     * @param fs
+     * @return Map<>,key is colIndex, value is last max dict value
+     * @throws IOException
+     */
+    private  Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem 
fs) throws IOException {
+        FSDataInputStream in = null;
+        BufferedReader buffer = null;
+        String[] arr=null;
+        Map<Integer, Long> map= new HashMap();
+        try {
+            in= fs.open(remotePath);
+            buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ;
+            String line = null;
+            while ((line = buffer.readLine()) != null) {
+                arr = line.split(",");
+                logger.info("line="+line+",arr.length:"+arr.length);
+                if(arr.length==3) {
+                    for (int i = 0; i < cols.length; i++) {
+                        if(cols[i].equalsIgnoreCase(arr[0])) {
+                            map.put(i, Long.parseLong(arr[2]));
+                            logger.info("col.{}.maxValue={}", cols[i], 
Long.parseLong(arr[2]));
+                            break;
+                        }
+                    }
+                }
+            }
+        } catch (IOException e) {
+            e.printStackTrace();
+        }finally {
+            buffer.close();
+            in.close();
+        }
+        logger.info("BuildMaxCountMap map="+map);
+        return map;
+    }
+}

Reply via email to