This is an automated email from the ASF dual-hosted git repository.

lpinter pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 4b93296  HIVE-20948: Eliminate file rename in compactor (Laszlo 
Pinter, reviewed by Peter Vary)
4b93296 is described below

commit 4b932966428c7bb58f1307e459849ac092fa9cbc
Author: Laszlo Pinter <lpin...@cloudera.com>
AuthorDate: Wed Feb 26 09:42:01 2020 +0100

    HIVE-20948: Eliminate file rename in compactor (Laszlo Pinter, reviewed by 
Peter Vary)
---
 .../hadoop/hive/ql/exec/FileSinkOperator.java      |  21 +++-
 .../hadoop/hive/ql/exec/tez/SplitGrouper.java      |  23 +---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |  50 ++++-----
 .../apache/hadoop/hive/ql/plan/FileSinkDesc.java   |   8 ++
 .../hadoop/hive/ql/txn/compactor/CompactorMR.java  |   4 +-
 .../hive/ql/txn/compactor/MajorQueryCompactor.java |  66 +++++------
 .../hive/ql/txn/compactor/MinorQueryCompactor.java | 114 ++++++++-----------
 .../hive/ql/txn/compactor/QueryCompactor.java      | 123 ++++++++-------------
 8 files changed, 181 insertions(+), 228 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
index d5e1b5b..d0f452b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/FileSinkOperator.java
@@ -84,6 +84,7 @@ import 
org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyShim;
 import org.apache.hadoop.hive.shims.HadoopShims.StoragePolicyValue;
 import org.apache.hadoop.hive.shims.ShimLoader;
 
+import org.apache.hadoop.io.IntWritable;
 import org.apache.hadoop.io.LongWritable;
 import org.apache.hadoop.io.Writable;
 import org.apache.hadoop.mapred.JobConf;
@@ -473,6 +474,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   protected transient boolean multiFileSpray;
   protected transient final Map<Integer, Integer> bucketMap = new 
HashMap<Integer, Integer>();
   private transient boolean isBucketed = false;
+  private transient int bucketId;
 
   private transient ObjectInspector[] partitionObjectInspectors;
   protected transient HivePartitioner<HiveKey, Object> prtner;
@@ -805,7 +807,12 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   protected void createBucketForFileIdx(FSPaths fsp, int filesIdx)
       throws HiveException {
     try {
-      fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), 
isSkewedStoredAsSubDirectories);
+      if (conf.isCompactionTable()) {
+        fsp.initializeBucketPaths(filesIdx, AcidUtils.BUCKET_PREFIX + 
String.format(AcidUtils.BUCKET_DIGITS, bucketId),
+            isNativeTable(), isSkewedStoredAsSubDirectories);
+      } else {
+        fsp.initializeBucketPaths(filesIdx, taskId, isNativeTable(), 
isSkewedStoredAsSubDirectories);
+      }
       if (Utilities.FILE_OP_LOGGER.isTraceEnabled()) {
         Utilities.FILE_OP_LOGGER.trace("createBucketForFileIdx " + filesIdx + 
": final path " + fsp.finalPaths[filesIdx]
           + "; out path " + fsp.outPaths[filesIdx] +" (spec path " + specPath 
+ ", tmp path "
@@ -828,7 +835,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       //todo IOW integration. Full Acid uses the else if block to create 
Acid's RecordUpdater (HiveFileFormatUtils)
       // and that will set writingBase(conf.getInsertOverwrite())
       // If MM wants to create a new base for IOW (instead of delta dir), it 
should specify it here
-      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable()) {
+      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable() || conf.isCompactionTable()) {
         Path outPath = fsp.outPaths[filesIdx];
         if (conf.isMmTable()
             && !FileUtils.mkdir(fs, outPath.getParent(), hconf)) {
@@ -960,6 +967,10 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
           createNewPaths(null, lbDirName);
         }
       } else {
+        if (conf.isCompactionTable()) {
+          int bucketProperty = ((IntWritable)((Object[])row)[2]).get();
+          bucketId = 
BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty);
+        }
         createBucketFiles(fsp);
       }
     }
@@ -1049,7 +1060,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
       // for a given operator branch prediction should work quite nicely on it.
       // RecordUpdater expects to get the actual row, not a serialized version 
of it.  Thus we
       // pass the row rather than recordValue.
-      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable()) {
+      if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable() || conf.isCompactionTable()) {
         rowOutWriters[findWriterOffset(row)].write(recordValue);
       } else if (conf.getWriteType() == AcidUtils.Operation.INSERT) {
         fpaths.updaters[findWriterOffset(row)].insert(conf.getTableWriteId(), 
row);
@@ -1107,7 +1118,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
   protected boolean areAllTrue(boolean[] statsFromRW) {
     // If we are doing an acid operation they will always all be true as 
RecordUpdaters always
     // collect stats
-    if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID && 
!conf.isMmTable()) {
+    if (conf.getWriteType() != AcidUtils.Operation.NOT_ACID && 
!conf.isMmTable() && !conf.isCompactionTable()) {
       return true;
     }
     for(boolean b : statsFromRW) {
@@ -1366,7 +1377,7 @@ public class FileSinkOperator extends 
TerminalOperator<FileSinkDesc> implements
         // record writer already gathers the statistics, it can simply return 
the
         // accumulated statistics which will be aggregated in case of spray 
writers
         if (conf.isGatherStats() && isCollectRWStats) {
-          if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable()) {
+          if (conf.getWriteType() == AcidUtils.Operation.NOT_ACID || 
conf.isMmTable() || conf.isCompactionTable()) {
             for (int idx = 0; idx < fsp.outWriters.length; idx++) {
               RecordWriter outWriter = fsp.outWriters[idx];
               if (outWriter != null) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java 
b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
index 076b778..2295edc 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/exec/tez/SplitGrouper.java
@@ -29,9 +29,9 @@ import java.util.LinkedHashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
-import java.util.Properties;
 import java.util.concurrent.ConcurrentHashMap;
 
+import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 import org.apache.hadoop.conf.Configuration;
@@ -191,7 +191,7 @@ public class SplitGrouper {
           if ((op != null) && (op instanceof TableScanOperator)) {
             TableScanOperator tableScan = (TableScanOperator) op;
             PartitionDesc partitionDesc = 
mapWork.getAliasToPartnInfo().get(aliases.get(0));
-            isMinorCompaction &= isQueryBasedMinorComp(partitionDesc);
+            isMinorCompaction &= 
AcidUtils.isCompactionTable(partitionDesc.getTableDesc().getProperties());
             if (!tableScan.getConf().isTranscationalTable() && 
!isMinorCompaction) {
               String splitPath = getFirstSplitPath(splits);
               String errorMessage =
@@ -233,25 +233,6 @@ public class SplitGrouper {
         this.group(jobConf, schemaGroupedSplitMultiMap, availableSlots, waves, 
locationProvider);
     return groupedSplits;
   }
-
-  /**
-   * Determines from the partition description, whether the table is used is 
used as compaction helper table. It looks
-   * for a table property 'queryminorcomp', which is set in
-   * {@link org.apache.hadoop.hive.ql.txn.compactor.MinorQueryCompactor}
-   * @param desc partition description of the table, must be not null
-   * @return true, if the table is a query based minor compaction helper table
-   */
-  private boolean isQueryBasedMinorComp(PartitionDesc desc) {
-    if (desc != null) {
-      Properties tblProperties = desc.getTableDesc().getProperties();
-      final String minorCompProperty = "queryminorcomp";
-      if (tblProperties != null && 
tblProperties.containsKey(minorCompProperty) && tblProperties
-          .getProperty(minorCompProperty).equalsIgnoreCase("true")) {
-        return true;
-      }
-    }
-    return false;
-  }
   
   // Returns the path of the first split in this list for logging purposes
   private String getFirstSplitPath(InputSplit[] splits) {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 76ea6c9..dbbe6f1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -32,7 +32,6 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.TimeUnit;
@@ -75,7 +74,6 @@ import org.apache.hadoop.hive.ql.io.orc.OrcFile;
 import org.apache.hadoop.hive.ql.io.orc.OrcInputFormat;
 import org.apache.hadoop.hive.ql.io.orc.OrcRecordUpdater;
 import org.apache.hadoop.hive.ql.io.orc.Reader;
-import org.apache.hadoop.hive.ql.io.orc.RecordReader;
 import org.apache.hadoop.hive.ql.io.orc.Writer;
 import org.apache.hadoop.hive.ql.lockmgr.HiveTxnManager;
 import org.apache.hadoop.hive.ql.lockmgr.LockException;
@@ -87,11 +85,9 @@ import org.apache.hadoop.hive.ql.parse.LoadSemanticAnalyzer;
 import org.apache.hadoop.hive.ql.parse.SemanticException;
 import org.apache.hadoop.hive.ql.plan.TableScanDesc;
 import org.apache.hadoop.hive.ql.session.SessionState;
-import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
-import org.apache.hadoop.io.IntWritable;
 import org.apache.hive.common.util.Ref;
 import org.apache.orc.FileFormatException;
 import org.apache.orc.impl.OrcAcidUtils;
@@ -113,6 +109,7 @@ public class AcidUtils {
   // This key will be put in the conf file when planning an acid operation
   public static final String CONF_ACID_KEY = "hive.doing.acid";
   public static final String BASE_PREFIX = "base_";
+  public static final String COMPACTOR_TABLE_PROPERTY = "compactiontable";
   public static final PathFilter baseFileFilter = new PathFilter() {
     @Override
     public boolean accept(Path path) {
@@ -383,6 +380,7 @@ public class AcidUtils {
     return baseOrDeltaDir + VISIBILITY_PREFIX
         + String.format(DELTA_DIGITS, visibilityTxnId);
   }
+
   /**
    * Represents bucketId and copy_N suffix
    */
@@ -426,6 +424,29 @@ public class AcidUtils {
       this.copyNumber = copyNumber;
     }
   }
+
+  /**
+   * Determine if a table is used during query based compaction.
+   * @param tblProperties table properties
+   * @return true, if the tblProperties contains {@link 
AcidUtils#COMPACTOR_TABLE_PROPERTY}
+   */
+  public static boolean isCompactionTable(Properties tblProperties) {
+    if (tblProperties != null && 
tblProperties.containsKey(COMPACTOR_TABLE_PROPERTY) && tblProperties
+        .getProperty(COMPACTOR_TABLE_PROPERTY).equalsIgnoreCase("true")) {
+      return true;
+    }
+    return false;
+  }
+
+  /**
+   * Determine if a table is used during query based compaction.
+   * @param parameters table properties map
+   * @return true, if the parameters contains {@link 
AcidUtils#COMPACTOR_TABLE_PROPERTY}
+   */
+  public static boolean isCompactionTable(Map<String, String> parameters) {
+    return Boolean.valueOf(parameters.getOrDefault(COMPACTOR_TABLE_PROPERTY, 
"false"));
+  }
+
   /**
    * Get the bucket id from the file path
    * @param bucketFile - bucket file path
@@ -463,27 +484,6 @@ public class AcidUtils {
   }
 
   /**
-   * Read the first row of an ORC file and determine the bucket ID based on 
the bucket column. This only works with
-   * files with ACID schema.
-   * @param fs the resolved file system
-   * @param orcFile path to ORC file
-   * @return resolved bucket number
-   * @throws IOException during the parsing of the ORC file
-   */
-  public static Optional<Integer> parseBucketIdFromRow(FileSystem fs, Path 
orcFile) throws IOException {
-    Reader reader = OrcFile.createReader(fs, orcFile);
-    StructObjectInspector objectInspector = 
(StructObjectInspector)reader.getObjectInspector();
-    RecordReader records = reader.rows();
-    while(records.hasNext()) {
-      Object row = records.next(null);
-      List<Object> fields = objectInspector.getStructFieldsDataAsList(row);
-      int bucketProperty = ((IntWritable) fields.get(2)).get();
-      return 
Optional.of(BucketCodec.determineVersion(bucketProperty).decodeWriterId(bucketProperty));
-    }
-    return Optional.empty();
-  }
-
-  /**
    * Parse a bucket filename back into the options that would have created
    * the file.
    * @param bucketFile the path to a bucket file
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java 
b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
index bbf73cb..f55c6ae 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/plan/FileSinkDesc.java
@@ -351,6 +351,14 @@ public class FileSinkDesc extends AbstractOperatorDesc 
implements IStatsGatherDe
     }
   }
 
+  /**
+   * @return true, if the table is used during compaction
+   */
+  public boolean isCompactionTable() {
+    return getTable() != null ? 
AcidUtils.isCompactionTable(table.getParameters())
+        : AcidUtils.isCompactionTable(getTableInfo().getProperties());
+  }
+
   public boolean isMaterialization() {
     return materialization;
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
index 25c14e0..c44f2b50 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorMR.java
@@ -56,6 +56,7 @@ import org.apache.hadoop.hive.ql.io.AcidUtils.Directory;
 import org.apache.hadoop.hive.ql.io.HiveInputFormat;
 import org.apache.hadoop.hive.ql.io.IOConstants;
 import org.apache.hadoop.hive.ql.io.RecordIdentifier;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
 import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
 import org.apache.hadoop.hive.shims.ShimLoader;
@@ -210,7 +211,8 @@ public class CompactorMR {
    * @throws java.io.IOException if the job fails
    */
   void run(HiveConf conf, String jobName, Table t, Partition p, 
StorageDescriptor sd, ValidWriteIdList writeIds,
-           CompactionInfo ci, Worker.StatsUpdater su, IMetaStoreClient msc, 
Directory dir) throws IOException {
+           CompactionInfo ci, Worker.StatsUpdater su, IMetaStoreClient msc, 
Directory dir)
+      throws IOException, HiveException {
 
     if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && 
conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION)) {
       throw new 
RuntimeException(HiveConf.ConfVars.HIVETESTMODEFAILCOMPACTION.name() + "=true");
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index f238eb5..9385080 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -37,8 +38,9 @@ import java.util.List;
  */
 final class MajorQueryCompactor extends QueryCompactor {
 
-  @Override void runCompaction(HiveConf hiveConf, Table table, Partition 
partition, StorageDescriptor storageDescriptor,
-      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws 
IOException {
+  @Override
+  void runCompaction(HiveConf hiveConf, Table table, Partition partition, 
StorageDescriptor storageDescriptor,
+      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws 
IOException, HiveException {
     AcidUtils
         .setAcidOperationalProperties(hiveConf, true, 
AcidUtils.getAcidOperationalProperties(table.getParameters()));
 
@@ -53,25 +55,26 @@ final class MajorQueryCompactor extends QueryCompactor {
 
     String tmpPrefix = table.getDbName() + "_tmp_compactor_" + 
table.getTableName() + "_";
     String tmpTableName = tmpPrefix + System.currentTimeMillis();
-    List<String> createQueries = getCreateQueries(tmpTableName, table);
+
+    long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : 
writeIds.getMinOpenWriteId();
+    long highWaterMark = writeIds.getHighWatermark();
+    long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
+    AcidOutputFormat.Options options = new 
AcidOutputFormat.Options(conf).writingBase(true)
+        
.writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
+        
.maximumWriteId(highWaterMark).statementId(-1).visibilityTxnId(compactorTxnId);
+    Path tmpTablePath = AcidUtils.baseOrDeltaSubdirPath(new 
Path(storageDescriptor.getLocation()), options);
+
+    List<String> createQueries = getCreateQueries(tmpTableName, table, 
tmpTablePath.toString());
     List<String> compactionQueries = getCompactionQueries(table, partition, 
tmpTableName);
     List<String> dropQueries = getDropQueries(tmpTableName);
     runCompactionQueries(conf, tmpTableName, storageDescriptor, writeIds, 
compactionInfo, createQueries,
         compactionQueries, dropQueries);
   }
 
-  /**
-   * Move and rename bucket files from the temp table (tmpTableName), to the 
new base path under the source table/ptn.
-   * Since the temp table is a non-transactional table, it has file names in 
the "original" format.
-   * Also, due to split grouping in
-   * {@link 
org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[],
-   * Configuration, boolean)}, we will end up with one file per bucket.
-   */
-  @Override protected void commitCompaction(String dest, String tmpTableName, 
HiveConf conf,
+  @Override
+  protected void commitCompaction(String dest, String tmpTableName, HiveConf 
conf,
       ValidWriteIdList actualWriteIds, long compactorTxnId) throws 
IOException, HiveException {
-    org.apache.hadoop.hive.ql.metadata.Table tempTable = 
Hive.get().getTable(tmpTableName);
-    Util.moveContents(new Path(tempTable.getSd().getLocation()), new 
Path(dest), true, false, conf, actualWriteIds,
-        compactorTxnId);
+    Util.cleanupEmptyDir(conf, tmpTableName);
   }
 
   /**
@@ -83,25 +86,26 @@ final class MajorQueryCompactor extends QueryCompactor {
    * See {@link 
org.apache.hadoop.hive.ql.exec.tez.SplitGrouper#getCompactorSplitGroups(InputSplit[],
 Configuration)}
    *  for details on the mechanism.
    */
-  private List<String> getCreateQueries(String fullName, Table t) {
-    StringBuilder query = new StringBuilder("create temporary table 
").append(fullName).append(" (");
-    // Acid virtual columns
-    query.append(
-        "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` 
bigint, `currentTransaction` bigint, "
-            + "`row` struct<");
-    List<FieldSchema> cols = t.getSd().getCols();
-    boolean isFirst = true;
-    // Actual columns
-    for (FieldSchema col : cols) {
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("`").append(col.getName()).append("` 
").append(":").append(col.getType());
+  private List<String> getCreateQueries(String fullName, Table t, String 
tmpTableLocation) throws HiveException {
+    StringBuilder query = new 
StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(fullName, t));
+    org.apache.hadoop.hive.ql.metadata.Table table = 
Hive.get().getTable(t.getDbName(), t.getTableName(), false);
+    int numBuckets = 1;
+    int bucketingVersion = 0;
+    if (table != null) {
+      numBuckets = Math.max(table.getNumBuckets(), numBuckets);
+      bucketingVersion = table.getBucketingVersion();
     }
-    query.append(">)");
+    query.append(" clustered by (`bucket`) into ").append(numBuckets).append(" 
buckets");
     query.append(" stored as orc");
-    query.append(" tblproperties ('transactional'='false')");
+    query.append(" location '");
+    query.append(tmpTableLocation);
+    query.append("' tblproperties ('transactional'='false',");
+    query.append(" 'bucketing_version'='");
+    query.append(bucketingVersion);
+    query.append("','");
+    query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY);
+    query.append("'='true'");
+    query.append(")");
     return Lists.newArrayList(query.toString());
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index 59dcf2c..01cd2fc 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -22,11 +22,11 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
@@ -45,11 +45,11 @@ import java.util.stream.Collectors;
  */
 final class MinorQueryCompactor extends QueryCompactor {
 
-  public static final String MINOR_COMP_TBL_PROP = "queryminorcomp";
   private static final Logger LOG = 
LoggerFactory.getLogger(MinorQueryCompactor.class.getName());
 
-  @Override void runCompaction(HiveConf hiveConf, Table table, Partition 
partition, StorageDescriptor storageDescriptor,
-      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws 
IOException {
+  @Override
+  void runCompaction(HiveConf hiveConf, Table table, Partition partition, 
StorageDescriptor storageDescriptor,
+      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws 
IOException, HiveException {
     LOG.info("Running query based minor compaction");
     AcidUtils
         .setAcidOperationalProperties(hiveConf, true, 
AcidUtils.getAcidOperationalProperties(table.getParameters()));
@@ -64,7 +64,8 @@ final class MinorQueryCompactor extends QueryCompactor {
     conf.setBoolVar(HiveConf.ConfVars.HIVE_STATS_ESTIMATE_STATS, false);
     String tmpTableName =
         table.getDbName() + "_tmp_compactor_" + table.getTableName() + "_" + 
System.currentTimeMillis();
-    List<String> createQueries = getCreateQueries(table, tmpTableName, dir, 
writeIds);
+
+    List<String> createQueries = getCreateQueries(table, tmpTableName, dir, 
writeIds, conf, storageDescriptor);
     List<String> compactionQueries = getCompactionQueries(tmpTableName, 
writeIds.getInvalidWriteIds());
     List<String> dropQueries = getDropQueries(tmpTableName);
 
@@ -72,14 +73,11 @@ final class MinorQueryCompactor extends QueryCompactor {
         compactionQueries, dropQueries);
   }
 
-  @Override protected void commitCompaction(String dest, String tmpTableName, 
HiveConf conf,
+  @Override
+  protected void commitCompaction(String dest, String tmpTableName, HiveConf 
conf,
       ValidWriteIdList actualWriteIds, long compactorTxnId) throws 
IOException, HiveException {
-    // get result temp tables;
-    String deltaTableName = AcidUtils.DELTA_PREFIX + tmpTableName + "_result";
-    commitCompaction(deltaTableName, dest, false, conf, actualWriteIds, 
compactorTxnId);
-
-    String deleteDeltaTableName = AcidUtils.DELETE_DELTA_PREFIX + tmpTableName 
+ "_result";
-    commitCompaction(deleteDeltaTableName, dest, true, conf, actualWriteIds, 
compactorTxnId);
+    Util.cleanupEmptyDir(conf, AcidUtils.DELTA_PREFIX + tmpTableName + 
"_result");
+    Util.cleanupEmptyDir(conf, AcidUtils.DELETE_DELTA_PREFIX + tmpTableName + 
"_result");
   }
 
   /**
@@ -95,24 +93,41 @@ final class MinorQueryCompactor extends QueryCompactor {
    * @param tempTableBase an unique identifier which is used to create 
delta/delete-delta temp tables
    * @param dir the directory, where the delta directories resides
    * @param writeIds list of valid write ids, used to filter out delta 
directories which are not relevant for compaction
+   * @param conf hive configuration
+   * @param storageDescriptor this is the resolved storage descriptor
    * @return list of create/alter queries, always non-null
    */
   private List<String> getCreateQueries(Table table, String tempTableBase, 
AcidUtils.Directory dir,
-      ValidWriteIdList writeIds) {
+      ValidWriteIdList writeIds, HiveConf conf, StorageDescriptor 
storageDescriptor) throws HiveException {
     List<String> queries = new ArrayList<>();
+    long minOpenWriteId = writeIds.getMinOpenWriteId() == null ? 1 : 
writeIds.getMinOpenWriteId();
+    long highWatermark = writeIds.getHighWatermark();
+    long compactorTxnId = CompactorMR.CompactorMap.getCompactorTxnId(conf);
     // create delta temp table
     String tmpTableName = AcidUtils.DELTA_PREFIX + tempTableBase;
-    queries.add(buildCreateTableQuery(table, tmpTableName, true, true, false));
+    queries.add(buildCreateTableQuery(table, tmpTableName, true, false, null));
     buildAlterTableQuery(tmpTableName, dir, writeIds, 
false).ifPresent(queries::add);
+    AcidOutputFormat.Options options = new 
AcidOutputFormat.Options(conf).writingBase(false)
+        
.writingDeleteDelta(false).isCompressed(false).minimumWriteId(minOpenWriteId)
+        
.maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
+    Path location = new Path(storageDescriptor.getLocation());
+    String tmpTableResultLocation = AcidUtils.baseOrDeltaSubdirPath(location,
+        options).toString();
     // create delta result temp table
-    queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, 
false, true));
+    queries.add(buildCreateTableQuery(table, tmpTableName + "_result", false, 
true,
+        tmpTableResultLocation));
 
     // create delete delta temp tables
     String tmpDeleteTableName = AcidUtils.DELETE_DELTA_PREFIX + tempTableBase;
-    queries.add(buildCreateTableQuery(table, tmpDeleteTableName, true, true, 
false));
+    queries.add(buildCreateTableQuery(table, tmpDeleteTableName,  true, false, 
null));
     buildAlterTableQuery(tmpDeleteTableName, dir, writeIds, 
true).ifPresent(queries::add);
+    options = new 
AcidOutputFormat.Options(conf).writingBase(false).writingDeleteDelta(true).isCompressed(false)
+        
.minimumWriteId(minOpenWriteId).maximumWriteId(highWatermark).statementId(-1).visibilityTxnId(compactorTxnId);
+    String tmpTableDeleteResultLocation = 
AcidUtils.baseOrDeltaSubdirPath(location,
+        options).toString();
     // create delete delta result temp table
-    queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result", 
false, false, true));
+    queries.add(buildCreateTableQuery(table, tmpDeleteTableName + "_result",  
false, true,
+        tmpTableDeleteResultLocation));
     return queries;
   }
 
@@ -121,9 +136,9 @@ final class MinorQueryCompactor extends QueryCompactor {
    * the schema of the table is the same as an ORC ACID file schema.
    * @param table he source table, where the compaction is running on
    * @param newTableName name of the table to be created
-   * @param isExternal true, if new table should be external
    * @param isPartitioned true, if new table should be partitioned
    * @param isBucketed true, if the new table should be bucketed
+   * @param location location of the table, can be null
    * @return a create table statement, always non-null. Example:
    * <p>
    *   if source table schema is: (a:int, b:int)
@@ -135,50 +150,32 @@ final class MinorQueryCompactor extends QueryCompactor {
    *   STORED AS ORC TBLPROPERTIES 
('transactional'='false','queryminorcomp'='true');
    * </p>
    */
-  private String buildCreateTableQuery(Table table, String newTableName, 
boolean isExternal, boolean isPartitioned,
-      boolean isBucketed) {
-    StringBuilder query = new StringBuilder("create temporary ");
-    if (isExternal) {
-      query.append("external ");
-    }
-    query.append("table ").append(newTableName).append(" (");
-    // Acid virtual columns
-    query.append(
-        "`operation` int, `originalTransaction` bigint, `bucket` int, `rowId` 
bigint, `currentTransaction` bigint, "
-            + "`row` struct<");
-    List<FieldSchema> cols = table.getSd().getCols();
-    boolean isFirst = true;
-    // Actual columns
-    for (FieldSchema col : cols) {
-      if (!isFirst) {
-        query.append(", ");
-      }
-      isFirst = false;
-      query.append("`").append(col.getName()).append("` 
").append(":").append(col.getType());
-    }
-    query.append(">)");
+  private String buildCreateTableQuery(Table table, String newTableName, 
boolean isPartitioned,
+      boolean isBucketed, String location) throws HiveException {
+    StringBuilder query = new 
StringBuilder(Util.getCreateTempTableQueryWithAcidColumns(newTableName, table));
     if (isPartitioned) {
       query.append(" partitioned by (`file_name` string)");
     }
     int bucketingVersion = 0;
     if (isBucketed) {
       int numBuckets = 1;
-      try {
-        org.apache.hadoop.hive.ql.metadata.Table t = 
Hive.get().getTable(table.getDbName(), table.getTableName());
+      org.apache.hadoop.hive.ql.metadata.Table t = 
Hive.get().getTable(table.getDbName(), table.getTableName(), false);
+      if (t != null) {
         numBuckets = Math.max(t.getNumBuckets(), numBuckets);
         bucketingVersion = t.getBucketingVersion();
-      } catch (HiveException e) {
-        LOG.info("Error finding table {}. Minor compaction result will use 0 
buckets.", table.getTableName());
-      } finally {
-        query.append(" clustered by (`bucket`)").append(" sorted by (`bucket`, 
`originalTransaction`, `rowId`)")
-            .append(" into ").append(numBuckets).append(" buckets");
       }
+      query.append(" clustered by (`bucket`)").append(" sorted by (`bucket`, 
`originalTransaction`, `rowId`)")
+              .append(" into ").append(numBuckets).append(" buckets");
     }
-
     query.append(" stored as orc");
+    if (location != null && !location.isEmpty()) {
+      query.append(" location '");
+      query.append(location);
+      query.append("'");
+    }
     query.append(" tblproperties ('transactional'='false'");
     query.append(", '");
-    query.append(MINOR_COMP_TBL_PROP);
+    query.append(AcidUtils.COMPACTOR_TABLE_PROPERTY);
     query.append("'='true'");
     if (isBucketed) {
       query.append(", 'bucketing_version'='")
@@ -272,23 +269,4 @@ final class MinorQueryCompactor extends QueryCompactor {
     queries.add(dropStm + AcidUtils.DELETE_DELTA_PREFIX + tmpTableBase + 
"_result");
     return queries;
   }
-
-  /**
-   * Creates the delta directory and moves the result files.
-   * @param deltaTableName name of the temporary table, where the results are 
stored
-   * @param dest destination path, where the result should be moved
-   * @param isDeleteDelta is the destination a delete delta directory
-   * @param conf hive configuration
-   * @param actualWriteIds list of valid write Ids
-   * @param compactorTxnId transaction Id of the compaction
-   * @throws HiveException the result files cannot be moved
-   * @throws IOException the destination delta directory cannot be created
-   */
-  private void commitCompaction(String deltaTableName, String dest, boolean 
isDeleteDelta, HiveConf conf,
-      ValidWriteIdList actualWriteIds, long compactorTxnId) throws 
HiveException, IOException {
-    org.apache.hadoop.hive.ql.metadata.Table deltaTable = 
Hive.get().getTable(deltaTableName);
-    Util.moveContents(new Path(deltaTable.getSd().getLocation()), new 
Path(dest), false, isDeleteDelta, conf,
-        actualWriteIds, compactorTxnId);
-  }
-
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
index 3ce4dde..7a9e48f 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/QueryCompactor.java
@@ -18,19 +18,17 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import org.apache.commons.lang3.exception.ExceptionUtils;
-import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.api.AlreadyExistsException;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
 import org.apache.hadoop.hive.ql.DriverUtils;
-import org.apache.hadoop.hive.ql.io.AcidOutputFormat;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
 import org.apache.hadoop.hive.ql.metadata.Hive;
 import org.apache.hadoop.hive.ql.metadata.HiveException;
 import org.apache.hadoop.hive.ql.session.SessionState;
@@ -40,7 +38,6 @@ import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
 import java.util.List;
-import java.util.Optional;
 import java.util.UUID;
 
 /**
@@ -62,7 +59,7 @@ abstract class QueryCompactor {
    * @throws IOException compaction cannot be finished.
    */
   abstract void runCompaction(HiveConf hiveConf, Table table, Partition 
partition, StorageDescriptor storageDescriptor,
-      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws 
IOException;
+      ValidWriteIdList writeIds, CompactionInfo compactionInfo) throws 
IOException, HiveException;
 
   /**
    * This is the final step of the compaction, which can vary based on 
compaction type. Usually this involves some file
@@ -91,7 +88,7 @@ abstract class QueryCompactor {
    * @param dropQueries queries which drops the temporary tables.
    * @throws IOException error during the run of the compaction.
    */
-  protected void runCompactionQueries(HiveConf conf, String tmpTableName, 
StorageDescriptor storageDescriptor,
+  void runCompactionQueries(HiveConf conf, String tmpTableName, 
StorageDescriptor storageDescriptor,
       ValidWriteIdList writeIds, CompactionInfo compactionInfo, List<String> 
createQueries,
       List<String> compactionQueries, List<String> dropQueries) throws 
IOException {
     Util.disableLlapCaching(conf);
@@ -151,88 +148,60 @@ abstract class QueryCompactor {
     }
 
     /**
-     * Check whether the result directory exits and contains compacted result 
files. If no splits are found, create
-     * an empty directory at the destination path, matching a base/delta 
directory naming convention.
-     * @param sourcePath the checked source location
-     * @param destPath the destination, where the new directory should be 
created
-     * @param isMajorCompaction is called from a major compaction
-     * @param isDeleteDelta is the output used as delete delta directory
-     * @param conf hive configuration
-     * @param validWriteIdList maximum transaction id
-     * @return true, if the check was successful
-     * @throws IOException the new directory cannot be created
+     * Unless caching is explicitly required for ETL queries this method 
disables it.
+     * LLAP cache content lookup is file based, and since compaction alters 
the file structure it is not beneficial to
+     * cache anything here, as it won't (and actually can't) ever be looked up 
later.
+     * @param conf the Hive configuration
      */
-    private static boolean resultHasSplits(Path sourcePath, Path destPath, 
boolean isMajorCompaction,
-        boolean isDeleteDelta, HiveConf conf, ValidWriteIdList 
validWriteIdList) throws IOException {
-      FileSystem fs = sourcePath.getFileSystem(conf);
-      long minOpenWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : 
validWriteIdList.getMinOpenWriteId();
-      long highWatermark = validWriteIdList.getHighWatermark();
-      AcidOutputFormat.Options options =
-          new 
AcidOutputFormat.Options(conf).writingBase(isMajorCompaction).writingDeleteDelta(isDeleteDelta)
-              .isCompressed(false).minimumWriteId(minOpenWriteId)
-              .maximumWriteId(highWatermark).bucket(0).statementId(-1);
-      Path newDeltaDir = AcidUtils.createFilename(destPath, 
options).getParent();
-      if (!fs.exists(sourcePath)) {
-        LOG.info("{} not found. Assuming 0 splits. Creating {}", sourcePath, 
newDeltaDir);
-        fs.mkdirs(newDeltaDir);
-        return false;
+    private static void disableLlapCaching(HiveConf conf) {
+      String llapIOETLSkipFormat = 
conf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT);
+      if (!"none".equals(llapIOETLSkipFormat)) {
+        // Unless caching is explicitly required for ETL queries - disable it.
+        conf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "all");
       }
-      return true;
     }
 
-    /**
-     * Create the base/delta directory matching the naming conventions and 
move the result files of the compaction
-     * into it.
-     * @param sourcePath location of the result files
-     * @param destPath destination path of the result files, without the 
base/delta directory
-     * @param isMajorCompaction is this called from a major compaction
-     * @param isDeleteDelta is the destination is a delete delta directory
-     * @param conf hive configuration
-     * @param validWriteIdList list of valid write Ids
-     * @param compactorTxnId transaction Id of the compaction
-     * @throws IOException the destination directory cannot be created
-     * @throws HiveException the result files cannot be moved to the 
destination directory
-     */
-    static void moveContents(Path sourcePath, Path destPath, boolean 
isMajorCompaction, boolean isDeleteDelta,
-        HiveConf conf, ValidWriteIdList validWriteIdList, long compactorTxnId) 
throws IOException, HiveException {
-      if (!resultHasSplits(sourcePath, destPath, isMajorCompaction, 
isDeleteDelta, conf, validWriteIdList)) {
-        return;
-      }
-      LOG.info("Moving contents of {} to {}", sourcePath, destPath);
-      FileSystem fs = sourcePath.getFileSystem(conf);
-      long minOpenWriteId = validWriteIdList.getMinOpenWriteId() == null ? 1 : 
validWriteIdList.getMinOpenWriteId();
-      long highWatermark = validWriteIdList.getHighWatermark();
-      for (FileStatus fileStatus : fs.listStatus(sourcePath)) {
-        String originalFileName = fileStatus.getPath().getName();
-        if (AcidUtils.ORIGINAL_PATTERN.matcher(originalFileName).matches()) {
-          Optional<Integer> bucketId = AcidUtils.parseBucketIdFromRow(fs, 
fileStatus.getPath());
-          if (bucketId.isPresent()) {
-            AcidOutputFormat.Options options =
-                new 
AcidOutputFormat.Options(conf).writingBase(isMajorCompaction).writingDeleteDelta(isDeleteDelta)
-                    .isCompressed(false).minimumWriteId(minOpenWriteId)
-                    
.maximumWriteId(highWatermark).bucket(bucketId.get()).statementId(-1)
-                    .visibilityTxnId(compactorTxnId);
-            Path finalBucketFile = AcidUtils.createFilename(destPath, options);
-            Hive.moveFile(conf, fileStatus.getPath(), finalBucketFile, true, 
false, false);
-          }
+      /**
+       * Get a create temporary table query string with Orc ACID columns.
+       * @param tableName name of the new temporary table
+       * @param table the table where the compaction is running
+       * @return create query
+       */
+    static String getCreateTempTableQueryWithAcidColumns(String tableName, 
Table table) {
+      StringBuilder query = new StringBuilder("create temporary external table 
").append(tableName).append(" (");
+      // Acid virtual columns
+      query.append("`operation` int, `originalTransaction` bigint, `bucket` 
int, `rowId` bigint, `currentTransaction` "
+              + "bigint, `row` struct<");
+      List<FieldSchema> cols = table.getSd().getCols();
+      boolean isFirst = true;
+      // Actual columns
+      for (FieldSchema col : cols) {
+        if (!isFirst) {
+          query.append(", ");
         }
+        isFirst = false;
+        query.append("`").append(col.getName()).append("` 
").append(":").append(col.getType());
       }
-      fs.delete(sourcePath, true);
+      query.append(">)");
+      return query.toString();
     }
 
     /**
-     * Unless caching is explicitly required for ETL queries this method 
disables it.
-     * LLAP cache content lookup is file based, and since compaction alters 
the file structure it is not beneficial to
-     * cache anything here, as it won't (and actually can't) ever be looked up 
later.
+     * Remove the root directory of a table if it's empty.
      * @param conf the Hive configuration
+     * @param tmpTableName name of the table
+     * @throws IOException the directory cannot be deleted
+     * @throws HiveException the table is not found
      */
-    static void disableLlapCaching(HiveConf conf) {
-      String llapIOETLSkipFormat = 
conf.getVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT);
-      if (!"none".equals(llapIOETLSkipFormat)) {
-        // Unless caching is explicitly required for ETL queries - disable it.
-        conf.setVar(HiveConf.ConfVars.LLAP_IO_ETL_SKIP_FORMAT, "all");
+    static void cleanupEmptyDir(HiveConf conf, String tmpTableName) throws 
IOException, HiveException {
+      org.apache.hadoop.hive.ql.metadata.Table tmpTable = 
Hive.get().getTable(tmpTableName);
+      if (tmpTable != null) {
+        Path path = new Path(tmpTable.getSd().getLocation());
+        FileSystem fs = path.getFileSystem(conf);
+        if (!fs.listFiles(path, false).hasNext()) {
+          fs.delete(path, true);
+        }
       }
     }
-
   }
 }

Reply via email to