This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit 5731f43fcf350247e76fd7e36b0980d5cf9fc912 Author: XiaoxiangYu <[email protected]> AuthorDate: Thu May 28 23:26:20 2020 +0800 KYLIN-4342 Improve code smell --- .../org/apache/kylin/common/KylinConfigBase.java | 55 +++--- .../kylin/job/constant/ExecutableConstants.java | 12 +- .../bitmap/BitmapIntersectValueAggFunc.java | 10 +- .../kylin/measure/bitmap/BitmapMeasureType.java | 6 +- .../apache/kylin/metadata/model/FunctionDesc.java | 4 + .../kylin/engine/mr/BatchCubingJobBuilder2.java | 53 +++--- .../java/org/apache/kylin/engine/mr/IInput.java | 24 ++- .../apache/kylin/engine/mr/JobBuilderSupport.java | 30 +-- .../kylin/engine/mr/common/BaseCuboidBuilder.java | 8 +- .../kylin/engine/mr/common/BatchConstants.java | 8 +- .../mr/steps/BuildGlobalHiveDictPartBuildJob.java | 45 +++-- .../steps/BuildGlobalHiveDictPartBuildMapper.java | 13 +- ...va => BuildGlobalHiveDictPartBuildReducer.java} | 20 +- ...ava => BuildGlobalHiveDictPartPartitioner.java} | 17 +- ....java => BuildGlobalHiveDictTotalBuildJob.java} | 17 +- .../steps/BuildGlobalHiveDictTotalBuildMapper.java | 93 +++++---- .../engine/spark/SparkBatchCubingJobBuilder2.java | 49 +++-- .../localmeta/cube_desc/ci_inner_join_cube.json | 3 +- kubernetes/README.md | 12 +- .../kylin/source/hive/CreateMrHiveDictStep.java | 60 ++---- .../apache/kylin/source/hive/HiveInputBase.java | 210 +++++++++++---------- .../apache/kylin/source/hive/MRHiveDictUtil.java | 140 +++++++++----- 22 files changed, 476 insertions(+), 413 deletions(-) diff --git a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java index f7f73ac..3429963 100644 --- a/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java +++ b/core-common/src/main/java/org/apache/kylin/common/KylinConfigBase.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.common; @@ -162,7 +162,6 @@ public abstract class KylinConfigBase implements Serializable { } /** - * * @param propertyKeys the collection of the properties; if null will return all properties * @return properties which contained in propertyKeys */ @@ -581,21 +580,28 @@ public abstract class KylinConfigBase implements Serializable { // ============================================================================ - // mr-hive dict + // Hive Global dictionary by mr/hive // ============================================================================ public String[] getMrHiveDictColumns() { String columnStr = getMrHiveDictColumnsStr(); - if (!columnStr.equals("")) { + if (!StringUtils.isEmpty(columnStr)) { return columnStr.split(","); } return new String[0]; } + /** + * @return the hdfs path for Hive Global dictionary table + */ + public String getHiveDatabaseDir() { + return this.getOptional("kylin.source.hive.databasedir", ""); + } + public String[] getMrHiveDictColumnsExcludeRefColumns() { String[] excludeRefCols = null; String[] hiveDictColumns = getMrHiveDictColumns(); Map<String, String> refCols = getMrHiveDictRefColumns(); - if(Objects.nonNull(hiveDictColumns) && hiveDictColumns.length>0) { + if (Objects.nonNull(hiveDictColumns) && hiveDictColumns.length > 0) { excludeRefCols = Arrays.stream(hiveDictColumns).filter(x -> !refCols.containsKey(x)).toArray(String[]::new); } return excludeRefCols; @@ -603,40 +609,40 @@ public abstract class KylinConfigBase implements Serializable { /** * set kylin.dictionary.mr-hive.columns in Cube level config , value are the columns which want to use MR/Hive to build global dict , - * Format, tableAliasName_ColumnName, multiple columns separated by commas,eg KYLIN_SALES_BUYER_ID,KYLIN_SALES_SELLER_ID - * @return if mr-hive dict not enabled, return ""; - * else return {TABLE_NAME}_{COLUMN_NAME1},{TABLE_NAME}_{COLUMN_NAME2}" + * Format, tableAliasName_ColumnName, multiple columns separated by comma,eg KYLIN_SALES_BUYER_ID,KYLIN_SALES_SELLER_ID + * + * @return if mr-hive dict not enabled, return ""; + * else return {TABLE_NAME}_{COLUMN_NAME1},{TABLE_NAME}_{COLUMN_NAME2}" */ private String getMrHiveDictColumnsStr() { return getOptional("kylin.dictionary.mr-hive.columns", ""); } /** - * @return The global dic reduce num per column. Default 2 per column. + * @return The global dic reduce num per column. Default 2 per column. */ public Integer[] getMrHiveDictColumnsReduceNumExcludeRefCols() { String[] excludeRefCols = getMrHiveDictColumnsExcludeRefColumns(); - if(Objects.nonNull(excludeRefCols) && excludeRefCols.length>0) { + if (Objects.nonNull(excludeRefCols) && excludeRefCols.length > 0) { String[] arr = null; Map<String, Integer> colNum = new HashMap<>(); Integer[] reduceNumArr = new Integer[excludeRefCols.length]; String[] columnReduceNum = getMrHiveDictColumnsReduceNumStr().split(","); - //change set columnReduceNum to map struct try { - for(int i=0;i<columnReduceNum.length;i++){ - if(!StringUtils.isBlank(columnReduceNum[i])) { + for (int i = 0; i < columnReduceNum.length; i++) { + if (!StringUtils.isBlank(columnReduceNum[i])) { arr = columnReduceNum[i].split(":"); colNum.put(arr[0], Integer.parseInt(arr[1])); } } - }catch (Exception e){ + } catch (Exception e) { logger.error("set kylin.dictionary.mr-hive.columns.reduce.num error {} , the value should like colAilasName:reduceNum,colAilasName:reduceNum", getMrHiveDictColumnsReduceNumStr()); } for (int i = 0; i < excludeRefCols.length; i++) { - reduceNumArr[i] = colNum.containsKey(excludeRefCols[i])?colNum.get(excludeRefCols[i]): DEFAULT_MR_HIVE_GLOBAL_DICT_REDUCE_NUM_PER_COLUMN; + reduceNumArr[i] = colNum.containsKey(excludeRefCols[i]) ? colNum.get(excludeRefCols[i]) : DEFAULT_MR_HIVE_GLOBAL_DICT_REDUCE_NUM_PER_COLUMN; } Arrays.asList(reduceNumArr).stream().forEach(x -> { @@ -646,7 +652,7 @@ public abstract class KylinConfigBase implements Serializable { }); return reduceNumArr; - }else { + } else { return null; } } @@ -654,15 +660,13 @@ public abstract class KylinConfigBase implements Serializable { /** * Set kylin.dictionary.mr-hive.columns.reduce.num in Cube level config , value are the reduce number for global dict columns which are set in kylin.dictionary.mr-hive.columns. * Format, tableAliasName_ColumnName:number, multiple columns separated by commas,eg KYLIN_SALES_BUYER_ID:5,KYLIN_SALES_SELLER_ID:3 - * @return */ private String getMrHiveDictColumnsReduceNumStr() { return getOptional("kylin.dictionary.mr-hive.columns.reduce.num", ""); } /** - * MR/Hive global domain dic (reuse dict from other cube's MR/Hive global dic column) - * @return + * MR/Hive global domain dictionary (reuse dict from other cube's MR/Hive global dic column) */ public Map<String, String> getMrHiveDictRefColumns() { Map<String, String> result = new HashMap<>(); @@ -670,7 +674,7 @@ public abstract class KylinConfigBase implements Serializable { if (!StringUtils.isEmpty(columnStr)) { String[] pairs = columnStr.split(","); for (String pair : pairs) { - String [] infos = pair.split(":"); + String[] infos = pair.split(":"); result.put(infos[0], infos[1]); } } @@ -685,8 +689,8 @@ public abstract class KylinConfigBase implements Serializable { return getOptional("kylin.dictionary.mr-hive.table.suffix", "_global_dict"); } - public String getMrHiveDictIntermediateTTableSuffix() { - return getOptional("kylin.dictionary.mr-hive.intermediate.table.suffix", "__group_by"); + public String getMrHiveDistinctValueTableSuffix() { + return getOptional("kylin.dictionary.mr-hive.intermediate.table.suffix", "__distinct_value"); } // ============================================================================ @@ -1100,9 +1104,6 @@ public abstract class KylinConfigBase implements Serializable { return this.getOptional("kylin.source.hive.database-for-flat-table", DEFAULT); } - public String getHiveDatabaseDir() { - return this.getOptional("kylin.source.hive.databasedir", ""); - } public String getFlatTableStorageFormat() { return this.getOptional("kylin.source.hive.flat-table-storage-format", "SEQUENCEFILE"); @@ -2326,7 +2327,7 @@ public abstract class KylinConfigBase implements Serializable { return getPropertiesByPrefix("kylin.metrics."); } - public int printSampleEventRatio(){ + public int printSampleEventRatio() { String val = getOptional("kylin.metrics.kafka-sample-ratio", "10000"); return Integer.parseInt(val); } @@ -2558,7 +2559,7 @@ public abstract class KylinConfigBase implements Serializable { return (getOptional("kylin.stream.event.timezone", "")); } - public boolean isAutoResubmitDiscardJob(){ + public boolean isAutoResubmitDiscardJob() { return Boolean.parseBoolean(getOptional("kylin.stream.auto-resubmit-after-discard-enabled", "true")); } diff --git a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java index 8d4de09..576f4bf 100644 --- a/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java +++ b/core-job/src/main/java/org/apache/kylin/job/constant/ExecutableConstants.java @@ -84,12 +84,12 @@ public final class ExecutableConstants { public static final String STEP_NAME_STREAMING_BUILD_BASE_CUBOID = "Build Base Cuboid Data For Streaming Job"; public static final String STEP_NAME_STREAMING_SAVE_DICTS = "Save Cube Dictionaries"; - // MR - Hive Dict - public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Build Global Dict - extract distinct value from data"; - public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = "Build Global Dict - parallel part build"; - public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = "Build Global Dict - parallel total build"; - public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Build Global Dict - merge to dict table"; - public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Global Dict - replace intermediate table"; + // Hive Global Dictionary built by MR + public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL = "Build Hive Global Dict - extract distinct value"; + public static final String STEP_NAME_GLOBAL_DICT_PART_BUILD_DICTVAL = "Build Hive Global Dict - parallel part build"; + public static final String STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL = "Build Hive Global Dict - parallel total build"; + public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL = "Build Hive Global Dict - merge to dict table"; + public static final String STEP_NAME_GLOBAL_DICT_MRHIVE_REPLACE_DICTVAL = "Build Hive Global Dict - replace intermediate table"; public static final String FLINK_SPECIFIC_CONFIG_NAME_MERGE_DICTIONARY = "mergedict"; } diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java index 7ec21b5..2ab4313 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapIntersectValueAggFunc.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.measure.bitmap; import java.util.List; @@ -22,10 +22,7 @@ import java.util.List; import org.apache.kylin.measure.ParamAsMeasureCount; /** - * BitmapIntersectDistinctCountAggFunc is an UDAF used for calculating the intersection of two or more bitmaps - * Usage: intersect_count(columnToCount, columnToFilter, filterList) - * Example: intersect_count(uuid, event, array['A', 'B', 'C']), meaning find the count of uuid in all A/B/C 3 bitmaps - * requires an bitmap count distinct measure of uuid, and an dimension of event + * */ public class BitmapIntersectValueAggFunc implements ParamAsMeasureCount { @@ -50,5 +47,4 @@ public class BitmapIntersectValueAggFunc implements ParamAsMeasureCount { public static String result(RetentionPartialResult result) { return result.valueResult(); } -} - +} \ No newline at end of file diff --git a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java index 70a64ea..9d95584 100644 --- a/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java +++ b/core-metadata/src/main/java/org/apache/kylin/measure/bitmap/BitmapMeasureType.java @@ -112,8 +112,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { int id; TblColRef literalCol = measureDesc.getFunction().getParameter().getColRefs().get(0); if (needDictionaryColumn(measureDesc.getFunction()) && dictionaryMap.containsKey(literalCol)) { - Dictionary<String> dictionary = dictionaryMap.get(literalCol); - id = dictionary.getIdFromValue(values[0]); + Dictionary<String> dictionary = dictionaryMap.get(literalCol); + id = dictionary.getIdFromValue(values[0]); } else { id = Integer.parseInt(values[0]); } @@ -153,6 +153,8 @@ public class BitmapMeasureType extends MeasureType<BitmapCounter> { private boolean needDictionaryColumn(FunctionDesc functionDesc) { DataType dataType = functionDesc.getParameter().getColRefs().get(0).getType(); if (functionDesc.isMrDict()) { + // If isMrDict set to true, it means related column has been + // encoded in previous step by Hive Global Dictionary return false; } if (dataType.isIntegerFamily() && !dataType.isBigInt()) { diff --git a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java index 93b4064..c4d002b 100644 --- a/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java +++ b/core-metadata/src/main/java/org/apache/kylin/metadata/model/FunctionDesc.java @@ -88,6 +88,10 @@ public class FunctionDesc implements Serializable { private DataType returnDataType; private MeasureType<?> measureType; private boolean isDimensionAsMetric = false; + + /** + * The flag of Hive Global Dictionary for COUNT_DISTINCT + */ private boolean isMrDict = false; public boolean isMrDict() { diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java index 8ec7d36..47f709d 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/BatchCubingJobBuilder2.java @@ -6,20 +6,21 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.engine.mr; import java.util.List; import java.util.Objects; + import org.apache.kylin.common.KylinConfig; import org.apache.kylin.cube.CubeSegment; import org.apache.kylin.cube.cuboid.CuboidUtil; @@ -59,24 +60,8 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables inputSide.addStepPhase1_CreateFlatTable(result); - // build global dict - KylinConfig dictConfig = seg.getConfig(); - String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns(); - - if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 - && !"".equals(mrHiveDictColumns[0])) { - - //parallel part build - result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId)); - - //parallel total build - result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId)); - } - - //merge global dic and replace flat table - if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){ - inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result); - } + // Build global dictionary in distributed way + buildHiveGlobalDictionaryByMR(result, jobId); // Phase 2: Build Dictionary result.addTask(createFactDistinctColumnsStep(jobId)); @@ -106,7 +91,7 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { result.addTask(createUpdateCubeInfoAfterBuildStep(jobId, lookupMaterializeContext)); inputSide.addStepPhase4_Cleanup(result); outputSide.addStepPhase4_Cleanup(result); - + // Set the task priority if specified result.setPriorityBasedOnPriorityOffset(priorityOffset); result.getTasks().forEach(task -> task.setPriorityBasedOnPriorityOffset(priorityOffset)); @@ -216,6 +201,30 @@ public class BatchCubingJobBuilder2 extends JobBuilderSupport { return ndCuboidStep; } + /** + * Build hive global dictionary by MR and encode corresponding column into integer for flat table + */ + protected void buildHiveGlobalDictionaryByMR(final CubingJob result, String jobId) { + KylinConfig dictConfig = seg.getConfig(); + String[] mrHiveDictColumnExcludeRef = dictConfig.getMrHiveDictColumnsExcludeRefColumns(); + String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns(); + + if (Objects.nonNull(mrHiveDictColumnExcludeRef) && mrHiveDictColumnExcludeRef.length > 0 + && !"".equals(mrHiveDictColumnExcludeRef[0])) { + + // 1. parallel part build + result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId)); + + // 2. parallel total build + result.addTask(createBuildGlobalHiveDictTotalBuildJob(jobId)); + } + + // Merge new dictionary entry into global dictionary and replace/encode flat table + if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 && !"".equals(mrHiveDictColumns[0])) { + inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result); + } + } + protected Class<? extends AbstractHadoopJob> getNDCuboidJob() { return NDCuboidJob.class; } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java index 9fdb300..2775cb7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/IInput.java @@ -24,26 +24,38 @@ import org.apache.kylin.metadata.model.ISegment; public interface IInput { - /** Return a helper to participate in batch cubing job flow. */ + /** + * Return a helper to participate in batch cubing job flow. + */ public IBatchCubingInputSide getBatchCubingInputSide(IJoinedFlatTableDesc flatDesc); - /** Return a helper to participate in batch cubing merge job flow. */ + /** + * Return a helper to participate in batch cubing merge job flow. + */ public IBatchMergeInputSide getBatchMergeInputSide(ISegment seg); public interface IBatchCubingInputSide { - /** Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc */ + /** + * Add step that creates an intermediate flat table as defined by CubeJoinedFlatTableDesc + */ public void addStepPhase1_CreateFlatTable(DefaultChainedExecutable jobFlow); - /** Add step that replace flat table global column value by global dic*/ + /** + * An optional step that replace/encode flat table with Hive Global Dictionary + */ public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow); - /** Add step that does necessary clean up, like delete the intermediate flat table */ + /** + * Add step that does necessary clean up, like delete the intermediate flat table + */ public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow); } public interface IBatchMergeInputSide { - /** Add step that executes before merge dictionary and before merge cube. */ + /** + * Add step that executes before merge dictionary and before merge cube. + */ public void addStepPhase1_MergeDictionary(DefaultChainedExecutable jobFlow); } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java index a597279..479db86 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/JobBuilderSupport.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.engine.mr; @@ -37,7 +37,7 @@ import org.apache.kylin.engine.EngineFactory; import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.common.HadoopShellExecutable; import org.apache.kylin.engine.mr.common.MapReduceExecutable; -import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDicTotalBuildJob; +import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictTotalBuildJob; import org.apache.kylin.engine.mr.steps.BuildGlobalHiveDictPartBuildJob; import org.apache.kylin.engine.mr.steps.CalculateStatsFromBaseCuboidJob; import org.apache.kylin.engine.mr.steps.CreateDictionaryJob; @@ -155,7 +155,7 @@ public class JobBuilderSupport { } public MapReduceExecutable createCalculateStatsFromBaseCuboid(String inputPath, String outputPath, - CuboidModeEnum cuboidMode) { + CuboidModeEnum cuboidMode) { MapReduceExecutable result = new MapReduceExecutable(); result.setName(ExecutableConstants.STEP_NAME_CALCULATE_STATS_FROM_BASE_CUBOID); result.setMapReduceJobClass(CalculateStatsFromBaseCuboidJob.class); @@ -224,10 +224,10 @@ public class JobBuilderSupport { return result; } - public MapReduceExecutable createBuildGlobalHiveDicTotalBuildJob(String jobId) { + public MapReduceExecutable createBuildGlobalHiveDictTotalBuildJob(String jobId) { MapReduceExecutable result = new MapReduceExecutable(); result.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_TOTAL_BUILD_DICTVAL); - result.setMapReduceJobClass(BuildGlobalHiveDicTotalBuildJob.class); + result.setMapReduceJobClass(BuildGlobalHiveDictTotalBuildJob.class); StringBuilder cmd = new StringBuilder(); appendMapReduceParameters(cmd); appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName()); @@ -384,26 +384,26 @@ public class JobBuilderSupport { } public String getBuildGlobalHiveDicTotalBuildJobInputPath(String jobId) { - return getBuildGlobalDictionaryBasePath(jobId)+"/part_sort"; + return getBuildGlobalDictionaryBasePath(jobId) + "/part_sort"; } public String getBuildGlobalDictionaryMaxDistinctCountPath(String jobId) { KylinConfig conf = seg.getConfig(); String dbDir = conf.getHiveDatabaseDir(); IJoinedFlatTableDesc flatDesc = EngineFactory.getJoinedFlatTableDesc(seg); - String tableName = flatDesc.getTableName()+conf.getMrHiveDictIntermediateTTableSuffix(); - String outPut = dbDir+"/"+tableName+"/dict_column="+BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE; + String tableName = flatDesc.getTableName() + conf.getMrHiveDistinctValueTableSuffix(); + String outPut = dbDir + "/" + tableName + "/dict_column=" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE; return outPut; } public String getBuildGlobalDictionaryPartReduceStatsPathV2(String jobId) { - return getBuildGlobalDictionaryBasePath(jobId)+ "/reduce_stats"; + return getBuildGlobalDictionaryBasePath(jobId) + "/reduce_stats"; } - public String getBuildGlobalDictionaryTotalOutput(KylinConfig config){ + public String getBuildGlobalDictionaryTotalOutput(KylinConfig config) { String dbDir = config.getHiveDatabaseDir(); - String tableName = EngineFactory.getJoinedFlatTableDesc(seg).getTableName()+config.getMrHiveDictTableSuffix(); - String path = dbDir+"/"+tableName; + String tableName = EngineFactory.getJoinedFlatTableDesc(seg).getTableName() + config.getMrHiveDictTableSuffix(); + String path = dbDir + "/" + tableName; return path; } @@ -509,7 +509,7 @@ public class JobBuilderSupport { List<FileStatus> outputs = Lists.newArrayList(); scanFiles(input, fs, outputs); long size = 0L; - for (FileStatus stat: outputs) { + for (FileStatus stat : outputs) { size += stat.getLen(); } return size; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java index aa377ed..baf415f 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BaseCuboidBuilder.java @@ -73,7 +73,7 @@ public class BaseCuboidBuilder implements java.io.Serializable { measureCodec = new BufferedMeasureCodec(measureDescList); kvBuilder = new KeyValueBuilder(intermediateTableDesc); - checkMrDictClolumn(); + checkHiveGlobalDictionaryColumn(); } public BaseCuboidBuilder(KylinConfig kylinConfig, CubeDesc cubeDesc, CubeSegment cubeSegment, @@ -92,7 +92,7 @@ public class BaseCuboidBuilder implements java.io.Serializable { measureCodec = new BufferedMeasureCodec(measureDescList); kvBuilder = new KeyValueBuilder(intermediateTableDesc); - checkMrDictClolumn(); + checkHiveGlobalDictionaryColumn(); } public byte[] buildKey(String[] flatRow) { @@ -121,7 +121,7 @@ public class BaseCuboidBuilder implements java.io.Serializable { } } - private void checkMrDictClolumn(){ + private void checkHiveGlobalDictionaryColumn(){ Set<String> mrDictColumnSet = new HashSet<>(); if (kylinConfig.getMrHiveDictColumns() != null) { Collections.addAll(mrDictColumnSet, kylinConfig.getMrHiveDictColumns()); @@ -133,7 +133,7 @@ public class BaseCuboidBuilder implements java.io.Serializable { TblColRef colRef = functionDesc.getParameter().getColRefs().get(0); if (mrDictColumnSet.contains(JoinedFlatTable.colName(colRef, true))) { functionDesc.setMrDict(true); - logger.info("setMrDict for {}", colRef); + logger.info("Enable hive global dictionary for {}", colRef); measure.setFunction(functionDesc); } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java index 6031f3c..f8ab007 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/common/BatchConstants.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.engine.mr.common; @@ -72,7 +72,7 @@ public interface BatchConstants { String CFG_MR_SPARK_JOB = "mr.spark.job"; String CFG_SPARK_META_URL = "spark.meta.url"; String CFG_GLOBAL_DICT_BASE_DIR = "global.dict.base.dir"; - String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE="KYLIN_MAX_DISTINCT_COUNT"; + String CFG_GLOBAL_DICT_STATS_PARTITION_VALUE = "KYLIN_MAX_DISTINCT_COUNT"; String CFG_HLL_REDUCER_NUM = "cuboidHLLCounterReducerNum"; diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java index 07b0824..c51cd11 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildJob.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; + import org.apache.commons.cli.Options; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.LongWritable; @@ -47,7 +48,7 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob { @Override public int run(String[] args) throws Exception { Options options = new Options(); - String[] dicColsArr=null; + String[] dicColsArr = null; try { options.addOption(OPTION_JOB_NAME); @@ -78,7 +79,7 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob { setJobClasspath(job, cube.getConfig()); //FileInputFormat.setInputPaths(job, input); - setInputput(job, dicColsArr, getInputPath(config, segment)); + setInput(job, dicColsArr, getInputPath(config, segment)); // make each reducer output to respective dir setOutput(job, dicColsArr, getOptionValue(OPTION_OUTPUT_PATH)); @@ -94,15 +95,14 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob { job.setOutputValueClass(Text.class); job.setMapperClass(BuildGlobalHiveDictPartBuildMapper.class); - - job.setPartitionerClass(BuildGlobalHiveDicPartPartitioner.class); - job.setReducerClass(BuildGlobalHiveDicPartBuildReducer.class); + job.setPartitionerClass(BuildGlobalHiveDictPartPartitioner.class); + job.setReducerClass(BuildGlobalHiveDictPartBuildReducer.class); // prevent to create zero-sized default output LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); - //delete output - Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH)); + // delete output + Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH)); deletePath(job.getConfiguration(), baseOutputPath); attachSegmentMetadataWithDict(segment, job.getConfiguration()); @@ -113,44 +113,41 @@ public class BuildGlobalHiveDictPartBuildJob extends AbstractHadoopJob { } } - private void setOutput(Job job, String[] dicColsArry, String outputBase){ + private void setOutput(Job job, String[] dicColsArr, String outputBase) { // make each reducer output to respective dir - //eg: /user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort - for(int i=0;i<dicColsArry.length;i++){ + // eg: /user/kylin/tmp/kylin/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/kylin_sales_cube_mr/dict_column=KYLIN_SALES_SELLER_ID/part_sort + for (int i = 0; i < dicColsArr.length; i++) { MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, LongWritable.class, Text.class); } - Path outputPath=new Path(outputBase); + Path outputPath = new Path(outputBase); FileOutputFormat.setOutputPath(job, outputPath); } - private void setInputput(Job job, String[] dicColsArray, String inputBase) throws IOException { - StringBuffer paths=new StringBuffer(); + private void setInput(Job job, String[] dicColsArray, String inputBase) throws IOException { + StringBuffer paths = new StringBuffer(); // make each reducer output to respective dir - for(String col:dicColsArray){ + for (String col : dicColsArray) { paths.append(inputBase).append("/dict_column=").append(col).append(","); } - paths.delete(paths.length() - 1, paths.length()); FileInputFormat.setInputPaths(job, paths.toString()); - } - private void setReduceNum(Job job, KylinConfig config){ + private void setReduceNum(Job job, KylinConfig config) { Integer[] reduceNumArr = config.getMrHiveDictColumnsReduceNumExcludeRefCols(); int totalReduceNum = 0; - for(Integer num:reduceNumArr){ - totalReduceNum +=num; + for (Integer num : reduceNumArr) { + totalReduceNum += num; } logger.info("BuildGlobalHiveDictPartBuildJob total reduce num is {}", totalReduceNum); job.setNumReduceTasks(totalReduceNum); } - private String getInputPath(KylinConfig config, CubeSegment segment){ + private String getInputPath(KylinConfig config, CubeSegment segment) { String dbDir = config.getHiveDatabaseDir(); - String tableName = EngineFactory.getJoinedFlatTableDesc(segment).getTableName()+config.getMrHiveDictIntermediateTTableSuffix(); - String input = dbDir+"/"+tableName; - logger.info("part build base input path:"+input); + String tableName = EngineFactory.getJoinedFlatTableDesc(segment).getTableName() + config.getMrHiveDistinctValueTableSuffix(); + String input = dbDir + "/" + tableName; + logger.info("part build base input path:" + input); return input; } - } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java index 54708f3..76c73f9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildMapper.java @@ -20,6 +20,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; import java.nio.ByteBuffer; + import org.apache.hadoop.io.NullWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.lib.input.FileSplit; @@ -62,12 +63,12 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapp String colName = name.split("=")[1]; logger.info("this map build col name :{}", colName); - for(int i=0;i<dicCols.length;i++){ - if(dicCols[i].equalsIgnoreCase(colName)){ - colIndex=i; + for (int i = 0; i < dicCols.length; i++) { + if (dicCols[i].equalsIgnoreCase(colName)) { + colIndex = i; } } - if(colIndex<0 || colIndex>127){ + if (colIndex < 0 || colIndex > 127) { logger.error("kylin.dictionary.mr-hive.columns colIndex :{} error ", colIndex); logger.error("kylin.dictionary.mr-hive.columns set error,mr-hive columns's count should less than 128"); } @@ -77,7 +78,7 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapp @Override public void doMap(KEYIN key, Object record, Context context) throws IOException, InterruptedException { - count ++; + count++; writeFieldValue(context, key.toString()); } @@ -94,7 +95,7 @@ public class BuildGlobalHiveDictPartBuildMapper<KEYIN, Object> extends KylinMapp tmpbuf.put(valueBytes); outputKey.set(tmpbuf.array(), 0, tmpbuf.position()); context.write(outputKey, NullWritable.get()); - if(count<10){ + if (count < 10) { logger.info("colIndex:{},input key:{}", colIndex, value); } } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java similarity index 86% rename from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java rename to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java index 8cdd8f1..54cd4b9 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartBuildReducer.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartBuildReducer.java @@ -14,11 +14,12 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.engine.mr.steps; import java.io.IOException; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Text; @@ -31,11 +32,11 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongWritable, LongWritable, Text> { +public class BuildGlobalHiveDictPartBuildReducer extends KylinReducer<Text, LongWritable, LongWritable, Text> { - private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDicPartBuildReducer.class); + private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictPartBuildReducer.class); - private Long count=0L; + private Long count = 0L; private MultipleOutputs mos; private String[] dicCols; private String colName; @@ -58,16 +59,16 @@ public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongW throws IOException, InterruptedException { count++; byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1); - if(count==1){ - colIndex = key.getBytes()[0];//col index + if (count == 1) { + colIndex = key.getBytes()[0]; colName = dicCols[colIndex]; } - if(count<10){ + if (count < 10) { logger.info("key:{}, temp dict num :{}, colIndex:{}, colName:{}", key.toString(), count, colIndex, colName); } - mos.write(colIndex+"", new LongWritable(count), new Text(keyBytes), "part_sort/"+colIndex); + mos.write(colIndex + "", new LongWritable(count), new Text(keyBytes), "part_sort/" + colIndex); } @Override @@ -77,7 +78,6 @@ public class BuildGlobalHiveDicPartBuildReducer extends KylinReducer<Text, LongW String partition = conf.get(MRJobConfig.TASK_PARTITION); mos.write(colIndex + "", new LongWritable(count), new Text(partition), "reduce_stats/" + colIndex); mos.close(); - logger.info("Reduce partition num {} finish, this reduce done item count is {}" , partition, count); + logger.info("Reduce partition num {} finish, this reduce done item count is {}", partition, count); } - } diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java similarity index 79% rename from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java rename to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java index 97ad4f4..8858e20 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicPartPartitioner.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictPartPartitioner.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; + import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.io.NullWritable; @@ -28,7 +29,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.util.Bytes; import org.apache.kylin.engine.mr.common.AbstractHadoopJob; -public class BuildGlobalHiveDicPartPartitioner extends Partitioner<Text, NullWritable> implements Configurable { +public class BuildGlobalHiveDictPartPartitioner extends Partitioner<Text, NullWritable> implements Configurable { private Configuration conf; private Integer[] reduceNumArr; @@ -49,21 +50,19 @@ public class BuildGlobalHiveDicPartPartitioner extends Partitioner<Text, NullWri @Override public int getPartition(Text key, NullWritable value, int numReduceTasks) { - //get first byte, the first byte value is the dic col index ,start from 0 + // get first byte, the first byte value is the dic col index ,start from 0 int colIndex = key.getBytes()[0]; int colReduceNum = reduceNumArr[colIndex]; int colReduceNumOffset = 0; - for (int i=0;i<colIndex;i++){ - colReduceNumOffset += reduceNumArr[i] ; + for (int i = 0; i < colIndex; i++) { + colReduceNumOffset += reduceNumArr[i]; } - //Calculate reduce number , reduce num = (value.hash % colReduceNum) + colReduceNumOffset + // Calculate reduce number , reduce num = (value.hash % colReduceNum) + colReduceNumOffset byte[] keyBytes = Bytes.copy(key.getBytes(), 1, key.getLength() - 1); - int hashCode = new Text(keyBytes).hashCode() & 0x7FFFFFFF ; - int reduceNo = hashCode % colReduceNum + colReduceNumOffset; - - return reduceNo; + int hashCode = new Text(keyBytes).hashCode() & 0x7FFFFFFF; + return hashCode % colReduceNum + colReduceNumOffset; } @Override diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java similarity index 92% rename from engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java rename to engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java index acdbb07..20bdfc7 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDicTotalBuildJob.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildJob.java @@ -19,6 +19,7 @@ package org.apache.kylin.engine.mr.steps; import java.io.IOException; + import org.apache.commons.cli.Options; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -40,8 +41,8 @@ import org.apache.kylin.engine.mr.common.BatchConstants; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob { - protected static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDicTotalBuildJob.class); +public class BuildGlobalHiveDictTotalBuildJob extends AbstractHadoopJob { + protected static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildJob.class); @Override public int run(String[] args) throws Exception { @@ -77,7 +78,7 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob { job.getConfiguration().set("last.max.dic.value.path", getOptionValue(OPTION_GLOBAL_DIC_MAX_DISTINCT_COUNT)); job.getConfiguration().setBoolean("mapreduce.output.fileoutputformat.compress", false); - job.setJarByClass(BuildGlobalHiveDicTotalBuildJob.class); + job.setJarByClass(BuildGlobalHiveDictTotalBuildJob.class); setJobClasspath(job, cube.getConfig()); @@ -95,8 +96,8 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob { // prevent to create zero-sized default output LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class); - //delete output - Path baseOutputPath =new Path(getOptionValue(OPTION_OUTPUT_PATH)); + // delete output + Path baseOutputPath = new Path(getOptionValue(OPTION_OUTPUT_PATH)); deletePath(job.getConfiguration(), baseOutputPath); attachSegmentMetadataWithDict(segment, job.getConfiguration()); @@ -107,10 +108,10 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob { } } - private void setOutput(Job job, String[] dicColsArry, String outputBase){ + private void setOutput(Job job, String[] dicColsArr, String outputBase) { // make each reducer output to respective dir ///user/prod_kylin/tmp/kylin2/globaldic_test/kylin-188c9f9d_dabb_944e_9f20_99dc95be66e6/bs_order_scene_day_new_cube_clone/dict_column=DM_ES_REPORT_ORDER_VIEW0420_DRIVER_ID/part_sort - for(int i=0;i<dicColsArry.length;i++){ + for (int i = 0; i < dicColsArr.length; i++) { MultipleOutputs.addNamedOutput(job, i + "", TextOutputFormat.class, Text.class, LongWritable.class); } Path outputPath = new Path(outputBase); @@ -120,7 +121,7 @@ public class BuildGlobalHiveDicTotalBuildJob extends AbstractHadoopJob { private void setInput(Job job, String input) throws IOException { Path path = new Path(input); FileSystem fs = path.getFileSystem(job.getConfiguration()); - if(!fs.exists(path)){ + if (!fs.exists(path)) { fs.mkdirs(path); } FileInputFormat.setInputPaths(job, getOptionValue(OPTION_INPUT_PATH)); diff --git a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java index b2252c0..8af341c 100644 --- a/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java +++ b/engine-mr/src/main/java/org/apache/kylin/engine/mr/steps/BuildGlobalHiveDictTotalBuildMapper.java @@ -21,10 +21,12 @@ package org.apache.kylin.engine.mr.steps; import java.io.BufferedReader; import java.io.IOException; import java.io.InputStreamReader; +import java.nio.charset.StandardCharsets; import java.util.HashMap; import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; + import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileStatus; @@ -40,7 +42,7 @@ import org.apache.kylin.engine.mr.common.AbstractHadoopJob; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMapper<KEYIN, Text, Text, LongWritable> { +public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, KEYOUT> extends KylinMapper<KEYIN, Text, Text, LongWritable> { private static final Logger logger = LoggerFactory.getLogger(BuildGlobalHiveDictTotalBuildMapper.class); private MultipleOutputs mos; @@ -65,26 +67,26 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap String statPath = conf.get("partition.statistics.path"); - //get the input file name ,the file name format by colIndex-part-partitionNum, eg: 1-part-000019 + // get the input file name ,the file name format by colIndex-part-partitionNum, eg: 1-part-000019 FileSplit fileSplit = (FileSplit) context.getInputSplit(); String[] arr = fileSplit.getPath().getName().split("-"); int partitionNum = Integer.parseInt(arr[2]); colIndex = Integer.parseInt(arr[0]); colName = cols[colIndex]; - logger.info("Input fileName:{},colIndex:{},colName:{},partitionNum:{}", fileSplit.getPath().getName(), colIndex, colName, partitionNum); + logger.info("Input fileName:{}, colIndex:{}, colName:{}, partitionNum:{}", fileSplit.getPath().getName(), colIndex, colName, partitionNum); //last max dic value per column String lastMaxValuePath = conf.get("last.max.dic.value.path"); - logger.info("last.max.dic.value.path:"+lastMaxValuePath); + logger.info("last.max.dic.value.path:" + lastMaxValuePath); long lastMaxDictValue = this.getLastMaxDicValue(conf, lastMaxValuePath); - logger.info("last.max.dic.value.path:"+lastMaxValuePath+",value="+lastMaxDictValue); + logger.info("last.max.dic.value.path:" + lastMaxValuePath + ",value=" + lastMaxDictValue); - //Calculate the starting position of this file, the starting position of this file = sum (count) of all previous numbers + last max dic value of the column - Map<Integer, TreeMap<Integer, Long>> allStats = getPartitionsCount(conf, statPath);//<colIndex,<reduceNum,count>> - TreeMap<Integer, Long> partitionStats =allStats.get(colIndex); - if(partitionNum!=0) { + // Calculate the starting position of this file, the starting position of this file = sum (count) of all previous numbers + last max dic value of the column + Map<Integer, TreeMap<Integer, Long>> allStats = getPartitionsCount(conf, statPath); //<colIndex,<reduceNum,count>> + TreeMap<Integer, Long> partitionStats = allStats.get(colIndex); + if (partitionNum != 0) { SortedMap<Integer, Long> subStat = partitionStats.subMap(0, true, partitionNum, false); - subStat.forEach((k, v)->{ + subStat.forEach((k, v) -> { logger.info("Split num:{} and it's count:{}", k, v); start += v; }); @@ -96,7 +98,7 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap @Override public void doMap(KEYIN key, Text record, Context context) throws IOException, InterruptedException { long inkey = Long.parseLong(key.toString()); - mos.write(colIndex+"", record, new LongWritable(start + inkey), "dict_column="+colName+"/"+colIndex); + mos.write(colIndex + "", record, new LongWritable(start + inkey), "dict_column=" + colName + "/" + colIndex); } @Override @@ -106,7 +108,7 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap private Map<Integer, TreeMap<Integer, Long>> getPartitionsCount(Configuration conf, String partitionStatPath) throws IOException { StringBuffer sb = new StringBuffer(); - String temp=null; + String temp = null; String[] fileNameArr = null; String[] statsArr = null; @@ -121,14 +123,14 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap if (fs.exists(path) && fs.isDirectory(path)) { for (FileStatus status : fs.listStatus(path)) { //fileNameArr[0] is globaldict colIndex - fileNameArr=status.getPath().getName().split("-"); + fileNameArr = status.getPath().getName().split("-"); colStats = allStats.get(Integer.parseInt(fileNameArr[0])); - if(colStats==null){ + if (colStats == null) { colStats = new TreeMap<>(); } - temp=cat(status.getPath(), fs); + temp = cat(status.getPath(), fs); logger.info("partitionStatPath:{},content:{}", partitionStatPath, temp); - if(temp!=null){ + if (temp != null) { statsArr = temp.split("\t"); colStats.put(Integer.parseInt(statsArr[1]), Long.parseLong(statsArr[0])); allStats.put(Integer.parseInt(fileNameArr[0]), colStats); @@ -136,8 +138,8 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap } } - allStats.forEach((k, v)->{ - v.forEach((k1, v1)->{ + allStats.forEach((k, v) -> { + v.forEach((k1, v1) -> { logger.info("allStats.colIndex:{},this split num:{},this split num's count:{}", k, k1, v1); }); }); @@ -148,21 +150,21 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap private String cat(Path remotePath, FileSystem fs) throws IOException { FSDataInputStream in = null; BufferedReader buffer = null; - StringBuffer stat= new StringBuffer(); + StringBuffer stat = new StringBuffer(); try { - in= fs.open(remotePath); - buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ; + in = fs.open(remotePath); + buffer = new BufferedReader(new InputStreamReader(in, "UTF-8")); String line = null; while ((line = buffer.readLine()) != null) { stat.append(line); } } catch (IOException e) { e.printStackTrace(); - }finally { - if(buffer!=null) { + } finally { + if (buffer != null) { buffer.close(); } - if(in!=null) { + if (in != null) { in.close(); } } @@ -170,15 +172,12 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap } /** - * - * @param conf * @param lastMaxDicValuePath eg: /user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000 - * remotePath content is dict colum stats info of per column: dic column name,extract distinct value count,last max dic value + * remotePath content is dict column stats info of per column: dic column name,extract distinct value count,last max dic value * @return this colIndex's last max dic value - * @throws IOException */ private long getLastMaxDicValue(Configuration conf, String lastMaxDicValuePath) throws IOException { - StringBuffer sb=new StringBuffer(); + StringBuffer sb = new StringBuffer(); Map<Integer, Long> map = null; Path path = new Path(lastMaxDicValuePath); FileSystem fs = path.getFileSystem(conf); @@ -187,39 +186,35 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap logger.info("start buildMaxCountMap :"); map = buildMaxCountMap(status.getPath(), fs); logger.info("end buildMaxCountMap :"); - } } - if(map == null){ + if (map == null) { return 0L; - }else{ - return map.get(colIndex)==null?0L:map.get(colIndex); + } else { + return map.get(colIndex) == null ? 0L : map.get(colIndex); } } /** - * * @param remotePath , eg: /user/kylin/warehouse/db/kylin_intermediate_kylin_sales_cube_mr_6222c210_ce2d_e8ce_dd0f_f12c38fa9115__group_by/dict_column=KYLIN_MAX_DISTINCT_COUNT/part-00000-450ee120-39ff-4806-afaf-ed482ceffc68-c000 - * remotePath content is dict colum stats info of per column: dic column name,extract distinct value count,last max dic value - * @param fs + * remotePath content is dict column stats info of per column: dic column name,extract distinct value count,last max dic value * @return Map<>,key is colIndex, value is last max dict value - * @throws IOException */ - private Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem fs) throws IOException { + private Map<Integer, Long> buildMaxCountMap(Path remotePath, FileSystem fs) throws IOException { FSDataInputStream in = null; BufferedReader buffer = null; - String[] arr=null; - Map<Integer, Long> map= new HashMap(); + String[] arr = null; + Map<Integer, Long> map = new HashMap<>(); try { - in= fs.open(remotePath); - buffer= new BufferedReader(new InputStreamReader(in, "UTF-8")) ; + in = fs.open(remotePath); + buffer = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); String line = null; while ((line = buffer.readLine()) != null) { arr = line.split(","); - logger.info("line="+line+",arr.length:"+arr.length); - if(arr.length==3) { + logger.info("line=" + line + ",arr.length:" + arr.length); + if (arr.length == 3) { for (int i = 0; i < cols.length; i++) { - if(cols[i].equalsIgnoreCase(arr[0])) { + if (cols[i].equalsIgnoreCase(arr[0])) { map.put(i, Long.parseLong(arr[2])); logger.info("col.{}.maxValue={}", cols[i], Long.parseLong(arr[2])); break; @@ -229,15 +224,15 @@ public class BuildGlobalHiveDictTotalBuildMapper<KEYIN, Object> extends KylinMap } } catch (IOException e) { e.printStackTrace(); - }finally { - if(buffer!=null) { + } finally { + if (buffer != null) { buffer.close(); } - if(in!=null) { + if (in != null) { in.close(); } } - logger.info("BuildMaxCountMap map="+map); + logger.info("BuildMaxCountMap map=" + map); return map; } } diff --git a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java index 9309a3d..7d6a367 100644 --- a/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java +++ b/engine-spark/src/main/java/org/apache/kylin/engine/spark/SparkBatchCubingJobBuilder2.java @@ -14,7 +14,7 @@ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.engine.spark; @@ -37,6 +37,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; /** + * */ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { @@ -48,7 +49,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter) { this(newSegment, submitter, 0); } - + public SparkBatchCubingJobBuilder2(CubeSegment newSegment, String submitter, Integer priorityOffset) { super(newSegment, submitter, priorityOffset); this.inputSide = SparkUtil.getBatchCubingInputSide(seg); @@ -65,22 +66,8 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { // Phase 1: Create Flat Table & Materialize Hive View in Lookup Tables inputSide.addStepPhase1_CreateFlatTable(result); - // build global dict - KylinConfig dictConfig = seg.getConfig(); - String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns(); - - if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 - && !"".equals(mrHiveDictColumns[0])) { - //parallel part build - result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId)); - //parallel total build - result.addTask(createBuildGlobalHiveDicTotalBuildJob(jobId)); - } - - // merge global dic and replace flat table - if(Objects.nonNull(dictConfig.getMrHiveDictColumns()) && dictConfig.getMrHiveDictColumns().length > 0 && !"".equals(dictConfig.getMrHiveDictColumns()[0])){ - inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result); - } + // Build global dictionary in distributed way + buildHiveGlobalDictionaryByMR(result, jobId); // Phase 2: Build Dictionary if (seg.getConfig().isSparkFactDistinctEnable()) { @@ -202,7 +189,7 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { public void configureSparkJob(final CubeSegment seg, final SparkExecutable sparkExecutable, - final String jobId, final String cuboidRootPath) { + final String jobId, final String cuboidRootPath) { final IJoinedFlatTableDesc flatTableDesc = EngineFactory.getJoinedFlatTableDesc(seg); final String tablePath = JoinedFlatTable.getTableDir(flatTableDesc, getJobWorkingDir(jobId)); sparkExecutable.setParam(SparkCubingByLayer.OPTION_CUBE_NAME.getOpt(), seg.getRealization().getName()); @@ -228,4 +215,28 @@ public class SparkBatchCubingJobBuilder2 extends JobBuilderSupport { param.put("path", getDumpMetadataPath(jobId)); return new StorageURL(kylinConfig.getMetadataUrl().getIdentifier(), "hdfs", param).toString(); } + + /** + * Build hive global dictionary by MR and encode corresponding column into integer for flat table + */ + protected void buildHiveGlobalDictionaryByMR(final CubingJob result, String jobId) { + KylinConfig dictConfig = seg.getConfig(); + String[] mrHiveDictColumnExcludeRef = dictConfig.getMrHiveDictColumnsExcludeRefColumns(); + String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns(); + + if (Objects.nonNull(mrHiveDictColumnExcludeRef) && mrHiveDictColumnExcludeRef.length > 0 + && !"".equals(mrHiveDictColumnExcludeRef[0])) { + + // 1. parallel part build + result.addTask(createBuildGlobalHiveDictPartBuildJob(jobId)); + + // 2. parallel total build + result.addTask(createBuildGlobalHiveDictTotalBuildJob(jobId)); + } + + // merge global dic and replace flat table + if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 && !"".equals(mrHiveDictColumns[0])) { + inputSide.addStepPhase_ReplaceFlatTableGlobalColumnValue(result); + } + } } diff --git a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json index fce237e..763cf90 100644 --- a/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json +++ b/examples/test_case_data/localmeta/cube_desc/ci_inner_join_cube.json @@ -595,7 +595,8 @@ "override_kylin_properties": { "kylin.cube.algorithm": "LAYER", "kylin.dictionary.shrunken-from-global-enabled": "false", - "kylin.dictionary.mr-hive.columns": "TEST_KYLIN_FACT_TEST_COUNT_DISTINCT_BITMAP" + "kylin.dictionary.mr-hive.columns": "TEST_KYLIN_FACT_TEST_COUNT_DISTINCT_BITMAP", + "kylin.source.hive.databasedir" : "/apps/hive/warehouse" }, "partition_date_start": 0 } diff --git a/kubernetes/README.md b/kubernetes/README.md index 964c612..2d0ecc6 100644 --- a/kubernetes/README.md +++ b/kubernetes/README.md @@ -1,4 +1,4 @@ -## Backgroud +## Background Kubernetes is a portable, extensible, open-source platform for managing containerized workloads and services, that facilitates both declarative configuration and automation. It has a large, rapidly growing ecosystem. Kubernetes services, support, and tools are widely available. @@ -11,7 +11,7 @@ cluster, will reduce cost of maintenance and extension. Please update your configuration file here. - **template** This directory provided two deployment templates, one for **quick-start** purpose, another for **production/distributed** deployment. - 1. Quick-start template is for one node deployment with an **ALL** kylin instance. + 1. Quick-start template is for one node deployment with an **ALL** kylin instance for test or PoC purpose. 2. Production template is for multi-nodes deployment with a few of **job**/**query** kylin instances; and some other service like **memcached** and **filebeat** will help to satisfy log collection/query cache/session sharing demand. - **docker** @@ -21,16 +21,16 @@ cluster, will reduce cost of maintenance and extension. This is a complete example by applying production template in a CDH 5.7 hadoop env with step by step guide. ### Note -1. CuratorScheduler is used as default JobScheduler because it is more flexible. +1. **CuratorScheduler** is used as default JobScheduler because it is more flexible. 2. Spark building require use `cluster` as deployMode. If you forget it, your spark application will never submitted successfully because Hadoop cluster can not resolve hostname of Pod (Spark Driver). 3. To modify `/etc/hosts` in Pod, please check this : https://kubernetes.io/docs/concepts/services-networking/add-entries-to-pod-etc-hosts-with-host-aliases/ . -4. To build you own kylin-client docker image, please don't forget to download and put following jars into KYLIN_HOME/tomcat/lib to enable tomcat session sharing. +4. To build you own kylin-client docker image, please don't forget to download and put following jars into `KYLIN_HOME/tomcat/lib` to enable tomcat session sharing. - https://repo1.maven.org/maven2/de/javakaffee/msm/memcached-session-manager-tc7/2.1.1/ - https://repo1.maven.org/maven2/de/javakaffee/msm/memcached-session-manager/2.1.1/ 5. If you have difficulty in configure filebeat, please check this https://www.elastic.co/guide/en/beats/filebeat/current/index.html . 6. External query cache is enabled by default, if you are interested in detail, you may check http://kylin.apache.org/blog/2019/07/30/detailed-analysis-of-refine-query-cache/ . -7. All configuration files is separated from Docker image, please use configMap or secret. Compared to configMap, secrets is more recommended for security reason. -8. Some verified kylin-client image will be published to DockerHub, here is the link https://hub.docker.com/r/apachekylin/kylin-client . You may consider contributed your Dockerfile to kylin's repo if you are interested. +7. All configuration files is separated from Docker image, please use **configMap** or **secret**. Compared to **configMap**, **secret** is more recommended for security reason. +8. Some verified kylin-client image will be published to DockerHub, here is the link https://hub.docker.com/r/apachekylin/kylin-client . You may consider contributed your `Dockerfile` to kylin's repo if you are interested. ### Reference - JIRA ticket: https://issues.apache.org/jira/browse/KYLIN-4447 diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java index 305cdae..8538622 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/CreateMrHiveDictStep.java @@ -58,8 +58,9 @@ public class CreateMrHiveDictStep extends AbstractExecutable { private static final String GET_SQL = "\" Get Max Dict Value Sql : \""; protected void createMrHiveDict(KylinConfig config, DistributedLock lock) throws Exception { - logger.info("start to run createMrHiveDict {}", getId()); + logger.info("Start to run createMrHiveDict {}", getId()); try { + // Step 1: Apply for lock if required if (getIsLock()) { getLock(lock); } @@ -72,12 +73,13 @@ public class CreateMrHiveDictStep extends AbstractExecutable { if (sql != null && sql.length() > 0) { hiveCmdBuilder.addStatement(sql); } - Map<String, String> maxDictValMap = deserilizeForMap(getMaxDictStatementMap()); - Map<String, String> dictSqlMap = deserilizeForMap(getCreateTableStatementMap()); + Map<String, String> maxDictValMap = deserializeForMap(getMaxDictStatementMap()); + Map<String, String> dictSqlMap = deserializeForMap(getCreateTableStatementMap()); - if (dictSqlMap != null && dictSqlMap.size() > 0) { + // Step 2: Execute HQL + if (!dictSqlMap.isEmpty()) { IHiveClient hiveClient = HiveClientFactory.getHiveClient(); - if (maxDictValMap != null && maxDictValMap.size() > 0) { + if (!maxDictValMap.isEmpty()) { if (maxDictValMap.size() == dictSqlMap.size()) { maxDictValMap.forEach((columnName, maxDictValSql) -> { int max = 0; @@ -111,7 +113,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable { final String cmd = hiveCmdBuilder.toString(); - stepLogger.log("MR/Hive dict, cmd: " + cmd); + stepLogger.log("Build Hive Global Dictionary by: " + cmd); CubeManager manager = CubeManager.getInstance(KylinConfig.getInstanceFromEnv()); CubeInstance cube = manager.getCube(getCubeName()); @@ -123,9 +125,9 @@ public class CreateMrHiveDictStep extends AbstractExecutable { if (response.getFirst() != 0) { throw new RuntimeException("Failed to create MR/Hive dict, error code " + response.getFirst()); } - getManager().addJobInfo(getId(), stepLogger.getInfo()); } + // Step 3: Release lock if required if (getIsUnlock()) { unLock(lock); } @@ -153,20 +155,10 @@ public class CreateMrHiveDictStep extends AbstractExecutable { lock = KylinConfig.getInstanceFromEnv().getDistributedLockFactory().lockForCurrentThread(); } - String preHdfsShell = getPreHdfsShell(); - if (Objects.nonNull(preHdfsShell) && !"".equalsIgnoreCase(preHdfsShell)) { - doRetry(preHdfsShell, config); - } - createMrHiveDict(config, lock); - String postfixHdfsCmd = getPostfixHdfsShell(); - if (Objects.nonNull(postfixHdfsCmd) && !"".equalsIgnoreCase(postfixHdfsCmd)) { - doRetry(postfixHdfsCmd, config); - } - if (isDiscarded()) { - if (getIsLock()) { + if (getIsLock() && lock != null) { unLock(lock); } return new ExecuteResult(ExecuteResult.State.DISCARDED, stepLogger.getBufferedLog()); @@ -224,44 +216,28 @@ public class CreateMrHiveDictStep extends AbstractExecutable { } public void setCreateTableStatementMap(Map<String, String> dictSqlMap) { - setParam("HiveRedistributeDataMap", serilizeToMap(dictSqlMap)); + setParam("DictSqlMap", serializeMap(dictSqlMap)); } public String getCreateTableStatementMap() { - return getParam("HiveRedistributeDataMap"); + return getParam("DictSqlMap"); } public void setMaxDictStatementMap(Map<String, String> maxDictValMap) { - setParam("DictMaxMap", serilizeToMap(maxDictValMap)); + setParam("DictMaxMap", serializeMap(maxDictValMap)); } public String getMaxDictStatementMap() { return getParam("DictMaxMap"); } - public String getPreHdfsShell() { - return getParam("preHdfsCmd"); - } - - public void setPrefixHdfsShell(String cmd) { - setParam("preHdfsCmd", cmd); - } - - public String getPostfixHdfsShell() { - return getParam("postfixHdfsCmd"); - } - - public void setPostfixHdfsShell(String cmd) { - setParam("postfixHdfsCmd", cmd); - } - public void setIsLock(Boolean isLock) { setParam("isLock", String.valueOf(isLock)); } public boolean getIsLock() { String isLock = getParam("isLock"); - return Strings.isNullOrEmpty(isLock) ? false : Boolean.parseBoolean(isLock); + return !Strings.isNullOrEmpty(isLock) && Boolean.parseBoolean(isLock); } public void setJobFlowJobId(String jobId) { @@ -278,7 +254,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable { public boolean getIsUnlock() { String isUnLock = getParam("isUnLock"); - return Strings.isNullOrEmpty(isUnLock) ? false : Boolean.parseBoolean(isUnLock); + return !Strings.isNullOrEmpty(isUnLock) && Boolean.parseBoolean(isUnLock); } public void setLockPathName(String pathName) { @@ -368,7 +344,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable { } } } - isLocked = true;//get lock fail,will try again + isLocked = true; //get lock fail,will try again } } // wait 1 min and try again @@ -402,7 +378,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable { } } - private static String serilizeToMap(Map<String, String> map) { + private static String serializeMap(Map<String, String> map) { JSONArray result = new JSONArray(); if (map != null && map.size() > 0) { map.forEach((key, value) -> { @@ -418,7 +394,7 @@ public class CreateMrHiveDictStep extends AbstractExecutable { return result.toString(); } - private static Map<String, String> deserilizeForMap(String mapStr) { + private static Map<String, String> deserializeForMap(String mapStr) { Map<String, String> result = new HashMap<>(); if (mapStr != null) { try { diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index 49e3f8d..c60a2ce 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -6,15 +6,15 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * + * * http://www.apache.org/licenses/LICENSE-2.0 - * + * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. -*/ + */ package org.apache.kylin.source.hive; @@ -43,7 +43,6 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.IInput; import org.apache.kylin.engine.mr.JobBuilderSupport; -import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.SparkCreatingFlatTable; import org.apache.kylin.engine.spark.SparkExecutable; @@ -97,17 +96,12 @@ public class HiveInputBase { // create flat table first addStepPhase1_DoCreateFlatTable(jobFlow); - // create global dict - KylinConfig dictConfig = (flatDesc.getSegment()).getConfig(); + // create hive global dictionary + KylinConfig dictConfig = flatDesc.getSegment().getConfig(); String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns(); if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 && !"".equals(mrHiveDictColumns[0])) { - String globalDictDatabase = dictConfig.getMrHiveDictDB(); - if (null == globalDictDatabase) { - throw new IllegalArgumentException("Mr-Hive Global dict database is null."); - } - String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix(); - addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns, globalDictDatabase, globalDictTable); + addStepPhase1_DoCreateMrHiveGlobalDict(jobFlow, mrHiveDictColumns); } // then count and redistribute @@ -129,80 +123,75 @@ public class HiveInputBase { @Override public void addStepPhase_ReplaceFlatTableGlobalColumnValue(DefaultChainedExecutable jobFlow) { - KylinConfig dictConfig = (flatDesc.getSegment()).getConfig(); + KylinConfig dictConfig = flatDesc.getSegment().getConfig(); final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); + String globalDictTable = MRHiveDictUtil.globalDictTableName(flatDesc, cubeName); + String globalDictDatabase = dictConfig.getMrHiveDictDB(); + String[] mrHiveDictColumnsExcludeRefCols = dictConfig.getMrHiveDictColumnsExcludeRefColumns(); Map<String, String> dictRef = dictConfig.getMrHiveDictRefColumns(); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); - String globalDictDatabase = dictConfig.getMrHiveDictDB(); - if (null == globalDictDatabase) { - throw new IllegalArgumentException("Mr-Hive Global dict database is null."); - } - String globalDictTable = cubeName + dictConfig.getMrHiveDictTableSuffix(); - if(Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && mrHiveDictColumnsExcludeRefCols.length > 0) { - //merge to dict table step + if (Objects.nonNull(mrHiveDictColumnsExcludeRefCols) && mrHiveDictColumnsExcludeRefCols.length > 0) { jobFlow.addTask(createHiveGlobalDictMergeGlobalDict(flatDesc, hiveInitStatements, cubeName, mrHiveDictColumnsExcludeRefCols, globalDictDatabase, globalDictTable)); - for (String item : mrHiveDictColumnsExcludeRefCols) { dictRef.put(item, ""); } } - //replace step - if(dictRef.size()>0) { + // replace step + if (!dictRef.isEmpty()) { jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, cubeName, dictRef, flatTableDatabase, globalDictDatabase, globalDictTable, dictConfig.getMrHiveDictTableSuffix(), jobFlow.getId())); } - } - protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow, - String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) { + /** + * 1. Create three related tables + * 2. Insert distinct value into distinct value table + * 3. Calculate statistics for dictionary + */ + protected void addStepPhase1_DoCreateMrHiveGlobalDict(DefaultChainedExecutable jobFlow, String[] mrHiveDictColumns) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); - //Crete tables for global dict and extract distinct value jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, cubeName, - mrHiveDictColumns, globalDictDatabase, globalDictTable, jobFlow.getId())); + mrHiveDictColumns, jobFlow.getId())); } - protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc, - String hiveInitStatements, String cubeName, String[] mrHiveDictColumns, - String globalDictDatabase, String globalDictTable, String jobId) { - // Firstly, determine if the global dict hive table of cube is exists. - String createGlobalDictTableHql = "CREATE TABLE IF NOT EXISTS " + globalDictDatabase + "." + globalDictTable - + "\n" + "( dict_key STRING COMMENT '', \n" + "dict_val INT COMMENT '' \n" + ") \n" - + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) \n" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n" + "STORED AS TEXTFILE; \n"; - - final String dropDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(flatDesc); - final String createDictIntermediateTableHql = MRHiveDictUtil.generateCreateTableStatement(flatDesc); - final String groupByTable = flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix(); - final String globalDictIntermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc); - final String dropGlobalDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(globalDictIntermediateTable); - final String createGlobalDictIntermediateTableHql = MRHiveDictUtil.generateCreateGlobalDicIntermediateTableStatement(globalDictIntermediateTable); - - String maxAndDistinctCountSql = "INSERT OVERWRITE TABLE " + groupByTable + " PARTITION (DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') " - + "\n" + "SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) " - + "\n" + "FROM (" - + "\n" + " SELECT dict_column,count(1) total_distinct_val FROM " - + "\n" + groupByTable + " where DICT_COLUMN != '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "' group by dict_column) tc " - + "\n" + "LEFT JOIN (\n" - + "\n" + " SELECT dict_column,if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val FROM " - + "\n" + globalDictDatabase + "." + globalDictTable + " group by dict_column) tm " - + "\n" + "ON tc.dict_column = tm.dict_column;"; + protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc, String hiveInitStatements, + String cubeName, String[] mrHiveDictColumns, String jobId) { + KylinConfig cfg = flatDesc.getSegment().getConfig(); + String globalDictTable = MRHiveDictUtil.globalDictTableName(flatDesc, cubeName); + String globalDictDatabase = cfg.getMrHiveDictDB(); + final String distinctValueTable = MRHiveDictUtil.distinctValueTable(flatDesc); + final String segmentLevelDictTableName = MRHiveDictUtil.segmentLevelDictTableName(flatDesc); + + final String createGlobalDictTableHql = MRHiveDictUtil.generateDictionaryDdl(globalDictDatabase, globalDictTable); + final String dropDistinctValueTableHql = MRHiveDictUtil.generateDropTableStatement(distinctValueTable); + final String createDistinctValueTableHql = MRHiveDictUtil.generateDistinctValueTableStatement(flatDesc); + final String dropSegmentLevelDictTableHql = MRHiveDictUtil.generateDropTableStatement(segmentLevelDictTableName); + final String createSegmentLevelDictTableHql = MRHiveDictUtil.generateDictTableStatement(segmentLevelDictTableName); + + String maxAndDistinctCountSql = MRHiveDictUtil.generateDictStatisticsSql(distinctValueTable, globalDictTable, globalDictDatabase); StringBuilder insertDataToDictIntermediateTableSql = new StringBuilder(); for (String dictColumn : mrHiveDictColumns) { insertDataToDictIntermediateTableSql .append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn, globalDictDatabase, globalDictTable)); } - String set = "set hive.exec.compress.output=false;set hive.mapred.mode=unstrict;"; + String setParametersHql = "set hive.exec.compress.output=false;set hive.mapred.mode=unstrict;"; CreateMrHiveDictStep step = new CreateMrHiveDictStep(); step.setInitStatement(hiveInitStatements); - step.setCreateTableStatement(set + createGlobalDictTableHql + dropDictIntermediateTableHql - + createDictIntermediateTableHql + dropGlobalDictIntermediateTableHql + createGlobalDictIntermediateTableHql + insertDataToDictIntermediateTableSql.toString() + maxAndDistinctCountSql); + step.setCreateTableStatement(setParametersHql + + createGlobalDictTableHql + + dropDistinctValueTableHql + + createDistinctValueTableHql + + dropSegmentLevelDictTableHql + + createSegmentLevelDictTableHql + + insertDataToDictIntermediateTableSql.toString() + + maxAndDistinctCountSql); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL); step.setIsLock(true); @@ -212,20 +201,27 @@ public class HiveInputBase { return step; } + /** + * In the previous step, data of hive global dictionary is prepared by MR, + * so now it is time for create partition for Segment Dictionary Table + * and merge into Hive Global Dictionary Table. + */ protected static AbstractExecutable createHiveGlobalDictMergeGlobalDict(IJoinedFlatTableDesc flatDesc, String hiveInitStatements, String cubeName, String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) { - String globalDictItermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc); + String globalDictIntermediateTable = MRHiveDictUtil.segmentLevelDictTableName(flatDesc); + StringBuilder addPartitionHql = new StringBuilder(); - StringBuffer addPartition = new StringBuffer(); - Map<String, String> maxDictValMap = new HashMap<>(); Map<String, String> dictHqlMap = new HashMap<>(); for (String dictColumn : mrHiveDictColumns) { try { - addPartition.append("alter table ").append(globalDictItermediateTable) - .append(" add IF NOT EXISTS partition (dict_column='").append(dictColumn) - .append("');").append(" \n"); + addPartitionHql.append("ALTER TABLE ") + .append(globalDictIntermediateTable) + .append(" ADD IF NOT EXISTS PARTITION (dict_column='") + .append(dictColumn) + .append("');") + .append(" \n"); String dictHql = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n" + "PARTITION (dict_column = '" + dictColumn + "') \n" @@ -233,7 +229,7 @@ public class HiveInputBase { + globalDictDatabase + "." + globalDictTable + " \n" + "WHERE dict_column = '" + dictColumn + "' \n" + flatDesc.getDataModel().getConfig().getHiveUnionStyle() + " \n" + "SELECT dict_key, dict_val FROM " - + globalDictItermediateTable + " \n" + " WHERE dict_column = '" + dictColumn + "' ;\n"; + + globalDictIntermediateTable + " \n" + " WHERE dict_column = '" + dictColumn + "' ;\n"; dictHqlMap.put(dictColumn, dictHql); } catch (Exception e) { logger.error("", e); @@ -241,9 +237,8 @@ public class HiveInputBase { } String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;"; CreateMrHiveDictStep step = new CreateMrHiveDictStep(); - step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict + addPartition); + step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict + addPartitionHql); step.setCreateTableStatementMap(dictHqlMap); - step.setMaxDictStatementMap(maxDictValMap); step.setIsLock(false); step.setIsUnLock(false); step.setLockPathName(cubeName); @@ -252,58 +247,75 @@ public class HiveInputBase { return step; } + /** + * Use Hive Global Dictionary to replace/encode flat table + * + * @param mrHiveDictColumns a Map which key is and vale is . + */ protected static AbstractExecutable createMrHiveGlobalDictReplaceStep(IJoinedFlatTableDesc flatDesc, String hiveInitStatements, String cubeName, Map<String, String> mrHiveDictColumns, String flatTableDatabase, String globalDictDatabase, String globalDictTable, String dictSuffix, String jobId) { Map<String, String> dictHqlMap = new HashMap<>(); - StringBuilder addPartition = new StringBuilder(); for (String dictColumn : mrHiveDictColumns.keySet()) { - StringBuilder dictHql = new StringBuilder(); + StringBuilder insertOverwriteHql = new StringBuilder(); TblColRef dictColumnRef = null; String flatTable = flatTableDatabase + "." + flatDesc.getTableName(); - // replace the flat table's dict column value - dictHql.append("INSERT OVERWRITE TABLE " + flatTable + " \n"); + insertOverwriteHql.append("INSERT OVERWRITE TABLE ").append(flatTable).append(" \n"); try { - dictHql.append("SELECT \n"); - Integer flatTableColumnSize = flatDesc.getAllColumns().size(); + insertOverwriteHql.append("SELECT \n"); + int flatTableColumnSize = flatDesc.getAllColumns().size(); for (int i = 0; i < flatTableColumnSize; i++) { TblColRef tblColRef = flatDesc.getAllColumns().get(i); + String colName = JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()); + if (i > 0) { - dictHql.append(","); + insertOverwriteHql.append(","); } - if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) { - dictHql.append("b.dict_val \n"); + + if (colName.equalsIgnoreCase(dictColumn)) { + // Note: replace original value into encoded integer + insertOverwriteHql.append("b.dict_val \n"); dictColumnRef = tblColRef; } else { - dictHql.append("a." + JoinedFlatTable.colName(tblColRef) + " \n"); + // Note: keep its original value + insertOverwriteHql.append("a.") + .append(JoinedFlatTable.colName(tblColRef)) + .append(" \n"); } } if (!Strings.isNullOrEmpty(mrHiveDictColumns.get(dictColumn))) { - String[] cubePartion = mrHiveDictColumns.get(dictColumn).split("\\."); - - String refGlobalDictTable = cubePartion[0] + dictSuffix; - String refDictColumn = cubePartion[1]; - - dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n" - + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + refGlobalDictTable - + " WHERE dict_column = '" + refDictColumn + "' \n" + ") b \n" + " ON a." - + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;"); - dictHqlMap.put(dictColumn, dictHql.toString()); - }else { - dictHql.append("FROM " + flatTable + " a \n" + "LEFT OUTER JOIN \n" + "( \n" - + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable - + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" + " ON a." - + JoinedFlatTable.colName(dictColumnRef) + " = b.dict_key;"); + // Note: reuse previous hive global dictionary + String[] tableColumn = mrHiveDictColumns.get(dictColumn).split("\\."); + + String refGlobalDictTable = tableColumn[0] + dictSuffix; + String refDictColumn = tableColumn[1]; + + insertOverwriteHql + .append("FROM ").append(flatTable).append(" a \nLEFT OUTER JOIN \n (") + .append("SELECT dict_key, dict_val FROM ") + .append(globalDictDatabase).append(".").append(refGlobalDictTable) + .append(" WHERE dict_column = '").append(refDictColumn).append("') b \n") + .append("ON a.").append(JoinedFlatTable.colName(dictColumnRef)).append(" = b.dict_key;"); + dictHqlMap.put(dictColumn, insertOverwriteHql.toString()); + } else { + // Note: use hive global dictionary built by current cube + insertOverwriteHql + .append("FROM ").append(flatTable).append(" a \nLEFT OUTER JOIN \n (") + .append("SELECT dict_key, dict_val FROM ") + .append(globalDictDatabase).append(".").append(globalDictTable) + .append(" WHERE dict_column = '").append(dictColumn).append("') b \n") + .append("ON a.").append(JoinedFlatTable.colName(dictColumnRef)).append(" = b.dict_key;"); } - dictHqlMap.put(dictColumn, dictHql.toString()); + dictHqlMap.put(dictColumn, insertOverwriteHql.toString()); } catch (Exception e) { logger.error("", e); } } - String set = "set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;"; + String setParameterHal = "set hive.exec.compress.output=false; set hive.mapred.mode=unstrict;"; CreateMrHiveDictStep step = new CreateMrHiveDictStep(); - step.setInitStatement(hiveInitStatements + set + addPartition); + step.setInitStatement(hiveInitStatements + setParameterHal); step.setCreateTableStatementMap(dictHqlMap); + step.setIsUnLock(true); step.setLockPathName(cubeName); step.setJobFlowJobId(jobId); @@ -389,7 +401,7 @@ public class HiveInputBase { } protected static AbstractExecutable createFlatHiveTableStep(String hiveInitStatements, String jobWorkingDir, - String cubeName, IJoinedFlatTableDesc flatDesc) { + String cubeName, IJoinedFlatTableDesc flatDesc) { //from hive to hive final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); @@ -404,7 +416,7 @@ public class HiveInputBase { } protected static AbstractExecutable createFlatHiveTableByLivyStep(String hiveInitStatements, String jobWorkingDir, - String cubeName, IJoinedFlatTableDesc flatDesc) { + String cubeName, IJoinedFlatTableDesc flatDesc) { //from hive to hive final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); @@ -419,7 +431,7 @@ public class HiveInputBase { } protected static AbstractExecutable createFlatHiveTableBySparkSql(String hiveInitStatements, - String jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) { + String jobWorkingDir, String cubeName, IJoinedFlatTableDesc flatDesc) { final String dropTableHql = JoinedFlatTable.generateDropTableStatement(flatDesc); final String createTableHql = JoinedFlatTable.generateCreateTableStatement(flatDesc, jobWorkingDir); @@ -472,7 +484,7 @@ public class HiveInputBase { } protected static AbstractExecutable createRedistributeFlatHiveTableStep(String hiveInitStatements, String cubeName, - IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { + IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { RedistributeFlatHiveTableStep step = new RedistributeFlatHiveTableStep(); step.setInitStatement(hiveInitStatements); step.setIntermediateTable(flatDesc.getTableName()); @@ -483,7 +495,7 @@ public class HiveInputBase { } protected static AbstractExecutable createRedistributeFlatHiveTableByLivyStep(String hiveInitStatements, - String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { + String cubeName, IJoinedFlatTableDesc flatDesc, CubeDesc cubeDesc) { RedistributeFlatHiveTableByLivyStep step = new RedistributeFlatHiveTableByLivyStep(); step.setInitStatement(hiveInitStatements); step.setIntermediateTable(flatDesc.getTableName()); @@ -493,8 +505,8 @@ public class HiveInputBase { return step; } - protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, - String jobWorkingDir, IJoinedFlatTableDesc flatDesc, List<String> intermediateTables, String uuid) { + protected static ShellExecutable createLookupHiveViewMaterializationStep(String hiveInitStatements, String jobWorkingDir, IJoinedFlatTableDesc flatDesc, + List<String> intermediateTables, String uuid) { ShellExecutable step = new ShellExecutable(); step.setName(ExecutableConstants.STEP_NAME_MATERIALIZE_HIVE_VIEW_IN_LOOKUP); diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java index 85cd855..573ecd3 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java @@ -27,6 +27,7 @@ import org.apache.kylin.common.KylinConfig; import org.apache.kylin.common.livy.LivyRestBuilder; import org.apache.kylin.common.livy.LivyRestExecutor; import org.apache.kylin.common.livy.LivyTypeEnum; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.job.JoinedFlatTable; import org.apache.kylin.job.common.PatternedLogger; import org.apache.kylin.job.constant.ExecutableConstants; @@ -41,6 +42,30 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +/** + * <pre> + * Hold some constant/enum/statement for Hive Global Dictionary. + * + * There are two different temporary tables which help to build Hive Global Dictionary. + * They should be deleted at the final step of building job. + * 1. Distinct Value Table (Temporary table) + * TableName: ${FlatTable}_${DistinctValueSuffix} + * Schema: One normal column, for original column value; with another partition column. + * @see #distinctValueTable + * + * 2. Segment Level Dictionary Table (Temporary table) + * TableName: ${FlatTable}_${DictTableSuffix} + * Schema: Two normal columns, first for original column value, second for is its encoded integer; + * also with another partition column + * @see #segmentLevelDictTableName + * + * After that, Hive Global Dictionary itself is stored in a third hive table. + * 3. Hive Global Dictionary Table + * TableName: ${CubeName}_${DictTableSuffix} + * Schema: Two columns, first for original column value, second is its encoded integer; also with another partition column + * @see #globalDictTableName + * </pre> + */ public class MRHiveDictUtil { private static final Logger logger = LoggerFactory.getLogger(MRHiveDictUtil.class); protected static final Pattern HDFS_LOCATION = Pattern.compile("LOCATION \'(.*)\';"); @@ -59,12 +84,27 @@ public class MRHiveDictUtil { } } - public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) { - StringBuilder ddl = new StringBuilder(); - String table = flatDesc.getTableName() - + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix(); - ddl.append("DROP TABLE IF EXISTS " + table + ";").append(" \n"); - return ddl.toString(); + public static String distinctValueTable(IJoinedFlatTableDesc flatDesc) { + return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDistinctValueTableSuffix(); + } + + public static String segmentLevelDictTableName(IJoinedFlatTableDesc flatDesc) { + return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix(); + } + + public static String globalDictTableName(IJoinedFlatTableDesc flatDesc, String cubeName) { + return cubeName + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix(); + } + + public static String generateDictionaryDdl(String db, String tbl) { + return "CREATE TABLE IF NOT EXISTS " + db + "." + tbl + "\n" + + " ( dict_key STRING COMMENT '', \n" + + " dict_val INT COMMENT '' \n" + + ") \n" + + "COMMENT 'Hive Global Dictionary' \n" + + "PARTITIONED BY (dict_column string) \n" + + "ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n" + + "STORED AS TEXTFILE; \n"; } public static String generateDropTableStatement(String tableName) { @@ -73,14 +113,14 @@ public class MRHiveDictUtil { return ddl.toString(); } - public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc) { + public static String generateDistinctValueTableStatement(IJoinedFlatTableDesc flatDesc) { StringBuilder ddl = new StringBuilder(); String table = flatDesc.getTableName() - + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix(); + + flatDesc.getSegment().getConfig().getMrHiveDistinctValueTableSuffix(); ddl.append("CREATE TABLE IF NOT EXISTS " + table + " \n"); ddl.append("( \n "); - ddl.append("dict_key" + " " + "STRING" + " COMMENT '' \n"); + ddl.append(" dict_key" + " " + "STRING" + " COMMENT '' \n"); ddl.append(") \n"); ddl.append("COMMENT '' \n"); ddl.append("PARTITIONED BY (dict_column string) \n"); @@ -89,13 +129,13 @@ public class MRHiveDictUtil { return ddl.toString(); } - public static String generateCreateGlobalDicIntermediateTableStatement(String globalTableName) { + public static String generateDictTableStatement(String globalTableName) { StringBuilder ddl = new StringBuilder(); ddl.append("CREATE TABLE IF NOT EXISTS " + globalTableName + " \n"); ddl.append("( \n "); - ddl.append("dict_key" + " " + "STRING" + " COMMENT '' , \n"); - ddl.append("dict_val" + " " + "STRING" + " COMMENT '' \n"); + ddl.append(" dict_key" + " " + "STRING" + " COMMENT '' , \n"); + ddl.append(" dict_val" + " " + "STRING" + " COMMENT '' \n"); ddl.append(") \n"); ddl.append("COMMENT '' \n"); ddl.append("PARTITIONED BY (dict_column string) \n"); @@ -105,12 +145,12 @@ public class MRHiveDictUtil { return ddl.toString(); } + /** + * Fetch distinct value from flat table and insert into distinctValueTable. + * + * @see #distinctValueTable + */ public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn, String globalDictDatabase, String globalDictTable) { - String table = getMRHiveFlatTableGroupBytableName(flatDesc); - - StringBuilder sql = new StringBuilder(); - sql.append("SELECT a.DICT_KEY FROM (" + "\n"); - int index = 0; for (TblColRef tblColRef : flatDesc.getAllColumns()) { if (JoinedFlatTable.colName(tblColRef, flatDesc.useAlias()).equalsIgnoreCase(dictColumn)) { @@ -118,42 +158,56 @@ public class MRHiveDictUtil { } index++; } - if (index == flatDesc.getAllColumns().size()) { String msg = "Can not find correct column for " + dictColumn + ", please check 'kylin.dictionary.mr-hive.columns'"; logger.error(msg); throw new IllegalArgumentException(msg); } - sql.append(" SELECT " + "\n"); - TblColRef col = flatDesc.getAllColumns().get(index); - sql.append(JoinedFlatTable.colName(col) + " as DICT_KEY \n"); - - MRHiveDictUtil.appendJoinStatement(flatDesc, sql); - //group by - sql.append("GROUP BY "); - sql.append(JoinedFlatTable.colName(col) + ") a \n"); + String table = distinctValueTable(flatDesc); + StringBuilder sql = new StringBuilder(); + TblColRef col = flatDesc.getAllColumns().get(index); - //join - sql.append(" LEFT JOIN \n"); - sql.append("(SELECT DICT_KEY FROM "); - sql.append(globalDictDatabase).append(".").append(globalDictTable); - sql.append(" WHERE DICT_COLUMN = '" + dictColumn + "'"); - sql.append(") b \n"); + sql.append("SELECT a.DICT_KEY FROM (\n"); + sql.append(" SELECT " + "\n"); + sql.append(JoinedFlatTable.colName(col)).append(" as DICT_KEY \n"); + sql.append(" FROM ").append(flatDesc.getTableName()).append("\n"); + sql.append(" GROUP BY "); + sql.append(JoinedFlatTable.colName(col)).append(") a \n"); + sql.append(" LEFT JOIN \n"); + sql.append(" (SELECT DICT_KEY FROM ").append(globalDictDatabase).append(".").append(globalDictTable); + sql.append(" WHERE DICT_COLUMN = '").append(dictColumn); + sql.append("' ) b \n"); sql.append("ON a.DICT_KEY = b.DICT_KEY \n"); - sql.append("WHERE b.DICT_KEY IS NULL \n"); + sql.append("WHERE b.DICT_KEY IS NULL \n"); - return "INSERT OVERWRITE TABLE " + table + " \n" + "PARTITION (dict_column = '" + dictColumn + "')" + " \n" - + sql + ";\n"; + return "INSERT OVERWRITE TABLE " + table + " \n" + + "PARTITION (dict_column = '" + dictColumn + "')" + " \n" + + sql.toString() + + ";\n"; } - public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) { - sql.append("FROM " + flatDesc.getTableName() + "\n"); + /** + * Calculate and store "columnName,segmentDistinctCount,previousMaxDictId" into specific partition + */ + public static String generateDictStatisticsSql(String distinctValueTable, String globalDictTable, String globalDictDatabase) { + return "INSERT OVERWRITE TABLE " + distinctValueTable + " PARTITION (DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') " + + "\n" + "SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) " + + "\n" + "FROM (" + + "\n" + " SELECT dict_column, count(1) total_distinct_val" + + "\n" + " FROM " + globalDictDatabase + "." + distinctValueTable + + "\n" + " WHERE DICT_COLUMN != '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "'" + + "\n" + " GROUP BY dict_column) tc " + + "\n" + "LEFT JOIN (\n" + + "\n" + " SELECT dict_column, if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val " + + "\n" + " FROM " + globalDictDatabase + "." + globalDictTable + + "\n" + " GROUP BY dict_column) tm " + + "\n" + "ON tc.dict_column = tm.dict_column;"; } public static void runLivySqlJob(PatternedLogger stepLogger, KylinConfig config, ImmutableList<String> sqls, - ExecutableManager executableManager, String jobId) throws IOException { + ExecutableManager executableManager, String jobId) throws IOException { final LivyRestBuilder livyRestBuilder = new LivyRestBuilder(); livyRestBuilder.overwriteHiveProps(config.getHiveConfigOverride()); StringBuilder stringBuilder = new StringBuilder(); @@ -171,7 +225,7 @@ public class MRHiveDictUtil { executor.execute(livyRestBuilder, stepLogger); Map<String, String> info = stepLogger.getInfo(); - //get the flat Hive table size + // get the flat Hive table size Matcher matcher = HDFS_LOCATION.matcher(args); if (matcher.find()) { String hiveFlatTableHdfsUrl = matcher.group(1); @@ -194,14 +248,6 @@ public class MRHiveDictUtil { return DictHiveType.MrEphemeralDictLockPath.getName() + cubeName; } - public static String getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) { - return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix(); - } - - public static String getMRHiveFlatTableGlobalDictTableName(IJoinedFlatTableDesc flatDesc) { - return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix(); - } - private static long getFileSize(String hdfsUrl) throws IOException { Configuration configuration = new Configuration(); Path path = new Path(hdfsUrl);
