This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 428de356cb8b1c6520ae2b6c3113805371246e71 Author: wangxiaojing <[email protected]> AuthorDate: Wed May 6 14:39:59 2020 +0800 KYLIN-4346 Build Global Dict by MR/Hive, Parallel Total Build Step implementation --- .../kylin/job/constant/ExecutableConstants.java | 1 + .../kylin/engine/mr/BatchCubingJobBuilder2.java | 3 +- .../apache/kylin/engine/mr/JobBuilderSupport.java | 45 ++++ .../kylin/engine/mr/common/AbstractHadoopJob.java | 4 + .../kylin/engine/mr/common/BatchConstants.java | 4 + .../mr/steps/BuildGlobalHiveDicTotalBuildJob.java | 129 ++++++++++++ .../steps/BuildGlobalHiveDictTotalBuildMapper.java | 231 +++++++++++++++++++++ 7 files changed, 416 insertions(+), 1 deletion(-) diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index b2d087b..8d4de09 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -87,6 +87,7 @@ public final class ExecutableConstants { // MR - Hive Dict public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Build Global Dict - extract distinct value from data"; public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = "Build Global Dict - parallel part build"; + public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = "Build Global Dict - parallel total build"; public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Build Global Dict - merge to dict table"; public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Global Dict - replace intermediate table"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 0aae61d..b62650a 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -69,7 +69,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { //parallel part build result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId)); - //toDo parallel total build + //parallel total build + result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId)); } //toDo merge global dic and replace flat table diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index dddb67d..a597279 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -33,9 +33,11 @@ import org.apache.kylin.common.StorageURL; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.CuboidModeEnum; import org.apache.kylin.cube.model.CubeDesc; +import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; +import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDicTotalBuildJob; import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictPartBuildJob; import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob; import org.apache.kylin.engine.mr.steps.CreateDictionaryJob; @@ -51,6 +53,7 @@ import org.apache.kylin.engine.mr.steps.UpdateCubeInfoAfterMergeStep; import org.apache.kylin.engine.mr.steps.UpdateDictionaryStep; import org.apache.kylin.job.constant.ExecutableConstants; import org.apache.kylin.job.engine.JobEngineConfig; +import org.apache.kylin.metadata.model.IJoinedFlatTableDesc; import org.apache.kylin.metadata.model.TblColRef; import org.apache.kylin.shaded.com.google.common.base.Preconditions; @@ -221,6 +224,24 @@ public class JobBuilderSupport { return result; } + public MapReduceExecutable createBuildGlobalHiveDicTotalBuildJob(String jobId) { + MapReduceExecutable result = new MapReduceExecutable(); + result.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL); + result.setMapReduceJobClass(BuildGlobalHiveDicTotalBuildJob.class); + StringBuilder cmd = new StringBuilder(); + appendMapReduceParameters(cmd); + appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); + appendExecCmdParameters(cmd, BatchConstants.ARG_SEGMENT_ID, seg.getUuid()); + appendExecCmdParameters(cmd, BatchConstants.ARG_JOB_NAME, + ExecutableConstants.STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL + seg.getRealization().getName() + "_Step"); + appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getBuildGlobalHiveDicTotalBuildJobInputPath(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_OUTPUT, getBuildGlobalDictionaryTotalOutput(seg.getConfig())); + appendExecCmdParameters(cmd, BatchConstants.ARG_GLOBAL_DIC_PART_REDUCE_STATS, getBuildGlobalDictionaryPartReduceStatsPathV2(jobId)); + appendExecCmdParameters(cmd, BatchConstants.ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT, getBuildGlobalDictionaryMaxDistinctCountPath(jobId)); + result.setMapReduceParams(cmd.toString()); + return result; + } + public UpdateCubeInfoAfterBuildStep createUpdateCubeInfoAfterBuildStep(String jobId, LookupMaterializeContext lookupMaterializeContext) { final UpdateCubeInfoAfterBuildStep result = new UpdateCubeInfoAfterBuildStep(); result.setName(ExecutableConstants.STEP_NAME_UPDATE_CUBE_INFO); @@ -362,6 +383,30 @@ public class JobBuilderSupport { return getRealizationRootPath(jobId) + "/global_dic"; } + public String getBuildGlobalHiveDicTotalBuildJobInputPath(String jobId) { + return getBuildGlobalDictionaryBasePath(jobId)+"/part_sort"; + } + + public String getBuildGlobalDictionaryMaxDistinctCountPath(String jobId) { + KylinConfig conf = seg.getConfig(); + String dbDir = conf.getHiveDatabaseDir(); + IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg); + String tableName = flatDesc.getTableName()+conf.getMrHiveDictIntermediateTTableSuffix(); + String outPut = dbDir+"/"+tableName+"/dict_column="+BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE; + return outPut; + } + + public String getBuildGlobalDictionaryPartReduceStatsPathV2(String jobId) { + return getBuildGlobalDictionaryBasePath(jobId)+ "/reduce_stats"; + } + + public String getBuildGlobalDictionaryTotalOutput(KylinConfig config){ + String dbDir = config.getHiveDatabaseDir(); + String tableName = EngineFactory.getJoinedFlatTableDesc(seg).getTableName()+config.getMrHiveDictTableSuffix(); + String path = dbDir+"/"+tableName; + return path; + } + public String getDictRootPath(String jobId) { return getRealizationRootPath(jobId) + "/dict"; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java index 371b10b..c463014 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/AbstractHadoopJob.java @@ -107,6 +107,10 @@ public abstract class AbstractHadoopJob extends Configured implements Tool { .isRequired(true).withDescription("Output path").create(BatchConstants.ARG_OUTPUT); protected static final Option OPTION_DICT_PATH = OptionBuilder.withArgName(BatchConstants.ARG_DICT_PATH).hasArg() .isRequired(false).withDescription("Dict path").create(BatchConstants.ARG_DICT_PATH); + protected static final Option OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT = OptionBuilder.withArgName(BatchConstants.ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT).hasArg() + .isRequired(false).withDescription("GLOBAL dic max distinct count path").create(BatchConstants.ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT); + protected static final Option OPTION_GLOBAL_DIC_PART_REDUCE_STATS = OptionBuilder.withArgName(BatchConstants.ARG_GLOBAL_DIC_PART_REDUCE_STATS).hasArg() + .isRequired(false).withDescription("Global dic part reduce stats").create(BatchConstants.ARG_GLOBAL_DIC_PART_REDUCE_STATS); protected static final Option OPTION_NCUBOID_LEVEL = OptionBuilder.withArgName(BatchConstants.ARG_LEVEL).hasArg() .isRequired(true).withDescription("N-Cuboid build level, e.g. 1, 2, 3...").create(BatchConstants.ARG_LEVEL); protected static final Option OPTION_PARTITION_FILE_PATH = OptionBuilder.withArgName(BatchConstants.ARG_PARTITION) diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 3fffad2..6031f3c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -72,6 +72,8 @@ public interface BatchConstants { String CFG_MR_SPARK_JOB = "mr.spark.job"; String CFG_SPARK_META_URL = "spark.meta.url"; String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir"; + String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE="KYLIN_MAX_DISTINCT_COUNT"; + String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum"; @@ -112,6 +114,8 @@ public interface BatchConstants { String ARG_BASE64_ENCODED_STEP_NAME = "base64StepName"; String ARG_SQL_COUNT = "sqlCount"; String ARG_BASE64_ENCODED_SQL = "base64EncodedSql"; + String ARG_GLOBAL_DIC_PART_REDUCE_STATS = "global_dic_part_reduce_stats"; + String ARG_GLOBAL_DIC_MAX_DISTINCT_COUNT = "globa_dicl_max_distinct_count"; /** * logger and counter diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java new file mode 100644 index 0000000..acdbb07 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java @@ -0,0 +1,129 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.IOException; +import org.apache.commons.cli.Options; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; +import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.cube.CubeInstance; +import org.apache.kylin.cube.CubeManager; +import org.apache.kylin.cube.CubeSegment; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.apache.kylin.engine.mr.common.BatchConstants; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDicTotalBuildJob.class); + + @Override + public int run(String[] args) throws Exception { + Options options = new Options(); + String[] dicColsArr = null; + try { + options.addOption(OPTION_JOB_NAME); + options.addOption(OPTION_INPUT_PATH); + options.addOption(OPTION_OUTPUT_PATH); + options.addOption(OPTION_CUBE_NAME); + options.addOption(OPTION_SEGMENT_ID); + options.addOption(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT); + options.addOption(OPTION_GLOBAL_DIC_PART_REDUCE_STATS); + parseOptions(options, args); + + KylinConfig config = KylinConfig.getInstanceFromEnv(); + dicColsArr = config.getMrHiveDictColumnsExcludeRefColumns(); + String cubeName = getOptionValue(OPTION_CUBE_NAME); + String segmentID = getOptionValue(OPTION_SEGMENT_ID); + + job = Job.getInstance(getConf(), getOptionValue(OPTION_JOB_NAME)); + logger.info("Starting: " + job.getJobName()); + + // ---------------------------------------------------------------------------- + // add metadata to distributed cache + CubeManager cubeMgr = CubeManager.getInstance(config); + CubeInstance cube = cubeMgr.getCube(cubeName); + CubeSegment segment = cube.getSegmentById(segmentID); + + job.getConfiguration().set(BatchConstants.CFG_CUBE_NAME, cubeName); + job.getConfiguration().set(BatchConstants.CFG_CUBE_SEGMENT_ID, segmentID); + job.getConfiguration().set("partition.statistics.path", getOptionValue(OPTION_GLOBAL_DIC_PART_REDUCE_STATS)); + job.getConfiguration().set("last.max.dic.value.path", getOptionValue(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT)); + job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false); + + job.setJarByClass(BuildGlobalHiveDicTotalBuildJob.class); + + setJobClasspath(job, cube.getConfig()); + + // Mapper + job.setMapperClass(BuildGlobalHiveDictTotalBuildMapper.class); + + // Input Output + setInput(job, getOptionValue(OPTION_INPUT_PATH)); + setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH)); + + job.setNumReduceTasks(0);//no reduce + + job.setInputFormatClass(KeyValueTextInputFormat.class); + + // prevent to create zero-sized default output + LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); + + //delete output + Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH)); + deletePath(job.getConfiguration(), baseOutputPath); + + attachSegmentMetadataWithDict(segment, job.getConfiguration()); + return waitForCompletion(job); + } finally { + if (job != null) + cleanupTempConfFile(job.getConfiguration()); + } + } + + private void setOutput(Job job, String[] dicColsArry, String outputBase){ + // make each reducer output to respective dir + ///user/prod_kylin/tmp/kylin2/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/bs_order_scene_day_new_cube_clone/dict_column=DM_ES_REPORT_ORDER_VIEW0420_DRIVER_ID/part_sort + for(int i=0;i<dicColsArry.length;i++){ + MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, Text.class, LongWritable.class); + } + Path outputPath = new Path(outputBase); + FileOutputFormat.setOutputPath(job, outputPath); + } + + private void setInput(Job job, String input) throws IOException { + Path path = new Path(input); + FileSystem fs = path.getFileSystem(job.getConfiguration()); + if(!fs.exists(path)){ + fs.mkdirs(path); + } + FileInputFormat.setInputPaths(job, getOptionValue(OPTION_INPUT_PATH)); + } + +} diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java new file mode 100644 index 0000000..6efac31 --- /dev/null +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java @@ -0,0 +1,231 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kylin.engine.mr.steps; + +import java.io.BufferedReader; +import java.io.IOException; +import java.io.InputStreamReader; +import java.util.HashMap; +import java.util.Map; +import java.util.SortedMap; +import java.util.TreeMap; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.lib.input.FileSplit; +import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; +import org.apache.kylin.common.KylinConfig; +import org.apache.kylin.engine.mr.KylinMapper; +import org.apache.kylin.engine.mr.common.AbstractHadoopJob; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMapper<KEYIN, Text, Text, LongWritable> { + private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildMapper.class); + + private MultipleOutputs mos; + private Integer colIndex = null; + private String colName = null; + private Long start = 0L;//start index + private String[] cols = null; + + @Override + protected void doSetup(Context context) throws IOException, InterruptedException { + Configuration conf = context.getConfiguration(); + mos = new MultipleOutputs(context); + + KylinConfig config; + try { + config = AbstractHadoopJob.loadKylinPropsAndMetadata(); + } catch (IOException e) { + throw new RuntimeException(e); + } + cols = config.getMrHiveDictColumnsExcludeRefColumns(); + + + String statPath = conf.get("partition.statistics.path"); + + //get the input file name ,the file name format by colIndex-part-partitionNum, eg: 1-part-000019 + FileSplit fileSplit = (FileSplit) context.getInputSplit(); + String[] arr = fileSplit.getPath().getName().split("-"); + int partitionNum = Integer.parseInt(arr[2]); + colIndex = Integer.parseInt(arr[0]); + colName = cols[colIndex]; + logger.info("Input fileName:{},colIndex:{},colName:{},partitionNum:{}", fileSplit.getPath().getName(), colIndex, colName, partitionNum); + + //last max dic value per column + String lastMaxValuePath = conf.get("last.max.dic.value.path"); + logger.info("last.max.dic.value.path:"+lastMaxValuePath); + long lastMaxDictValue = this.getLastMaxDicValue(conf, lastMaxValuePath); + logger.info("last.max.dic.value.path:"+lastMaxValuePath+",value="+lastMaxDictValue); + + //Calculate the starting position of this file, the starting position of this file = sum (count) of all previous numbers + last max dic value of the column + Map<Integer, TreeMap<Integer, Long>> allStats = getPartitionsCount(conf, statPath);//<colIndex,<reduceNum,count>> + TreeMap<Integer, Long> partitionStats =allStats.get(colIndex); + if(partitionNum!=0) { + SortedMap<Integer, Long> subStat = partitionStats.subMap(0, true, partitionNum, false); + subStat.forEach((k, v)->{ + logger.info("Split num:{} and it's count:{}", k, v); + start += v; + }); + } + start += lastMaxDictValue; + logger.info("global dic.{}.split.num.{} build dict start offset is {}", colName, partitionNum, start); + } + + @Override + public void doMap(KEYIN key, Text record, Context context) throws IOException, InterruptedException { + long inkey = Long.parseLong(key.toString()); + mos.write(colIndex+"", record, new LongWritable(start + inkey), "dict_column="+colName+"/"+colIndex); + } + + @Override + protected void doCleanup(Context context) throws IOException, InterruptedException { + mos.close(); + } + + private Map<Integer, TreeMap<Integer, Long>> getPartitionsCount(Configuration conf, String partitionStatPath) throws IOException { + StringBuffer sb = new StringBuffer(); + String temp=null; + + String[] fileNameArr = null; + String[] statsArr = null; + + //colStats key is last step reduce num,value is last step that reduce item count + TreeMap<Integer, Long> colStats = null; + //allStats key is colIndex,value is colStats(that col statistics info) + Map<Integer, TreeMap<Integer, Long>> allStats = new HashMap<>(); + + Path path = new Path(partitionStatPath); + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path) && fs.isDirectory(path)) { + for (FileStatus status : fs.listStatus(path)) { + //fileNameArr[0] is globaldict colIndex + fileNameArr=status.getPath().getName().split("-"); + colStats = allStats.get(Integer.parseInt(fileNameArr[0])); + if(colStats==null){ + colStats = new TreeMap<>(); + } + temp=cat(status.getPath(), fs); + logger.info("partitionStatPath:{},content:{}", partitionStatPath, temp); + if(temp!=null){ + statsArr = temp.split("\t"); + colStats.put(Integer.parseInt(statsArr[1]), Long.parseLong(statsArr[0])); + allStats.put(Integer.parseInt(fileNameArr[0]), colStats); + } + } + } + + allStats.forEach((k, v)->{ + v.forEach((k1, v1)->{ + logger.info("allStats.colIndex:{},this split num:{},this split num's count:{}", k, k1, v1); + }); + }); + + return allStats; + } + + private String cat(Path remotePath, FileSystem fs) throws IOException { + FSDataInputStream in = null; + BufferedReader buffer = null; + StringBuffer stat= new StringBuffer(); + try { + in= fs.open(remotePath); + buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ; + String line = null; + while ((line = buffer.readLine()) != null) { + stat.append(line); + } + } catch (IOException e) { + e.printStackTrace(); + }finally { + buffer.close(); + in.close(); + } + return stat.toString(); + } + + /** + * + * @param conf + * @param lastMaxDicValuePath, eg: /user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000 + * remotePath content is dict colum stats info of per column: dic column name,extract distinct value count,last max dic value + * @return this colIndex's last max dic value + * @throws IOException + */ + private long getLastMaxDicValue(Configuration conf, String lastMaxDicValuePath) throws IOException { + StringBuffer sb=new StringBuffer(); + Map<Integer, Long> map = null; + Path path = new Path(lastMaxDicValuePath); + FileSystem fs = path.getFileSystem(conf); + if (fs.exists(path) && fs.isDirectory(path)) { + for (FileStatus status : fs.listStatus(path)) { + logger.info("start buildMaxCountMap :"); + map = buildMaxCountMap(status.getPath(), fs); + logger.info("end buildMaxCountMap :"); + + } + } + return map.get(colIndex)==null?0L:map.get(colIndex); + } + + /** + * + * @param remotePath , eg: /user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000 + * remotePath content is dict colum stats info of per column: dic column name,extract distinct value count,last max dic value + * @param fs + * @return Map<>,key is colIndex, value is last max dict value + * @throws IOException + */ + private Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem fs) throws IOException { + FSDataInputStream in = null; + BufferedReader buffer = null; + String[] arr=null; + Map<Integer, Long> map= new HashMap(); + try { + in= fs.open(remotePath); + buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ; + String line = null; + while ((line = buffer.readLine()) != null) { + arr = line.split(","); + logger.info("line="+line+",arr.length:"+arr.length); + if(arr.length==3) { + for (int i = 0; i < cols.length; i++) { + if(cols[i].equalsIgnoreCase(arr[0])) { + map.put(i, Long.parseLong(arr[2])); + logger.info("col.{}.maxValue={}", cols[i], Long.parseLong(arr[2])); + break; + } + } + } + } + } catch (IOException e) { + e.printStackTrace(); + }finally { + buffer.close(); + in.close(); + } + logger.info("BuildMaxCountMap map="+map); + return map; + } +}
