HIVE-20241: Support partitioning spec in CTAS statements (Jesus Camacho Rodriguez, reviewed by Ashutosh Chauhan)
Project: http://git-wip-us.apache.org/repos/asf/hive/repo Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/e96728c5 Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/e96728c5 Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/e96728c5 Branch: refs/heads/branch-3 Commit: e96728c527965f624f03de1d5c3c42ca6f5eb1a5 Parents: d05361e Author: Jesus Camacho Rodriguez <[email protected]> Authored: Wed Jul 25 17:28:45 2018 -0700 Committer: Jesus Camacho Rodriguez <[email protected]> Committed: Tue Jul 31 09:19:08 2018 -0700 ---------------------------------------------------------------------- .../test/resources/testconfiguration.properties | 1 + .../org/apache/hadoop/hive/ql/exec/DDLTask.java | 36 +- .../apache/hadoop/hive/ql/exec/MoveTask.java | 1 + .../hive/ql/optimizer/GenMapRedUtils.java | 9 +- .../apache/hadoop/hive/ql/parse/HiveParser.g | 26 +- .../hadoop/hive/ql/parse/SemanticAnalyzer.java | 485 ++++++---- .../hadoop/hive/ql/parse/TaskCompiler.java | 41 +- .../hadoop/hive/ql/plan/CreateTableDesc.java | 48 +- .../hive/ql/plan/DynamicPartitionCtx.java | 34 +- .../hadoop/hive/ql/plan/LoadTableDesc.java | 10 + .../hive/ql/exec/TestFileSinkOperator.java | 2 +- .../queries/clientpositive/partition_ctas.q | 51 + .../clientpositive/llap/partition_ctas.q.out | 940 +++++++++++++++++++ 13 files changed, 1442 insertions(+), 242 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/itests/src/test/resources/testconfiguration.properties ---------------------------------------------------------------------- diff --git a/itests/src/test/resources/testconfiguration.properties b/itests/src/test/resources/testconfiguration.properties index 747525a..b91c88a 100644 --- a/itests/src/test/resources/testconfiguration.properties +++ b/itests/src/test/resources/testconfiguration.properties @@ -615,6 +615,7 @@ minillaplocal.query.files=\ orc_ppd_decimal.q,\ orc_ppd_timestamp.q,\ order_null.q,\ + partition_ctas.q,\ partition_multilevels.q,\ partition_shared_scan.q,\ partition_pruning.q,\ http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java index acca490..b90b218 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/DDLTask.java @@ -48,16 +48,15 @@ import java.util.Set; import java.util.SortedSet; import java.util.TreeMap; import java.util.TreeSet; +import java.util.concurrent.ExecutionException; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.concurrent.ExecutionException; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableSet; import com.google.common.collect.Iterables; import com.google.common.collect.Lists; import com.google.common.util.concurrent.ListenableFuture; - import org.apache.commons.lang.StringUtils; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; @@ -293,39 +292,6 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.stringtemplate.v4.ST; -import java.io.BufferedWriter; -import java.io.DataOutputStream; -import java.io.FileNotFoundException; -import java.io.IOException; -import java.io.OutputStreamWriter; -import java.io.Serializable; -import java.io.Writer; -import java.net.URI; -import java.net.URISyntaxException; -import java.nio.charset.StandardCharsets; -import java.sql.SQLException; -import java.util.AbstractList; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.LinkedHashMap; -import java.util.List; -import java.util.Map; -import java.util.Map.Entry; -import java.util.Set; -import java.util.SortedSet; -import java.util.TreeMap; -import java.util.TreeSet; -import java.util.concurrent.ExecutionException; -import java.util.regex.Matcher; -import java.util.regex.Pattern; - -import static org.apache.commons.lang.StringUtils.join; -import static org.apache.hadoop.hive.metastore.api.hive_metastoreConstants.META_TABLE_STORAGE; /** * DDLTask implementation. http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java index e2f88cf..e6ab88f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/MoveTask.java @@ -315,6 +315,7 @@ public class MoveTask extends Task<MoveWork> implements Serializable { } } } + // Multi-file load is for dynamic partitions when some partitions do not // need to merge and they can simply be moved to the target directory. // This is also used for MM table conversion. http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java index 605bb09..f7eb711 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/optimizer/GenMapRedUtils.java @@ -1489,10 +1489,15 @@ public final class GenMapRedUtils { boolean truncate = false; if (mvWork.getLoadTableWork() != null) { statsWork = new BasicStatsWork(mvWork.getLoadTableWork()); - String tableName = mvWork.getLoadTableWork().getTable().getTableName(); truncate = mvWork.getLoadTableWork().getReplace(); + String tableName = mvWork.getLoadTableWork().getTable().getTableName(); try { - table = Hive.get().getTable(SessionState.get().getCurrentDatabase(), tableName); + // For partitioned CTAS, the table has not been created, but we can retrieve it + // from the loadTableWork. For rest of query types, we just retrieve it from + // metastore. + table = mvWork.getLoadTableWork().getMdTable() != null ? + mvWork.getLoadTableWork().getMdTable() : + Hive.get().getTable(SessionState.get().getCurrentDatabase(), tableName); } catch (HiveException e) { throw new RuntimeException("unexpected; table should be present already..: " + tableName, e); } http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g index d4a0ed3..b4c247a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/HiveParser.g @@ -213,6 +213,7 @@ TOK_TABCOLLIST; TOK_TABCOL; TOK_TABLECOMMENT; TOK_TABLEPARTCOLS; +TOK_TABLEPARTCOLNAMES; TOK_TABLEROWFORMAT; TOK_TABLEROWFORMATFIELD; TOK_TABLEROWFORMATCOLLITEMS; @@ -1075,7 +1076,7 @@ createTableStatement tablePropertiesPrefixed? | (LPAREN columnNameTypeOrConstraintList RPAREN)? tableComment? - tablePartition? + createTablePartitionSpec? tableBuckets? tableSkewed? tableRowFormat? @@ -1088,7 +1089,7 @@ createTableStatement ^(TOK_LIKETABLE $likeName?) columnNameTypeOrConstraintList? tableComment? - tablePartition? + createTablePartitionSpec? tableBuckets? tableSkewed? tableRowFormat? @@ -1973,13 +1974,28 @@ tableComment KW_COMMENT comment=StringLiteral -> ^(TOK_TABLECOMMENT $comment) ; -tablePartition -@init { pushMsg("table partition specification", state); } +createTablePartitionSpec +@init { pushMsg("create table partition specification", state); } @after { popMsg(state); } - : KW_PARTITIONED KW_BY LPAREN columnNameTypeConstraint (COMMA columnNameTypeConstraint)* RPAREN + : KW_PARTITIONED KW_BY LPAREN (opt1 = createTablePartitionColumnTypeSpec | opt2 = createTablePartitionColumnSpec) RPAREN + -> {$opt1.tree != null}? $opt1 + -> $opt2 + ; + +createTablePartitionColumnTypeSpec +@init { pushMsg("create table partition specification", state); } +@after { popMsg(state); } + : columnNameTypeConstraint (COMMA columnNameTypeConstraint)* -> ^(TOK_TABLEPARTCOLS columnNameTypeConstraint+) ; +createTablePartitionColumnSpec +@init { pushMsg("create table partition specification", state); } +@after { popMsg(state); } + : columnName (COMMA columnName)* + -> ^(TOK_TABLEPARTCOLNAMES columnName+) + ; + tableBuckets @init { pushMsg("table buckets specification", state); } @after { popMsg(state); } http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java index a804c15..355e76d 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/SemanticAnalyzer.java @@ -25,6 +25,7 @@ import com.google.common.collect.Iterables; import com.google.common.collect.Sets; import com.google.common.math.IntMath; import com.google.common.math.LongMath; + import org.antlr.runtime.ClassicToken; import org.antlr.runtime.CommonToken; import org.antlr.runtime.Token; @@ -36,7 +37,9 @@ import org.antlr.runtime.tree.TreeWizard; import org.antlr.runtime.tree.TreeWizard.ContextVisitor; import org.apache.calcite.rel.RelNode; import org.apache.calcite.util.ImmutableBitSet; +import org.apache.commons.collections.CollectionUtils; import org.apache.commons.lang.StringUtils; +import org.apache.commons.lang3.tuple.Pair; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; @@ -274,6 +277,8 @@ import java.util.Map.Entry; import java.util.Optional; import java.util.Queue; import java.util.Set; +import java.util.SortedMap; +import java.util.TreeMap; import java.util.TreeSet; import java.util.UUID; import java.util.function.Supplier; @@ -6938,7 +6943,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } private void genPartnCols(String dest, Operator input, QB qb, - TableDesc table_desc, Table dest_tab, SortBucketRSCtx ctx) throws SemanticException { + TableDesc table_desc, Table dest_tab, SortBucketRSCtx ctx) throws SemanticException { boolean enforceBucketing = false; ArrayList<ExprNodeDesc> partnColsNoConvert = new ArrayList<ExprNodeDesc>(); @@ -6971,13 +6976,13 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } @SuppressWarnings("unchecked") - private void setStatsForNonNativeTable(Table tab) throws SemanticException { - String tableName = DDLSemanticAnalyzer.getDotName(new String[] { tab.getDbName(), - tab.getTableName() }); + private void setStatsForNonNativeTable(String dbName, String tableName) throws SemanticException { + String qTableName = DDLSemanticAnalyzer.getDotName(new String[] { dbName, + tableName }); AlterTableDesc alterTblDesc = new AlterTableDesc(AlterTableTypes.DROPPROPS, null, false); HashMap<String, String> mapProp = new HashMap<>(); mapProp.put(StatsSetupConst.COLUMN_STATS_ACCURATE, null); - alterTblDesc.setOldName(tableName); + alterTblDesc.setOldName(qTableName); alterTblDesc.setProps(mapProp); alterTblDesc.setDropIfExists(true); this.rootTasks.add(TaskFactory.get(new DDLWork(getInputs(), getOutputs(), alterTblDesc))); @@ -7212,21 +7217,22 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { RowResolver inputRR = opParseCtx.get(input).getRowResolver(); QBMetaData qbm = qb.getMetaData(); - Integer dest_type = qbm.getDestTypeForAlias(dest); + Integer destType = qbm.getDestTypeForAlias(dest); - Table dest_tab = null; // destination table if any + Table destinationTable = null; // destination table if any boolean destTableIsTransactional; // true for full ACID table and MM table boolean destTableIsFullAcid; // should the destination table be written to using ACID boolean destTableIsTemporary = false; boolean destTableIsMaterialization = false; - Partition dest_part = null;// destination partition if any + Partition destinationPartition = null;// destination partition if any Path queryTmpdir = null; // the intermediate destination directory - Path dest_path = null; // the final destination directory - TableDesc table_desc = null; + Path destinationPath = null; // the final destination directory + TableDesc tableDescriptor = null; int currentTableId = 0; boolean isLocal = false; SortBucketRSCtx rsCtx = new SortBucketRSCtx(); DynamicPartitionCtx dpCtx = null; + Table partitionedCTASOrMVTable = null; // destination partitioned CTAS or MV table if any LoadTableDesc ltd = null; ListBucketingCtx lbCtx = null; Map<String, String> partSpec = null; @@ -7234,24 +7240,24 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { Long writeId = null; HiveTxnManager txnMgr = getTxnMgr(); - switch (dest_type.intValue()) { + switch (destType.intValue()) { case QBMetaData.DEST_TABLE: { - dest_tab = qbm.getDestTableForAlias(dest); - destTableIsTransactional = AcidUtils.isTransactionalTable(dest_tab); - destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); - destTableIsTemporary = dest_tab.isTemporary(); + destinationTable = qbm.getDestTableForAlias(dest); + destTableIsTransactional = AcidUtils.isTransactionalTable(destinationTable); + destTableIsFullAcid = AcidUtils.isFullAcidTable(destinationTable); + destTableIsTemporary = destinationTable.isTemporary(); // Is the user trying to insert into a external tables - checkExternalTable(dest_tab); + checkExternalTable(destinationTable); partSpec = qbm.getPartSpecForAlias(dest); - dest_path = dest_tab.getPath(); + destinationPath = destinationTable.getPath(); - checkImmutableTable(qb, dest_tab, dest_path, false); + checkImmutableTable(qb, destinationTable, destinationPath, false); // check for partition - List<FieldSchema> parts = dest_tab.getPartitionKeys(); + List<FieldSchema> parts = destinationTable.getPartitionKeys(); if (parts != null && parts.size() > 0) { // table is partitioned if (partSpec == null || partSpec.size() == 0) { // user did NOT specify partition throw new SemanticException(generateErrorMessage( @@ -7260,8 +7266,8 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } dpCtx = qbm.getDPCtx(dest); if (dpCtx == null) { - dest_tab.validatePartColumnNames(partSpec, false); - dpCtx = new DynamicPartitionCtx(dest_tab, partSpec, + destinationTable.validatePartColumnNames(partSpec, false); + dpCtx = new DynamicPartitionCtx(partSpec, conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); qbm.setDPCtx(dest, dpCtx); @@ -7269,76 +7275,76 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } // Check for dynamic partitions. - dpCtx = checkDynPart(qb, qbm, dest_tab, partSpec, dest); + dpCtx = checkDynPart(qb, qbm, destinationTable, partSpec, dest); if (dpCtx != null && dpCtx.getSPPath() != null) { - dest_path = new Path(dest_tab.getPath(), dpCtx.getSPPath()); + destinationPath = new Path(destinationTable.getPath(), dpCtx.getSPPath()); } - boolean isNonNativeTable = dest_tab.isNonNative(); - isMmTable = AcidUtils.isInsertOnlyTable(dest_tab.getParameters()); + boolean isNonNativeTable = destinationTable.isNonNative(); + isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters()); if (isNonNativeTable || isMmTable) { - queryTmpdir = dest_path; + queryTmpdir = destinationPath; } else { - queryTmpdir = ctx.getTempDirForFinalJobPath(dest_path); + queryTmpdir = ctx.getTempDirForFinalJobPath(destinationPath); } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_TABLE specifying " + queryTmpdir - + " from " + dest_path); + + " from " + destinationPath); } if (dpCtx != null) { // set the root of the temporary path where dynamic partition columns will populate dpCtx.setRootPath(queryTmpdir); } // this table_desc does not contain the partitioning columns - table_desc = Utilities.getTableDesc(dest_tab); + tableDescriptor = Utilities.getTableDesc(destinationTable); // Add NOT NULL constraint check input = genConstraintsPlan(dest, qb, input); // Add sorting/bucketing if needed - input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx); + input = genBucketingSortingDest(dest, input, qb, tableDescriptor, destinationTable, rsCtx); - idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName()); + idToTableNameMap.put(String.valueOf(destTableId), destinationTable.getTableName()); currentTableId = destTableId; destTableId++; - lbCtx = constructListBucketingCtx(dest_tab.getSkewedColNames(), - dest_tab.getSkewedColValues(), dest_tab.getSkewedColValueLocationMaps(), - dest_tab.isStoredAsSubDirectories(), conf); + lbCtx = constructListBucketingCtx(destinationTable.getSkewedColNames(), + destinationTable.getSkewedColValues(), destinationTable.getSkewedColValueLocationMaps(), + destinationTable.isStoredAsSubDirectories(), conf); // Create the work for moving the table // NOTE: specify Dynamic partitions in dest_tab for WriteEntity if (!isNonNativeTable) { AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { - acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); + acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM - checkAcidConstraints(qb, table_desc, dest_tab); + checkAcidConstraints(qb, tableDescriptor, destinationTable); } try { if (ctx.getExplainConfig() != null) { writeId = null; // For explain plan, txn won't be opened and doesn't make sense to allocate write id } else { if (isMmTable) { - writeId = txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName()); + writeId = txnMgr.getTableWriteId(destinationTable.getDbName(), destinationTable.getTableName()); } else { writeId = acidOp == Operation.NOT_ACID ? null : - txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName()); + txnMgr.getTableWriteId(destinationTable.getDbName(), destinationTable.getTableName()); } } } catch (LockException ex) { throw new SemanticException("Failed to allocate write Id", ex); } boolean isReplace = !qb.getParseInfo().isInsertIntoTable( - dest_tab.getDbName(), dest_tab.getTableName()); - ltd = new LoadTableDesc(queryTmpdir, table_desc, dpCtx, acidOp, isReplace, writeId); + destinationTable.getDbName(), destinationTable.getTableName()); + ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, dpCtx, acidOp, isReplace, writeId); if (writeId != null) { ltd.setStmtId(txnMgr.getCurrentStmtId()); } // For Acid table, Insert Overwrite shouldn't replace the table content. We keep the old // deltas and base and leave them up to the cleaner to clean up boolean isInsertInto = qb.getParseInfo().isInsertIntoTable( - dest_tab.getDbName(), dest_tab.getTableName()); + destinationTable.getDbName(), destinationTable.getTableName()); LoadFileType loadType = (!isInsertInto && !destTableIsTransactional) ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING; ltd.setLoadFileType(loadType); @@ -7348,88 +7354,88 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } else { // This is a non-native table. // We need to set stats as inaccurate. - setStatsForNonNativeTable(dest_tab); + setStatsForNonNativeTable(destinationTable.getDbName(), destinationTable.getTableName()); // true if it is insert overwrite. boolean overwrite = !qb.getParseInfo().isInsertIntoTable( - String.format("%s.%s", dest_tab.getDbName(), dest_tab.getTableName())); - createPreInsertDesc(dest_tab, overwrite); + String.format("%s.%s", destinationTable.getDbName(), destinationTable.getTableName())); + createPreInsertDesc(destinationTable, overwrite); - ltd = new LoadTableDesc(queryTmpdir, table_desc, partSpec == null ? ImmutableMap.of() : partSpec); + ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, partSpec == null ? ImmutableMap.of() : partSpec); ltd.setInsertOverwrite(overwrite); ltd.setLoadFileType(overwrite ? LoadFileType.REPLACE_ALL : LoadFileType.KEEP_EXISTING); } - if (dest_tab.isMaterializedView()) { + if (destinationTable.isMaterializedView()) { materializedViewUpdateDesc = new MaterializedViewDesc( - dest_tab.getFullyQualifiedName(), false, false, true); + destinationTable.getFullyQualifiedName(), false, false, true); } WriteEntity output = generateTableWriteEntity( - dest, dest_tab, partSpec, ltd, dpCtx, isNonNativeTable); + dest, destinationTable, partSpec, ltd, dpCtx, isNonNativeTable); ctx.getLoadTableOutputMap().put(ltd, output); break; } case QBMetaData.DEST_PARTITION: { - dest_part = qbm.getDestPartitionForAlias(dest); - dest_tab = dest_part.getTable(); - destTableIsTransactional = AcidUtils.isTransactionalTable(dest_tab); - destTableIsFullAcid = AcidUtils.isFullAcidTable(dest_tab); + destinationPartition = qbm.getDestPartitionForAlias(dest); + destinationTable = destinationPartition.getTable(); + destTableIsTransactional = AcidUtils.isTransactionalTable(destinationTable); + destTableIsFullAcid = AcidUtils.isFullAcidTable(destinationTable); - checkExternalTable(dest_tab); + checkExternalTable(destinationTable); - Path tabPath = dest_tab.getPath(); - Path partPath = dest_part.getDataLocation(); + Path tabPath = destinationTable.getPath(); + Path partPath = destinationPartition.getDataLocation(); - checkImmutableTable(qb, dest_tab, partPath, true); + checkImmutableTable(qb, destinationTable, partPath, true); // if the table is in a different dfs than the partition, // replace the partition's dfs with the table's dfs. - dest_path = new Path(tabPath.toUri().getScheme(), tabPath.toUri() + destinationPath = new Path(tabPath.toUri().getScheme(), tabPath.toUri() .getAuthority(), partPath.toUri().getPath()); - isMmTable = AcidUtils.isInsertOnlyTable(dest_tab.getParameters()); - queryTmpdir = isMmTable ? dest_path : ctx.getTempDirForFinalJobPath(dest_path); + isMmTable = AcidUtils.isInsertOnlyTable(destinationTable.getParameters()); + queryTmpdir = isMmTable ? destinationPath : ctx.getTempDirForFinalJobPath(destinationPath); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("create filesink w/DEST_PARTITION specifying " - + queryTmpdir + " from " + dest_path); + + queryTmpdir + " from " + destinationPath); } - table_desc = Utilities.getTableDesc(dest_tab); + tableDescriptor = Utilities.getTableDesc(destinationTable); // Add NOT NULL constraint check input = genConstraintsPlan(dest, qb, input); // Add sorting/bucketing if needed - input = genBucketingSortingDest(dest, input, qb, table_desc, dest_tab, rsCtx); + input = genBucketingSortingDest(dest, input, qb, tableDescriptor, destinationTable, rsCtx); - idToTableNameMap.put(String.valueOf(destTableId), dest_tab.getTableName()); + idToTableNameMap.put(String.valueOf(destTableId), destinationTable.getTableName()); currentTableId = destTableId; destTableId++; - lbCtx = constructListBucketingCtx(dest_part.getSkewedColNames(), - dest_part.getSkewedColValues(), dest_part.getSkewedColValueLocationMaps(), - dest_part.isStoredAsSubDirectories(), conf); + lbCtx = constructListBucketingCtx(destinationPartition.getSkewedColNames(), + destinationPartition.getSkewedColValues(), destinationPartition.getSkewedColValueLocationMaps(), + destinationPartition.isStoredAsSubDirectories(), conf); AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; if (destTableIsFullAcid) { - acidOp = getAcidType(table_desc.getOutputFileFormatClass(), dest); + acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM? - checkAcidConstraints(qb, table_desc, dest_tab); + checkAcidConstraints(qb, tableDescriptor, destinationTable); } try { if (ctx.getExplainConfig() != null) { writeId = 0L; // For explain plan, txn won't be opened and doesn't make sense to allocate write id } else { if (isMmTable) { - writeId = txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName()); + writeId = txnMgr.getTableWriteId(destinationTable.getDbName(), destinationTable.getTableName()); } else { writeId = (acidOp == Operation.NOT_ACID) ? null : - txnMgr.getTableWriteId(dest_tab.getDbName(), dest_tab.getTableName()); + txnMgr.getTableWriteId(destinationTable.getDbName(), destinationTable.getTableName()); } } } catch (LockException ex) { throw new SemanticException("Failed to allocate write Id", ex); } - ltd = new LoadTableDesc(queryTmpdir, table_desc, dest_part.getSpec(), acidOp, writeId); + ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, destinationPartition.getSpec(), acidOp, writeId); if (writeId != null) { ltd.setStmtId(txnMgr.getCurrentStmtId()); } @@ -7445,11 +7451,11 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ltd.setLbCtx(lbCtx); loadTableWork.add(ltd); - if (!outputs.add(new WriteEntity(dest_part, - determineWriteType(ltd, dest_tab.isNonNative(), dest)))) { + if (!outputs.add(new WriteEntity(destinationPartition, + determineWriteType(ltd, destinationTable.isNonNative(), dest)))) { throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES - .getMsg(dest_tab.getTableName() + "@" + dest_part.getName())); + .getMsg(destinationTable.getTableName() + "@" + destinationPartition.getName())); } break; } @@ -7457,18 +7463,21 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { isLocal = true; // fall through case QBMetaData.DEST_DFS_FILE: { - dest_path = new Path(qbm.getDestFileForAlias(dest)); - - ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos(); + destinationPath = new Path(qbm.getDestFileForAlias(dest)); // CTAS case: the file output format and serde are defined by the create // table command rather than taking the default value - List<FieldSchema> field_schemas = null; + List<FieldSchema> fieldSchemas = null; + List<FieldSchema> partitionColumns = null; + List<String> partitionColumnNames = null; + List<ColumnInfo> fileSinkColInfos = null; CreateTableDesc tblDesc = qb.getTableDesc(); CreateViewDesc viewDesc = qb.getViewDesc(); - boolean isCtas = false; if (tblDesc != null) { - field_schemas = new ArrayList<FieldSchema>(); + fieldSchemas = new ArrayList<>(); + partitionColumns = new ArrayList<>(); + partitionColumnNames = tblDesc.getPartColNames(); + fileSinkColInfos = new ArrayList<>(); destTableIsTemporary = tblDesc.isTemporary(); destTableIsMaterialization = tblDesc.isMaterialization(); if (AcidUtils.isInsertOnlyTable(tblDesc.getTblProps(), true)) { @@ -7485,7 +7494,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { tblDesc.setInitialMmWriteId(writeId); } } else if (viewDesc != null) { - field_schemas = new ArrayList<FieldSchema>(); + fieldSchemas = new ArrayList<>(); + partitionColumns = new ArrayList<>(); + partitionColumnNames = viewDesc.getPartColNames(); + fileSinkColInfos = new ArrayList<>(); destTableIsTemporary = false; } @@ -7498,55 +7510,73 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // otherwise write to the file system implied by the directory // no copy is required. we may want to revisit this policy in future try { - Path qPath = FileUtils.makeQualified(dest_path, conf); + Path qPath = FileUtils.makeQualified(destinationPath, conf); queryTmpdir = isMmTable ? qPath : ctx.getTempDirForFinalJobPath(qPath); if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("Setting query directory " + queryTmpdir - + " from " + dest_path + " (" + isMmTable + ")"); + + " from " + destinationPath + " (" + isMmTable + ")"); } } catch (Exception e) { throw new SemanticException("Error creating temporary folder on: " - + dest_path, e); + + destinationPath, e); } } - ColsAndTypes ct = deriveFileSinkColTypes(inputRR, field_schemas); - String cols = ct.cols, colTypes = ct.colTypes; + // Check for dynamic partitions. + final String cols, colTypes; + final boolean isPartitioned; + if (dpCtx != null) { + throw new SemanticException("Dynamic partition context has already been created, this should not happen"); + } + if (!CollectionUtils.isEmpty(partitionColumnNames)) { + ColsAndTypes ct = deriveFileSinkColTypes( + inputRR, partitionColumnNames, fieldSchemas, partitionColumns, + fileSinkColInfos); + cols = ct.cols; + colTypes = ct.colTypes; + dpCtx = new DynamicPartitionCtx(partitionColumnNames, + conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), + conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); + qbm.setDPCtx(dest, dpCtx); + // set the root of the temporary path where dynamic partition columns will populate + dpCtx.setRootPath(queryTmpdir); + isPartitioned = true; + } else { + ColsAndTypes ct = deriveFileSinkColTypes(inputRR, fieldSchemas); + cols = ct.cols; + colTypes = ct.colTypes; + isPartitioned = false; + } // update the create table descriptor with the resulting schema. if (tblDesc != null) { - tblDesc.setCols(new ArrayList<FieldSchema>(field_schemas)); + tblDesc.setCols(new ArrayList<>(fieldSchemas)); + tblDesc.setPartCols(new ArrayList<>(partitionColumns)); } else if (viewDesc != null) { - viewDesc.setSchema(new ArrayList<FieldSchema>(field_schemas)); + viewDesc.setSchema(new ArrayList<>(fieldSchemas)); + viewDesc.setPartCols(new ArrayList<>(partitionColumns)); } destTableIsTransactional = tblDesc != null && AcidUtils.isTransactionalTable(tblDesc); destTableIsFullAcid = tblDesc != null && AcidUtils.isFullAcidTable(tblDesc); boolean isDestTempFile = true; - if (!ctx.isMRTmpFileURI(dest_path.toUri().toString())) { - idToTableNameMap.put(String.valueOf(destTableId), dest_path.toUri().toString()); + if (!ctx.isMRTmpFileURI(destinationPath.toUri().toString())) { + idToTableNameMap.put(String.valueOf(destTableId), destinationPath.toUri().toString()); currentTableId = destTableId; destTableId++; isDestTempFile = false; } - boolean isDfsDir = (dest_type.intValue() == QBMetaData.DEST_DFS_FILE); - // Create LFD even for MM CTAS - it's a no-op move, but it still seems to be used for stats. - loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, dest_path, isDfsDir, cols, - colTypes, - destTableIsFullAcid ?//there is a change here - prev version had 'transadtional', one beofre' acid' - Operation.INSERT : Operation.NOT_ACID, - isMmCtas)); if (tblDesc == null) { if (viewDesc != null) { - table_desc = PlanUtils.getTableDesc(viewDesc, cols, colTypes); + tableDescriptor = PlanUtils.getTableDesc(viewDesc, cols, colTypes); } else if (qb.getIsQuery()) { String fileFormat; if (SessionState.get().getIsUsingThriftJDBCBinarySerDe()) { fileFormat = "SequenceFile"; HiveConf.setVar(conf, HiveConf.ConfVars.HIVEQUERYRESULTFILEFORMAT, fileFormat); - table_desc= + tableDescriptor = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, ThriftJDBCBinarySerDe.class); // Set the fetch formatter to be a no-op for the ListSinkOperator, since we'll @@ -7563,29 +7593,97 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { serdeClass = LazyBinarySerDe2.class; } } - table_desc = + tableDescriptor = PlanUtils.getDefaultQueryOutputTableDesc(cols, colTypes, fileFormat, serdeClass); } } else { - table_desc = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes); + tableDescriptor = PlanUtils.getDefaultTableDesc(qb.getDirectoryDesc(), cols, colTypes); } } else { - table_desc = PlanUtils.getTableDesc(tblDesc, cols, colTypes); + tableDescriptor = PlanUtils.getTableDesc(tblDesc, cols, colTypes); } - if (!outputs.add(new WriteEntity(dest_path, !isDfsDir, isDestTempFile))) { - throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES - .getMsg(dest_path.toUri().toString())); + boolean isDfsDir = (destType.intValue() == QBMetaData.DEST_DFS_FILE); + + if (isPartitioned) { + // Create a SELECT that may reorder the columns if needed + RowResolver rowResolver = new RowResolver(); + List<ExprNodeDesc> columnExprs = new ArrayList<>(); + List<String> colNames = new ArrayList<>(); + Map<String, ExprNodeDesc> colExprMap = new HashMap<>(); + for (int i = 0; i < fileSinkColInfos.size(); i++) { + ColumnInfo ci = fileSinkColInfos.get(i); + ExprNodeDesc columnExpr = new ExprNodeColumnDesc(ci); + String name = getColumnInternalName(i); + rowResolver.put("", name, new ColumnInfo(name, columnExpr.getTypeInfo(), "", false)); + columnExprs.add(columnExpr); + colNames.add(name); + colExprMap.put(name, columnExpr); + } + input = putOpInsertMap(OperatorFactory.getAndMakeChild( + new SelectDesc(columnExprs, colNames), new RowSchema(rowResolver + .getColumnInfos()), input), rowResolver); + input.setColumnExprMap(colExprMap); + // If this is a partitioned CTAS or MV statement, we are going to create a LoadTableDesc + // object. Although the table does not exist in metastore, we will swamp the CreateTableTask + // and MoveTask resulting from this LoadTable so in this specific case, first we create + // the metastore table, then we move and commit the partitions. At least for the time being, + // this order needs to be enforced because metastore expects a table to exist before we can + // add any partitions to it. + boolean isNonNativeTable = tableDescriptor.isNonNative(); + if (!isNonNativeTable) { + AcidUtils.Operation acidOp = AcidUtils.Operation.NOT_ACID; + if (destTableIsFullAcid) { + acidOp = getAcidType(tableDescriptor.getOutputFileFormatClass(), dest); + //todo: should this be done for MM? is it ok to use CombineHiveInputFormat with MM + checkAcidConstraints(qb, tableDescriptor, null); + } + // isReplace = false in case concurrent operation is executed + ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, dpCtx, acidOp, false, writeId); + if (writeId != null) { + ltd.setStmtId(txnMgr.getCurrentStmtId()); + } + ltd.setLoadFileType(LoadFileType.KEEP_EXISTING); + ltd.setInsertOverwrite(false); + loadTableWork.add(ltd); + } else { + // This is a non-native table. + // We need to set stats as inaccurate. + setStatsForNonNativeTable(tableDescriptor.getDbName(), tableDescriptor.getTableName()); + ltd = new LoadTableDesc(queryTmpdir, tableDescriptor, dpCtx.getPartSpec()); + ltd.setInsertOverwrite(false); + ltd.setLoadFileType(LoadFileType.KEEP_EXISTING); + } + try { + partitionedCTASOrMVTable = tblDesc != null ? tblDesc.toTable(conf) : viewDesc.toTable(conf); + ltd.setMdTable(partitionedCTASOrMVTable); + WriteEntity output = generateTableWriteEntity( + dest, partitionedCTASOrMVTable, dpCtx.getPartSpec(), ltd, dpCtx, isNonNativeTable); + ctx.getLoadTableOutputMap().put(ltd, output); + } catch (HiveException e) { + throw new SemanticException(e); + } + } else { + // Create LFD even for MM CTAS - it's a no-op move, but it still seems to be used for stats. + loadFileWork.add(new LoadFileDesc(tblDesc, viewDesc, queryTmpdir, destinationPath, isDfsDir, cols, + colTypes, + destTableIsFullAcid ?//there is a change here - prev version had 'transactional', one before 'acid' + Operation.INSERT : Operation.NOT_ACID, + isMmCtas)); + if (!outputs.add(new WriteEntity(destinationPath, !isDfsDir, isDestTempFile))) { + throw new SemanticException(ErrorMsg.OUTPUT_SPECIFIED_MULTIPLE_TIMES + .getMsg(destinationPath.toUri().toString())); + } } break; } default: - throw new SemanticException("Unknown destination type: " + dest_type); + throw new SemanticException("Unknown destination type: " + destType); } - if (!(dest_type.intValue() == QBMetaData.DEST_DFS_FILE && qb.getIsQuery())) { - input = genConversionSelectOperator(dest, qb, input, table_desc, dpCtx); + if (!(destType.intValue() == QBMetaData.DEST_DFS_FILE && qb.getIsQuery())) { + input = genConversionSelectOperator(dest, qb, input, tableDescriptor, dpCtx); } inputRR = opParseCtx.get(input).getRowResolver(); @@ -7597,7 +7695,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { "", true)); } else { try { - StructObjectInspector rowObjectInspector = (StructObjectInspector) table_desc + StructObjectInspector rowObjectInspector = (StructObjectInspector) tableDescriptor .getDeserializer(conf).getObjectInspector(); List<? extends StructField> fields = rowObjectInspector .getAllStructFieldRefs(); @@ -7616,22 +7714,22 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { // The output files of a FileSink can be merged if they are either not being written to a table // or are being written to a table which is not bucketed // and table the table is not sorted - boolean canBeMerged = (dest_tab == null || !((dest_tab.getNumBuckets() > 0) || - (dest_tab.getSortCols() != null && dest_tab.getSortCols().size() > 0))); + boolean canBeMerged = (destinationTable == null || !((destinationTable.getNumBuckets() > 0) || + (destinationTable.getSortCols() != null && destinationTable.getSortCols().size() > 0))); // If this table is working with ACID semantics, turn off merging canBeMerged &= !destTableIsFullAcid; // Generate the partition columns from the parent input - if (dest_type.intValue() == QBMetaData.DEST_TABLE - || dest_type.intValue() == QBMetaData.DEST_PARTITION) { - genPartnCols(dest, input, qb, table_desc, dest_tab, rsCtx); + if (destType.intValue() == QBMetaData.DEST_TABLE + || destType.intValue() == QBMetaData.DEST_PARTITION) { + genPartnCols(dest, input, qb, tableDescriptor, destinationTable, rsCtx); } - FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, table_desc, dest_part, - dest_path, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid + FileSinkDesc fileSinkDesc = createFileSinkDesc(dest, tableDescriptor, destinationPartition, + destinationPath, currentTableId, destTableIsFullAcid, destTableIsTemporary,//this was 1/4 acid destTableIsMaterialization, queryTmpdir, rsCtx, dpCtx, lbCtx, fsRS, - canBeMerged, dest_tab, writeId, isMmCtas, dest_type, qb); + canBeMerged, destinationTable, writeId, isMmCtas, destType, qb); if (isMmCtas) { // Add FSD so that the LoadTask compilation could fix up its path to avoid the move. tableDesc.setWriter(fileSinkDesc); @@ -7642,7 +7740,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { ltd.setInsertOverwrite(true); } } - if (null != table_desc && useBatchingSerializer(table_desc.getSerdeClassName())) { + if (null != tableDescriptor && useBatchingSerializer(tableDescriptor.getSerdeClassName())) { fileSinkDesc.setIsUsingBatchingSerDe(true); } else { fileSinkDesc.setIsUsingBatchingSerDe(false); @@ -7655,26 +7753,24 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { if (LOG.isDebugEnabled()) { LOG.debug("Created FileSink Plan for clause: " + dest + "dest_path: " - + dest_path + " row schema: " + inputRR.toString()); + + destinationPath + " row schema: " + inputRR.toString()); } FileSinkOperator fso = (FileSinkOperator) output; - fso.getConf().setTable(dest_tab); + fso.getConf().setTable(destinationTable); // the following code is used to collect column stats when // hive.stats.autogather=true // and it is an insert overwrite or insert into table - if (dest_tab != null - && !dest_tab.isNonNative() - && conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) + if (conf.getBoolVar(ConfVars.HIVESTATSAUTOGATHER) && conf.getBoolVar(ConfVars.HIVESTATSCOLAUTOGATHER) && ColumnStatsAutoGatherContext.canRunAutogatherStats(fso)) { - if (dest_type.intValue() == QBMetaData.DEST_TABLE) { - genAutoColumnStatsGatheringPipeline(qb, table_desc, partSpec, input, qb.getParseInfo() - .isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); - } else if (dest_type.intValue() == QBMetaData.DEST_PARTITION) { - genAutoColumnStatsGatheringPipeline(qb, table_desc, dest_part.getSpec(), input, qb - .getParseInfo().isInsertIntoTable(dest_tab.getDbName(), dest_tab.getTableName())); - + // TODO: Column stats autogather does not work for CTAS statements + if (destType.intValue() == QBMetaData.DEST_TABLE && !destinationTable.isNonNative()) { + genAutoColumnStatsGatheringPipeline(qb, destinationTable, partSpec, input, qb.getParseInfo() + .isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName())); + } else if (destType.intValue() == QBMetaData.DEST_PARTITION && !destinationTable.isNonNative()) { + genAutoColumnStatsGatheringPipeline(qb, destinationTable, destinationPartition.getSpec(), input, qb + .getParseInfo().isInsertIntoTable(destinationTable.getDbName(), destinationTable.getTableName())); } } return output; @@ -7691,61 +7787,101 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { serdeClassName.equalsIgnoreCase(ArrowColumnarBatchSerDe.class.getName()); } + private ColsAndTypes deriveFileSinkColTypes(RowResolver inputRR, List<FieldSchema> field_schemas) + throws SemanticException { + return deriveFileSinkColTypes(inputRR, new ArrayList<>(), field_schemas, new ArrayList<>(), new ArrayList<>()); + } + private ColsAndTypes deriveFileSinkColTypes( - RowResolver inputRR, List<FieldSchema> field_schemas) throws SemanticException { + RowResolver inputRR, List<String> partitionColumnNames, + List<FieldSchema> columns, List<FieldSchema> partitionColumns, + List<ColumnInfo> fileSinkColInfos) throws SemanticException { ColsAndTypes result = new ColsAndTypes("", ""); - ArrayList<ColumnInfo> colInfos = inputRR.getColumnInfos(); + List<String> allColumns = new ArrayList<>(); + List<ColumnInfo> colInfos = inputRR.getColumnInfos(); + List<ColumnInfo> nonPartColInfos = new ArrayList<>(); + SortedMap<Integer, Pair<FieldSchema, ColumnInfo>> partColInfos = new TreeMap<>(); boolean first = true; - for (ColumnInfo colInfo : colInfos) { + int numNonPartitionedCols = colInfos.size() - partitionColumnNames.size(); + if (numNonPartitionedCols <= 0) { + throw new SemanticException("Too many partition columns declared"); + } + for (int i = 0; i < colInfos.size(); i++) { + ColumnInfo colInfo = colInfos.get(i); String[] nm = inputRR.reverseLookup(colInfo.getInternalName()); if (nm[1] != null) { // non-null column alias colInfo.setAlias(nm[1]); } + boolean isPartitionCol = false; String colName = colInfo.getInternalName(); //default column name - if (field_schemas != null) { + if (columns != null) { FieldSchema col = new FieldSchema(); if (!("".equals(nm[0])) && nm[1] != null) { colName = unescapeIdentifier(colInfo.getAlias()).toLowerCase(); // remove `` } colName = fixCtasColumnName(colName); col.setName(colName); + allColumns.add(colName); String typeName = colInfo.getType().getTypeName(); // CTAS should NOT create a VOID type if (typeName.equals(serdeConstants.VOID_TYPE_NAME)) { throw new SemanticException(ErrorMsg.CTAS_CREATES_VOID_TYPE.getMsg(colName)); } col.setType(typeName); - field_schemas.add(col); - } - - if (!first) { - result.cols = result.cols.concat(","); - result.colTypes = result.colTypes.concat(":"); - } - - first = false; - result.cols = result.cols.concat(colName); - - // Replace VOID type with string when the output is a temp table or - // local files. - // A VOID type can be generated under the query: - // - // select NULL from tt; - // or - // insert overwrite local directory "abc" select NULL from tt; - // - // where there is no column type to which the NULL value should be - // converted. - // - String tName = colInfo.getType().getTypeName(); - if (tName.equals(serdeConstants.VOID_TYPE_NAME)) { - result.colTypes = result.colTypes.concat(serdeConstants.STRING_TYPE_NAME); - } else { - result.colTypes = result.colTypes.concat(tName); + int idx = partitionColumnNames.indexOf(colName); + if (idx >= 0) { + partColInfos.put(idx, Pair.of(col, colInfo)); + isPartitionCol = true; + } else { + columns.add(col); + nonPartColInfos.add(colInfo); + } + } + + if (!isPartitionCol) { + if (!first) { + result.cols = result.cols.concat(","); + result.colTypes = result.colTypes.concat(":"); + } + + first = false; + result.cols = result.cols.concat(colName); + + // Replace VOID type with string when the output is a temp table or + // local files. + // A VOID type can be generated under the query: + // + // select NULL from tt; + // or + // insert overwrite local directory "abc" select NULL from tt; + // + // where there is no column type to which the NULL value should be + // converted. + // + String tName = colInfo.getType().getTypeName(); + if (tName.equals(serdeConstants.VOID_TYPE_NAME)) { + result.colTypes = result.colTypes.concat(serdeConstants.STRING_TYPE_NAME); + } else { + result.colTypes = result.colTypes.concat(tName); + } } + + } + + if (partColInfos.size() != partitionColumnNames.size()) { + throw new SemanticException("Table declaration contains partition columns that are not present " + + "in query result schema. " + + "Query columns: " + allColumns + ". " + + "Partition columns: " + partitionColumnNames); } + + // FileSinkColInfos comprise nonPartCols followed by partCols + fileSinkColInfos.addAll(nonPartColInfos); + partitionColumns.addAll(partColInfos.values().stream().map(Pair::getLeft).collect(Collectors.toList())); + fileSinkColInfos.addAll(partColInfos.values().stream().map(Pair::getRight).collect(Collectors.toList())); + return result; } @@ -7949,7 +8085,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { DynamicPartitionCtx dpCtx = qbm.getDPCtx(dest); if (dpCtx == null) { dest_tab.validatePartColumnNames(partSpec, false); - dpCtx = new DynamicPartitionCtx(dest_tab, partSpec, + dpCtx = new DynamicPartitionCtx(partSpec, conf.getVar(HiveConf.ConfVars.DEFAULTPARTITIONNAME), conf.getIntVar(HiveConf.ConfVars.DYNAMICPARTITIONMAXPARTSPERNODE)); qbm.setDPCtx(dest, dpCtx); @@ -7973,16 +8109,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } - private void genAutoColumnStatsGatheringPipeline(QB qb, TableDesc table_desc, + private void genAutoColumnStatsGatheringPipeline(QB qb, Table table, Map<String, String> partSpec, Operator curr, boolean isInsertInto) throws SemanticException { - String tableName = table_desc.getTableName(); - Table table = null; - try { - table = db.getTable(tableName); - } catch (HiveException e) { - throw new SemanticException(e.getMessage()); - } - LOG.info("Generate an operator pipeline to autogather column stats for table " + tableName + LOG.info("Generate an operator pipeline to autogather column stats for table " + table.getTableName() + " in query " + ctx.getCmd()); ColumnStatsAutoGatherContext columnStatsAutoGatherContext = null; columnStatsAutoGatherContext = new ColumnStatsAutoGatherContext(this, conf, curr, table, partSpec, isInsertInto, ctx); @@ -12981,6 +13110,7 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { String likeTableName = null; List<FieldSchema> cols = new ArrayList<FieldSchema>(); List<FieldSchema> partCols = new ArrayList<FieldSchema>(); + List<String> partColNames = new ArrayList<>(); List<String> bucketCols = new ArrayList<String>(); List<SQLPrimaryKey> primaryKeys = new ArrayList<SQLPrimaryKey>(); List<SQLForeignKey> foreignKeys = new ArrayList<SQLForeignKey>(); @@ -13096,6 +13226,9 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { "partition columns. ")); } break; + case HiveParser.TOK_TABLEPARTCOLNAMES: + partColNames = getColumnNames(child); + break; case HiveParser.TOK_ALTERTABLE_BUCKETS: bucketCols = getColumnNames((ASTNode) child.getChild(0)); if (child.getChildCount() == 2) { @@ -13201,6 +13334,10 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { switch (command_type) { case CREATE_TABLE: // REGULAR CREATE TABLE DDL + if (!CollectionUtils.isEmpty(partColNames)) { + throw new SemanticException( + "Partition columns can only declared using their name and types in regular CREATE TABLE statements"); + } tblProps = addDefaultProperties( tblProps, isExt, storageFormat, dbDotTab, sortCols, isMaterialization, isTemporary); addDbAndTabToOutputs(qualifiedTabName, TableType.MANAGED_TABLE, isTemporary, tblProps); @@ -13304,12 +13441,16 @@ public class SemanticAnalyzer extends BaseSemanticAnalyzer { } } + if (!CollectionUtils.isEmpty(partCols)) { + throw new SemanticException( + "Partition columns can only declared using their names in CTAS statements"); + } tblProps = addDefaultProperties( tblProps, isExt, storageFormat, dbDotTab, sortCols, isMaterialization, isTemporary); addDbAndTabToOutputs(qualifiedTabName, TableType.MANAGED_TABLE, isTemporary, tblProps); tableDesc = new CreateTableDesc(qualifiedTabName[0], dbDotTab, isExt, isTemporary, cols, - partCols, bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, + partColNames, bucketCols, sortCols, numBuckets, rowFormatParams.fieldDelim, rowFormatParams.fieldEscape, rowFormatParams.collItemDelim, rowFormatParams.mapKeyDelim, rowFormatParams.lineDelim, comment, storageFormat.getInputFormat(), storageFormat.getOutputFormat(), location, storageFormat.getSerde(), http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java index 49709e5..fcb8c55 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/TaskCompiler.java @@ -20,6 +20,8 @@ package org.apache.hadoop.hive.ql.parse; import com.google.common.collect.Interner; import com.google.common.collect.Interners; + +import org.apache.commons.collections.CollectionUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.HiveStatsUtils; import org.apache.hadoop.hive.conf.HiveConf; @@ -31,6 +33,7 @@ import org.apache.hadoop.hive.ql.QueryState; import org.apache.hadoop.hive.ql.exec.DDLTask; import org.apache.hadoop.hive.ql.exec.FetchTask; import org.apache.hadoop.hive.ql.exec.MaterializedViewDesc; +import org.apache.hadoop.hive.ql.exec.MoveTask; import org.apache.hadoop.hive.ql.exec.StatsTask; import org.apache.hadoop.hive.ql.exec.TableScanOperator; import org.apache.hadoop.hive.ql.exec.Task; @@ -325,13 +328,13 @@ public abstract class TaskCompiler { crtTblDesc.validate(conf); Task<? extends Serializable> crtTblTask = TaskFactory.get(new DDLWork( inputs, outputs, crtTblDesc)); - patchUpAfterCTASorMaterializedView(rootTasks, outputs, crtTblTask); + patchUpAfterCTASorMaterializedView(rootTasks, outputs, crtTblTask, CollectionUtils.isEmpty(crtTblDesc.getPartColNames())); } else if (pCtx.getQueryProperties().isMaterializedView()) { // generate a DDL task and make it a dependent task of the leaf CreateViewDesc viewDesc = pCtx.getCreateViewDesc(); Task<? extends Serializable> crtViewTask = TaskFactory.get(new DDLWork( inputs, outputs, viewDesc)); - patchUpAfterCTASorMaterializedView(rootTasks, outputs, crtViewTask); + patchUpAfterCTASorMaterializedView(rootTasks, outputs, crtViewTask, CollectionUtils.isEmpty(viewDesc.getPartColNames())); } else if (pCtx.getMaterializedViewUpdateDesc() != null) { // If there is a materialized view update desc, we create introduce it at the end // of the tree. @@ -454,9 +457,10 @@ public abstract class TaskCompiler { } } - private void patchUpAfterCTASorMaterializedView(final List<Task<? extends Serializable>> rootTasks, + private void patchUpAfterCTASorMaterializedView(final List<Task<? extends Serializable>> rootTasks, final HashSet<WriteEntity> outputs, - Task<? extends Serializable> createTask) { + Task<? extends Serializable> createTask, + boolean createTaskAfterMoveTask) { // clear the mapredWork output file from outputs for CTAS // DDLWork at the tail of the chain will have the output Iterator<WriteEntity> outIter = outputs.iterator(); @@ -475,18 +479,32 @@ public abstract class TaskCompiler { HashSet<Task<? extends Serializable>> leaves = new LinkedHashSet<>(); getLeafTasks(rootTasks, leaves); assert (leaves.size() > 0); + // Target task is supposed to be the last task Task<? extends Serializable> targetTask = createTask; for (Task<? extends Serializable> task : leaves) { if (task instanceof StatsTask) { // StatsTask require table to already exist for (Task<? extends Serializable> parentOfStatsTask : task.getParentTasks()) { - parentOfStatsTask.addDependentTask(createTask); + if (parentOfStatsTask instanceof MoveTask && !createTaskAfterMoveTask) { + // For partitioned CTAS, we need to create the table before the move task + // as we need to create the partitions in metastore and for that we should + // have already registered the table + interleaveTask(parentOfStatsTask, createTask); + } else { + parentOfStatsTask.addDependentTask(createTask); + } } for (Task<? extends Serializable> parentOfCrtTblTask : createTask.getParentTasks()) { parentOfCrtTblTask.removeDependentTask(task); } createTask.addDependentTask(task); targetTask = task; + } else if (task instanceof MoveTask && !createTaskAfterMoveTask) { + // For partitioned CTAS, we need to create the table before the move task + // as we need to create the partitions in metastore and for that we should + // have already registered the table + interleaveTask(task, createTask); + targetTask = task; } else { task.addDependentTask(createTask); } @@ -519,6 +537,19 @@ public abstract class TaskCompiler { } /** + * Makes dependentTask dependent of task. + */ + private void interleaveTask(Task<? extends Serializable> dependentTask, Task<? extends Serializable> task) { + for (Task<? extends Serializable> parentOfStatsTask : dependentTask.getParentTasks()) { + parentOfStatsTask.addDependentTask(task); + } + for (Task<? extends Serializable> parentOfCrtTblTask : task.getParentTasks()) { + parentOfCrtTblTask.removeDependentTask(dependentTask); + } + task.addDependentTask(dependentTask); + } + + /** * A helper function to generate a column stats task on top of map-red task. The column stats * task fetches from the output of the map-red task, constructs the column stats object and * persists it to the metastore. http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java index 871844b..0fadf1b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/CreateTableDesc.java @@ -72,6 +72,7 @@ public class CreateTableDesc extends DDLDesc implements Serializable { boolean isExternal; List<FieldSchema> cols; List<FieldSchema> partCols; + List<String> partColNames; List<String> bucketCols; List<Order> sortCols; int numBuckets; @@ -137,29 +138,28 @@ public class CreateTableDesc extends DDLDesc implements Serializable { } public CreateTableDesc(String databaseName, String tableName, boolean isExternal, boolean isTemporary, - List<FieldSchema> cols, List<FieldSchema> partCols, - List<String> bucketCols, List<Order> sortCols, int numBuckets, - String fieldDelim, String fieldEscape, String collItemDelim, - String mapKeyDelim, String lineDelim, String comment, String inputFormat, - String outputFormat, String location, String serName, - String storageHandler, - Map<String, String> serdeProps, - Map<String, String> tblProps, - boolean ifNotExists, List<String> skewedColNames, List<List<String>> skewedColValues, - boolean isCTAS, List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys, - List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints, - List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints) { - this(databaseName, tableName, isExternal, isTemporary, cols, partCols, - bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape, - collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat, - outputFormat, location, serName, storageHandler, serdeProps, - tblProps, ifNotExists, skewedColNames, skewedColValues, - primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints); + List<FieldSchema> cols, List<String> partColNames, + List<String> bucketCols, List<Order> sortCols, int numBuckets, + String fieldDelim, String fieldEscape, String collItemDelim, + String mapKeyDelim, String lineDelim, String comment, String inputFormat, + String outputFormat, String location, String serName, + String storageHandler, + Map<String, String> serdeProps, + Map<String, String> tblProps, + boolean ifNotExists, List<String> skewedColNames, List<List<String>> skewedColValues, + boolean isCTAS, List<SQLPrimaryKey> primaryKeys, List<SQLForeignKey> foreignKeys, + List<SQLUniqueConstraint> uniqueConstraints, List<SQLNotNullConstraint> notNullConstraints, + List<SQLDefaultConstraint> defaultConstraints, List<SQLCheckConstraint> checkConstraints) { + this(databaseName, tableName, isExternal, isTemporary, cols, new ArrayList<>(), + bucketCols, sortCols, numBuckets, fieldDelim, fieldEscape, + collItemDelim, mapKeyDelim, lineDelim, comment, inputFormat, + outputFormat, location, serName, storageHandler, serdeProps, + tblProps, ifNotExists, skewedColNames, skewedColValues, + primaryKeys, foreignKeys, uniqueConstraints, notNullConstraints, defaultConstraints, checkConstraints); + this.partColNames = partColNames; this.isCTAS = isCTAS; - } - public CreateTableDesc(String tableName, boolean isExternal, boolean isTemporary, List<FieldSchema> cols, List<FieldSchema> partCols, List<String> bucketCols, List<Order> sortCols, int numBuckets, @@ -257,6 +257,14 @@ public class CreateTableDesc extends DDLDesc implements Serializable { this.partCols = partCols; } + public List<String> getPartColNames() { + return partColNames; + } + + public void setPartColNames(ArrayList<String> partColNames) { + this.partColNames = partColNames; + } + public List<SQLPrimaryKey> getPrimaryKeys() { return primaryKeys; } http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java index 6af7833..c1aeb8f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/DynamicPartitionCtx.java @@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.plan; import java.io.Serializable; import java.util.ArrayList; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.regex.Pattern; @@ -28,7 +29,6 @@ import org.apache.hadoop.hive.conf.HiveConf.ConfVars; import org.apache.hadoop.hive.metastore.Warehouse; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; -import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.parse.SemanticException; public class DynamicPartitionCtx implements Serializable { @@ -54,7 +54,37 @@ public class DynamicPartitionCtx implements Serializable { public DynamicPartitionCtx() { } - public DynamicPartitionCtx(Table tbl, Map<String, String> partSpec, String defaultPartName, + /** + * This constructor is used for partitioned CTAS. Basically we pass the name of + * partitioned columns, which will all be dynamic partitions since the binding + * is done after executing the query in the CTAS. + */ + public DynamicPartitionCtx(List<String> partColNames, String defaultPartName, + int maxParts) throws SemanticException { + this.partSpec = new LinkedHashMap<>(); + this.spNames = new ArrayList<>(); + this.dpNames = new ArrayList<>(); + for (String colName : partColNames) { + this.partSpec.put(colName, null); + this.dpNames.add(colName); + } + this.numBuckets = 0; + this.maxPartsPerNode = maxParts; + this.defaultPartName = defaultPartName; + + this.numDPCols = dpNames.size(); + this.numSPCols = spNames.size(); + this.spPath = null; + String confVal; + try { + confVal = Hive.get().getMetaConf(ConfVars.METASTORE_PARTITION_NAME_WHITELIST_PATTERN.varname); + } catch (HiveException e) { + throw new SemanticException(e); + } + this.whiteListPattern = confVal == null || confVal.isEmpty() ? null : Pattern.compile(confVal); + } + + public DynamicPartitionCtx(Map<String, String> partSpec, String defaultPartName, int maxParts) throws SemanticException { this.partSpec = partSpec; this.spNames = new ArrayList<String>(); http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java ---------------------------------------------------------------------- diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java index f15b3c3..617e51b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/LoadTableDesc.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hive.ql.plan; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.ql.exec.Utilities; import org.apache.hadoop.hive.ql.io.AcidUtils; +import org.apache.hadoop.hive.ql.metadata.Table; import org.apache.hadoop.hive.ql.plan.Explain.Level; import java.io.Serializable; @@ -43,6 +44,7 @@ public class LoadTableDesc extends LoadDesc implements Serializable { private boolean isInsertOverwrite; // TODO: the below seem like they should just be combined into partitionDesc + private Table mdTable; private org.apache.hadoop.hive.ql.plan.TableDesc table; private Map<String, String> partitionSpec; // NOTE: this partitionSpec has to be ordered map @@ -244,4 +246,12 @@ public class LoadTableDesc extends LoadDesc implements Serializable { public void setStmtId(int stmtId) { this.stmtId = stmtId; } + + public Table getMdTable() { + return mdTable; + } + + public void setMdTable(Table mdTable) { + this.mdTable = mdTable; + } } http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java ---------------------------------------------------------------------- diff --git a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java index 71127c2..b369c96 100644 --- a/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java +++ b/ql/src/test/org/apache/hadoop/hive/ql/exec/TestFileSinkOperator.java @@ -283,7 +283,7 @@ public class TestFileSinkOperator { partCols.add(new ExprNodeColumnDesc(TypeInfoFactory.stringTypeInfo, PARTCOL_NAME, "a", true)); Map<String, String> partColMap= new LinkedHashMap<String, String>(1); partColMap.put(PARTCOL_NAME, null); - DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(null, partColMap, "Sunday", 100); + DynamicPartitionCtx dpCtx = new DynamicPartitionCtx(partColMap, "Sunday", 100); //todo: does this need the finalDestination? desc = new FileSinkDesc(basePath, tableDesc, false, 1, false, false, 1, 1, partCols, dpCtx, null, null, false, false); http://git-wip-us.apache.org/repos/asf/hive/blob/e96728c5/ql/src/test/queries/clientpositive/partition_ctas.q ---------------------------------------------------------------------- diff --git a/ql/src/test/queries/clientpositive/partition_ctas.q b/ql/src/test/queries/clientpositive/partition_ctas.q new file mode 100644 index 0000000..470b86e --- /dev/null +++ b/ql/src/test/queries/clientpositive/partition_ctas.q @@ -0,0 +1,51 @@ +--! qt:dataset:src + +EXPLAIN +CREATE TABLE partition_ctas_1 PARTITIONED BY (key) AS +SELECT value, key FROM src where key > 200 and key < 300; + +CREATE TABLE partition_ctas_1 PARTITIONED BY (key) AS +SELECT value, key FROM src where key > 200 and key < 300; + +DESCRIBE FORMATTED partition_ctas_1; + +EXPLAIN +SELECT * FROM partition_ctas_1 where key = 238; + +SELECT * FROM partition_ctas_1 where key = 238; + +CREATE TABLE partition_ctas_2 PARTITIONED BY (value) AS +SELECT key, value FROM src where key > 200 and key < 300; + +EXPLAIN +SELECT * FROM partition_ctas_2 where value = 'val_238'; + +SELECT * FROM partition_ctas_2 where value = 'val_238'; + +EXPLAIN +SELECT value FROM partition_ctas_2 where key = 238; + +SELECT value FROM partition_ctas_2 where key = 238; + +CREATE TABLE partition_ctas_diff_order PARTITIONED BY (value) AS +SELECT value, key FROM src where key > 200 and key < 300; + +EXPLAIN +SELECT * FROM partition_ctas_diff_order where value = 'val_238'; + +SELECT * FROM partition_ctas_diff_order where value = 'val_238'; + +CREATE TABLE partition_ctas_complex_order PARTITIONED BY (c0, c4, c1) AS +SELECT concat(value, '_0') as c0, + concat(value, '_1') as c1, + concat(value, '_2') as c2, + concat(value, '_3') as c3, + concat(value, '_5') as c5, + concat(value, '_4') as c4 +FROM src where key > 200 and key < 240; + +-- c2, c3, c5, c0, c4, c1 +EXPLAIN +SELECT * FROM partition_ctas_complex_order where c0 = 'val_238_0'; + +SELECT * FROM partition_ctas_complex_order where c0 = 'val_238_0';
