This is an automated email from the ASF dual-hosted git repository. jcamacho pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
commit c04fa7b3353d20f5cab4c9ed034b822dd11b2adb Author: Miklos Gergely <[email protected]> AuthorDate: Mon Jul 15 10:01:04 2019 -0700 HIVE-21984: Clean up TruncateTable operation and desc (Miklos Gergely, reviewed by Jesus Camacho Rodriguez) Close apache/hive#719 --- .../hive/ql/ddl/table/misc/TruncateTableDesc.java | 40 ++- .../ql/ddl/table/misc/TruncateTableOperation.java | 9 +- .../hadoop/hive/ql/parse/DDLSemanticAnalyzer.java | 277 +++++++++++---------- .../test/results/clientpositive/explain_ddl.q.out | 2 +- .../clientpositive/temp_table_truncate.q.out | 4 +- .../results/clientpositive/truncate_table.q.out | 12 +- 6 files changed, 179 insertions(+), 165 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/misc/TruncateTableDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/misc/TruncateTableDesc.java index 5f970e5..cf271fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/misc/TruncateTableDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/misc/TruncateTableDesc.java @@ -44,11 +44,10 @@ public class TruncateTableDesc implements DDLDescWithWriteId, Serializable { private final Map<String, String> partSpec; private final ReplicationSpec replicationSpec; private final boolean isTransactional; - - private List<Integer> columnIndexes; - private Path inputDir; - private Path outputDir; - private ListBucketingCtx lbCtx; + private final List<Integer> columnIndexes; + private final Path inputDir; + private final Path outputDir; + private final ListBucketingCtx lbCtx; private long writeId = 0; @@ -58,14 +57,23 @@ public class TruncateTableDesc implements DDLDescWithWriteId, Serializable { public TruncateTableDesc(String tableName, Map<String, String> partSpec, ReplicationSpec replicationSpec, Table table) { + this(tableName, partSpec, replicationSpec, table, null, null, null, null); + } + + public TruncateTableDesc(String tableName, Map<String, String> partSpec, ReplicationSpec replicationSpec, + Table table, List<Integer> columnIndexes, Path inputDir, Path outputDir, ListBucketingCtx lbCtx) { this.tableName = tableName; this.fullTableName = table == null ? tableName : TableName.getDbTable(table.getDbName(), table.getTableName()); this.partSpec = partSpec; this.replicationSpec = replicationSpec; this.isTransactional = AcidUtils.isTransactionalTable(table); + this.columnIndexes = columnIndexes; + this.inputDir = inputDir; + this.outputDir = outputDir; + this.lbCtx = lbCtx; } - @Explain(displayName = "TableName", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + @Explain(displayName = "table name", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public String getTableName() { return tableName; } @@ -75,7 +83,7 @@ public class TruncateTableDesc implements DDLDescWithWriteId, Serializable { return fullTableName; } - @Explain(displayName = "Partition Spec", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) + @Explain(displayName = "partition spec", explainLevels = { Level.USER, Level.DEFAULT, Level.EXTENDED }) public Map<String, String> getPartSpec() { return partSpec; } @@ -88,39 +96,23 @@ public class TruncateTableDesc implements DDLDescWithWriteId, Serializable { return replicationSpec; } - @Explain(displayName = "Column Indexes") + @Explain(displayName = "column indexes") public List<Integer> getColumnIndexes() { return columnIndexes; } - public void setColumnIndexes(List<Integer> columnIndexes) { - this.columnIndexes = columnIndexes; - } - public Path getInputDir() { return inputDir; } - public void setInputDir(Path inputDir) { - this.inputDir = inputDir; - } - public Path getOutputDir() { return outputDir; } - public void setOutputDir(Path outputDir) { - this.outputDir = outputDir; - } - public ListBucketingCtx getLbCtx() { return lbCtx; } - public void setLbCtx(ListBucketingCtx lbCtx) { - this.lbCtx = lbCtx; - } - @Override public void setWriteId(long writeId) { this.writeId = writeId; diff --git a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/misc/TruncateTableOperation.java b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/misc/TruncateTableOperation.java index d5f3885..1244380 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/misc/TruncateTableOperation.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/misc/TruncateTableOperation.java @@ -18,14 +18,12 @@ package org.apache.hadoop.hive.ql.ddl.table.misc; -import java.io.Serializable; import java.util.ArrayList; import java.util.Map; import org.apache.hadoop.hive.common.FileUtils; import org.apache.hadoop.hive.ql.ddl.DDLOperationContext; import org.apache.hadoop.hive.ql.ddl.DDLUtils; -import org.apache.hadoop.hive.ql.exec.Task; import org.apache.hadoop.hive.ql.DriverContext; import org.apache.hadoop.hive.ql.ErrorMsg; import org.apache.hadoop.hive.ql.ddl.DDLOperation; @@ -49,15 +47,16 @@ public class TruncateTableOperation extends DDLOperation<TruncateTableDesc> { desc.getOutputDir()); truncateWork.setListBucketingCtx(desc.getLbCtx()); truncateWork.setMapperCannotSpanPartns(true); + DriverContext driverCxt = new DriverContext(); ColumnTruncateTask taskExec = new ColumnTruncateTask(); taskExec.initialize(context.getQueryState(), null, driverCxt, null); taskExec.setWork(truncateWork); taskExec.setQueryPlan(context.getQueryPlan()); - Task<? extends Serializable> subtask = taskExec; + int ret = taskExec.execute(driverCxt); - if (subtask.getException() != null) { - context.getTask().setException(subtask.getException()); + if (taskExec.getException() != null) { + context.getTask().setException(taskExec.getException()); } return ret; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java index ff7f9a8..c013a1a 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/parse/DDLSemanticAnalyzer.java @@ -1488,11 +1488,29 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { String tableName = getUnescapedName((ASTNode) root.getChild(0)); Table table = getTable(tableName, true); + checkTruncateEligibility(ast, root, tableName, table); + + Map<String, String> partSpec = getPartSpec((ASTNode) root.getChild(1)); + addTruncateTableOutputs(root, table, partSpec); + + Task<?> truncateTask = null; + + // Is this a truncate column command + ASTNode colNamesNode = (ASTNode) ast.getFirstChildWithType(HiveParser.TOK_TABCOLNAME); + if (colNamesNode == null) { + truncateTask = getTruncateTaskWithoutColumnNames(tableName, partSpec, table); + } else { + truncateTask = getTruncateTaskWithColumnNames(root, tableName, table, partSpec, colNamesNode); + } + + rootTasks.add(truncateTask); + } + + private void checkTruncateEligibility(ASTNode ast, ASTNode root, String tableName, Table table) + throws SemanticException { boolean isForce = ast.getFirstChildWithType(HiveParser.TOK_FORCE) != null; - if (!isForce) { - if (table.getTableType() != TableType.MANAGED_TABLE) { - throw new SemanticException(ErrorMsg.TRUNCATE_FOR_NON_MANAGED_TABLE.format(tableName)); - } + if (!isForce && table.getTableType() != TableType.MANAGED_TABLE) { + throw new SemanticException(ErrorMsg.TRUNCATE_FOR_NON_MANAGED_TABLE.format(tableName)); } if (table.isNonNative()) { throw new SemanticException(ErrorMsg.TRUNCATE_FOR_NON_NATIVE_TABLE.format(tableName)); //TODO @@ -1500,7 +1518,10 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { if (!table.isPartitioned() && root.getChildCount() > 1) { throw new SemanticException(ErrorMsg.PARTSPEC_FOR_NON_PARTITIONED_TABLE.format(tableName)); } - Map<String, String> partSpec = getPartSpec((ASTNode) root.getChild(1)); + } + + private void addTruncateTableOutputs(ASTNode root, Table table, Map<String, String> partSpec) + throws SemanticException { if (partSpec == null) { if (!table.isPartitioned()) { outputs.add(new WriteEntity(table, WriteEntity.WriteType.DDL_EXCLUSIVE)); @@ -1521,152 +1542,154 @@ public class DDLSemanticAnalyzer extends BaseSemanticAnalyzer { } } } + } + private Task<?> getTruncateTaskWithoutColumnNames(String tableName, Map<String, String> partSpec, Table table) { TruncateTableDesc truncateTblDesc = new TruncateTableDesc(tableName, partSpec, null, table); if (truncateTblDesc.mayNeedWriteId()) { setAcidDdlDesc(truncateTblDesc); } DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), truncateTblDesc); - Task<?> truncateTask = TaskFactory.get(ddlWork); - - // Is this a truncate column command - List<String> columnNames = null; - ASTNode colNamesNode = (ASTNode) ast.getFirstChildWithType(HiveParser.TOK_TABCOLNAME); - if (colNamesNode != null) { - try { - columnNames = getColumnNames(colNamesNode); - - // It would be possible to support this, but this is such a pointless command. - if (AcidUtils.isInsertOnlyTable(table.getParameters())) { - throw new SemanticException("Truncating MM table columns not presently supported"); - } - - List<String> bucketCols = null; - Class<? extends InputFormat> inputFormatClass = null; - boolean isArchived = false; - Path newTblPartLoc = null; - Path oldTblPartLoc = null; - List<FieldSchema> cols = null; - ListBucketingCtx lbCtx = null; - boolean isListBucketed = false; - List<String> listBucketColNames = null; + return TaskFactory.get(ddlWork); + } - if (table.isPartitioned()) { - Partition part = db.getPartition(table, partSpec, false); + private Task<?> getTruncateTaskWithColumnNames(ASTNode root, String tableName, Table table, + Map<String, String> partSpec, ASTNode colNamesNode) throws SemanticException { + try { + List<String> columnNames = getColumnNames(colNamesNode); - Path tabPath = table.getPath(); - Path partPath = part.getDataLocation(); + // It would be possible to support this, but this is such a pointless command. + if (AcidUtils.isInsertOnlyTable(table.getParameters())) { + throw new SemanticException("Truncating MM table columns not presently supported"); + } - // if the table is in a different dfs than the partition, - // replace the partition's dfs with the table's dfs. - newTblPartLoc = new Path(tabPath.toUri().getScheme(), tabPath.toUri() - .getAuthority(), partPath.toUri().getPath()); + List<String> bucketCols = null; + Class<? extends InputFormat> inputFormatClass = null; + boolean isArchived = false; + Path newTblPartLoc = null; + Path oldTblPartLoc = null; + List<FieldSchema> cols = null; + ListBucketingCtx lbCtx = null; + boolean isListBucketed = false; + List<String> listBucketColNames = null; + + if (table.isPartitioned()) { + Partition part = db.getPartition(table, partSpec, false); + + Path tabPath = table.getPath(); + Path partPath = part.getDataLocation(); + + // if the table is in a different dfs than the partition, + // replace the partition's dfs with the table's dfs. + newTblPartLoc = new Path(tabPath.toUri().getScheme(), tabPath.toUri() + .getAuthority(), partPath.toUri().getPath()); + + oldTblPartLoc = partPath; + + cols = part.getCols(); + bucketCols = part.getBucketCols(); + inputFormatClass = part.getInputFormatClass(); + isArchived = ArchiveUtils.isArchived(part); + lbCtx = constructListBucketingCtx(part.getSkewedColNames(), part.getSkewedColValues(), + part.getSkewedColValueLocationMaps(), part.isStoredAsSubDirectories()); + isListBucketed = part.isStoredAsSubDirectories(); + listBucketColNames = part.getSkewedColNames(); + } else { + // input and output are the same + oldTblPartLoc = table.getPath(); + newTblPartLoc = table.getPath(); + cols = table.getCols(); + bucketCols = table.getBucketCols(); + inputFormatClass = table.getInputFormatClass(); + lbCtx = constructListBucketingCtx(table.getSkewedColNames(), table.getSkewedColValues(), + table.getSkewedColValueLocationMaps(), table.isStoredAsSubDirectories()); + isListBucketed = table.isStoredAsSubDirectories(); + listBucketColNames = table.getSkewedColNames(); + } - oldTblPartLoc = partPath; + // throw a HiveException for non-rcfile. + if (!inputFormatClass.equals(RCFileInputFormat.class)) { + throw new SemanticException(ErrorMsg.TRUNCATE_COLUMN_NOT_RC.getMsg()); + } - cols = part.getCols(); - bucketCols = part.getBucketCols(); - inputFormatClass = part.getInputFormatClass(); - isArchived = ArchiveUtils.isArchived(part); - lbCtx = constructListBucketingCtx(part.getSkewedColNames(), part.getSkewedColValues(), - part.getSkewedColValueLocationMaps(), part.isStoredAsSubDirectories()); - isListBucketed = part.isStoredAsSubDirectories(); - listBucketColNames = part.getSkewedColNames(); - } else { - // input and output are the same - oldTblPartLoc = table.getPath(); - newTblPartLoc = table.getPath(); - cols = table.getCols(); - bucketCols = table.getBucketCols(); - inputFormatClass = table.getInputFormatClass(); - lbCtx = constructListBucketingCtx(table.getSkewedColNames(), table.getSkewedColValues(), - table.getSkewedColValueLocationMaps(), table.isStoredAsSubDirectories()); - isListBucketed = table.isStoredAsSubDirectories(); - listBucketColNames = table.getSkewedColNames(); - } + // throw a HiveException if the table/partition is archived + if (isArchived) { + throw new SemanticException(ErrorMsg.TRUNCATE_COLUMN_ARCHIVED.getMsg()); + } - // throw a HiveException for non-rcfile. - if (!inputFormatClass.equals(RCFileInputFormat.class)) { - throw new SemanticException(ErrorMsg.TRUNCATE_COLUMN_NOT_RC.getMsg()); + Set<Integer> columnIndexes = new HashSet<Integer>(); + for (String columnName : columnNames) { + boolean found = false; + for (int columnIndex = 0; columnIndex < cols.size(); columnIndex++) { + if (columnName.equalsIgnoreCase(cols.get(columnIndex).getName())) { + columnIndexes.add(columnIndex); + found = true; + break; + } } - - // throw a HiveException if the table/partition is archived - if (isArchived) { - throw new SemanticException(ErrorMsg.TRUNCATE_COLUMN_ARCHIVED.getMsg()); + // Throw an exception if the user is trying to truncate a column which doesn't exist + if (!found) { + throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(columnName)); } - - Set<Integer> columnIndexes = new HashSet<Integer>(); - for (String columnName : columnNames) { - boolean found = false; - for (int columnIndex = 0; columnIndex < cols.size(); columnIndex++) { - if (columnName.equalsIgnoreCase(cols.get(columnIndex).getName())) { - columnIndexes.add(columnIndex); - found = true; - break; - } - } - // Throw an exception if the user is trying to truncate a column which doesn't exist - if (!found) { - throw new SemanticException(ErrorMsg.INVALID_COLUMN.getMsg(columnName)); - } - // Throw an exception if the table/partition is bucketed on one of the columns - for (String bucketCol : bucketCols) { - if (bucketCol.equalsIgnoreCase(columnName)) { - throw new SemanticException(ErrorMsg.TRUNCATE_BUCKETED_COLUMN.getMsg(columnName)); - } + // Throw an exception if the table/partition is bucketed on one of the columns + for (String bucketCol : bucketCols) { + if (bucketCol.equalsIgnoreCase(columnName)) { + throw new SemanticException(ErrorMsg.TRUNCATE_BUCKETED_COLUMN.getMsg(columnName)); } - if (isListBucketed) { - for (String listBucketCol : listBucketColNames) { - if (listBucketCol.equalsIgnoreCase(columnName)) { - throw new SemanticException( - ErrorMsg.TRUNCATE_LIST_BUCKETED_COLUMN.getMsg(columnName)); - } + } + if (isListBucketed) { + for (String listBucketCol : listBucketColNames) { + if (listBucketCol.equalsIgnoreCase(columnName)) { + throw new SemanticException( + ErrorMsg.TRUNCATE_LIST_BUCKETED_COLUMN.getMsg(columnName)); } } } + } - truncateTblDesc.setColumnIndexes(new ArrayList<Integer>(columnIndexes)); - truncateTblDesc.setInputDir(oldTblPartLoc); - truncateTblDesc.setLbCtx(lbCtx); - - addInputsOutputsAlterTable(tableName, partSpec, null, AlterTableType.TRUNCATE, false); - ddlWork.setNeedLock(true); - TableDesc tblDesc = Utilities.getTableDesc(table); - // Write the output to temporary directory and move it to the final location at the end - // so the operation is atomic. - Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); - truncateTblDesc.setOutputDir(queryTmpdir); - LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, - partSpec == null ? new HashMap<>() : partSpec); - ltd.setLbCtx(lbCtx); - Task<MoveWork> moveTsk = - TaskFactory.get(new MoveWork(null, null, ltd, null, false)); - truncateTask.addDependentTask(moveTsk); - - // Recalculate the HDFS stats if auto gather stats is set - if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { - BasicStatsWork basicStatsWork; - if (oldTblPartLoc.equals(newTblPartLoc)) { - // If we're merging to the same location, we can avoid some metastore calls - TableSpec tablepart = new TableSpec(this.db, conf, root); - basicStatsWork = new BasicStatsWork(tablepart); - } else { - basicStatsWork = new BasicStatsWork(ltd); - } - basicStatsWork.setNoStatsAggregator(true); - basicStatsWork.setClearAggregatorStats(true); - StatsWork columnStatsWork = new StatsWork(table, basicStatsWork, conf); + Path queryTmpdir = ctx.getExternalTmpPath(newTblPartLoc); + TruncateTableDesc truncateTblDesc = new TruncateTableDesc(tableName, partSpec, null, table, + new ArrayList<Integer>(columnIndexes), oldTblPartLoc, queryTmpdir, lbCtx); + if (truncateTblDesc.mayNeedWriteId()) { + setAcidDdlDesc(truncateTblDesc); + } - Task<?> statTask = TaskFactory.get(columnStatsWork); - moveTsk.addDependentTask(statTask); + DDLWork ddlWork = new DDLWork(getInputs(), getOutputs(), truncateTblDesc); + Task<?> truncateTask = TaskFactory.get(ddlWork); + + addInputsOutputsAlterTable(tableName, partSpec, null, AlterTableType.TRUNCATE, false); + ddlWork.setNeedLock(true); + TableDesc tblDesc = Utilities.getTableDesc(table); + // Write the output to temporary directory and move it to the final location at the end + // so the operation is atomic. + LoadTableDesc ltd = new LoadTableDesc(queryTmpdir, tblDesc, partSpec == null ? new HashMap<>() : partSpec); + ltd.setLbCtx(lbCtx); + Task<MoveWork> moveTsk = TaskFactory.get(new MoveWork(null, null, ltd, null, false)); + truncateTask.addDependentTask(moveTsk); + + // Recalculate the HDFS stats if auto gather stats is set + if (conf.getBoolVar(HiveConf.ConfVars.HIVESTATSAUTOGATHER)) { + BasicStatsWork basicStatsWork; + if (oldTblPartLoc.equals(newTblPartLoc)) { + // If we're merging to the same location, we can avoid some metastore calls + TableSpec tablepart = new TableSpec(this.db, conf, root); + basicStatsWork = new BasicStatsWork(tablepart); + } else { + basicStatsWork = new BasicStatsWork(ltd); } - } catch (HiveException e) { - throw new SemanticException(e); + basicStatsWork.setNoStatsAggregator(true); + basicStatsWork.setClearAggregatorStats(true); + StatsWork columnStatsWork = new StatsWork(table, basicStatsWork, conf); + + Task<?> statTask = TaskFactory.get(columnStatsWork); + moveTsk.addDependentTask(statTask); } - } - rootTasks.add(truncateTask); + return truncateTask; + } catch (HiveException e) { + throw new SemanticException(e); + } } public static boolean isFullSpec(Table table, Map<String, String> partSpec) { diff --git a/ql/src/test/results/clientpositive/explain_ddl.q.out b/ql/src/test/results/clientpositive/explain_ddl.q.out index 45c8339..e0ec1c5 100644 --- a/ql/src/test/results/clientpositive/explain_ddl.q.out +++ b/ql/src/test/results/clientpositive/explain_ddl.q.out @@ -795,7 +795,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Truncate Table or Partition - TableName: M1 + table name: M1 PREHOOK: query: select count(*) from M1 where key > 0 PREHOOK: type: QUERY diff --git a/ql/src/test/results/clientpositive/temp_table_truncate.q.out b/ql/src/test/results/clientpositive/temp_table_truncate.q.out index 7c470f4..0fa4f96 100644 --- a/ql/src/test/results/clientpositive/temp_table_truncate.q.out +++ b/ql/src/test/results/clientpositive/temp_table_truncate.q.out @@ -82,7 +82,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Truncate Table or Partition - TableName: tmp_src + table name: tmp_src PREHOOK: query: TRUNCATE TABLE tmp_src PREHOOK: type: TRUNCATETABLE @@ -111,7 +111,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Truncate Table or Partition - TableName: tmp_srcpart + table name: tmp_srcpart PREHOOK: query: TRUNCATE TABLE tmp_srcpart PREHOOK: type: TRUNCATETABLE diff --git a/ql/src/test/results/clientpositive/truncate_table.q.out b/ql/src/test/results/clientpositive/truncate_table.q.out index ba35012..a8b4cab 100644 --- a/ql/src/test/results/clientpositive/truncate_table.q.out +++ b/ql/src/test/results/clientpositive/truncate_table.q.out @@ -126,7 +126,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Truncate Table or Partition - TableName: src_truncate + table name: src_truncate PREHOOK: query: TRUNCATE TABLE src_truncate PREHOOK: type: TRUNCATETABLE @@ -163,10 +163,10 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Truncate Table or Partition - Partition Spec: + partition spec: ds 2008-04-08 hr 11 - TableName: srcpart_truncate + table name: srcpart_truncate PREHOOK: query: TRUNCATE TABLE srcpart_truncate partition (ds='2008-04-08', hr='11') PREHOOK: type: TRUNCATETABLE @@ -207,10 +207,10 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Truncate Table or Partition - Partition Spec: + partition spec: ds hr 12 - TableName: srcpart_truncate + table name: srcpart_truncate PREHOOK: query: TRUNCATE TABLE srcpart_truncate partition (ds, hr='12') PREHOOK: type: TRUNCATETABLE @@ -259,7 +259,7 @@ STAGE DEPENDENCIES: STAGE PLANS: Stage: Stage-0 Truncate Table or Partition - TableName: srcpart_truncate + table name: srcpart_truncate PREHOOK: query: TRUNCATE TABLE srcpart_truncate PREHOOK: type: TRUNCATETABLE
