This is an automated email from the ASF dual-hosted git repository. xxyu pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/kylin.git
commit ad954b76a25fb70a14f4c5c034ae58fd78b538f2 Author: wangxiaojing <[email protected]> AuthorDate: Wed May 6 14:42:19 2020 +0800 KYLIN-4344 Build Global Dict by MR/Hive, Extract Fact Table Distinct Columns Step --- .../apache/kylin/source/hive/HiveInputBase.java | 86 ++++++++-------------- .../apache/kylin/source/hive/MRHiveDictUtil.java | 74 ++++++++++++++----- 2 files changed, 84 insertions(+), 76 deletions(-) diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java index c3f6b6f..65b8dc6 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/HiveInputBase.java @@ -42,6 +42,7 @@ import org.apache.kylin.cube.CubeManager; import org.apache.kylin.cube.model.CubeDesc; import org.apache.kylin.engine.mr.IInput; import org.apache.kylin.engine.mr.JobBuilderSupport; +import org.apache.kylin.engine.mr.common.BatchConstants; import org.apache.kylin.engine.mr.steps.CubingExecutableUtil; import org.apache.kylin.engine.spark.SparkCreatingFlatTable; import org.apache.kylin.engine.spark.SparkExecutable; @@ -97,8 +98,9 @@ public class HiveInputBase { // create global dict KylinConfig dictConfig = (flatDesc.getSegment()).getConfig(); - String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumns(); - if (mrHiveDictColumns.length > 0) { + String[] mrHiveDictColumns = dictConfig.getMrHiveDictColumnsExcludeRefColumns(); + if (Objects.nonNull(mrHiveDictColumns) && mrHiveDictColumns.length > 0 + && !"".equals(mrHiveDictColumns[0])) { String globalDictDatabase = dictConfig.getMrHiveDictDB(); if (null == globalDictDatabase) { throw new IllegalArgumentException("Mr-Hive Global dict database is null."); @@ -128,81 +130,53 @@ public class HiveInputBase { String[] mrHiveDictColumns, String globalDictDatabase, String globalDictTable) { final String cubeName = CubingExecutableUtil.getCubeName(jobFlow.getParams()); final String hiveInitStatements = JoinedFlatTable.generateHiveInitStatements(flatTableDatabase); - final String jobWorkingDir = getJobWorkingDir(jobFlow, hdfsWorkingDir); - jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, jobWorkingDir, cubeName, - mrHiveDictColumns, globalDictDatabase, globalDictTable)); - jobFlow.addTask(createMrHIveGlobalDictBuildStep(flatDesc, hiveInitStatements, hdfsWorkingDir, cubeName, - mrHiveDictColumns, flatTableDatabase, globalDictDatabase, globalDictTable)); - jobFlow.addTask(createMrHiveGlobalDictReplaceStep(flatDesc, hiveInitStatements, hdfsWorkingDir, cubeName, - mrHiveDictColumns, flatTableDatabase, globalDictDatabase, globalDictTable)); + //Crete tables for global dict and extract distinct value + jobFlow.addTask(createMrHiveGlobalDictExtractStep(flatDesc, hiveInitStatements, cubeName, + mrHiveDictColumns, globalDictDatabase, globalDictTable, jobFlow.getId())); + } protected static AbstractExecutable createMrHiveGlobalDictExtractStep(IJoinedFlatTableDesc flatDesc, - String hiveInitStatements, String jobWorkingDir, String cubeName, String[] mrHiveDictColumns, - String globalDictDatabase, String globalDictTable) { + String hiveInitStatements, String cubeName, String[] mrHiveDictColumns, + String globalDictDatabase, String globalDictTable, String jobId) { // Firstly, determine if the global dict hive table of cube is exists. String createGlobalDictTableHql = "CREATE TABLE IF NOT EXISTS " + globalDictDatabase + "." + globalDictTable + "\n" + "( dict_key STRING COMMENT '', \n" + "dict_val INT COMMENT '' \n" + ") \n" - + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) \n" + "STORED AS TEXTFILE; \n"; + + "COMMENT '' \n" + "PARTITIONED BY (dict_column string) \n" + " ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n" + "STORED AS TEXTFILE; \n"; final String dropDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(flatDesc); final String createDictIntermediateTableHql = MRHiveDictUtil.generateCreateTableStatement(flatDesc); + final String groupByTable = flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix(); + final String globalDictIntermediateTable = MRHiveDictUtil.getMRHiveFlatTableGlobalDictTableName(flatDesc); + final String dropGlobalDictIntermediateTableHql = MRHiveDictUtil.generateDropTableStatement(globalDictIntermediateTable); + final String createGlobalDictIntermediateTableHql = MRHiveDictUtil.generateCreateGlobalDicIntermediateTableStatement(globalDictIntermediateTable); + + String maxAndDistinctCountSql = "INSERT OVERWRITE TABLE " + groupByTable + " PARTITION (DICT_COLUMN = '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "') " + + "\n" + "SELECT CONCAT_WS(',', tc.dict_column, cast(tc.total_distinct_val AS String), if(tm.max_dict_val is null, '0', cast(max_dict_val as string))) " + + "\n" + "FROM (" + + "\n" + " SELECT dict_column,count(1) total_distinct_val FROM " + + "\n" + groupByTable + " where DICT_COLUMN != '" + BatchConstants.CFG_GLOBAL_DICT_STATS_PARTITION_VALUE + "' group by dict_column) tc " + + "\n" + "LEFT JOIN (\n" + + "\n" + " SELECT dict_column,if(max(dict_val) is null, 0, max(dict_val)) as max_dict_val FROM " + + "\n" + globalDictDatabase + "." + globalDictTable + " group by dict_column) tm " + + "\n" + "ON tc.dict_column = tm.dict_column;"; StringBuilder insertDataToDictIntermediateTableSql = new StringBuilder(); for (String dictColumn : mrHiveDictColumns) { insertDataToDictIntermediateTableSql - .append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn)); + .append(MRHiveDictUtil.generateInsertDataStatement(flatDesc, dictColumn, globalDictDatabase, globalDictTable)); } - + String set = "set hive.exec.compress.output=false;set hive.mapred.mode=unstrict;"; CreateMrHiveDictStep step = new CreateMrHiveDictStep(); step.setInitStatement(hiveInitStatements); - step.setCreateTableStatement(createGlobalDictTableHql + dropDictIntermediateTableHql - + createDictIntermediateTableHql + insertDataToDictIntermediateTableSql.toString()); + step.setCreateTableStatement(set + createGlobalDictTableHql + dropDictIntermediateTableHql + + createDictIntermediateTableHql + dropGlobalDictIntermediateTableHql + createGlobalDictIntermediateTableHql + insertDataToDictIntermediateTableSql.toString() + maxAndDistinctCountSql); CubingExecutableUtil.setCubeName(cubeName, step.getParams()); step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_EXTRACT_DICTVAL); - return step; - } - - protected static AbstractExecutable createMrHIveGlobalDictBuildStep(IJoinedFlatTableDesc flatDesc, - String hiveInitStatements, String hdfsWorkingDir, String cubeName, String[] mrHiveDictColumns, - String flatTableDatabase, String globalDictDatabase, String globalDictTable) { - String flatTable = flatTableDatabase + "." - + MRHiveDictUtil.getHiveTableName(flatDesc, MRHiveDictUtil.DictHiveType.GroupBy); - Map<String, String> maxDictValMap = new HashMap<>(); - Map<String, String> dictHqlMap = new HashMap<>(); - - for (String dictColumn : mrHiveDictColumns) { - // get dict max value - String maxDictValHql = "SELECT if(max(dict_val) is null,0,max(dict_val)) as max_dict_val \n" + " FROM " - + globalDictDatabase + "." + globalDictTable + " \n" + " WHERE dict_column = '" + dictColumn - + "' \n"; - maxDictValMap.put(dictColumn, maxDictValHql); - try { - String dictHql = "INSERT OVERWRITE TABLE " + globalDictDatabase + "." + globalDictTable + " \n" - + "PARTITION (dict_column = '" + dictColumn + "') \n" + "SELECT dict_key, dict_val FROM " - + globalDictDatabase + "." + globalDictTable + " \n" + "WHERE dict_column = '" + dictColumn - + "' \n" + flatDesc.getDataModel().getConfig().getHiveUnionStyle() - + "\nSELECT a.dict_key as dict_key, (row_number() over(order by a.dict_key asc)) + (___maxDictVal___) as dict_val \n" - + "FROM \n" + "( \n" + " SELECT dict_key FROM " + flatTable + " WHERE dict_column = '" - + dictColumn + "' AND dict_key is not null \n" + ") a \n" + "LEFT JOIN \n" + "( \n" - + "SELECT dict_key, dict_val FROM " + globalDictDatabase + "." + globalDictTable - + " WHERE dict_column = '" + dictColumn + "' \n" + ") b \n" - + "ON a.dict_key = b.dict_key \n" + "WHERE b.dict_val is null; \n"; - dictHqlMap.put(dictColumn, dictHql); - } catch (Exception e) { - logger.error("", e); - } - } - String hiveInitStatementForUnstrict = "set hive.mapred.mode=unstrict;"; - CreateMrHiveDictStep step = new CreateMrHiveDictStep(); - step.setInitStatement(hiveInitStatements + hiveInitStatementForUnstrict); - step.setCreateTableStatementMap(dictHqlMap); - step.setMaxDictStatementMap(maxDictValMap); step.setIsLock(true); + step.setIsUnLock(false); step.setLockPathName(cubeName); - CubingExecutableUtil.setCubeName(cubeName, step.getParams()); - step.setName(ExecutableConstants.STEP_NAME_GLOBAL_DICT_MRHIVE_BUILD_DICTVAL); return step; } diff --git a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java index 804a183..fd2d103 100644 --- a/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java +++ b/source-hive/src/main/java/org/apache/kylin/source/hive/MRHiveDictUtil.java @@ -60,14 +60,22 @@ public class MRHiveDictUtil { public static String generateDropTableStatement(IJoinedFlatTableDesc flatDesc) { StringBuilder ddl = new StringBuilder(); - String table = getHiveTableName(flatDesc, DictHiveType.GroupBy); + String table = flatDesc.getTableName() + + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix(); ddl.append("DROP TABLE IF EXISTS " + table + ";").append(" \n"); return ddl.toString(); } + public static String generateDropTableStatement(String tableName) { + StringBuilder ddl = new StringBuilder(); + ddl.append("DROP TABLE IF EXISTS " + tableName + ";").append(" \n"); + return ddl.toString(); + } + public static String generateCreateTableStatement(IJoinedFlatTableDesc flatDesc) { StringBuilder ddl = new StringBuilder(); - String table = getHiveTableName(flatDesc, DictHiveType.GroupBy); + String table = flatDesc.getTableName() + + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix(); ddl.append("CREATE TABLE IF NOT EXISTS " + table + " \n"); ddl.append("( \n "); @@ -75,16 +83,32 @@ public class MRHiveDictUtil { ddl.append(") \n"); ddl.append("COMMENT '' \n"); ddl.append("PARTITIONED BY (dict_column string) \n"); - ddl.append("STORED AS SEQUENCEFILE \n"); + ddl.append("STORED AS TEXTFILE \n"); + ddl.append(";").append("\n"); + return ddl.toString(); + } + + public static String generateCreateGlobalDicIntermediateTableStatement(String globalTableName) { + StringBuilder ddl = new StringBuilder(); + + ddl.append("CREATE TABLE IF NOT EXISTS " + globalTableName + " \n"); + ddl.append("( \n "); + ddl.append("dict_key" + " " + "STRING" + " COMMENT '' , \n"); + ddl.append("dict_val" + " " + "STRING" + " COMMENT '' \n"); + ddl.append(") \n"); + ddl.append("COMMENT '' \n"); + ddl.append("PARTITIONED BY (dict_column string) \n"); + ddl.append("ROW FORMAT DELIMITED FIELDS TERMINATED BY '\\t' \n"); + ddl.append("STORED AS TEXTFILE \n"); ddl.append(";").append("\n"); return ddl.toString(); } - public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn) { - String table = getHiveTableName(flatDesc, DictHiveType.GroupBy); + public static String generateInsertDataStatement(IJoinedFlatTableDesc flatDesc, String dictColumn, String globalDictDatabase, String globalDictTable) { + String table = getMRHiveFlatTableGroupBytableName(flatDesc); StringBuilder sql = new StringBuilder(); - sql.append("SELECT" + "\n"); + sql.append("SELECT a.DICT_KEY FROM (" + "\n"); int index = 0; for (TblColRef tblColRef : flatDesc.getAllColumns()) { @@ -95,32 +119,34 @@ public class MRHiveDictUtil { } if (index == flatDesc.getAllColumns().size()) { - String msg = "Can not find correct column for " + dictColumn + ", please check 'kylin.dictionary.mr-hive.columns'"; + String msg = "Can not find correct column for " + dictColumn + + ", please check 'kylin.dictionary.mr-hive.columns'"; logger.error(msg); throw new IllegalArgumentException(msg); } - + sql.append(" SELECT " + "\n"); TblColRef col = flatDesc.getAllColumns().get(index); - sql.append(JoinedFlatTable.colName(col) + " \n"); + sql.append(JoinedFlatTable.colName(col) + " as DICT_KEY \n"); MRHiveDictUtil.appendJoinStatement(flatDesc, sql); //group by sql.append("GROUP BY "); - sql.append(JoinedFlatTable.colName(col) + " \n"); - - return "INSERT OVERWRITE TABLE " + table + " \n" - + "PARTITION (dict_column = '" + dictColumn + "')" + " \n" + sql.append(JoinedFlatTable.colName(col) + ") a \n"); + + //join + sql.append(" LEFT JOIN \n"); + sql.append("(SELECT DICT_KEY FROM "); + sql.append(globalDictDatabase).append(".").append(globalDictTable); + sql.append(" WHERE DICT_COLUMN = '" + dictColumn + "'"); + sql.append(") b \n"); + sql.append("ON a.DICT_KEY = b.DICT_KEY \n"); + sql.append("WHERE b.DICT_KEY IS NULL \n"); + + return "INSERT OVERWRITE TABLE " + table + " \n" + "PARTITION (dict_column = '" + dictColumn + "')" + " \n" + sql + ";\n"; } - public static String getHiveTableName(IJoinedFlatTableDesc flatDesc, DictHiveType dictHiveType) { - StringBuffer table = new StringBuffer(flatDesc.getTableName()); - table.append("__"); - table.append(dictHiveType.getName()); - return table.toString(); - } - public static void appendJoinStatement(IJoinedFlatTableDesc flatDesc, StringBuilder sql) { sql.append("FROM " + flatDesc.getTableName() + "\n"); } @@ -155,6 +181,14 @@ public class MRHiveDictUtil { executableManager.addJobInfo(jobId, info); } + public static String getMRHiveFlatTableGroupBytableName(IJoinedFlatTableDesc flatDesc) { + return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictIntermediateTTableSuffix(); + } + + public static String getMRHiveFlatTableGlobalDictTableName(IJoinedFlatTableDesc flatDesc) { + return flatDesc.getTableName() + flatDesc.getSegment().getConfig().getMrHiveDictTableSuffix(); + } + private static long getFileSize(String hdfsUrl) throws IOException { Configuration configuration = new Configuration(); Path path = new Path(hdfsUrl);
