This is an automated email from the ASF dual-hosted git repository.

kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6bddac42fbe HIVE-27956: Query based compactor implementation 
separation (#5192). (Marta Kuczora, reviewed by Ayush Saxena, Zoltan Ratkai, 
Attila Turoczy and Laszlo Vegh)
6bddac42fbe is described below

commit 6bddac42fbe573d84a93c0ac9f08193c193aca6c
Author: kuczoram <[email protected]>
AuthorDate: Sat May 18 13:24:40 2024 +0200

    HIVE-27956: Query based compactor implementation separation (#5192). (Marta 
Kuczora, reviewed by Ayush Saxena, Zoltan Ratkai, Attila Turoczy and Laszlo 
Vegh)
---
 .../ql/txn/compactor/CompactionQueryBuilder.java   | 551 ++++++---------------
 .../compactor/CompactionQueryBuilderFactory.java   |  44 ++
 .../CompactionQueryBuilderForInsertOnly.java       | 274 ++++++++++
 .../compactor/CompactionQueryBuilderForMajor.java  |  52 ++
 .../compactor/CompactionQueryBuilderForMinor.java  | 105 ++++
 .../CompactionQueryBuilderForRebalance.java        |  76 +++
 .../hive/ql/txn/compactor/MajorQueryCompactor.java |  29 +-
 .../hive/ql/txn/compactor/MinorQueryCompactor.java |  37 +-
 .../ql/txn/compactor/MmMajorQueryCompactor.java    |  30 +-
 .../ql/txn/compactor/MmMinorQueryCompactor.java    |  45 +-
 .../ql/txn/compactor/RebalanceQueryCompactor.java  |  32 +-
 .../compactor/CompactionQueryBuilderTestBase.java  | 150 ++++++
 ...stCompactionQueryBuilderForMajorCompaction.java | 255 ++++++++++
 ...stCompactionQueryBuilderForMinorCompaction.java | 186 +++++++
 .../TestCompactionQueryBuilderForMmCompaction.java | 466 +++++++++++++++++
 ...mpactionQueryBuilderForRebalanceCompaction.java | 171 +++++++
 16 files changed, 1999 insertions(+), 504 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
index 3ca930d5cf5..98196d20bc3 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
@@ -17,72 +17,51 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor;
 
-import org.apache.commons.lang3.ArrayUtils;
 import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.metastore.ColumnType;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
 import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SkewedInfo;
 import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
 import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
 import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
 import org.apache.hadoop.hive.ql.io.AcidDirectory;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.util.DirectionUtils;
 import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
 import org.apache.hive.common.util.HiveStringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.ArrayList;
 import java.util.HashMap;
-import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.stream.Collectors;
 
-/**
- * Builds query strings that help with query-based compaction of CRUD and 
insert-only tables.
- */
-class CompactionQueryBuilder {
-
-  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilder.class.getName());
-
+abstract class CompactionQueryBuilder {
   // required fields, set in constructor
-  private final Operation operation;
-  private final String resultTableName;
+  protected Operation operation;
+  protected String resultTableName;
 
   // required for some types of compaction. Required...
-  private Table sourceTab; // for Create and for Insert in CRUD and 
insert-only major
-  private StorageDescriptor storageDescriptor; // for Create in insert-only
-  private String location; // for Create
-  private ValidWriteIdList validWriteIdList; // for Alter/Insert in minor and 
CRUD
-  private AcidDirectory dir; // for Alter in minor
-  private Partition sourcePartition; // for Insert in major and insert-only 
minor
-  private String sourceTabForInsert; // for Insert
-  private int numberOfBuckets;  //for rebalance
-  private String orderByClause;  //for rebalance
+  protected Table sourceTab; // for Create and for Insert in CRUD and 
insert-only major
+  protected String location; // for Create
+  protected ValidWriteIdList validWriteIdList; // for Alter/Insert in minor 
and CRUD
+  protected AcidDirectory dir; // for Alter in minor
+  protected Partition sourcePartition; // for Insert in major and insert-only 
minor
+  protected String sourceTabForInsert; // for Insert
+  protected String orderByClause;  //for rebalance
+  protected StorageDescriptor storageDescriptor; // for Create in insert-only
+  protected int numberOfBuckets;
 
   // settable booleans
-  private boolean isPartitioned; // for Create
-  private boolean isBucketed; // for Create in CRUD
-  private boolean isDeleteDelta; // for Alter in CRUD minor
+  protected boolean isPartitioned; // for Create
+  protected boolean isBucketed; // for Create in CRUD
+  protected boolean isDeleteDelta; // for Alter in CRUD minor
 
   // internal use only, for legibility
-  private final boolean insertOnly;
-  private final CompactionType compactionType;
+  protected boolean insertOnly;
+  protected CompactionType compactionType;
 
   enum Operation {
     CREATE, ALTER, INSERT, DROP
@@ -100,17 +79,6 @@ class CompactionQueryBuilder {
     return this;
   }
 
-  /**
-   * Set the StorageDescriptor of the table or partition to compact.
-   * Required for Create operations in insert-only compaction.
-   *
-   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
-   */
-  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
-    this.storageDescriptor = storageDescriptor;
-    return this;
-  }
-
   /**
    * Set the location of the temp tables.
    * Used for Create operations.
@@ -164,15 +132,6 @@ class CompactionQueryBuilder {
     return this;
   }
 
-  /**
-   * Sets the target number of implicit buckets for a rebalancing compaction
-   * @param numberOfBuckets The target number of buckets
-   */
-  public CompactionQueryBuilder setNumberOfBuckets(int numberOfBuckets) {
-    this.numberOfBuckets = numberOfBuckets;
-    return this;
-  }
-
   /**
    * Sets the order by clause for a rebalancing compaction. It will be used to 
re-order the data in the table during
    * the compaction.
@@ -195,7 +154,7 @@ class CompactionQueryBuilder {
    * If true, Create operations for CRUD minor compaction will result in a 
bucketed table.
    */
   CompactionQueryBuilder setBucketed(boolean bucketed) {
-    if(CompactionType.REBALANCE.equals(compactionType) && bucketed) {
+    if (CompactionType.REBALANCE.equals(compactionType) && bucketed) {
       throw new IllegalArgumentException("Rebalance compaction is supported 
only on implicitly-bucketed tables!");
     }
     isBucketed = bucketed;
@@ -212,28 +171,50 @@ class CompactionQueryBuilder {
     return this;
   }
 
+  /**
+   * Set the StorageDescriptor of the table or partition to compact.
+   * Required for Create operations in insert-only compaction.
+   *
+   * @param storageDescriptor StorageDescriptor of the table or partition to 
compact, not null
+   */
+  CompactionQueryBuilder setStorageDescriptor(StorageDescriptor 
storageDescriptor) {
+    this.storageDescriptor = storageDescriptor;
+    return this;
+  }
+
+  /**
+   * Sets the target number of implicit buckets for a rebalancing compaction
+   * @param numberOfBuckets The target number of buckets
+   */
+  CompactionQueryBuilder setNumberOfBuckets(int numberOfBuckets) {
+    this.numberOfBuckets = numberOfBuckets;
+    return this;
+  }
+
+  CompactionQueryBuilder setOperation(Operation operation) {
+    this.operation = operation;
+    return this;
+  }
+
+  CompactionQueryBuilder setResultTableName(String resultTableName) {
+    this.resultTableName = resultTableName;
+    return this;
+  }
+
   /**
    * Construct a CompactionQueryBuilder with required params.
    *
    * @param compactionType major or minor; crud or insert-only, e.g. 
CompactionType.MAJOR_CRUD.
    *                       Cannot be null.
-   * @param operation query's Operation e.g. Operation.CREATE.
    * @param insertOnly Indicates if the table is not full ACID but Insert-only 
(Micromanaged)
    * @throws IllegalArgumentException if compactionType is null
    */
-  CompactionQueryBuilder(CompactionType compactionType, Operation operation, 
boolean insertOnly,
-      String resultTableName) {
+  CompactionQueryBuilder(CompactionType compactionType, boolean insertOnly) {
     if (compactionType == null) {
       throw new 
IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be 
null");
     }
     this.compactionType = compactionType;
-    this.operation = operation;
-    this.resultTableName = resultTableName;
     this.insertOnly = insertOnly;
-
-    if (CompactionType.REBALANCE.equals(compactionType) && insertOnly) {
-      throw new IllegalArgumentException("Rebalance compaction is supported 
only on full ACID tables!");
-    }
   }
 
   /**
@@ -275,137 +256,104 @@ class CompactionQueryBuilder {
     case DROP:
     default:
     }
-
     return query.toString();
   }
 
+  protected void getDdlForCreate(StringBuilder query) {
+    defineColumns(query);
+
+    // PARTITIONED BY. Used for parts of minor compaction.
+    if (isPartitioned) {
+      query.append(" PARTITIONED BY (`file_name` STRING) ");
+    }
+
+    // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+    query.append(" stored as orc");
+
+    // LOCATION
+    if (location != null) {
+      query.append(" LOCATION 
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+    }
+
+    addTblProperties(query, false, 0);
+  }
+
+  /**
+   * Part of Create operation. All tmp tables are not transactional and are 
marked as
+   * compaction tables. Additionally...
+   * - Crud compaction temp tables need tblproperty, "compactiontable."
+   * - Minor crud compaction temp tables need bucketing version tblproperty, 
if table is bucketed.
+   */
+  protected void addTblProperties(StringBuilder query, boolean 
addBucketingVersion, int bucketingVersion) {
+    Map<String, String> tblProperties = new HashMap<>();
+    tblProperties.put("transactional", "false");
+    tblProperties.put(AcidUtils.COMPACTOR_TABLE_PROPERTY, 
compactionType.name());
+    if (addBucketingVersion) {
+      tblProperties.put("bucketing_version", String.valueOf(bucketingVersion));
+    }
+    if (sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is 
null
+      sourceTab.getParameters().entrySet().stream().filter(e -> 
e.getKey().startsWith("orc."))
+          .forEach(e -> tblProperties.put(e.getKey(), 
HiveStringUtils.escapeHiveCommand(e.getValue())));
+    }
+    addTblProperties(query, tblProperties);
+  }
+
+  protected void addTblProperties(StringBuilder query, Map<String, String> 
tblProperties) {
+    if (tblProperties != null && !tblProperties.isEmpty()) {
+      // add TBLPROPERTIES clause to query
+      boolean isFirst;
+      query.append(" TBLPROPERTIES (");
+      isFirst = true;
+      for (Map.Entry<String, String> property : tblProperties.entrySet()) {
+        if (!isFirst) {
+          query.append(", ");
+        }
+        
query.append("'").append(property.getKey()).append("'='").append(property.getValue()).append("'");
+        isFirst = false;
+      }
+      query.append(")");
+    }
+  }
+
   private void buildAddClauseForAlter(StringBuilder query) {
     if (validWriteIdList == null || dir == null) {
       query.setLength(0);
       return;  // avoid NPEs, don't throw an exception but return an empty 
query
     }
-    long minWriteID =
-        validWriteIdList.getMinOpenWriteId() == null ? 1 : 
validWriteIdList.getMinOpenWriteId();
+    long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 : 
validWriteIdList.getMinOpenWriteId();
     long highWatermark = validWriteIdList.getHighWatermark();
     List<AcidUtils.ParsedDelta> deltas = 
dir.getCurrentDirectories().stream().filter(
-        delta -> delta.isDeleteDelta() == isDeleteDelta && 
delta.getMaxWriteId() <= highWatermark
-            && delta.getMinWriteId() >= minWriteID)
+            delta -> delta.isDeleteDelta() == isDeleteDelta && 
delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID)
         .collect(Collectors.toList());
     if (deltas.isEmpty()) {
       query.setLength(0); // no alter query needed; clear StringBuilder
       return;
     }
     query.append(" add ");
-    deltas.forEach(delta -> query.append("partition (file_name='")
-        .append(delta.getPath().getName()).append("')"
-            + " location '").append(delta.getPath()).append("' "));
-  }
-
-
-  private void buildSelectClauseForInsert(StringBuilder query) {
-    // Need list of columns for major crud, mmmajor partitioned, mmminor
-    List<FieldSchema> cols;
-    if (CompactionType.REBALANCE.equals(compactionType) ||
-        CompactionType.MAJOR.equals(compactionType) && (!insertOnly || 
sourcePartition != null) ||
-        CompactionType.MINOR.equals(compactionType) && insertOnly) {
-      if (sourceTab == null) {
-        return; // avoid NPEs, don't throw an exception but skip this part of 
the query
-      }
-      cols = sourceTab.getSd().getCols();
-    } else {
-      cols = null;
-    }
-    switch (compactionType) {
-      case REBALANCE: {
-        query.append("0, t2.writeId, t2.rowId DIV CEIL(numRows / ")
-            .append(numberOfBuckets)
-            .append("), t2.rowId, t2.writeId, t2.data from (select ")
-            .append("count(ROW__ID.writeId) over() as numRows, ");
-        if (StringUtils.isNotBlank(orderByClause)) {
-          // in case of reordering the data the writeids cannot be kept.
-          query.append("MAX(ROW__ID.writeId) over() as writeId, row_number() 
OVER (")
-            .append(orderByClause);
-        } else {
-          query.append("ROW__ID.writeId as writeId, row_number() OVER (order 
by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC");
-        }
-        query.append(") - 1 AS rowId, NAMED_STRUCT(");
-        for (int i = 0; i < cols.size(); ++i) {
-          query.append(i == 0 ? "'" : ", 
'").append(cols.get(i).getName()).append("', `")
-              .append(cols.get(i).getName()).append("`");
-        }
-        query.append(") as data");
-        break;
-      }
-      case MAJOR: {
-        if (insertOnly) {
-          if (sourcePartition != null) { //mmmajor and partitioned
-            appendColumns(query, cols, false);
-          } else { // mmmajor and unpartitioned
-            query.append("*");
-          }
-        } else {
-          query.append(
-              "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, 
ROW__ID.rowId), "
-                  + "ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, 
ROW__ID.writeId, "
-                  + "NAMED_STRUCT(");
-          appendColumns(query, cols, true);
-          query.append(") ");
-        }
-        break;
-      }
-      case MINOR: {
-        if (insertOnly) {
-          appendColumns(query, cols, false);
-        } else {
-          query.append(
-              "`operation`, `originalTransaction`, `bucket`, `rowId`, 
`currentTransaction`, `row`");
-        }
-        break;
-      }
-    }
+    deltas.forEach(
+        delta -> query.append("partition 
(file_name='").append(delta.getPath().getName()).append("') location '")
+            .append(delta.getPath()).append("' "));
   }
 
-  private void appendColumns(StringBuilder query, List<FieldSchema> cols, 
boolean alias) {
-    if (cols == null) {
-      throw new IllegalStateException("Query could not be created: Source 
columns are unknown");
-    }
-    for (int i = 0; i < cols.size(); ++i) {
-      if (alias) {
-        query.append(i == 0 ? "'" : ", 
'").append(cols.get(i).getName()).append("', `")
-            .append(cols.get(i).getName()).append("`");
-      } else {
-        query.append(i == 0 ? "`" : ", 
`").append(cols.get(i).getName()).append("`");
-      }
-    }
-  }
+  protected abstract void buildSelectClauseForInsert(StringBuilder query);
 
-  private void getSourceForInsert(StringBuilder query) {
+  protected void getSourceForInsert(StringBuilder query) {
     if (sourceTabForInsert != null) {
       query.append(sourceTabForInsert);
     } else {
       
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
     }
     query.append(" ");
-    if (CompactionType.REBALANCE.equals(compactionType)) {
-      if (StringUtils.isNotBlank(orderByClause)) {
-        query.append(orderByClause);
-      } else {
-        query.append("order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, 
ROW__ID.rowId ASC");
-      }
-      query.append(") t2");
-    } else if (CompactionType.MAJOR.equals(compactionType) && insertOnly && 
StringUtils.isNotBlank(orderByClause)) {
-      query.append(orderByClause);
-    }
   }
 
-  private void buildWhereClauseForInsert(StringBuilder query) {
-    if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null 
&& sourceTab != null) {
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (sourcePartition != null && sourceTab != null) {
       List<String> vals = sourcePartition.getValues();
       List<FieldSchema> keys = sourceTab.getPartitionKeys();
       if (keys.size() != vals.size()) {
-        throw new IllegalStateException("source partition values ("
-            + Arrays.toString(vals.toArray()) + ") do not match source table 
values ("
-            + Arrays.toString(keys.toArray()) + "). Failing compaction.");
+        throw new IllegalStateException("source partition values (" + 
Arrays.toString(
+            vals.toArray()) + ") do not match source table values (" + 
Arrays.toString(
+            keys.toArray()) + "). Failing compaction.");
       }
 
       query.append(" where ");
@@ -419,67 +367,36 @@ class CompactionQueryBuilder {
         }
       }
     }
-
-    if (CompactionType.MINOR.equals(compactionType) && !insertOnly && 
validWriteIdList != null) {
-      long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds();
-      if (invalidWriteIds.length > 0) {
-        query.append(" where `originalTransaction` not in (").append(
-            StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ","))
-            .append(")");
-      }
-    }
   }
 
-  private void getDdlForCreate(StringBuilder query) {
-    defineColumns(query);
-
-    // PARTITIONED BY. Used for parts of minor compaction.
-    if (isPartitioned) {
-      query.append(" PARTITIONED BY (`file_name` STRING) ");
-    }
-
-    // CLUSTERED BY. (bucketing)
-    int bucketingVersion = 0;
-    if (!insertOnly && CompactionType.MINOR.equals(compactionType)) {
-      bucketingVersion = getMinorCrudBucketing(query, bucketingVersion);
-    } else if (insertOnly) {
-      getMmBucketing(query);
-    }
-
-    // SKEWED BY
-    if (insertOnly) {
-      getSkewedByClause(query);
-    }
-
-    // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
-    if (!insertOnly) {
-      query.append(" stored as orc");
-    } else {
-      copySerdeFromSourceTable(query);
+  protected void appendColumns(StringBuilder query, List<FieldSchema> cols, 
boolean alias) {
+    if (cols == null) {
+      throw new IllegalStateException("Query could not be created: Source 
columns are unknown");
     }
-
-    // LOCATION
-    if (location != null) {
-      query.append(" LOCATION 
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+    for (int i = 0; i < cols.size(); ++i) {
+      if (alias) {
+        query.append(i == 0 ? "'" : ", 
'").append(cols.get(i).getName()).append("', `").append(cols.get(i).getName())
+            .append("`");
+      } else {
+        query.append(i == 0 ? "`" : ", 
`").append(cols.get(i).getName()).append("`");
+      }
     }
-
-    // TBLPROPERTIES
-    addTblProperties(query, bucketingVersion);
   }
 
   /**
    * Define columns of the create query.
    */
-  private void defineColumns(StringBuilder query) {
-    if (sourceTab == null) {
-      return; // avoid NPEs, don't throw an exception but skip this part of 
the query
-    }
-    query.append("(");
-    if (!insertOnly) {
-      query.append(
-          "`operation` int, `originalTransaction` bigint, `bucket` int, 
`rowId` bigint, "
-              + "`currentTransaction` bigint, `row` struct<");
+  protected void defineColumns(StringBuilder query) {
+    if (sourceTab != null) {
+      query.append("(")
+          .append("`operation` int, `originalTransaction` bigint, `bucket` 
int, `rowId` bigint, `currentTransaction` bigint, ")
+          .append("`row` struct<")
+          .append(StringUtils.join(getColumnDescs(), ','))
+          .append(">) ");
     }
+  }
+
+  protected List<String> getColumnDescs() {
     List<FieldSchema> cols = sourceTab.getSd().getCols();
     List<String> columnDescs = new ArrayList<>();
     for (FieldSchema col : cols) {
@@ -487,197 +404,7 @@ class CompactionQueryBuilder {
       String columnDesc = "`" + col.getName() + "` " + (!insertOnly ? ":" : 
"") + columnType;
       columnDescs.add(columnDesc);
     }
-    query.append(StringUtils.join(columnDescs, ','));
-    query.append(!insertOnly ? ">" : "");
-    query.append(") ");
+    return columnDescs;
   }
 
-  /**
-   * Part of Create operation. Copy source table bucketing for insert-only 
compaction.
-   */
-  private void getMmBucketing(StringBuilder query) {
-    if (sourceTab == null) {
-      return; // avoid NPEs, don't throw an exception but skip this part of 
the query
-    }
-    boolean isFirst;
-    List<String> buckCols = sourceTab.getSd().getBucketCols();
-    if (buckCols.size() > 0) {
-      query.append("CLUSTERED BY (").append(StringUtils.join(buckCols, 
",")).append(") ");
-      List<Order> sortCols = sourceTab.getSd().getSortCols();
-      if (sortCols.size() > 0) {
-        query.append("SORTED BY (");
-        isFirst = true;
-        for (Order sortCol : sortCols) {
-          if (!isFirst) {
-            query.append(", ");
-          }
-          isFirst = false;
-          query.append(sortCol.getCol()).append(" ")
-              .append(DirectionUtils.codeToText(sortCol.getOrder()));
-        }
-        query.append(") ");
-      }
-      query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append(" 
BUCKETS");
-    }
-  }
-
-  /**
-   * Part of Create operation. Minor crud compaction uses its own bucketing 
system.
-   */
-  private int getMinorCrudBucketing(StringBuilder query, int bucketingVersion) 
{
-    if (isBucketed && sourceTab != null) { // skip if sourceTab is null to 
avoid NPEs
-      int numBuckets = 1;
-      try {
-        org.apache.hadoop.hive.ql.metadata.Table t =
-            Hive.get().getTable(sourceTab.getDbName(), 
sourceTab.getTableName());
-        numBuckets = Math.max(t.getNumBuckets(), numBuckets);
-        bucketingVersion = t.getBucketingVersion();
-      } catch (HiveException e) {
-        LOG.info("Error finding table {}. Minor compaction result will use 0 
buckets.",
-            sourceTab.getTableName());
-      } finally {
-        query.append(" clustered by (`bucket`)")
-            .append(" sorted by (`originalTransaction`, `bucket`, `rowId`)")
-            .append(" into ").append(numBuckets).append(" buckets");
-      }
-    }
-    return bucketingVersion;
-  }
-
-  /**
-   * Part of Create operation. Insert-only compaction tables copy source 
tables.
-   */
-  private void getSkewedByClause(StringBuilder query) {
-    if (sourceTab == null) {
-      return; // avoid NPEs, don't throw an exception but skip this part of 
the query
-    }
-    boolean isFirst; // Stored as directories. We don't care about the skew 
otherwise.
-    if (sourceTab.getSd().isStoredAsSubDirectories()) {
-      SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
-      if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
-        query.append(" SKEWED BY (")
-            .append(StringUtils.join(skewedInfo.getSkewedColNames(), ", 
")).append(") ON ");
-        isFirst = true;
-        for (List<String> colValues : skewedInfo.getSkewedColValues()) {
-          if (!isFirst) {
-            query.append(", ");
-          }
-          isFirst = false;
-          query.append("('").append(StringUtils.join(colValues, 
"','")).append("')");
-        }
-        query.append(") STORED AS DIRECTORIES");
-      }
-    }
-  }
-
-  /**
-   * Part of Create operation. Insert-only compaction tables copy source 
tables' serde.
-   */
-  private void copySerdeFromSourceTable(StringBuilder query) {
-    if (storageDescriptor == null) {
-      return; // avoid NPEs, don't throw an exception but skip this part of 
the query
-    }
-    ensureTableToCompactIsNative();
-    SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
-    Map<String, String> serdeParams = serdeInfo.getParameters();
-    query.append(" ROW FORMAT SERDE '")
-        
.append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())).append("'");
-    // WITH SERDEPROPERTIES
-    if (!serdeParams.isEmpty()) {
-      DDLPlanUtils.appendSerdeParams(query, serdeParams);
-    }
-    query.append("STORED AS INPUTFORMAT '")
-        
.append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getInputFormat())).append("'")
-        .append(" OUTPUTFORMAT '")
-        
.append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getOutputFormat()))
-        .append("'");
-  }
-
-  /**
-   * Part of Create operation. All tmp tables are not transactional and are 
marked as
-   * compaction tables. Additionally...
-   * - Crud compaction temp tables need tblproperty, "compactiontable."
-   * - Minor crud compaction temp tables need bucketing version tblproperty, 
if table is bucketed.
-   * - Insert-only compaction tables copy source tables' tblproperties, except 
metastore/statistics
-   *   properties.
-   */
-  private void addTblProperties(StringBuilder query, int bucketingVersion) {
-    Map<String, String> tblProperties = new HashMap<>();
-    tblProperties.put("transactional", "false");
-    if (!insertOnly) {
-      tblProperties.put(AcidUtils.COMPACTOR_TABLE_PROPERTY, 
compactionType.name());
-      if (CompactionType.MINOR.equals(compactionType) && isBucketed) {
-        tblProperties.put("bucketing_version", 
String.valueOf(bucketingVersion));
-      }
-    }
-    if (sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is 
null
-      if (insertOnly) {
-        // Exclude all standard table properties.
-        Set<String> excludes = getHiveMetastoreConstants();
-        excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
-        for (Map.Entry<String, String> e : 
sourceTab.getParameters().entrySet()) {
-          if (e.getValue() == null) {
-            continue;
-          }
-          if (excludes.contains(e.getKey())) {
-            continue;
-          }
-          tblProperties.put(e.getKey(), 
HiveStringUtils.escapeHiveCommand(e.getValue()));
-        }
-      } else {
-        for (Map.Entry<String, String> e : 
sourceTab.getParameters().entrySet()) {
-          if (e.getKey().startsWith("orc.")) {
-            tblProperties.put(e.getKey(), 
HiveStringUtils.escapeHiveCommand(e.getValue()));
-          }
-        }
-      }
-    }
-
-    // add TBLPROPERTIES clause to query
-    boolean isFirst;
-    query.append(" TBLPROPERTIES (");
-    isFirst = true;
-    for (Map.Entry<String, String> property : tblProperties.entrySet()) {
-      if (!isFirst) {
-        query.append(", ");
-      }
-      
query.append("'").append(property.getKey()).append("'='").append(property.getValue())
-          .append("'");
-      isFirst = false;
-    }
-    query.append(")");
-  }
-
-  private static Set<String> getHiveMetastoreConstants() {
-    Set<String> result = new HashSet<>();
-    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
-      if (!Modifier.isFinal(f.getModifiers())) {
-        continue;
-      }
-      if (!String.class.equals(f.getType())) {
-        continue;
-      }
-      f.setAccessible(true);
-      try {
-        result.add((String) f.get(null));
-      } catch (IllegalAccessException e) {
-        throw new RuntimeException(e);
-      }
-    }
-    return result;
-  }
-
-  private void ensureTableToCompactIsNative() {
-    if (sourceTab == null) {
-      return;
-    }
-    String storageHandler =
-        
sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
-    if (storageHandler != null) {
-      String message = "Table " + sourceTab.getTableName() + "has a storage 
handler ("
-          + storageHandler + "). Failing compaction for this non-native 
table.";
-      LOG.error(message);
-      throw new RuntimeException(message);
-    }
-  }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderFactory.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderFactory.java
new file mode 100644
index 00000000000..bb7a001ebd4
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+
+class CompactionQueryBuilderFactory {
+  public CompactionQueryBuilder getCompactionQueryBuilder(CompactionType 
compactionType, boolean insertOnly) {
+
+    if (compactionType == null) {
+      throw new 
IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be 
null");
+    }
+    if (insertOnly) {
+      return new CompactionQueryBuilderForInsertOnly(compactionType);
+    } else {
+      switch (compactionType) {
+      case MAJOR:
+        return new CompactionQueryBuilderForMajor();
+      case MINOR:
+        return new CompactionQueryBuilderForMinor();
+      case REBALANCE:
+        return new CompactionQueryBuilderForRebalance();
+      }
+    }
+    throw new IllegalArgumentException(
+        String.format("Compaction cannot be created with type %s", 
compactionType));
+  }
+
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java
new file mode 100644
index 00000000000..f0bd64bfd7b
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only 
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+  /**
+   * Construct a CompactionQueryBuilderForInsertOnly with required params.
+   *
+   * @param compactionType major or minor or rebalance, e.g. 
CompactionType.MAJOR.
+   *                       Cannot be null.
+   *                       Rebalance compaction is not supported for 
insert-only tables.
+   * @throws IllegalArgumentException if compactionType is null or the 
compaction type is REBALANCE
+   */
+  CompactionQueryBuilderForInsertOnly(CompactionType compactionType) {
+    super(compactionType, true);
+    if (CompactionType.REBALANCE.equals(compactionType)) {
+      throw new IllegalArgumentException("Rebalance compaction can only be 
supported on full ACID tables.");
+    }
+  }
+
+  @Override
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    switch (compactionType) {
+    case MAJOR: {
+      if (sourcePartition != null) { //mmmajor and partitioned
+        if (sourceTab != null) {
+          List<FieldSchema> cols = sourceTab.getSd().getCols();
+          appendColumns(query, cols, false);
+          // If the source table is null, just skip to add this part to the 
query
+        }
+      } else { // mmmajor and unpartitioned
+        query.append("*");
+      }
+      break;
+    }
+    case MINOR: {
+      if (sourceTab != null) {
+        List<FieldSchema> cols = sourceTab.getSd().getCols();
+        appendColumns(query, cols, false);
+        // If the source table is null, just skip to add this part to the query
+      }
+    }
+    }
+  }
+
+  @Override
+  protected void getSourceForInsert(StringBuilder query) {
+    super.getSourceForInsert(query);
+    if (CompactionType.MAJOR.equals(compactionType) && 
StringUtils.isNotBlank(orderByClause)) {
+      query.append(orderByClause);
+    }
+  }
+
+  @Override
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (CompactionType.MAJOR.equals(compactionType)) {
+      super.buildWhereClauseForInsert(query);
+    }
+  }
+
+  @Override
+  protected void getDdlForCreate(StringBuilder query) {
+    defineColumns(query);
+
+    // PARTITIONED BY. Used for parts of minor compaction.
+    if (isPartitioned) {
+      query.append(" PARTITIONED BY (`file_name` STRING) ");
+    }
+
+    // CLUSTERED BY. (bucketing)
+    getMmBucketing(query);
+
+    // SKEWED BY
+    getSkewedByClause(query);
+
+    // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+    copySerdeFromSourceTable(query);
+
+    // LOCATION
+    if (location != null) {
+      query.append(" LOCATION 
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+    }
+
+    // TBLPROPERTIES
+    addTblProperties(query);
+  }
+
+  /**
+   * Define columns of the create query.
+   */
+  @Override
+  protected void defineColumns(StringBuilder query) {
+    if (sourceTab != null) {
+      query.append("(");
+      List<String> columnDescs = getColumnDescs();
+      query.append(StringUtils.join(columnDescs, ','));
+      query.append(") ");
+    }
+  }
+
+  /**
+   * Part of Create operation. Copy source table bucketing for insert-only 
compaction.
+   */
+  private void getMmBucketing(StringBuilder query) {
+    if (sourceTab != null) {
+      boolean isFirst;
+      List<String> buckCols = sourceTab.getSd().getBucketCols();
+      if (!buckCols.isEmpty()) {
+        query.append("CLUSTERED BY (").append(StringUtils.join(buckCols, 
",")).append(") ");
+        List<Order> sortCols = sourceTab.getSd().getSortCols();
+        if (!sortCols.isEmpty()) {
+          query.append("SORTED BY (");
+          isFirst = true;
+          for (Order sortCol : sortCols) {
+            if (!isFirst) {
+              query.append(", ");
+            }
+            isFirst = false;
+            query.append(sortCol.getCol()).append(" 
").append(DirectionUtils.codeToText(sortCol.getOrder()));
+          }
+          query.append(") ");
+        }
+        query.append("INTO 
").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS");
+      }
+    }
+  }
+
+  /**
+   * Part of Create operation. Insert-only compaction tables copy source 
tables.
+   */
+  private void getSkewedByClause(StringBuilder query) {
+    if (sourceTab != null) {
+      boolean isFirst; // Stored as directories. We don't care about the skew 
otherwise.
+      if (sourceTab.getSd().isStoredAsSubDirectories()) {
+        SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
+        if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+          query.append(" SKEWED BY 
(").append(StringUtils.join(skewedInfo.getSkewedColNames(), ", ")).append(") ON 
");
+          isFirst = true;
+          for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+            if (!isFirst) {
+              query.append(", ");
+            }
+            isFirst = false;
+            query.append("('").append(StringUtils.join(colValues, 
"','")).append("')");
+          }
+          query.append(") STORED AS DIRECTORIES");
+        }
+      }
+    }
+  }
+
+  private static Set<String> getHiveMetastoreConstants() {
+    Set<String> result = new HashSet<>();
+    for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+      if (!Modifier.isFinal(f.getModifiers())) {
+        continue;
+      }
+      if (!String.class.equals(f.getType())) {
+        continue;
+      }
+      f.setAccessible(true);
+      try {
+        result.add((String) f.get(null));
+      } catch (IllegalAccessException e) {
+        throw new RuntimeException(e);
+      }
+    }
+    return result;
+  }
+
+  /**
+   * Part of Create operation. All tmp tables are not transactional and are 
marked as
+   * compaction tables. Additionally...
+   * - Insert-only compaction tables copy source tables' tblproperties, except 
metastore/statistics
+   * properties.
+   */
+  protected void addTblProperties(StringBuilder query) {
+    Map<String, String> tblProperties = new HashMap<>();
+    tblProperties.put("transactional", "false");
+    if (sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is 
null
+      // Exclude all standard table properties.
+      Set<String> excludes = getHiveMetastoreConstants();
+      excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
+      for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet()) 
{
+        if (e.getValue() == null) {
+          continue;
+        }
+        if (excludes.contains(e.getKey())) {
+          continue;
+        }
+        tblProperties.put(e.getKey(), 
HiveStringUtils.escapeHiveCommand(e.getValue()));
+      }
+    }
+    super.addTblProperties(query, tblProperties);
+  }
+
+  /**
+   * Part of Create operation. Insert-only compaction tables copy source 
tables' serde.
+   */
+  protected void copySerdeFromSourceTable(StringBuilder query) {
+    if (storageDescriptor != null) {
+      ensureTableToCompactIsNative();
+      SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
+      Map<String, String> serdeParams = serdeInfo.getParameters();
+      query.append(" ROW FORMAT SERDE 
'").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
+          .append("'");
+      // WITH SERDEPROPERTIES
+      if (!serdeParams.isEmpty()) {
+        DDLPlanUtils.appendSerdeParams(query, serdeParams);
+      }
+      query.append("STORED AS INPUTFORMAT '")
+          
.append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getInputFormat())).append("'")
+          .append(" OUTPUTFORMAT 
'").append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getOutputFormat()))
+          .append("'");
+    }
+
+  }
+
+  private void ensureTableToCompactIsNative() {
+    if (sourceTab != null) {
+      String storageHandler = 
sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+      if (storageHandler != null) {
+        String message =
+            "Table " + sourceTab.getTableName() + " has a storage handler (" + 
storageHandler + "). Failing compaction for this non-native table.";
+        LOG.error(message);
+        throw new RuntimeException(message);
+      }
+    }
+  }
+
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMajor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMajor.java
new file mode 100644
index 00000000000..c95b3cfee67
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMajor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+
+import java.util.List;
+
+/**
+ * Builds query strings that help with query-based MAJOR compaction of CRUD.
+ */
+class CompactionQueryBuilderForMajor extends CompactionQueryBuilder {
+
+  CompactionQueryBuilderForMajor() {
+    super(CompactionType.MAJOR, false);
+  }
+
+  @Override
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns fir major crud compaction.
+    // If the source table is null, don't throw an exception, but skip this 
part of the query
+    if (sourceTab != null) {
+      List<FieldSchema> cols = sourceTab.getSd().getCols();
+
+      query.append(
+          "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, 
ROW__ID.rowId), ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, 
ROW__ID.writeId, NAMED_STRUCT(");
+      appendColumns(query, cols, true);
+      query.append(") ");
+    }
+  }
+
+  @Override
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    super.buildWhereClauseForInsert(query);
+  }
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java
new file mode 100644
index 00000000000..ebaec5fccd4
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Builds query strings that help with query-based MINOR compaction of CRUD.
+ */
+class CompactionQueryBuilderForMinor extends CompactionQueryBuilder {
+
+  private static final Logger LOG = 
LoggerFactory.getLogger(CompactionQueryBuilderForMinor.class.getName());
+
+  CompactionQueryBuilderForMinor() {
+    super(CompactionType.MINOR, false);
+  }
+
+  @Override
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    query.append("`operation`, `originalTransaction`, `bucket`, `rowId`, 
`currentTransaction`, `row`");
+  }
+
+  @Override
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    if (validWriteIdList != null) {
+      long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds();
+      if (invalidWriteIds.length > 0) {
+        query.append(" where `originalTransaction` not in (")
+            .append(StringUtils.join(ArrayUtils.toObject(invalidWriteIds), 
",")).append(")");
+      }
+    }
+  }
+
+  @Override
+  protected void getDdlForCreate(StringBuilder query) {
+    defineColumns(query);
+
+    // PARTITIONED BY. Used for parts of minor compaction.
+    if (isPartitioned) {
+      query.append(" PARTITIONED BY (`file_name` STRING) ");
+    }
+
+    // CLUSTERED BY. (bucketing)
+    int bucketingVersion = getMinorCrudBucketing(query);
+
+    // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+    query.append(" stored as orc");
+
+    // LOCATION
+    if (location != null) {
+      query.append(" LOCATION 
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+    }
+
+    // TBLPROPERTIES
+    addTblProperties(query, isBucketed, bucketingVersion);
+  }
+
+  /**
+   * Part of Create operation. Minor crud compaction uses its own bucketing 
system.
+   */
+  private int getMinorCrudBucketing(StringBuilder query) {
+    int bucketingVersion = 0;
+    if (isBucketed && sourceTab != null) { // skip if sourceTab is null to 
avoid NPEs
+      int numBuckets = 1;
+      try {
+        org.apache.hadoop.hive.ql.metadata.Table t = getTable();
+        numBuckets = Math.max(t.getNumBuckets(), numBuckets);
+        bucketingVersion = t.getBucketingVersion();
+      } catch (HiveException e) {
+        LOG.info("Error finding table {}. Minor compaction result will use 0 
buckets.", sourceTab.getTableName());
+      } finally {
+        query.append(" clustered by (`bucket`)").append(" sorted by 
(`originalTransaction`, `bucket`, `rowId`)")
+            .append(" into ").append(numBuckets).append(" buckets");
+      }
+    }
+    return bucketingVersion;
+  }
+
+  protected org.apache.hadoop.hive.ql.metadata.Table getTable() throws 
HiveException {
+    return Hive.get().getTable(sourceTab.getDbName(), 
sourceTab.getTableName());
+  }
+
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForRebalance.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForRebalance.java
new file mode 100644
index 00000000000..91dca3ffd1b
--- /dev/null
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForRebalance.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+
+import java.util.List;
+
+/**
+ * Builds query strings that help with REBALANCE compaction of CRUD.
+ */
+class CompactionQueryBuilderForRebalance extends CompactionQueryBuilder {
+
+  CompactionQueryBuilderForRebalance() {
+    super(CompactionType.REBALANCE, false);
+  }
+
+  @Override
+  protected void buildSelectClauseForInsert(StringBuilder query) {
+    // Need list of columns. If the source table is null, don't throw
+    // an exception, but skip this part of the query
+    if (sourceTab != null) {
+      List<FieldSchema> cols = sourceTab.getSd().getCols();
+      query.append("0, t2.writeId, t2.rowId DIV CEIL(numRows / 
").append(numberOfBuckets)
+          .append("), t2.rowId, t2.writeId, t2.data from (select ")
+          .append("count(ROW__ID.writeId) over() as numRows, ");
+      if (StringUtils.isNotBlank(orderByClause)) {
+        // in case of reordering the data the writeids cannot be kept.
+        query.append("MAX(ROW__ID.writeId) over() as writeId, row_number() 
OVER (").append(orderByClause);
+      } else {
+        query.append(
+            "ROW__ID.writeId as writeId, row_number() OVER (order by 
ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC");
+      }
+      query.append(") - 1 AS rowId, NAMED_STRUCT(");
+      for (int i = 0; i < cols.size(); ++i) {
+        query.append(i == 0 ? "'" : ", 
'").append(cols.get(i).getName()).append("', `").append(cols.get(i).getName())
+            .append("`");
+      }
+      query.append(") as data");
+    }
+  }
+
+  @Override
+  protected void getSourceForInsert(StringBuilder query) {
+    super.getSourceForInsert(query);
+    if (StringUtils.isNotBlank(orderByClause)) {
+      query.append(orderByClause);
+    } else {
+      query.append("order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, 
ROW__ID.rowId ASC");
+    }
+    query.append(") t2");
+  }
+
+  @Override
+  protected void buildWhereClauseForInsert(StringBuilder query) {
+    // No where clause in the insert query of rebalance compaction
+  }
+
+}
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 75fb798ac33..2dc49851531 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -72,23 +72,22 @@ final class MajorQueryCompactor extends QueryCompactor {
    * See {@link 
org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the 
config description.
    */
   private List<String> getCreateQueries(String fullName, Table t, String 
tmpTableLocation) {
-    return Lists.newArrayList(new CompactionQueryBuilder(
-        CompactionType.MAJOR,
-        CompactionQueryBuilder.Operation.CREATE,
-        false,
-        fullName)
+    return Lists.newArrayList(new 
CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+        CompactionType.MAJOR, false)
+        .setOperation(CompactionQueryBuilder.Operation.CREATE)
+        .setResultTableName(fullName)
         .setSourceTab(t)
         .setLocation(tmpTableLocation)
         .build());
   }
 
   private List<String> getCompactionQueries(Table t, Partition p, String 
tmpName) {
+
     return Lists.newArrayList(
-        new CompactionQueryBuilder(
-            CompactionType.MAJOR,
-            CompactionQueryBuilder.Operation.INSERT,
-            false,
-            tmpName)
+        new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+            CompactionType.MAJOR, false)
+            .setOperation(CompactionQueryBuilder.Operation.INSERT)
+            .setResultTableName(tmpName)
             .setSourceTab(t)
             .setSourcePartition(p)
         .build());
@@ -96,10 +95,10 @@ final class MajorQueryCompactor extends QueryCompactor {
 
   private List<String> getDropQueries(String tmpTableName) {
     return Lists.newArrayList(
-        new CompactionQueryBuilder(
-            CompactionType.MAJOR,
-            CompactionQueryBuilder.Operation.DROP,
-            false,
-            tmpTableName).build());
+        new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+            CompactionType.MAJOR, false)
+            .setOperation(CompactionQueryBuilder.Operation.DROP)
+            .setResultTableName(tmpTableName)
+            .build());
   }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index cc1c3e92167..9234443a9d6 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -150,11 +150,10 @@ final class MinorQueryCompactor extends QueryCompactor {
    */
   private String buildCreateTableQuery(Table table, String newTableName, 
boolean isPartitioned,
       boolean isBucketed, String location) {
-    return new CompactionQueryBuilder(
-        CompactionType.MINOR,
-        CompactionQueryBuilder.Operation.CREATE,
-        false,
-        newTableName)
+    return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+        CompactionType.MINOR, false)
+        .setOperation(CompactionQueryBuilder.Operation.CREATE)
+        .setResultTableName(newTableName)
         .setSourceTab(table)
         .setBucketed(isBucketed)
         .setPartitioned(isPartitioned)
@@ -173,11 +172,10 @@ final class MinorQueryCompactor extends QueryCompactor {
    */
   private String buildAlterTableQuery(String tableName, AcidDirectory dir,
       ValidWriteIdList validWriteIdList, boolean isDeleteDelta) {
-    return new CompactionQueryBuilder(
-        CompactionType.MINOR,
-        CompactionQueryBuilder.Operation.ALTER,
-        false,
-        tableName)
+    return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+        CompactionType.MINOR, false)
+        .setOperation(CompactionQueryBuilder.Operation.ALTER)
+        .setResultTableName(tableName)
         .setDir(dir)
         .setValidWriteIdList(validWriteIdList)
         .setIsDeleteDelta(isDeleteDelta)
@@ -214,11 +212,10 @@ final class MinorQueryCompactor extends QueryCompactor {
    */
   private String buildCompactionQuery(String sourceTableName, String 
resultTableName, Table table,
       ValidWriteIdList validWriteIdList) {
-    return new CompactionQueryBuilder(
-        CompactionType.MINOR,
-        CompactionQueryBuilder.Operation.INSERT,
-        false,
-        resultTableName)
+    return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+        CompactionType.MINOR, false)
+        .setOperation(CompactionQueryBuilder.Operation.INSERT)
+        .setResultTableName(resultTableName)
         .setSourceTabForInsert(sourceTableName)
         .setSourceTab(table)
         .setValidWriteIdList(validWriteIdList)
@@ -239,10 +236,10 @@ final class MinorQueryCompactor extends QueryCompactor {
   }
 
   private String getDropQuery(String tableToDrop) {
-    return new CompactionQueryBuilder(
-        CompactionType.MINOR,
-        CompactionQueryBuilder.Operation.DROP,
-        false,
-        tableToDrop).build();
+    return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+        CompactionType.MINOR, false)
+        .setOperation(CompactionQueryBuilder.Operation.DROP)
+        .setResultTableName(tableToDrop)
+        .build();
   }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index ecdae6439ca..80a0d8f932e 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -82,13 +82,12 @@ final class MmMajorQueryCompactor extends QueryCompactor {
   private List<String> getCreateQueries(String tmpTableName, Table table,
       StorageDescriptor storageDescriptor, String baseLocation) {
     return Lists.newArrayList(
-        new CompactionQueryBuilder(
-            CompactionType.MAJOR,
-            CompactionQueryBuilder.Operation.CREATE,
-            true,
-            tmpTableName)
-            .setSourceTab(table)
+        new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+            CompactionType.MAJOR, true)
+            .setOperation(CompactionQueryBuilder.Operation.CREATE)
+            .setResultTableName(tmpTableName)
             .setStorageDescriptor(storageDescriptor)
+            .setSourceTab(table)
             .setLocation(baseLocation)
             .build()
     );
@@ -96,11 +95,10 @@ final class MmMajorQueryCompactor extends QueryCompactor {
 
   private List<String> getCompactionQueries(Table t, Partition p, String 
tmpName) {
     return Lists.newArrayList(
-        new CompactionQueryBuilder(
-            CompactionType.MAJOR,
-            CompactionQueryBuilder.Operation.INSERT,
-            true,
-            tmpName)
+        new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+            CompactionType.MAJOR, true)
+            .setOperation(CompactionQueryBuilder.Operation.INSERT)
+            .setResultTableName(tmpName)
             .setSourceTab(t)
             .setSourcePartition(p)
             .build()
@@ -109,10 +107,10 @@ final class MmMajorQueryCompactor extends QueryCompactor {
 
   private List<String> getDropQueries(String tmpTableName) {
     return Lists.newArrayList(
-        new CompactionQueryBuilder(
-            CompactionType.MAJOR,
-            CompactionQueryBuilder.Operation.DROP,
-            true,
-            tmpTableName).build());
+        new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+            CompactionType.MAJOR, true)
+            .setOperation(CompactionQueryBuilder.Operation.DROP)
+            .setResultTableName(tmpTableName)
+            .build());
   }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
index 81e7b4cc19f..b3f8019a251 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -108,13 +108,12 @@ final class MmMinorQueryCompactor extends QueryCompactor {
 
   private String getCreateQuery(String newTableName, Table t, 
StorageDescriptor sd,
       String location, boolean isPartitioned) {
-    return new CompactionQueryBuilder(
-        CompactionType.MINOR,
-        CompactionQueryBuilder.Operation.CREATE,
-        true,
-        newTableName)
-        .setSourceTab(t)
+    return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+        CompactionType.MINOR, true)
+        .setOperation(CompactionQueryBuilder.Operation.CREATE)
+        .setResultTableName(newTableName)
         .setStorageDescriptor(sd)
+        .setSourceTab(t)
         .setLocation(location)
         .setPartitioned(isPartitioned)
         .build();
@@ -130,11 +129,10 @@ final class MmMinorQueryCompactor extends QueryCompactor {
    */
   private String buildAlterTableQuery(String tableName, AcidDirectory dir,
       ValidWriteIdList validWriteIdList) {
-    return new CompactionQueryBuilder(
-        CompactionType.MINOR,
-        CompactionQueryBuilder.Operation.ALTER,
-        true,
-        tableName)
+    return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+        CompactionType.MINOR, true)
+        .setOperation(CompactionQueryBuilder.Operation.ALTER)
+        .setResultTableName(tableName)
         .setDir(dir)
         .setValidWriteIdList(validWriteIdList)
         .build();
@@ -154,14 +152,13 @@ final class MmMinorQueryCompactor extends QueryCompactor {
   private List<String> getCompactionQueries(String sourceTmpTableName, String 
resultTmpTableName,
       Table sourceTable) {
     return Lists.newArrayList(
-        new CompactionQueryBuilder(
-            CompactionType.MINOR,
-            CompactionQueryBuilder.Operation.INSERT,
-            true,
-            resultTmpTableName)
-        .setSourceTabForInsert(sourceTmpTableName)
-        .setSourceTab(sourceTable)
-        .build()
+        new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+            CompactionType.MINOR, true)
+            .setOperation(CompactionQueryBuilder.Operation.INSERT)
+            .setResultTableName(resultTmpTableName)
+            .setSourceTabForInsert(sourceTmpTableName)
+            .setSourceTab(sourceTable)
+            .build()
     );
   }
 
@@ -178,11 +175,11 @@ final class MmMinorQueryCompactor extends QueryCompactor {
   }
 
   private String getDropQuery(String tableToDrop) {
-    return new CompactionQueryBuilder(
-        CompactionType.MINOR,
-        CompactionQueryBuilder.Operation.DROP,
-        true,
-        tableToDrop).build();
+    return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+        CompactionType.MINOR, true)
+        .setOperation(CompactionQueryBuilder.Operation.DROP)
+        .setResultTableName(tableToDrop)
+        .build();
   }
 
   private HiveConf setUpDriverSession(HiveConf hiveConf) {
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
index a9849404c7a..9e9d76f070c 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
@@ -73,11 +73,10 @@ final class RebalanceQueryCompactor extends QueryCompactor {
   }
 
   private List<String> getCreateQueries(String fullName, Table t, String 
tmpTableLocation) {
-    return Lists.newArrayList(new CompactionQueryBuilder(
-        CompactionType.REBALANCE,
-        CompactionQueryBuilder.Operation.CREATE,
-        false,
-        fullName)
+    return Lists.newArrayList(new 
CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+        CompactionType.REBALANCE, false)
+        .setOperation(CompactionQueryBuilder.Operation.CREATE)
+        .setResultTableName(fullName)
         .setSourceTab(t)
         .setLocation(tmpTableLocation)
         .build());
@@ -85,24 +84,23 @@ final class RebalanceQueryCompactor extends QueryCompactor {
 
   private List<String> getCompactionQueries(Table t, Partition p, String 
tmpName, int numberOfBuckets, String orderByClause) {
     return Lists.newArrayList(
-        new CompactionQueryBuilder(
-            CompactionType.REBALANCE,
-            CompactionQueryBuilder.Operation.INSERT,
-            false,
-            tmpName)
-            .setSourceTab(t)
-            .setSourcePartition(p)
+        new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+            CompactionType.REBALANCE, false)
+            .setOperation(CompactionQueryBuilder.Operation.INSERT)
+            .setResultTableName(tmpName)
             .setNumberOfBuckets(numberOfBuckets)
             .setOrderByClause(orderByClause)
+            .setSourceTab(t)
+            .setSourcePartition(p)
             .build());
   }
 
   private List<String> getDropQueries(String tmpTableName) {
     return Lists.newArrayList(
-        new CompactionQueryBuilder(
-            CompactionType.REBALANCE,
-            CompactionQueryBuilder.Operation.DROP,
-            false,
-            tmpTableName).build());
+        new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+            CompactionType.REBALANCE, false)
+            .setOperation(CompactionQueryBuilder.Operation.DROP)
+            .setResultTableName(tmpTableName)
+            .build());
   }
 }
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderTestBase.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderTestBase.java
new file mode 100644
index 00000000000..243f6a67063
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderTestBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+public class CompactionQueryBuilderTestBase {
+
+  public static final String DB_NAME = "comp_test_db";
+  public static final String SOURCE_TABLE_NAME = "comp_test_source_table";
+  public static final String RESULT_TABLE_NAME = "comp_test_result_table";
+  public static final String COLUMN_1_NAME = "column_1";
+  public static final String COLUMN_2_NAME = "column_2";
+  public static final String COLUMN_3_NAME = "column_3";
+  public static final String SOME_TEST_LOCATION = "some_test_path";
+  public static final String COMP_TEST_SOURCE_TABLE_FOR_INSERT = 
"comp_test_db.comp_test_insert_table";
+
+  public Table createSourceTable() {
+    return createSourceTable(false, false, false);
+  }
+
+  public Table createSourceTableWithProperties() {
+    return createSourceTable(true, false, false);
+  }
+
+  public Table createSourceTableBucketed() {
+    return createSourceTable(false, true, false);
+  }
+
+  public Table createSourceTableBucketedSorted() {
+    return createSourceTable(false, true, true);
+  }
+
+  private Table createSourceTable(boolean addTableProperties, boolean 
bucketed, boolean sorted) {
+    Table sourceTable = new Table();
+    sourceTable.setDbName(DB_NAME);
+    sourceTable.setTableName(SOURCE_TABLE_NAME);
+
+    StorageDescriptor sd = new StorageDescriptor();
+    List<FieldSchema> columns = new ArrayList<>();
+    FieldSchema col1 = new FieldSchema(COLUMN_1_NAME, "string", "First 
column");
+    FieldSchema col2 = new FieldSchema(COLUMN_2_NAME, "int", null);
+    FieldSchema col3 = new FieldSchema(COLUMN_3_NAME, "boolean", "Third 
column");
+    columns.add(col1);
+    columns.add(col2);
+    columns.add(col3);
+    sd.setCols(columns);
+
+    if (bucketed) {
+      sd.addToBucketCols(COLUMN_1_NAME);
+      sd.addToBucketCols(COLUMN_3_NAME);
+      sd.setNumBuckets(4);
+    } else {
+      sd.setBucketCols(Collections.emptyList());
+    }
+
+    if (sorted) {
+      sd.addToSortCols(new Order(COLUMN_1_NAME, 0));
+      sd.addToSortCols(new Order(COLUMN_2_NAME, 1));
+    } else {
+      sd.setSortCols(Collections.emptyList());
+    }
+
+    Map<String, String> parameters = new HashMap<>();
+    if (addTableProperties) {
+      parameters.put("property_1", "true");
+      parameters.put("orc.property_2", "44");
+      parameters.put("COLUMN_STATS_ACCURATE", "false");
+      parameters.put("columns.types", "test");
+    }
+    sourceTable.setParameters(parameters);
+    sourceTable.setSd(sd);
+    return sourceTable;
+  }
+
+  protected AcidDirectory createAcidDirectory() {
+    AcidDirectory dir = Mockito.mock(AcidDirectory.class);
+    AcidUtils.ParsedDelta d1 = Mockito.mock(AcidUtils.ParsedDelta.class);
+    AcidUtils.ParsedDelta d2 = Mockito.mock(AcidUtils.ParsedDelta.class);
+    AcidUtils.ParsedDelta d3 = Mockito.mock(AcidUtils.ParsedDelta.class);
+    AcidUtils.ParsedDelta d4 = Mockito.mock(AcidUtils.ParsedDelta.class);
+    AcidUtils.ParsedDelta d5 = Mockito.mock(AcidUtils.ParsedDelta.class);
+
+    List<AcidUtils.ParsedDelta> dirs = new ArrayList<>();
+    dirs.add(d1);
+    dirs.add(d2);
+    dirs.add(d3);
+    dirs.add(d4);
+    dirs.add(d5);
+
+    when(dir.getCurrentDirectories()).thenReturn(dirs);
+    when(d1.isDeleteDelta()).thenReturn(true);
+    when(d1.getMinWriteId()).thenReturn(7L);
+    when(d1.getMaxWriteId()).thenReturn(11L);
+    when(d1.getPath()).thenReturn(new Path("/compaction/test/table", 
"test_delta_1"));
+    when(d2.isDeleteDelta()).thenReturn(true);
+    when(d2.getMinWriteId()).thenReturn(1L);
+    when(d2.getMaxWriteId()).thenReturn(11L);
+    when(d2.getPath()).thenReturn(new Path("/compaction/test/table", 
"test_delta_2"));
+    when(d3.isDeleteDelta()).thenReturn(true);
+    when(d3.getMinWriteId()).thenReturn(5L);
+    when(d3.getMaxWriteId()).thenReturn(15L);
+    when(d3.getPath()).thenReturn(new Path("/compaction/test/table", 
"test_delta_3"));
+    when(d4.isDeleteDelta()).thenReturn(true);
+    when(d4.getMinWriteId()).thenReturn(7L);
+    when(d4.getMaxWriteId()).thenReturn(20L);
+    when(d4.getPath()).thenReturn(new Path("/compaction/test/table", 
"test_delta_4"));
+    when(d5.isDeleteDelta()).thenReturn(false);
+    when(d5.getMinWriteId()).thenReturn(6L);
+    when(d5.getMaxWriteId()).thenReturn(11L);
+    when(d5.getPath()).thenReturn(new Path("/compaction/test/table", 
"test_delta_5"));
+    return dir;
+  }
+
+  protected ValidCompactorWriteIdList createWriteId(long minWriteId) {
+    long[] abortedWriteIdList = { 1111L };
+    return new ValidCompactorWriteIdList("comp_test_source_table", 
abortedWriteIdList, null, 15L, minWriteId);
+  }
+}
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java
new file mode 100644
index 00000000000..99f8b5303a5
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+import static org.mockito.Mockito.when;
+
+public class TestCompactionQueryBuilderForMajorCompaction extends 
CompactionQueryBuilderTestBase {
+
+  @Test
+  public void testCreateNoSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForCreate();
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table stored as orc 
TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreate() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  stored as orc TBLPROPERTIES 
('compactiontable'='MAJOR', 'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreateWithSourceTableProperties() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableWithProperties();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  stored as orc TBLPROPERTIES 
('compactiontable'='MAJOR', 'orc.property_2'='44', 'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreateWithSourceTableLocation() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setLocation(SOME_TEST_LOCATION);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  stored as orc LOCATION 'some_test_path' 
TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreateWithPartitionedSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setPartitioned(true);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  PARTITIONED BY (`file_name` STRING)  stored as orc 
TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsert() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    Partition sourcePartition = new Partition();
+    sourcePartition.addToValues("source_part_1");
+    sourcePartition.addToValues("true");
+    sourcePartition.addToValues("4444");
+
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string", 
"comment 1"));
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean", 
"comment 2"));
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_3", "int", 
"comment 3"));
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setSourcePartition(sourcePartition);
+
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select 
validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), 
ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, 
NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3', 
`column_3`)  from comp_test_db.comp_test_insert_table  where 
`source_part_1`='source_part_1' and `source_part_2`=true and 
`source_part_3`='4444'";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertPartitionMismatch() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    Partition sourcePartition = new Partition();
+    sourcePartition.addToValues("source_part_1");
+    sourcePartition.addToValues("true");
+    sourcePartition.addToValues("4444");
+
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string", 
"comment 1"));
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean", 
"comment 2"));
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setSourcePartition(sourcePartition);
+
+    String expectedMessage =
+        "source partition values ([source_part_1, true, 4444]) do not match 
source table values ([FieldSchema(name:source_part_1, type:string, 
comment:comment 1), FieldSchema(name:source_part_2, type:boolean, 
comment:comment 2)]). Failing compaction.";
+    Assert.assertThrows(expectedMessage, IllegalStateException.class, 
queryBuilder::build);
+  }
+
+  @Test
+  public void testInsertNoSourcePartition() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select 
validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), 
ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, 
NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3', 
`column_3`)  from comp_test_db.comp_test_insert_table ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertNoSourceTableForInsert() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select 
validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId), 
ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId, 
NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3', 
`column_3`)  from comp_test_db.comp_test_source_table ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertNoSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForInsert();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    String query = queryBuilder.build();
+    String expectedQuery = "INSERT into table comp_test_result_table select  
from comp_test_db.comp_test_insert_table ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testAlter() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForAlter();
+    AcidDirectory dir = createAcidDirectory();
+    ValidCompactorWriteIdList writeIds = createWriteId(5L);
+    queryBuilder.setValidWriteIdList(writeIds);
+    queryBuilder.setDir(dir);
+    queryBuilder.setIsDeleteDelta(true);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "ALTER table comp_test_result_table add partition 
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1' 
partition (file_name='test_delta_3') location 
'/compaction/test/table/test_delta_3' ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testAlterEmptyDir() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForAlter();
+    AcidDirectory dir = Mockito.mock(AcidDirectory.class);
+    when(dir.getCurrentDirectories()).thenReturn(Collections.emptyList());
+    ValidCompactorWriteIdList writeIds = createWriteId(5L);
+    queryBuilder.setValidWriteIdList(writeIds);
+    queryBuilder.setDir(dir);
+    queryBuilder.setIsDeleteDelta(true);
+    String query = queryBuilder.build();
+    Assert.assertTrue(query.isEmpty());
+  }
+
+  @Test
+  public void testAlterMinWriteIdIsNull() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForAlter();
+    AcidDirectory dir = createAcidDirectory();
+    ValidCompactorWriteIdList writeIds = createWriteId(Long.MAX_VALUE);
+    queryBuilder.setValidWriteIdList(writeIds);
+    queryBuilder.setDir(dir);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "ALTER table comp_test_result_table add partition 
(file_name='test_delta_5') location '/compaction/test/table/test_delta_5' ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testAlterDirIsNull() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForAlter();
+    ValidCompactorWriteIdList writeIds = createWriteId(Long.MAX_VALUE);
+    queryBuilder.setValidWriteIdList(writeIds);
+    queryBuilder.setDir(null);
+    String query = queryBuilder.build();
+    Assert.assertTrue(query.isEmpty());
+  }
+
+  @Test
+  public void testAlterWalidWriteIdListIsNull() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForAlter();
+    AcidDirectory dir = createAcidDirectory();
+    queryBuilder.setValidWriteIdList(null);
+    queryBuilder.setDir(dir);
+    String query = queryBuilder.build();
+    Assert.assertTrue(query.isEmpty());
+  }
+
+  @Test
+  public void testDrop() {
+    CompactionQueryBuilder queryBuilder = 
getMajorCompactionQueryBuilderForDrop();
+    String query = queryBuilder.build();
+    String expectedQuery = "DROP table if exists comp_test_result_table";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  private CompactionQueryBuilder getMajorCompactionQueryBuilderForCreate() {
+    return 
getMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+  }
+
+  private CompactionQueryBuilder getMajorCompactionQueryBuilderForInsert() {
+    return 
getMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+  }
+
+  private CompactionQueryBuilder getMajorCompactionQueryBuilderForAlter() {
+    return 
getMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+  }
+
+  private CompactionQueryBuilder getMajorCompactionQueryBuilderForDrop() {
+    return 
getMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+  }
+
+  private CompactionQueryBuilder getMajorCompactionBuilder() {
+    CompactionQueryBuilder compactionQueryBuilder =
+        new 
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.MAJOR, 
false);
+    return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+  }
+
+}
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java
new file mode 100644
index 00000000000..eef965b1a92
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.when;
+
+public class TestCompactionQueryBuilderForMinorCompaction extends 
CompactionQueryBuilderTestBase {
+
+  static class CompactionQueryBuilderForMinorMock extends 
CompactionQueryBuilderForMinor {
+    private boolean throwException = false;
+
+    public void setThrowException(boolean throwException) {
+      this.throwException = throwException;
+    }
+
+    @Override
+    public org.apache.hadoop.hive.ql.metadata.Table getTable() throws 
HiveException {
+      org.apache.hadoop.hive.ql.metadata.Table t = 
Mockito.mock(org.apache.hadoop.hive.ql.metadata.Table.class);
+      if (throwException) {
+        throw new HiveException();
+      } else {
+        when(t.getBucketingVersion()).thenReturn(2);
+        when(t.getNumBuckets()).thenReturn(4);
+        return t;
+      }
+    }
+  }
+
+  @Test
+  public void testCreateWithoutSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getMinorCompactionQueryBuilderForCreate();
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table stored as orc 
TBLPROPERTIES ('compactiontable'='MINOR', 'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreateWithTablePropertiesWithLocation() {
+    CompactionQueryBuilder queryBuilder = 
getMinorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableWithProperties();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setLocation("some_test_path");
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  stored as orc LOCATION 'some_test_path' 
TBLPROPERTIES ('compactiontable'='MINOR', 'orc.property_2'='44', 
'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreatePartitioned() {
+    CompactionQueryBuilder queryBuilder = 
getMinorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setPartitioned(true);
+    queryBuilder.setLocation("some_test_path");
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  PARTITIONED BY (`file_name` STRING)  stored as orc 
LOCATION 'some_test_path' TBLPROPERTIES ('compactiontable'='MINOR', 
'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreateBucketed() {
+    CompactionQueryBuilder queryBuilder = new 
CompactionQueryBuilderForMinorMock();
+    queryBuilder.setOperation(CompactionQueryBuilder.Operation.CREATE);
+    queryBuilder.setResultTableName(RESULT_TABLE_NAME);
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setBucketed(true);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  clustered by (`bucket`) sorted by 
(`originalTransaction`, `bucket`, `rowId`) into 4 buckets stored as orc 
TBLPROPERTIES ('compactiontable'='MINOR', 'bucketing_version'='2', 
'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreateBucketedExceptionThrown() {
+    CompactionQueryBuilderForMinorMock queryBuilder = new 
CompactionQueryBuilderForMinorMock();
+    queryBuilder.setThrowException(true);
+    queryBuilder.setOperation(CompactionQueryBuilder.Operation.CREATE);
+    queryBuilder.setResultTableName(RESULT_TABLE_NAME);
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setBucketed(true);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  clustered by (`bucket`) sorted by 
(`originalTransaction`, `bucket`, `rowId`) into 1 buckets stored as orc 
TBLPROPERTIES ('compactiontable'='MINOR', 'bucketing_version'='0', 
'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsert() {
+    CompactionQueryBuilder queryBuilder = 
getMinorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTabForInsert("comp_test_db.comp_test_insert_table");
+    long[] abortedWriteIdList = { 1111L, 2222L };
+    ValidCompactorWriteIdList writeIds =
+        new ValidCompactorWriteIdList("comp_test_source_table", 
abortedWriteIdList, null, 111111L);
+    queryBuilder.setValidWriteIdList(writeIds);
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select `operation`, 
`originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row` from 
comp_test_db.comp_test_insert_table  where `originalTransaction` not in 
(1111,2222)";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertWithoutValidWriteIdsAndSourceTableForInsert() {
+    CompactionQueryBuilder queryBuilder = 
getMinorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select `operation`, 
`originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row` from 
comp_test_db.comp_test_source_table ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testAlter() {
+    CompactionQueryBuilder queryBuilder = 
getMinorCompactionQueryBuilderForAlter();
+    AcidDirectory dir = createAcidDirectory();
+    ValidCompactorWriteIdList writeIds = createWriteId(5L);
+    queryBuilder.setValidWriteIdList(writeIds);
+    queryBuilder.setDir(dir);
+    queryBuilder.setIsDeleteDelta(true);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "ALTER table comp_test_result_table add partition 
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1' 
partition (file_name='test_delta_3') location 
'/compaction/test/table/test_delta_3' ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testDrop() {
+    CompactionQueryBuilder queryBuilder = 
getMinorCompactionQueryBuilderForDrop();
+    String query = queryBuilder.build();
+    String expectedQuery = "DROP table if exists comp_test_result_table";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  private CompactionQueryBuilder getMinorCompactionQueryBuilderForCreate() {
+    return 
getMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+  }
+
+  private CompactionQueryBuilder getMinorCompactionQueryBuilderForInsert() {
+    return 
getMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+  }
+
+  private CompactionQueryBuilder getMinorCompactionQueryBuilderForAlter() {
+    return 
getMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+  }
+
+  private CompactionQueryBuilder getMinorCompactionQueryBuilderForDrop() {
+    return 
getMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+  }
+
+  private CompactionQueryBuilder getMinorCompactionBuilder() {
+    CompactionQueryBuilder compactionQueryBuilder =
+        new 
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.MINOR, 
false);
+    return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+  }
+}
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java
new file mode 100644
index 00000000000..2295f9cf715
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestCompactionQueryBuilderForMmCompaction extends 
CompactionQueryBuilderTestBase {
+
+  @Test
+  public void testMajorCompactionCreateWithoutSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForCreate();
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table TBLPROPERTIES 
('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMajorCompactionCreateWithTablePropertiesWithLocation() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableWithProperties();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setLocation(SOME_TEST_LOCATION);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean)  LOCATION 'some_test_path' 
TBLPROPERTIES ('property_1'='true', 'orc.property_2'='44', 
'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMajorCompactionCreatePartitioned() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setPartitioned(true);
+    queryBuilder.setLocation(SOME_TEST_LOCATION);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean)  PARTITIONED BY (`file_name` STRING)  
LOCATION 'some_test_path' TBLPROPERTIES ('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMajorCompactionCreateWithBucketedSourceTable() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableBucketed();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3) INTO 
4 BUCKETS TBLPROPERTIES ('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMajorCompactionCreateWithBucketedSortedSourceTable() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableBucketedSorted();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3) 
SORTED BY (column_1 DESC, column_2 ASC) INTO 4 BUCKETS TBLPROPERTIES 
('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMajorCompactionCreateWithStorageDescriptor() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    StorageDescriptor storageDescriptor = createStorageDescriptor();
+    queryBuilder.setStorageDescriptor(storageDescriptor);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean)  ROW FORMAT SERDE 
'/some/test/serialization_lib'WITH SERDEPROPERTIES ( \n" + "  
'test_param_1'='test_value', \n" + "  'test_param_2'='test_value')STORED AS 
INPUTFORMAT 'some.test.InputFormat' OUTPUTFORMAT 'some.test.OutputFormat' 
TBLPROPERTIES ('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMajorCompactionCreateWithSkewedByClause() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    StorageDescriptor storageDescriptor = sourceTable.getSd();
+    SkewedInfo skewedInfo = new SkewedInfo();
+    skewedInfo.addToSkewedColNames("column_1");
+    List<String> skewedColValues = new ArrayList<>();
+    skewedColValues.add("value1");
+    skewedColValues.add("value2");
+    skewedColValues.add("value3");
+    skewedInfo.addToSkewedColValues(skewedColValues);
+    storageDescriptor.setSkewedInfo(skewedInfo);
+    storageDescriptor.setStoredAsSubDirectories(true);
+    sourceTable.setSd(storageDescriptor);
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean)  SKEWED BY (column_1) ON 
('value1','value2','value3')) STORED AS DIRECTORIES TBLPROPERTIES 
('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMajorCompactionCreateWithNonNativeTable() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("storage_handler", "test_storage_handler");
+    sourceTable.setParameters(parameters);
+    queryBuilder.setSourceTab(sourceTable);
+    StorageDescriptor storageDescriptor = createStorageDescriptor();
+    queryBuilder.setStorageDescriptor(storageDescriptor);
+    String expectedMessage =
+        "Table comp_test_source_table has a storage handler 
(test_storage_handler). Failing compaction for this non-native table.";
+    Assert.assertThrows(expectedMessage, RuntimeException.class, 
queryBuilder::build);
+  }
+
+  @Test
+  public void testMinorCompactionCreateWithoutSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForCreate();
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table TBLPROPERTIES 
('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMinorCompactionCreateWithTablePropertiesWithLocation() {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableWithProperties();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setLocation(SOME_TEST_LOCATION);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean)  LOCATION 'some_test_path' 
TBLPROPERTIES ('property_1'='true', 'orc.property_2'='44', 
'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMinorCompactionCreatePartitioned() {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setPartitioned(true);
+    queryBuilder.setLocation(SOME_TEST_LOCATION);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean)  PARTITIONED BY (`file_name` STRING)  
LOCATION 'some_test_path' TBLPROPERTIES ('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMinorCompactionCreateWithBucketedSourceTable() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableBucketed();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3) INTO 
4 BUCKETS TBLPROPERTIES ('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMinorCompactionCreateWithBucketedSortedSourceTable() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableBucketedSorted();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3) 
SORTED BY (column_1 DESC, column_2 ASC) INTO 4 BUCKETS TBLPROPERTIES 
('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMinorCompactionCreateWithStorageDescriptor() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    StorageDescriptor storageDescriptor = createStorageDescriptor();
+    queryBuilder.setStorageDescriptor(storageDescriptor);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean)  ROW FORMAT SERDE 
'/some/test/serialization_lib'WITH SERDEPROPERTIES ( \n" + "  
'test_param_1'='test_value', \n" + "  'test_param_2'='test_value')STORED AS 
INPUTFORMAT 'some.test.InputFormat' OUTPUTFORMAT 'some.test.OutputFormat' 
TBLPROPERTIES ('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMinorCompactionCreateWithSkewedByClause() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    StorageDescriptor storageDescriptor = sourceTable.getSd();
+    SkewedInfo skewedInfo = new SkewedInfo();
+    skewedInfo.addToSkewedColNames("column_1");
+    List<String> skewedColValues = new ArrayList<>();
+    skewedColValues.add("value1");
+    skewedColValues.add("value2");
+    skewedColValues.add("value3");
+    skewedInfo.addToSkewedColValues(skewedColValues);
+    storageDescriptor.setSkewedInfo(skewedInfo);
+    storageDescriptor.setStoredAsSubDirectories(true);
+    sourceTable.setSd(storageDescriptor);
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`column_1` 
string,`column_2` int,`column_3` boolean)  SKEWED BY (column_1) ON 
('value1','value2','value3')) STORED AS DIRECTORIES TBLPROPERTIES 
('transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testMinorCompactionCreateLWithNonNativeTable() throws 
HiveException {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    Map<String, String> parameters = new HashMap<>();
+    parameters.put("storage_handler", "test_storage_handler");
+    sourceTable.setParameters(parameters);
+    queryBuilder.setSourceTab(sourceTable);
+    StorageDescriptor storageDescriptor = createStorageDescriptor();
+    queryBuilder.setStorageDescriptor(storageDescriptor);
+    String expectedMessage =
+        "Table comp_test_source_tablehas a storage handler 
(test_storage_handler). Failing compaction for this non-native table.";
+    Assert.assertThrows(expectedMessage, RuntimeException.class, 
queryBuilder::build);
+  }
+
+  @Test
+  public void testInsertMajorCompaction() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTableBucketedSorted();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    queryBuilder.setOrderByClause("ORDER BY column_1 ASC, column_2 DESC");
+    Partition sourcePartition = new Partition();
+    sourcePartition.addToValues("source_part_1");
+    sourcePartition.addToValues("true");
+    sourcePartition.addToValues("4444");
+
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string", 
"comment 1"));
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean", 
"comment 2"));
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_3", "int", 
"comment 3"));
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setSourcePartition(sourcePartition);
+
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select `column_1`, 
`column_2`, `column_3` from comp_test_db.comp_test_insert_table ORDER BY 
column_1 ASC, column_2 DESC where `source_part_1`='source_part_1' and 
`source_part_2`=true and `source_part_3`='4444'";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertMajorCompactionPartitionMismatch() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    Partition sourcePartition = new Partition();
+    sourcePartition.addToValues("source_part_1");
+    sourcePartition.addToValues("true");
+    sourcePartition.addToValues("4444");
+
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string", 
"comment 1"));
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean", 
"comment 2"));
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setSourcePartition(sourcePartition);
+
+    String expectedMessage =
+        "source partition values ([source_part_1, true, 4444]) do not match 
source table values ([FieldSchema(name:source_part_1, type:string, 
comment:comment 1), FieldSchema(name:source_part_2, type:boolean, 
comment:comment 2)]). Failing compaction.";
+    Assert.assertThrows(expectedMessage, IllegalStateException.class, 
queryBuilder::build);
+  }
+
+  @Test
+  public void testInsertMajorCompactionNoSourceTabForInsert() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    Partition sourcePartition = new Partition();
+    sourcePartition.addToValues("source_part_1");
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string", 
"comment 1"));
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setSourcePartition(sourcePartition);
+
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select `column_1`, 
`column_2`, `column_3` from comp_test_db.comp_test_source_table  where 
`source_part_1`='source_part_1'";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertMajorCompactionOnlySourceTableSet() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select * from 
comp_test_db.comp_test_source_table ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertMajorCompactionNoSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForInsert();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select * from 
comp_test_db.comp_test_insert_table ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertMinorCompaction() {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select `column_1`, 
`column_2`, `column_3` from comp_test_db.comp_test_insert_table ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertMinorCompactionWithoutSourceTableForInsert() {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT into table comp_test_result_table select `column_1`, 
`column_2`, `column_3` from comp_test_db.comp_test_source_table ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertMinorCompactionNoSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForInsert();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    String query = queryBuilder.build();
+    String expectedQuery = "INSERT into table comp_test_result_table select  
from comp_test_db.comp_test_insert_table ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testAlterMajorCompaction() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForAlter();
+    AcidDirectory dir = createAcidDirectory();
+    ValidCompactorWriteIdList writeIds = createWriteId(5L);
+    queryBuilder.setValidWriteIdList(writeIds);
+    queryBuilder.setDir(dir);
+    queryBuilder.setIsDeleteDelta(true);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "ALTER table comp_test_result_table add partition 
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1' 
partition (file_name='test_delta_3') location 
'/compaction/test/table/test_delta_3' ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testAlterMinorCompaction() {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForAlter();
+    AcidDirectory dir = createAcidDirectory();
+    ValidCompactorWriteIdList writeIds = createWriteId(5L);
+    queryBuilder.setValidWriteIdList(writeIds);
+    queryBuilder.setDir(dir);
+    queryBuilder.setIsDeleteDelta(true);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "ALTER table comp_test_result_table add partition 
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1' 
partition (file_name='test_delta_3') location 
'/compaction/test/table/test_delta_3' ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testDropMajorCompaction() {
+    CompactionQueryBuilder queryBuilder = 
getMmMajorCompactionQueryBuilderForDrop();
+    String query = queryBuilder.build();
+    String expectedQuery = "DROP table if exists comp_test_result_table";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testDropMinorCompaction() {
+    CompactionQueryBuilder queryBuilder = 
getMmMinorCompactionQueryBuilderForDrop();
+    String query = queryBuilder.build();
+    String expectedQuery = "DROP table if exists comp_test_result_table";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  private StorageDescriptor createStorageDescriptor() {
+    SerDeInfo serdeInfo = new SerDeInfo();
+    serdeInfo.setSerializationLib("/some/test/serialization_lib");
+    Map<String, String> serdeParams = new HashMap<>();
+    serdeParams.put("test_param_1", "test_value");
+    serdeParams.put("test_param_2", "test_value");
+    serdeInfo.setParameters(serdeParams);
+    StorageDescriptor storageDescriptor = new StorageDescriptor();
+    storageDescriptor.setSerdeInfo(serdeInfo);
+    storageDescriptor.setInputFormat("some.test.InputFormat");
+    storageDescriptor.setOutputFormat("some.test.OutputFormat");
+    return storageDescriptor;
+  }
+
+  private CompactionQueryBuilder getMmMajorCompactionQueryBuilderForCreate() {
+    return 
getMmMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+  }
+
+  private CompactionQueryBuilder getMmMajorCompactionQueryBuilderForInsert() {
+    return 
getMmMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+  }
+
+  private CompactionQueryBuilder getMmMajorCompactionQueryBuilderForAlter() {
+    return 
getMmMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+  }
+
+  private CompactionQueryBuilder getMmMajorCompactionQueryBuilderForDrop() {
+    return 
getMmMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+  }
+
+  private CompactionQueryBuilder getMmMinorCompactionQueryBuilderForCreate() {
+    return 
getMmMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+  }
+
+  private CompactionQueryBuilder getMmMinorCompactionQueryBuilderForInsert() {
+    return 
getMmMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+  }
+
+  private CompactionQueryBuilder getMmMinorCompactionQueryBuilderForAlter() {
+    return 
getMmMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+  }
+
+  private CompactionQueryBuilder getMmMinorCompactionQueryBuilderForDrop() {
+    return 
getMmMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+  }
+
+  private CompactionQueryBuilder getMmMajorCompactionBuilder() {
+    CompactionQueryBuilder compactionQueryBuilder =
+        new 
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.MAJOR, 
true);
+    return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+  }
+
+  private CompactionQueryBuilder getMmMinorCompactionBuilder() {
+    CompactionQueryBuilder compactionQueryBuilder =
+        new 
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.MINOR, 
true);
+    return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+  }
+}
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java
new file mode 100644
index 00000000000..df96fb00314
--- /dev/null
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCompactionQueryBuilderForRebalanceCompaction extends 
CompactionQueryBuilderTestBase {
+
+  @Test
+  public void testCreate() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableWithProperties();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setLocation(SOME_TEST_LOCATION);
+    queryBuilder.setPartitioned(true);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  PARTITIONED BY (`file_name` STRING)  stored as orc 
LOCATION 'some_test_path' TBLPROPERTIES ('compactiontable'='REBALANCE', 
'orc.property_2'='44', 'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreateWithNonPartitionedSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTableWithProperties();
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setLocation(SOME_TEST_LOCATION);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  stored as orc LOCATION 'some_test_path' 
TBLPROPERTIES ('compactiontable'='REBALANCE', 'orc.property_2'='44', 
'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testCreateWithNoLocationAndNoTableProperties() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForCreate();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "CREATE temporary external table comp_test_result_table(`operation` 
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint, 
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2` 
:int,`column_3` :boolean>)  stored as orc TBLPROPERTIES 
('compactiontable'='REBALANCE', 'transactional'='false')";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsert() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setNumberOfBuckets(5);
+    queryBuilder.setOrderByClause("ORDER BY column_1 ASC, column_3 DESC");
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT overwrite table comp_test_result_table select 0, t2.writeId, 
t2.rowId DIV CEIL(numRows / 5), t2.rowId, t2.writeId, t2.data from (select 
count(ROW__ID.writeId) over() as numRows, MAX(ROW__ID.writeId) over() as 
writeId, row_number() OVER (ORDER BY column_1 ASC, column_3 DESC) - 1 AS rowId, 
NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3', 
`column_3`) as data from comp_test_db.comp_test_insert_table ORDER BY column_1 
ASC, column_3 DESC) t2";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertWithPartitionedTable() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    Partition sourcePartition = new Partition();
+    sourcePartition.addToValues("source_part_1");
+    sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string", 
"comment 1"));
+    queryBuilder.setSourceTab(sourceTable);
+    queryBuilder.setSourcePartition(sourcePartition);
+
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT overwrite table comp_test_result_table select 0, t2.writeId, 
t2.rowId DIV CEIL(numRows / 0), t2.rowId, t2.writeId, t2.data from (select 
count(ROW__ID.writeId) over() as numRows, ROW__ID.writeId as writeId, 
row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, 
ROW__ID.rowId ASC) - 1 AS rowId, NAMED_STRUCT('column_1', `column_1`, 
'column_2', `column_2`, 'column_3', `column_3`) as data from 
comp_test_db.comp_test_insert_table order by ROW__ID.writeId ASC, RO [...]
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertOnlySourceTableIsSet() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForInsert();
+    Table sourceTable = createSourceTable();
+    queryBuilder.setSourceTab(sourceTable);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT overwrite table comp_test_result_table select 0, t2.writeId, 
t2.rowId DIV CEIL(numRows / 0), t2.rowId, t2.writeId, t2.data from (select 
count(ROW__ID.writeId) over() as numRows, ROW__ID.writeId as writeId, 
row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, 
ROW__ID.rowId ASC) - 1 AS rowId, NAMED_STRUCT('column_1', `column_1`, 
'column_2', `column_2`, 'column_3', `column_3`) as data from 
comp_test_db.comp_test_source_table order by ROW__ID.writeId ASC, RO [...]
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testInsertNoSourceTable() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForInsert();
+    queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "INSERT overwrite table comp_test_result_table select  from 
comp_test_db.comp_test_insert_table order by ROW__ID.writeId ASC, 
ROW__ID.bucketId ASC, ROW__ID.rowId ASC) t2";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testAlter() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForAlter();
+    AcidDirectory dir = createAcidDirectory();
+    ValidCompactorWriteIdList writeIds = createWriteId(5L);
+    queryBuilder.setValidWriteIdList(writeIds);
+    queryBuilder.setDir(dir);
+    queryBuilder.setIsDeleteDelta(true);
+    String query = queryBuilder.build();
+    String expectedQuery =
+        "ALTER table comp_test_result_table add partition 
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1' 
partition (file_name='test_delta_3') location 
'/compaction/test/table/test_delta_3' ";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testDrop() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForDrop();
+    String query = queryBuilder.build();
+    String expectedQuery = "DROP table if exists comp_test_result_table";
+    Assert.assertEquals(expectedQuery, query);
+  }
+
+  @Test
+  public void testRebalanceCompactionWithBuckets() {
+    CompactionQueryBuilder queryBuilder = 
getRebalanceCompactionQueryBuilderForInsert();
+    String expectedMessage = "Rebalance compaction is supported only on 
implicitly-bucketed tables!";
+    Assert.assertThrows(expectedMessage, IllegalArgumentException.class, () -> 
{
+      queryBuilder.setBucketed(true);
+    });
+  }
+
+  private CompactionQueryBuilder getRebalanceCompactionQueryBuilderForCreate() 
{
+    return 
getRebalanceCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+  }
+
+  private CompactionQueryBuilder getRebalanceCompactionQueryBuilderForInsert() 
{
+    return 
getRebalanceCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+  }
+
+  private CompactionQueryBuilder getRebalanceCompactionQueryBuilderForAlter() {
+    return 
getRebalanceCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+  }
+
+  private CompactionQueryBuilder getRebalanceCompactionQueryBuilderForDrop() {
+    return 
getRebalanceCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+  }
+
+  private CompactionQueryBuilder getRebalanceCompactionBuilder() {
+    CompactionQueryBuilder compactionQueryBuilder =
+        new 
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.REBALANCE,
 false);
+    return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+  }
+}

Reply via email to