This is an automated email from the ASF dual-hosted git repository. lpinter pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push: new 4b93296 HIVE-20948: Eliminate file rename in compactor (Laszlo Pinter, reviewed by Peter Vary) 4b93296 is described below commit 4b932966428c7bb58f1307e459849ac092fa9cbc Author: Laszlo Pinter <lpin...@cloudera.com> AuthorDate: Wed Feb 26 09:42:01 2020 +0100 HIVE-20948: Eliminate file rename in compactor (Laszlo Pinter, reviewed by Peter Vary) --- .../hadoop/hive/ql/exec/FileSinkOperator.java | 21 +++- .../hadoop/hive/ql/exec/tez/SplitGrouper.java | 23 +--- .../org/apache/hadoop/hive/ql/io/AcidUtils.java | 50 ++++----- .../apache/hadoop/hive/ql/plan/FileSinkDesc.java | 8 ++ .../hadoop/hive/ql/txn/compactor/CompactorMR.java | 4 +- .../hive/ql/txn/compactor/MajorQueryCompactor.java | 66 +++++------ .../hive/ql/txn/compactor/MinorQueryCompactor.java | 114 ++++++++----------- .../hive/ql/txn/compactor/QueryCompactor.java | 123 ++++++++------------- 8 files changed, 181 insertions(+), 228 deletions(-) diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java index d5e1b5b..d0f452b 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java @@ -84,6 +84,7 @@ import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim; import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue; import org.apache.hadoop.hive.shims.ShimLoader; +import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.mapred.JobConf; @@ -473,6 +474,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected transient boolean multiFileSpray; protected transient final Map<Integer, Integer> bucketMap = new HashMap<Integer, Integer>(); private transient boolean isBucketed = false; + private transient int bucketId; private transient ObjectInspector[] partitionObjectInspectors; protected transient HivePartitioner<HiveKey, Object> prtner; @@ -805,7 +807,12 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected void createBucketForFileIdx(FSPaths fsp, int filesIdx) throws HiveException { try { - fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories); + if (conf.isCompactionTable()) { + fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + String.format(AcidUtils.BUCKET_DIGITS, bucketId), + isNativeTable(), isSkewedStoredAsSubDirectories); + } else { + fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), isSkewedStoredAsSubDirectories); + } if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) { Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + ": final path " + fsp.finalPaths[filesIdx] + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath + ", tmp path " @@ -828,7 +835,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements //todo IOW integration. Full Acid uses the else if block to create Acid's RecordUpdater (HiveFileFormatUtils) // and that will set writingBase(conf.getInsertOverwrite()) // If MM wants to create a new base for IOW (instead of delta dir), it should specify it here - if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) { Path outPath = fsp.outPaths[filesIdx]; if (conf.isMmTable() && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) { @@ -960,6 +967,10 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements createNewPaths(null, lbDirName); } } else { + if (conf.isCompactionTable()) { + int bucketProperty = ((IntWritable)((Object[])row)[2]).get(); + bucketId = BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty); + } createBucketFiles(fsp); } } @@ -1049,7 +1060,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements // for a given operator branch prediction should work quite nicely on it. // RecordUpdater expects to get the actual row, not a serialized version of it. Thus we // pass the row rather than recordValue. - if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) { rowOutWriters[findWriterOffset(row)].write(recordValue); } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) { fpaths.updaters[findWriterOffset(row)].insert(conf.getTableWriteId(), row); @@ -1107,7 +1118,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements protected boolean areAllTrue(boolean[] statsFromRW) { // If we are doing an acid operation they will always all be true as RecordUpdaters always // collect stats - if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID && !conf.isMmTable()) { + if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID && !conf.isMmTable() && !conf.isCompactionTable()) { return true; } for(boolean b : statsFromRW) { @@ -1366,7 +1377,7 @@ public class FileSinkOperator extends TerminalOperator<FileSinkDesc> implements // record writer already gathers the statistics, it can simply return the // accumulated statistics which will be aggregated in case of spray writers if (conf.isGatherStats() && isCollectRWStats) { - if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable()) { + if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || conf.isMmTable() || conf.isCompactionTable()) { for (int idx = 0; idx < fsp.outWriters.length; idx++) { RecordWriter outWriter = fsp.outWriters[idx]; if (outWriter != null) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java index 076b778..2295edc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java @@ -29,9 +29,9 @@ import java.util.LinkedHashSet; import java.util.List; import java.util.Map; import java.util.Objects; -import java.util.Properties; import java.util.concurrent.ConcurrentHashMap; +import org.apache.hadoop.hive.ql.io.AcidUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -191,7 +191,7 @@ public class SplitGrouper { if ((op != null) && (op instanceof TableScanOperator)) { TableScanOperator tableScan = (TableScanOperator) op; PartitionDesc partitionDesc = mapWork.getAliasToPartnInfo().get(aliases.get(0)); - isMinorCompaction &= isQueryBasedMinorComp(partitionDesc); + isMinorCompaction &= AcidUtils.isCompactionTable(partitionDesc.getTableDesc().getProperties()); if (!tableScan.getConf().isTranscationalTable() && !isMinorCompaction) { String splitPath = getFirstSplitPath(splits); String errorMessage = @@ -233,25 +233,6 @@ public class SplitGrouper { this.group(jobConf, schemaGroupedSplitMultiMap, availableSlots, waves, locationProvider); return groupedSplits; } - - /** - * Determines from the partition description, whether the table is used is used as compaction helper table. It looks - * for a table property 'queryminorcomp', which is set in - * {@link org.apache.hadoop.hive.ql.txn.compactor.MinorQueryCompactor} - * @param desc partition description of the table, must be not null - * @return true, if the table is a query based minor compaction helper table - */ - private boolean isQueryBasedMinorComp(PartitionDesc desc) { - if (desc != null) { - Properties tblProperties = desc.getTableDesc().getProperties(); - final String minorCompProperty = "queryminorcomp"; - if (tblProperties != null && tblProperties.containsKey(minorCompProperty) && tblProperties - .getProperty(minorCompProperty).equalsIgnoreCase("true")) { - return true; - } - } - return false; - } // Returns the path of the first split in this list for logging purposes private String getFirstSplitPath(InputSplit[] splits) { diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java index 76ea6c9..dbbe6f1 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java @@ -32,7 +32,6 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.Optional; import java.util.Properties; import java.util.Set; import java.util.concurrent.TimeUnit; @@ -75,7 +74,6 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile; import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat; import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater; import org.apache.hadoop.hive.ql.io.orc.Reader; -import org.apache.hadoop.hive.ql.io.orc.RecordReader; import org.apache.hadoop.hive.ql.io.orc.Writer; import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager; import org.apache.hadoop.hive.ql.lockmgr.LockException; @@ -87,11 +85,9 @@ import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer; import org.apache.hadoop.hive.ql.parse.SemanticException; import org.apache.hadoop.hive.ql.plan.TableScanDesc; import org.apache.hadoop.hive.ql.session.SessionState; -import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; -import org.apache.hadoop.io.IntWritable; import org.apache.hive.common.util.Ref; import org.apache.orc.FileFormatException; import org.apache.orc.impl.OrcAcidUtils; @@ -113,6 +109,7 @@ public class AcidUtils { // This key will be put in the conf file when planning an acid operation public static final String CONF_ACID_KEY = "hive.doing.acid"; public static final String BASE_PREFIX = "base_"; + public static final String COMPACTOR_TABLE_PROPERTY = "compactiontable"; public static final PathFilter baseFileFilter = new PathFilter() { @Override public boolean accept(Path path) { @@ -383,6 +380,7 @@ public class AcidUtils { return baseOrDeltaDir + VISIBILITY_PREFIX + String.format(DELTA_DIGITS, visibilityTxnId); } + /** * Represents bucketId and copy_N suffix */ @@ -426,6 +424,29 @@ public class AcidUtils { this.copyNumber = copyNumber; } } + + /** + * Determine if a table is used during query based compaction. + * @param tblProperties table properties + * @return true, if the tblProperties contains {@link AcidUtils#COMPACTOR_TABLE_PROPERTY} + */ + public static boolean isCompactionTable(Properties tblProperties) { + if (tblProperties != null && tblProperties.containsKey(COMPACTOR_TABLE_PROPERTY) && tblProperties + .getProperty(COMPACTOR_TABLE_PROPERTY).equalsIgnoreCase("true")) { + return true; + } + return false; + } + + /** + * Determine if a table is used during query based compaction. + * @param parameters table properties map + * @return true, if the parameters contains {@link AcidUtils#COMPACTOR_TABLE_PROPERTY} + */ + public static boolean isCompactionTable(Map<String, String> parameters) { + return Boolean.valueOf(parameters.getOrDefault(COMPACTOR_TABLE_PROPERTY, "false")); + } + /** * Get the bucket id from the file path * @param bucketFile - bucket file path @@ -463,27 +484,6 @@ public class AcidUtils { } /** - * Read the first row of an ORC file and determine the bucket ID based on the bucket column. This only works with - * files with ACID schema. - * @param fs the resolved file system - * @param orcFile path to ORC file - * @return resolved bucket number - * @throws IOException during the parsing of the ORC file - */ - public static Optional<Integer> parseBucketIdFromRow(FileSystem fs, Path orcFile) throws IOException { - Reader reader = OrcFile.createReader(fs, orcFile); - StructObjectInspector objectInspector = (StructObjectInspector)reader.getObjectInspector(); - RecordReader records = reader.rows(); - while(records.hasNext()) { - Object row = records.next(null); - List<Object> fields = objectInspector.getStructFieldsDataAsList(row); - int bucketProperty = ((IntWritable) fields.get(2)).get(); - return Optional.of(BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty)); - } - return Optional.empty(); - } - - /** * Parse a bucket filename back into the options that would have created * the file. * @param bucketFile the path to a bucket file diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java index bbf73cb..f55c6ae 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java @@ -351,6 +351,14 @@ public class FileSinkDesc extends AbstractOperatorDesc implements IStatsGatherDe } } + /** + * @return true, if the table is used during compaction + */ + public boolean isCompactionTable() { + return getTable() != null ? AcidUtils.isCompactionTable(table.getParameters()) + : AcidUtils.isCompactionTable(getTableInfo().getProperties()); + } + public boolean isMaterialization() { return materialization; } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java index 25c14e0..c44f2b50 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils.Directory; import org.apache.hadoop.hive.ql.io.HiveInputFormat; import org.apache.hadoop.hive.ql.io.IOConstants; import org.apache.hadoop.hive.ql.io.RecordIdentifier; +import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector; import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId; import org.apache.hadoop.hive.shims.ShimLoader; @@ -210,7 +211,8 @@ public class CompactorMR { * @throws java.io.IOException if the job fails */ void run(HiveConf conf, String jobName, Table t, Partition p, StorageDescriptor sd, ValidWriteIdList writeIds, - CompactionInfo ci, Worker.StatsUpdater su, IMetaStoreClient msc, Directory dir) throws IOException { + CompactionInfo ci, Worker.StatsUpdater su, IMetaStoreClient msc, Directory dir) + throws IOException, HiveException { if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) { throw new RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true"); diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java index f238eb5..9385080 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java @@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -37,8 +38,9 @@ import java.util.List; */ final class MajorQueryCompactor extends QueryCompactor { - @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + @Override + void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException { AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); @@ -53,25 +55,26 @@ final class MajorQueryCompactor extends QueryCompactor { String tmpPrefix = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_"; String tmpTableName = tmpPrefix + System.currentTimeMillis(); - List<String> createQueries = getCreateQueries(tmpTableName, table); + + long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); + long highWaterMark = writeIds.getHighWatermark(); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(true) + .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId) + .maximumWriteId(highWaterMark).statementId(-1).visibilityTxnId(compactorTxnId); + Path tmpTablePath = AcidUtils.baseOrDeltaSubdirPath(new Path(storageDescriptor.getLocation()), options); + + List<String> createQueries = getCreateQueries(tmpTableName, table, tmpTablePath.toString()); List<String> compactionQueries = getCompactionQueries(table, partition, tmpTableName); List<String> dropQueries = getDropQueries(tmpTableName); runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, compactionInfo, createQueries, compactionQueries, dropQueries); } - /** - * Move and rename bucket files from the temp table (tmpTableName), to the new base path under the source table/ptn. - * Since the temp table is a non-transactional table, it has file names in the "original" format. - * Also, due to split grouping in - * {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], - * Configuration, boolean)}, we will end up with one file per bucket. - */ - @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, + @Override + protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - org.apache.hadoop.hive.ql.metadata.Table tempTable = Hive.get().getTable(tmpTableName); - Util.moveContents(new Path(tempTable.getSd().getLocation()), new Path(dest), true, false, conf, actualWriteIds, - compactorTxnId); + Util.cleanupEmptyDir(conf, tmpTableName); } /** @@ -83,25 +86,26 @@ final class MajorQueryCompactor extends QueryCompactor { * See {@link org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[], Configuration)} * for details on the mechanism. */ - private List<String> getCreateQueries(String fullName, Table t) { - StringBuilder query = new StringBuilder("create temporary table ").append(fullName).append(" ("); - // Acid virtual columns - query.append( - "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, " - + "`row` struct<"); - List<FieldSchema> cols = t.getSd().getCols(); - boolean isFirst = true; - // Actual columns - for (FieldSchema col : cols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("`").append(col.getName()).append("` ").append(":").append(col.getType()); + private List<String> getCreateQueries(String fullName, Table t, String tmpTableLocation) throws HiveException { + StringBuilder query = new StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(fullName, t)); + org.apache.hadoop.hive.ql.metadata.Table table = Hive.get().getTable(t.getDbName(), t.getTableName(), false); + int numBuckets = 1; + int bucketingVersion = 0; + if (table != null) { + numBuckets = Math.max(table.getNumBuckets(), numBuckets); + bucketingVersion = table.getBucketingVersion(); } - query.append(">)"); + query.append(" clustered by (`bucket`) into ").append(numBuckets).append(" buckets"); query.append(" stored as orc"); - query.append(" tblproperties ('transactional'='false')"); + query.append(" location '"); + query.append(tmpTableLocation); + query.append("' tblproperties ('transactional'='false',"); + query.append(" 'bucketing_version'='"); + query.append(bucketingVersion); + query.append("','"); + query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY); + query.append("'='true'"); + query.append(")"); return Lists.newArrayList(query.toString()); } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java index 59dcf2c..01cd2fc 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java @@ -22,11 +22,11 @@ import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; -import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; +import org.apache.hadoop.hive.ql.io.AcidOutputFormat; import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; @@ -45,11 +45,11 @@ import java.util.stream.Collectors; */ final class MinorQueryCompactor extends QueryCompactor { - public static final String MINOR_COMP_TBL_PROP = "queryminorcomp"; private static final Logger LOG = LoggerFactory.getLogger(MinorQueryCompactor.class.getName()); - @Override void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException { + @Override + void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException { LOG.info("Running query based minor compaction"); AcidUtils .setAcidOperationalProperties(hiveConf, true, AcidUtils.getAcidOperationalProperties(table.getParameters())); @@ -64,7 +64,8 @@ final class MinorQueryCompactor extends QueryCompactor { conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false); String tmpTableName = table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + System.currentTimeMillis(); - List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds); + + List<String> createQueries = getCreateQueries(table, tmpTableName, dir, writeIds, conf, storageDescriptor); List<String> compactionQueries = getCompactionQueries(tmpTableName, writeIds.getInvalidWriteIds()); List<String> dropQueries = getDropQueries(tmpTableName); @@ -72,14 +73,11 @@ final class MinorQueryCompactor extends QueryCompactor { compactionQueries, dropQueries); } - @Override protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, + @Override + protected void commitCompaction(String dest, String tmpTableName, HiveConf conf, ValidWriteIdList actualWriteIds, long compactorTxnId) throws IOException, HiveException { - // get result temp tables; - String deltaTableName = AcidUtils.DELTA_PREFIX + tmpTableName + "_result"; - commitCompaction(deltaTableName, dest, false, conf, actualWriteIds, compactorTxnId); - - String deleteDeltaTableName = AcidUtils.DELETE_DELTA_PREFIX + tmpTableName + "_result"; - commitCompaction(deleteDeltaTableName, dest, true, conf, actualWriteIds, compactorTxnId); + Util.cleanupEmptyDir(conf, AcidUtils.DELTA_PREFIX + tmpTableName + "_result"); + Util.cleanupEmptyDir(conf, AcidUtils.DELETE_DELTA_PREFIX + tmpTableName + "_result"); } /** @@ -95,24 +93,41 @@ final class MinorQueryCompactor extends QueryCompactor { * @param tempTableBase an unique identifier which is used to create delta/delete-delta temp tables * @param dir the directory, where the delta directories resides * @param writeIds list of valid write ids, used to filter out delta directories which are not relevant for compaction + * @param conf hive configuration + * @param storageDescriptor this is the resolved storage descriptor * @return list of create/alter queries, always non-null */ private List<String> getCreateQueries(Table table, String tempTableBase, AcidUtils.Directory dir, - ValidWriteIdList writeIds) { + ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor storageDescriptor) throws HiveException { List<String> queries = new ArrayList<>(); + long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : writeIds.getMinOpenWriteId(); + long highWatermark = writeIds.getHighWatermark(); + long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf); // create delta temp table String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase; - queries.add(buildCreateTableQuery(table, tmpTableName, true, true, false)); + queries.add(buildCreateTableQuery(table, tmpTableName, true, false, null)); buildAlterTableQuery(tmpTableName, dir, writeIds, false).ifPresent(queries::add); + AcidOutputFormat.Options options = new AcidOutputFormat.Options(conf).writingBase(false) + .writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId) + .maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId); + Path location = new Path(storageDescriptor.getLocation()); + String tmpTableResultLocation = AcidUtils.baseOrDeltaSubdirPath(location, + options).toString(); // create delta result temp table - queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, false, true)); + queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, true, + tmpTableResultLocation)); // create delete delta temp tables String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase; - queries.add(buildCreateTableQuery(table, tmpDeleteTableName, true, true, false)); + queries.add(buildCreateTableQuery(table, tmpDeleteTableName, true, false, null)); buildAlterTableQuery(tmpDeleteTableName, dir, writeIds, true).ifPresent(queries::add); + options = new AcidOutputFormat.Options(conf).writingBase(false).writingDeleteDelta(true).isCompressed(false) + .minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId); + String tmpTableDeleteResultLocation = AcidUtils.baseOrDeltaSubdirPath(location, + options).toString(); // create delete delta result temp table - queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result", false, false, true)); + queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result", false, true, + tmpTableDeleteResultLocation)); return queries; } @@ -121,9 +136,9 @@ final class MinorQueryCompactor extends QueryCompactor { * the schema of the table is the same as an ORC ACID file schema. * @param table he source table, where the compaction is running on * @param newTableName name of the table to be created - * @param isExternal true, if new table should be external * @param isPartitioned true, if new table should be partitioned * @param isBucketed true, if the new table should be bucketed + * @param location location of the table, can be null * @return a create table statement, always non-null. Example: * <p> * if source table schema is: (a:int, b:int) @@ -135,50 +150,32 @@ final class MinorQueryCompactor extends QueryCompactor { * STORED AS ORC TBLPROPERTIES ('transactional'='false','queryminorcomp'='true'); * </p> */ - private String buildCreateTableQuery(Table table, String newTableName, boolean isExternal, boolean isPartitioned, - boolean isBucketed) { - StringBuilder query = new StringBuilder("create temporary "); - if (isExternal) { - query.append("external "); - } - query.append("table ").append(newTableName).append(" ("); - // Acid virtual columns - query.append( - "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` bigint, " - + "`row` struct<"); - List<FieldSchema> cols = table.getSd().getCols(); - boolean isFirst = true; - // Actual columns - for (FieldSchema col : cols) { - if (!isFirst) { - query.append(", "); - } - isFirst = false; - query.append("`").append(col.getName()).append("` ").append(":").append(col.getType()); - } - query.append(">)"); + private String buildCreateTableQuery(Table table, String newTableName, boolean isPartitioned, + boolean isBucketed, String location) throws HiveException { + StringBuilder query = new StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(newTableName, table)); if (isPartitioned) { query.append(" partitioned by (`file_name` string)"); } int bucketingVersion = 0; if (isBucketed) { int numBuckets = 1; - try { - org.apache.hadoop.hive.ql.metadata.Table t = Hive.get().getTable(table.getDbName(), table.getTableName()); + org.apache.hadoop.hive.ql.metadata.Table t = Hive.get().getTable(table.getDbName(), table.getTableName(), false); + if (t != null) { numBuckets = Math.max(t.getNumBuckets(), numBuckets); bucketingVersion = t.getBucketingVersion(); - } catch (HiveException e) { - LOG.info("Error finding table {}. Minor compaction result will use 0 buckets.", table.getTableName()); - } finally { - query.append(" clustered by (`bucket`)").append(" sorted by (`bucket`, `originalTransaction`, `rowId`)") - .append(" into ").append(numBuckets).append(" buckets"); } + query.append(" clustered by (`bucket`)").append(" sorted by (`bucket`, `originalTransaction`, `rowId`)") + .append(" into ").append(numBuckets).append(" buckets"); } - query.append(" stored as orc"); + if (location != null && !location.isEmpty()) { + query.append(" location '"); + query.append(location); + query.append("'"); + } query.append(" tblproperties ('transactional'='false'"); query.append(", '"); - query.append(MINOR_COMP_TBL_PROP); + query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY); query.append("'='true'"); if (isBucketed) { query.append(", 'bucketing_version'='") @@ -272,23 +269,4 @@ final class MinorQueryCompactor extends QueryCompactor { queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + "_result"); return queries; } - - /** - * Creates the delta directory and moves the result files. - * @param deltaTableName name of the temporary table, where the results are stored - * @param dest destination path, where the result should be moved - * @param isDeleteDelta is the destination a delete delta directory - * @param conf hive configuration - * @param actualWriteIds list of valid write Ids - * @param compactorTxnId transaction Id of the compaction - * @throws HiveException the result files cannot be moved - * @throws IOException the destination delta directory cannot be created - */ - private void commitCompaction(String deltaTableName, String dest, boolean isDeleteDelta, HiveConf conf, - ValidWriteIdList actualWriteIds, long compactorTxnId) throws HiveException, IOException { - org.apache.hadoop.hive.ql.metadata.Table deltaTable = Hive.get().getTable(deltaTableName); - Util.moveContents(new Path(deltaTable.getSd().getLocation()), new Path(dest), false, isDeleteDelta, conf, - actualWriteIds, compactorTxnId); - } - } diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java index 3ce4dde..7a9e48f 100644 --- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java +++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java @@ -18,19 +18,17 @@ package org.apache.hadoop.hive.ql.txn.compactor; import org.apache.commons.lang3.exception.ExceptionUtils; -import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.conf.HiveConf; import org.apache.hadoop.hive.metastore.api.AlreadyExistsException; +import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.apache.hadoop.hive.metastore.api.Partition; -import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.api.Table; +import org.apache.hadoop.hive.metastore.api.StorageDescriptor; import org.apache.hadoop.hive.metastore.txn.CompactionInfo; import org.apache.hadoop.hive.ql.DriverUtils; -import org.apache.hadoop.hive.ql.io.AcidOutputFormat; -import org.apache.hadoop.hive.ql.io.AcidUtils; import org.apache.hadoop.hive.ql.metadata.Hive; import org.apache.hadoop.hive.ql.metadata.HiveException; import org.apache.hadoop.hive.ql.session.SessionState; @@ -40,7 +38,6 @@ import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.List; -import java.util.Optional; import java.util.UUID; /** @@ -62,7 +59,7 @@ abstract class QueryCompactor { * @throws IOException compaction cannot be finished. */ abstract void runCompaction(HiveConf hiveConf, Table table, Partition partition, StorageDescriptor storageDescriptor, - ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException; + ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws IOException, HiveException; /** * This is the final step of the compaction, which can vary based on compaction type. Usually this involves some file @@ -91,7 +88,7 @@ abstract class QueryCompactor { * @param dropQueries queries which drops the temporary tables. * @throws IOException error during the run of the compaction. */ - protected void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor storageDescriptor, + void runCompactionQueries(HiveConf conf, String tmpTableName, StorageDescriptor storageDescriptor, ValidWriteIdList writeIds, CompactionInfo compactionInfo, List<String> createQueries, List<String> compactionQueries, List<String> dropQueries) throws IOException { Util.disableLlapCaching(conf); @@ -151,88 +148,60 @@ abstract class QueryCompactor { } /** - * Check whether the result directory exits and contains compacted result files. If no splits are found, create - * an empty directory at the destination path, matching a base/delta directory naming convention. - * @param sourcePath the checked source location - * @param destPath the destination, where the new directory should be created - * @param isMajorCompaction is called from a major compaction - * @param isDeleteDelta is the output used as delete delta directory - * @param conf hive configuration - * @param validWriteIdList maximum transaction id - * @return true, if the check was successful - * @throws IOException the new directory cannot be created + * Unless caching is explicitly required for ETL queries this method disables it. + * LLAP cache content lookup is file based, and since compaction alters the file structure it is not beneficial to + * cache anything here, as it won't (and actually can't) ever be looked up later. + * @param conf the Hive configuration */ - private static boolean resultHasSplits(Path sourcePath, Path destPath, boolean isMajorCompaction, - boolean isDeleteDelta, HiveConf conf, ValidWriteIdList validWriteIdList) throws IOException { - FileSystem fs = sourcePath.getFileSystem(conf); - long minOpenWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); - long highWatermark = validWriteIdList.getHighWatermark(); - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(isMajorCompaction).writingDeleteDelta(isDeleteDelta) - .isCompressed(false).minimumWriteId(minOpenWriteId) - .maximumWriteId(highWatermark).bucket(0).statementId(-1); - Path newDeltaDir = AcidUtils.createFilename(destPath, options).getParent(); - if (!fs.exists(sourcePath)) { - LOG.info("{} not found. Assuming 0 splits. Creating {}", sourcePath, newDeltaDir); - fs.mkdirs(newDeltaDir); - return false; + private static void disableLlapCaching(HiveConf conf) { + String llapIOETLSkipFormat = conf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT); + if (!"none".equals(llapIOETLSkipFormat)) { + // Unless caching is explicitly required for ETL queries - disable it. + conf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "all"); } - return true; } - /** - * Create the base/delta directory matching the naming conventions and move the result files of the compaction - * into it. - * @param sourcePath location of the result files - * @param destPath destination path of the result files, without the base/delta directory - * @param isMajorCompaction is this called from a major compaction - * @param isDeleteDelta is the destination is a delete delta directory - * @param conf hive configuration - * @param validWriteIdList list of valid write Ids - * @param compactorTxnId transaction Id of the compaction - * @throws IOException the destination directory cannot be created - * @throws HiveException the result files cannot be moved to the destination directory - */ - static void moveContents(Path sourcePath, Path destPath, boolean isMajorCompaction, boolean isDeleteDelta, - HiveConf conf, ValidWriteIdList validWriteIdList, long compactorTxnId) throws IOException, HiveException { - if (!resultHasSplits(sourcePath, destPath, isMajorCompaction, isDeleteDelta, conf, validWriteIdList)) { - return; - } - LOG.info("Moving contents of {} to {}", sourcePath, destPath); - FileSystem fs = sourcePath.getFileSystem(conf); - long minOpenWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : validWriteIdList.getMinOpenWriteId(); - long highWatermark = validWriteIdList.getHighWatermark(); - for (FileStatus fileStatus : fs.listStatus(sourcePath)) { - String originalFileName = fileStatus.getPath().getName(); - if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) { - Optional<Integer> bucketId = AcidUtils.parseBucketIdFromRow(fs, fileStatus.getPath()); - if (bucketId.isPresent()) { - AcidOutputFormat.Options options = - new AcidOutputFormat.Options(conf).writingBase(isMajorCompaction).writingDeleteDelta(isDeleteDelta) - .isCompressed(false).minimumWriteId(minOpenWriteId) - .maximumWriteId(highWatermark).bucket(bucketId.get()).statementId(-1) - .visibilityTxnId(compactorTxnId); - Path finalBucketFile = AcidUtils.createFilename(destPath, options); - Hive.moveFile(conf, fileStatus.getPath(), finalBucketFile, true, false, false); - } + /** + * Get a create temporary table query string with Orc ACID columns. + * @param tableName name of the new temporary table + * @param table the table where the compaction is running + * @return create query + */ + static String getCreateTempTableQueryWithAcidColumns(String tableName, Table table) { + StringBuilder query = new StringBuilder("create temporary external table ").append(tableName).append(" ("); + // Acid virtual columns + query.append("`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, `currentTransaction` " + + "bigint, `row` struct<"); + List<FieldSchema> cols = table.getSd().getCols(); + boolean isFirst = true; + // Actual columns + for (FieldSchema col : cols) { + if (!isFirst) { + query.append(", "); } + isFirst = false; + query.append("`").append(col.getName()).append("` ").append(":").append(col.getType()); } - fs.delete(sourcePath, true); + query.append(">)"); + return query.toString(); } /** - * Unless caching is explicitly required for ETL queries this method disables it. - * LLAP cache content lookup is file based, and since compaction alters the file structure it is not beneficial to - * cache anything here, as it won't (and actually can't) ever be looked up later. + * Remove the root directory of a table if it's empty. * @param conf the Hive configuration + * @param tmpTableName name of the table + * @throws IOException the directory cannot be deleted + * @throws HiveException the table is not found */ - static void disableLlapCaching(HiveConf conf) { - String llapIOETLSkipFormat = conf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT); - if (!"none".equals(llapIOETLSkipFormat)) { - // Unless caching is explicitly required for ETL queries - disable it. - conf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "all"); + static void cleanupEmptyDir(HiveConf conf, String tmpTableName) throws IOException, HiveException { + org.apache.hadoop.hive.ql.metadata.Table tmpTable = Hive.get().getTable(tmpTableName); + if (tmpTable != null) { + Path path = new Path(tmpTable.getSd().getLocation()); + FileSystem fs = path.getFileSystem(conf); + if (!fs.listFiles(path, false).hasNext()) { + fs.delete(path, true); + } } } - } }