This is an automated email from the ASF dual-hosted git repository.
kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6bddac42fbe HIVE-27956: Query based compactor implementation
separation (#5192). (Marta Kuczora, reviewed by Ayush Saxena, Zoltan Ratkai,
Attila Turoczy and Laszlo Vegh)
6bddac42fbe is described below
commit 6bddac42fbe573d84a93c0ac9f08193c193aca6c
Author: kuczoram <[email protected]>
AuthorDate: Sat May 18 13:24:40 2024 +0200
HIVE-27956: Query based compactor implementation separation (#5192). (Marta
Kuczora, reviewed by Ayush Saxena, Zoltan Ratkai, Attila Turoczy and Laszlo
Vegh)
---
.../ql/txn/compactor/CompactionQueryBuilder.java | 551 ++++++---------------
.../compactor/CompactionQueryBuilderFactory.java | 44 ++
.../CompactionQueryBuilderForInsertOnly.java | 274 ++++++++++
.../compactor/CompactionQueryBuilderForMajor.java | 52 ++
.../compactor/CompactionQueryBuilderForMinor.java | 105 ++++
.../CompactionQueryBuilderForRebalance.java | 76 +++
.../hive/ql/txn/compactor/MajorQueryCompactor.java | 29 +-
.../hive/ql/txn/compactor/MinorQueryCompactor.java | 37 +-
.../ql/txn/compactor/MmMajorQueryCompactor.java | 30 +-
.../ql/txn/compactor/MmMinorQueryCompactor.java | 45 +-
.../ql/txn/compactor/RebalanceQueryCompactor.java | 32 +-
.../compactor/CompactionQueryBuilderTestBase.java | 150 ++++++
...stCompactionQueryBuilderForMajorCompaction.java | 255 ++++++++++
...stCompactionQueryBuilderForMinorCompaction.java | 186 +++++++
.../TestCompactionQueryBuilderForMmCompaction.java | 466 +++++++++++++++++
...mpactionQueryBuilderForRebalanceCompaction.java | 171 +++++++
16 files changed, 1999 insertions(+), 504 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
index 3ca930d5cf5..98196d20bc3 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilder.java
@@ -17,72 +17,51 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.ColumnType;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
-import org.apache.hadoop.hive.metastore.api.Order;
import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.SerDeInfo;
-import org.apache.hadoop.hive.metastore.api.SkewedInfo;
import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.metadata.Hive;
-import org.apache.hadoop.hive.ql.metadata.HiveException;
-import org.apache.hadoop.hive.ql.util.DirectionUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hive.common.util.HiveStringUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import java.lang.reflect.Field;
-import java.lang.reflect.Modifier;
-import java.util.ArrayList;
import java.util.Arrays;
+import java.util.ArrayList;
import java.util.HashMap;
-import java.util.HashSet;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.stream.Collectors;
-/**
- * Builds query strings that help with query-based compaction of CRUD and
insert-only tables.
- */
-class CompactionQueryBuilder {
-
- private static final Logger LOG =
LoggerFactory.getLogger(CompactionQueryBuilder.class.getName());
-
+abstract class CompactionQueryBuilder {
// required fields, set in constructor
- private final Operation operation;
- private final String resultTableName;
+ protected Operation operation;
+ protected String resultTableName;
// required for some types of compaction. Required...
- private Table sourceTab; // for Create and for Insert in CRUD and
insert-only major
- private StorageDescriptor storageDescriptor; // for Create in insert-only
- private String location; // for Create
- private ValidWriteIdList validWriteIdList; // for Alter/Insert in minor and
CRUD
- private AcidDirectory dir; // for Alter in minor
- private Partition sourcePartition; // for Insert in major and insert-only
minor
- private String sourceTabForInsert; // for Insert
- private int numberOfBuckets; //for rebalance
- private String orderByClause; //for rebalance
+ protected Table sourceTab; // for Create and for Insert in CRUD and
insert-only major
+ protected String location; // for Create
+ protected ValidWriteIdList validWriteIdList; // for Alter/Insert in minor
and CRUD
+ protected AcidDirectory dir; // for Alter in minor
+ protected Partition sourcePartition; // for Insert in major and insert-only
minor
+ protected String sourceTabForInsert; // for Insert
+ protected String orderByClause; //for rebalance
+ protected StorageDescriptor storageDescriptor; // for Create in insert-only
+ protected int numberOfBuckets;
// settable booleans
- private boolean isPartitioned; // for Create
- private boolean isBucketed; // for Create in CRUD
- private boolean isDeleteDelta; // for Alter in CRUD minor
+ protected boolean isPartitioned; // for Create
+ protected boolean isBucketed; // for Create in CRUD
+ protected boolean isDeleteDelta; // for Alter in CRUD minor
// internal use only, for legibility
- private final boolean insertOnly;
- private final CompactionType compactionType;
+ protected boolean insertOnly;
+ protected CompactionType compactionType;
enum Operation {
CREATE, ALTER, INSERT, DROP
@@ -100,17 +79,6 @@ class CompactionQueryBuilder {
return this;
}
- /**
- * Set the StorageDescriptor of the table or partition to compact.
- * Required for Create operations in insert-only compaction.
- *
- * @param storageDescriptor StorageDescriptor of the table or partition to
compact, not null
- */
- CompactionQueryBuilder setStorageDescriptor(StorageDescriptor
storageDescriptor) {
- this.storageDescriptor = storageDescriptor;
- return this;
- }
-
/**
* Set the location of the temp tables.
* Used for Create operations.
@@ -164,15 +132,6 @@ class CompactionQueryBuilder {
return this;
}
- /**
- * Sets the target number of implicit buckets for a rebalancing compaction
- * @param numberOfBuckets The target number of buckets
- */
- public CompactionQueryBuilder setNumberOfBuckets(int numberOfBuckets) {
- this.numberOfBuckets = numberOfBuckets;
- return this;
- }
-
/**
* Sets the order by clause for a rebalancing compaction. It will be used to
re-order the data in the table during
* the compaction.
@@ -195,7 +154,7 @@ class CompactionQueryBuilder {
* If true, Create operations for CRUD minor compaction will result in a
bucketed table.
*/
CompactionQueryBuilder setBucketed(boolean bucketed) {
- if(CompactionType.REBALANCE.equals(compactionType) && bucketed) {
+ if (CompactionType.REBALANCE.equals(compactionType) && bucketed) {
throw new IllegalArgumentException("Rebalance compaction is supported
only on implicitly-bucketed tables!");
}
isBucketed = bucketed;
@@ -212,28 +171,50 @@ class CompactionQueryBuilder {
return this;
}
+ /**
+ * Set the StorageDescriptor of the table or partition to compact.
+ * Required for Create operations in insert-only compaction.
+ *
+ * @param storageDescriptor StorageDescriptor of the table or partition to
compact, not null
+ */
+ CompactionQueryBuilder setStorageDescriptor(StorageDescriptor
storageDescriptor) {
+ this.storageDescriptor = storageDescriptor;
+ return this;
+ }
+
+ /**
+ * Sets the target number of implicit buckets for a rebalancing compaction
+ * @param numberOfBuckets The target number of buckets
+ */
+ CompactionQueryBuilder setNumberOfBuckets(int numberOfBuckets) {
+ this.numberOfBuckets = numberOfBuckets;
+ return this;
+ }
+
+ CompactionQueryBuilder setOperation(Operation operation) {
+ this.operation = operation;
+ return this;
+ }
+
+ CompactionQueryBuilder setResultTableName(String resultTableName) {
+ this.resultTableName = resultTableName;
+ return this;
+ }
+
/**
* Construct a CompactionQueryBuilder with required params.
*
* @param compactionType major or minor; crud or insert-only, e.g.
CompactionType.MAJOR_CRUD.
* Cannot be null.
- * @param operation query's Operation e.g. Operation.CREATE.
* @param insertOnly Indicates if the table is not full ACID but Insert-only
(Micromanaged)
* @throws IllegalArgumentException if compactionType is null
*/
- CompactionQueryBuilder(CompactionType compactionType, Operation operation,
boolean insertOnly,
- String resultTableName) {
+ CompactionQueryBuilder(CompactionType compactionType, boolean insertOnly) {
if (compactionType == null) {
throw new
IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be
null");
}
this.compactionType = compactionType;
- this.operation = operation;
- this.resultTableName = resultTableName;
this.insertOnly = insertOnly;
-
- if (CompactionType.REBALANCE.equals(compactionType) && insertOnly) {
- throw new IllegalArgumentException("Rebalance compaction is supported
only on full ACID tables!");
- }
}
/**
@@ -275,137 +256,104 @@ class CompactionQueryBuilder {
case DROP:
default:
}
-
return query.toString();
}
+ protected void getDdlForCreate(StringBuilder query) {
+ defineColumns(query);
+
+ // PARTITIONED BY. Used for parts of minor compaction.
+ if (isPartitioned) {
+ query.append(" PARTITIONED BY (`file_name` STRING) ");
+ }
+
+ // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+ query.append(" stored as orc");
+
+ // LOCATION
+ if (location != null) {
+ query.append(" LOCATION
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+ }
+
+ addTblProperties(query, false, 0);
+ }
+
+ /**
+ * Part of Create operation. All tmp tables are not transactional and are
marked as
+ * compaction tables. Additionally...
+ * - Crud compaction temp tables need tblproperty, "compactiontable."
+ * - Minor crud compaction temp tables need bucketing version tblproperty,
if table is bucketed.
+ */
+ protected void addTblProperties(StringBuilder query, boolean
addBucketingVersion, int bucketingVersion) {
+ Map<String, String> tblProperties = new HashMap<>();
+ tblProperties.put("transactional", "false");
+ tblProperties.put(AcidUtils.COMPACTOR_TABLE_PROPERTY,
compactionType.name());
+ if (addBucketingVersion) {
+ tblProperties.put("bucketing_version", String.valueOf(bucketingVersion));
+ }
+ if (sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is
null
+ sourceTab.getParameters().entrySet().stream().filter(e ->
e.getKey().startsWith("orc."))
+ .forEach(e -> tblProperties.put(e.getKey(),
HiveStringUtils.escapeHiveCommand(e.getValue())));
+ }
+ addTblProperties(query, tblProperties);
+ }
+
+ protected void addTblProperties(StringBuilder query, Map<String, String>
tblProperties) {
+ if (tblProperties != null && !tblProperties.isEmpty()) {
+ // add TBLPROPERTIES clause to query
+ boolean isFirst;
+ query.append(" TBLPROPERTIES (");
+ isFirst = true;
+ for (Map.Entry<String, String> property : tblProperties.entrySet()) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+
query.append("'").append(property.getKey()).append("'='").append(property.getValue()).append("'");
+ isFirst = false;
+ }
+ query.append(")");
+ }
+ }
+
private void buildAddClauseForAlter(StringBuilder query) {
if (validWriteIdList == null || dir == null) {
query.setLength(0);
return; // avoid NPEs, don't throw an exception but return an empty
query
}
- long minWriteID =
- validWriteIdList.getMinOpenWriteId() == null ? 1 :
validWriteIdList.getMinOpenWriteId();
+ long minWriteID = validWriteIdList.getMinOpenWriteId() == null ? 1 :
validWriteIdList.getMinOpenWriteId();
long highWatermark = validWriteIdList.getHighWatermark();
List<AcidUtils.ParsedDelta> deltas =
dir.getCurrentDirectories().stream().filter(
- delta -> delta.isDeleteDelta() == isDeleteDelta &&
delta.getMaxWriteId() <= highWatermark
- && delta.getMinWriteId() >= minWriteID)
+ delta -> delta.isDeleteDelta() == isDeleteDelta &&
delta.getMaxWriteId() <= highWatermark && delta.getMinWriteId() >= minWriteID)
.collect(Collectors.toList());
if (deltas.isEmpty()) {
query.setLength(0); // no alter query needed; clear StringBuilder
return;
}
query.append(" add ");
- deltas.forEach(delta -> query.append("partition (file_name='")
- .append(delta.getPath().getName()).append("')"
- + " location '").append(delta.getPath()).append("' "));
- }
-
-
- private void buildSelectClauseForInsert(StringBuilder query) {
- // Need list of columns for major crud, mmmajor partitioned, mmminor
- List<FieldSchema> cols;
- if (CompactionType.REBALANCE.equals(compactionType) ||
- CompactionType.MAJOR.equals(compactionType) && (!insertOnly ||
sourcePartition != null) ||
- CompactionType.MINOR.equals(compactionType) && insertOnly) {
- if (sourceTab == null) {
- return; // avoid NPEs, don't throw an exception but skip this part of
the query
- }
- cols = sourceTab.getSd().getCols();
- } else {
- cols = null;
- }
- switch (compactionType) {
- case REBALANCE: {
- query.append("0, t2.writeId, t2.rowId DIV CEIL(numRows / ")
- .append(numberOfBuckets)
- .append("), t2.rowId, t2.writeId, t2.data from (select ")
- .append("count(ROW__ID.writeId) over() as numRows, ");
- if (StringUtils.isNotBlank(orderByClause)) {
- // in case of reordering the data the writeids cannot be kept.
- query.append("MAX(ROW__ID.writeId) over() as writeId, row_number()
OVER (")
- .append(orderByClause);
- } else {
- query.append("ROW__ID.writeId as writeId, row_number() OVER (order
by ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC");
- }
- query.append(") - 1 AS rowId, NAMED_STRUCT(");
- for (int i = 0; i < cols.size(); ++i) {
- query.append(i == 0 ? "'" : ",
'").append(cols.get(i).getName()).append("', `")
- .append(cols.get(i).getName()).append("`");
- }
- query.append(") as data");
- break;
- }
- case MAJOR: {
- if (insertOnly) {
- if (sourcePartition != null) { //mmmajor and partitioned
- appendColumns(query, cols, false);
- } else { // mmmajor and unpartitioned
- query.append("*");
- }
- } else {
- query.append(
- "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId,
ROW__ID.rowId), "
- + "ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId,
ROW__ID.writeId, "
- + "NAMED_STRUCT(");
- appendColumns(query, cols, true);
- query.append(") ");
- }
- break;
- }
- case MINOR: {
- if (insertOnly) {
- appendColumns(query, cols, false);
- } else {
- query.append(
- "`operation`, `originalTransaction`, `bucket`, `rowId`,
`currentTransaction`, `row`");
- }
- break;
- }
- }
+ deltas.forEach(
+ delta -> query.append("partition
(file_name='").append(delta.getPath().getName()).append("') location '")
+ .append(delta.getPath()).append("' "));
}
- private void appendColumns(StringBuilder query, List<FieldSchema> cols,
boolean alias) {
- if (cols == null) {
- throw new IllegalStateException("Query could not be created: Source
columns are unknown");
- }
- for (int i = 0; i < cols.size(); ++i) {
- if (alias) {
- query.append(i == 0 ? "'" : ",
'").append(cols.get(i).getName()).append("', `")
- .append(cols.get(i).getName()).append("`");
- } else {
- query.append(i == 0 ? "`" : ",
`").append(cols.get(i).getName()).append("`");
- }
- }
- }
+ protected abstract void buildSelectClauseForInsert(StringBuilder query);
- private void getSourceForInsert(StringBuilder query) {
+ protected void getSourceForInsert(StringBuilder query) {
if (sourceTabForInsert != null) {
query.append(sourceTabForInsert);
} else {
query.append(sourceTab.getDbName()).append(".").append(sourceTab.getTableName());
}
query.append(" ");
- if (CompactionType.REBALANCE.equals(compactionType)) {
- if (StringUtils.isNotBlank(orderByClause)) {
- query.append(orderByClause);
- } else {
- query.append("order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC,
ROW__ID.rowId ASC");
- }
- query.append(") t2");
- } else if (CompactionType.MAJOR.equals(compactionType) && insertOnly &&
StringUtils.isNotBlank(orderByClause)) {
- query.append(orderByClause);
- }
}
- private void buildWhereClauseForInsert(StringBuilder query) {
- if (CompactionType.MAJOR.equals(compactionType) && sourcePartition != null
&& sourceTab != null) {
+ protected void buildWhereClauseForInsert(StringBuilder query) {
+ if (sourcePartition != null && sourceTab != null) {
List<String> vals = sourcePartition.getValues();
List<FieldSchema> keys = sourceTab.getPartitionKeys();
if (keys.size() != vals.size()) {
- throw new IllegalStateException("source partition values ("
- + Arrays.toString(vals.toArray()) + ") do not match source table
values ("
- + Arrays.toString(keys.toArray()) + "). Failing compaction.");
+ throw new IllegalStateException("source partition values (" +
Arrays.toString(
+ vals.toArray()) + ") do not match source table values (" +
Arrays.toString(
+ keys.toArray()) + "). Failing compaction.");
}
query.append(" where ");
@@ -419,67 +367,36 @@ class CompactionQueryBuilder {
}
}
}
-
- if (CompactionType.MINOR.equals(compactionType) && !insertOnly &&
validWriteIdList != null) {
- long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds();
- if (invalidWriteIds.length > 0) {
- query.append(" where `originalTransaction` not in (").append(
- StringUtils.join(ArrayUtils.toObject(invalidWriteIds), ","))
- .append(")");
- }
- }
}
- private void getDdlForCreate(StringBuilder query) {
- defineColumns(query);
-
- // PARTITIONED BY. Used for parts of minor compaction.
- if (isPartitioned) {
- query.append(" PARTITIONED BY (`file_name` STRING) ");
- }
-
- // CLUSTERED BY. (bucketing)
- int bucketingVersion = 0;
- if (!insertOnly && CompactionType.MINOR.equals(compactionType)) {
- bucketingVersion = getMinorCrudBucketing(query, bucketingVersion);
- } else if (insertOnly) {
- getMmBucketing(query);
- }
-
- // SKEWED BY
- if (insertOnly) {
- getSkewedByClause(query);
- }
-
- // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
- if (!insertOnly) {
- query.append(" stored as orc");
- } else {
- copySerdeFromSourceTable(query);
+ protected void appendColumns(StringBuilder query, List<FieldSchema> cols,
boolean alias) {
+ if (cols == null) {
+ throw new IllegalStateException("Query could not be created: Source
columns are unknown");
}
-
- // LOCATION
- if (location != null) {
- query.append(" LOCATION
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+ for (int i = 0; i < cols.size(); ++i) {
+ if (alias) {
+ query.append(i == 0 ? "'" : ",
'").append(cols.get(i).getName()).append("', `").append(cols.get(i).getName())
+ .append("`");
+ } else {
+ query.append(i == 0 ? "`" : ",
`").append(cols.get(i).getName()).append("`");
+ }
}
-
- // TBLPROPERTIES
- addTblProperties(query, bucketingVersion);
}
/**
* Define columns of the create query.
*/
- private void defineColumns(StringBuilder query) {
- if (sourceTab == null) {
- return; // avoid NPEs, don't throw an exception but skip this part of
the query
- }
- query.append("(");
- if (!insertOnly) {
- query.append(
- "`operation` int, `originalTransaction` bigint, `bucket` int,
`rowId` bigint, "
- + "`currentTransaction` bigint, `row` struct<");
+ protected void defineColumns(StringBuilder query) {
+ if (sourceTab != null) {
+ query.append("(")
+ .append("`operation` int, `originalTransaction` bigint, `bucket`
int, `rowId` bigint, `currentTransaction` bigint, ")
+ .append("`row` struct<")
+ .append(StringUtils.join(getColumnDescs(), ','))
+ .append(">) ");
}
+ }
+
+ protected List<String> getColumnDescs() {
List<FieldSchema> cols = sourceTab.getSd().getCols();
List<String> columnDescs = new ArrayList<>();
for (FieldSchema col : cols) {
@@ -487,197 +404,7 @@ class CompactionQueryBuilder {
String columnDesc = "`" + col.getName() + "` " + (!insertOnly ? ":" :
"") + columnType;
columnDescs.add(columnDesc);
}
- query.append(StringUtils.join(columnDescs, ','));
- query.append(!insertOnly ? ">" : "");
- query.append(") ");
+ return columnDescs;
}
- /**
- * Part of Create operation. Copy source table bucketing for insert-only
compaction.
- */
- private void getMmBucketing(StringBuilder query) {
- if (sourceTab == null) {
- return; // avoid NPEs, don't throw an exception but skip this part of
the query
- }
- boolean isFirst;
- List<String> buckCols = sourceTab.getSd().getBucketCols();
- if (buckCols.size() > 0) {
- query.append("CLUSTERED BY (").append(StringUtils.join(buckCols,
",")).append(") ");
- List<Order> sortCols = sourceTab.getSd().getSortCols();
- if (sortCols.size() > 0) {
- query.append("SORTED BY (");
- isFirst = true;
- for (Order sortCol : sortCols) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append(sortCol.getCol()).append(" ")
- .append(DirectionUtils.codeToText(sortCol.getOrder()));
- }
- query.append(") ");
- }
- query.append("INTO ").append(sourceTab.getSd().getNumBuckets()).append("
BUCKETS");
- }
- }
-
- /**
- * Part of Create operation. Minor crud compaction uses its own bucketing
system.
- */
- private int getMinorCrudBucketing(StringBuilder query, int bucketingVersion)
{
- if (isBucketed && sourceTab != null) { // skip if sourceTab is null to
avoid NPEs
- int numBuckets = 1;
- try {
- org.apache.hadoop.hive.ql.metadata.Table t =
- Hive.get().getTable(sourceTab.getDbName(),
sourceTab.getTableName());
- numBuckets = Math.max(t.getNumBuckets(), numBuckets);
- bucketingVersion = t.getBucketingVersion();
- } catch (HiveException e) {
- LOG.info("Error finding table {}. Minor compaction result will use 0
buckets.",
- sourceTab.getTableName());
- } finally {
- query.append(" clustered by (`bucket`)")
- .append(" sorted by (`originalTransaction`, `bucket`, `rowId`)")
- .append(" into ").append(numBuckets).append(" buckets");
- }
- }
- return bucketingVersion;
- }
-
- /**
- * Part of Create operation. Insert-only compaction tables copy source
tables.
- */
- private void getSkewedByClause(StringBuilder query) {
- if (sourceTab == null) {
- return; // avoid NPEs, don't throw an exception but skip this part of
the query
- }
- boolean isFirst; // Stored as directories. We don't care about the skew
otherwise.
- if (sourceTab.getSd().isStoredAsSubDirectories()) {
- SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
- if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
- query.append(" SKEWED BY (")
- .append(StringUtils.join(skewedInfo.getSkewedColNames(), ",
")).append(") ON ");
- isFirst = true;
- for (List<String> colValues : skewedInfo.getSkewedColValues()) {
- if (!isFirst) {
- query.append(", ");
- }
- isFirst = false;
- query.append("('").append(StringUtils.join(colValues,
"','")).append("')");
- }
- query.append(") STORED AS DIRECTORIES");
- }
- }
- }
-
- /**
- * Part of Create operation. Insert-only compaction tables copy source
tables' serde.
- */
- private void copySerdeFromSourceTable(StringBuilder query) {
- if (storageDescriptor == null) {
- return; // avoid NPEs, don't throw an exception but skip this part of
the query
- }
- ensureTableToCompactIsNative();
- SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
- Map<String, String> serdeParams = serdeInfo.getParameters();
- query.append(" ROW FORMAT SERDE '")
-
.append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib())).append("'");
- // WITH SERDEPROPERTIES
- if (!serdeParams.isEmpty()) {
- DDLPlanUtils.appendSerdeParams(query, serdeParams);
- }
- query.append("STORED AS INPUTFORMAT '")
-
.append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getInputFormat())).append("'")
- .append(" OUTPUTFORMAT '")
-
.append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getOutputFormat()))
- .append("'");
- }
-
- /**
- * Part of Create operation. All tmp tables are not transactional and are
marked as
- * compaction tables. Additionally...
- * - Crud compaction temp tables need tblproperty, "compactiontable."
- * - Minor crud compaction temp tables need bucketing version tblproperty,
if table is bucketed.
- * - Insert-only compaction tables copy source tables' tblproperties, except
metastore/statistics
- * properties.
- */
- private void addTblProperties(StringBuilder query, int bucketingVersion) {
- Map<String, String> tblProperties = new HashMap<>();
- tblProperties.put("transactional", "false");
- if (!insertOnly) {
- tblProperties.put(AcidUtils.COMPACTOR_TABLE_PROPERTY,
compactionType.name());
- if (CompactionType.MINOR.equals(compactionType) && isBucketed) {
- tblProperties.put("bucketing_version",
String.valueOf(bucketingVersion));
- }
- }
- if (sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is
null
- if (insertOnly) {
- // Exclude all standard table properties.
- Set<String> excludes = getHiveMetastoreConstants();
- excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
- for (Map.Entry<String, String> e :
sourceTab.getParameters().entrySet()) {
- if (e.getValue() == null) {
- continue;
- }
- if (excludes.contains(e.getKey())) {
- continue;
- }
- tblProperties.put(e.getKey(),
HiveStringUtils.escapeHiveCommand(e.getValue()));
- }
- } else {
- for (Map.Entry<String, String> e :
sourceTab.getParameters().entrySet()) {
- if (e.getKey().startsWith("orc.")) {
- tblProperties.put(e.getKey(),
HiveStringUtils.escapeHiveCommand(e.getValue()));
- }
- }
- }
- }
-
- // add TBLPROPERTIES clause to query
- boolean isFirst;
- query.append(" TBLPROPERTIES (");
- isFirst = true;
- for (Map.Entry<String, String> property : tblProperties.entrySet()) {
- if (!isFirst) {
- query.append(", ");
- }
-
query.append("'").append(property.getKey()).append("'='").append(property.getValue())
- .append("'");
- isFirst = false;
- }
- query.append(")");
- }
-
- private static Set<String> getHiveMetastoreConstants() {
- Set<String> result = new HashSet<>();
- for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
- if (!Modifier.isFinal(f.getModifiers())) {
- continue;
- }
- if (!String.class.equals(f.getType())) {
- continue;
- }
- f.setAccessible(true);
- try {
- result.add((String) f.get(null));
- } catch (IllegalAccessException e) {
- throw new RuntimeException(e);
- }
- }
- return result;
- }
-
- private void ensureTableToCompactIsNative() {
- if (sourceTab == null) {
- return;
- }
- String storageHandler =
-
sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
- if (storageHandler != null) {
- String message = "Table " + sourceTab.getTableName() + "has a storage
handler ("
- + storageHandler + "). Failing compaction for this non-native
table.";
- LOG.error(message);
- throw new RuntimeException(message);
- }
- }
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderFactory.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderFactory.java
new file mode 100644
index 00000000000..bb7a001ebd4
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderFactory.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+
+class CompactionQueryBuilderFactory {
+ public CompactionQueryBuilder getCompactionQueryBuilder(CompactionType
compactionType, boolean insertOnly) {
+
+ if (compactionType == null) {
+ throw new
IllegalArgumentException("CompactionQueryBuilder.CompactionType cannot be
null");
+ }
+ if (insertOnly) {
+ return new CompactionQueryBuilderForInsertOnly(compactionType);
+ } else {
+ switch (compactionType) {
+ case MAJOR:
+ return new CompactionQueryBuilderForMajor();
+ case MINOR:
+ return new CompactionQueryBuilderForMinor();
+ case REBALANCE:
+ return new CompactionQueryBuilderForRebalance();
+ }
+ }
+ throw new IllegalArgumentException(
+ String.format("Compaction cannot be created with type %s",
compactionType));
+ }
+
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java
new file mode 100644
index 00000000000..f0bd64bfd7b
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForInsertOnly.java
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.common.StatsSetupConst;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.ql.exec.DDLPlanUtils;
+import org.apache.hadoop.hive.ql.util.DirectionUtils;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * Builds query strings that help with query-based compaction of insert-only
tables.
+ */
+class CompactionQueryBuilderForInsertOnly extends CompactionQueryBuilder {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionQueryBuilderForInsertOnly.class.getName());
+
+ /**
+ * Construct a CompactionQueryBuilderForInsertOnly with required params.
+ *
+ * @param compactionType major or minor or rebalance, e.g.
CompactionType.MAJOR.
+ * Cannot be null.
+ * Rebalance compaction is not supported for
insert-only tables.
+ * @throws IllegalArgumentException if compactionType is null or the
compaction type is REBALANCE
+ */
+ CompactionQueryBuilderForInsertOnly(CompactionType compactionType) {
+ super(compactionType, true);
+ if (CompactionType.REBALANCE.equals(compactionType)) {
+ throw new IllegalArgumentException("Rebalance compaction can only be
supported on full ACID tables.");
+ }
+ }
+
+ @Override
+ protected void buildSelectClauseForInsert(StringBuilder query) {
+ switch (compactionType) {
+ case MAJOR: {
+ if (sourcePartition != null) { //mmmajor and partitioned
+ if (sourceTab != null) {
+ List<FieldSchema> cols = sourceTab.getSd().getCols();
+ appendColumns(query, cols, false);
+ // If the source table is null, just skip to add this part to the
query
+ }
+ } else { // mmmajor and unpartitioned
+ query.append("*");
+ }
+ break;
+ }
+ case MINOR: {
+ if (sourceTab != null) {
+ List<FieldSchema> cols = sourceTab.getSd().getCols();
+ appendColumns(query, cols, false);
+ // If the source table is null, just skip to add this part to the query
+ }
+ }
+ }
+ }
+
+ @Override
+ protected void getSourceForInsert(StringBuilder query) {
+ super.getSourceForInsert(query);
+ if (CompactionType.MAJOR.equals(compactionType) &&
StringUtils.isNotBlank(orderByClause)) {
+ query.append(orderByClause);
+ }
+ }
+
+ @Override
+ protected void buildWhereClauseForInsert(StringBuilder query) {
+ if (CompactionType.MAJOR.equals(compactionType)) {
+ super.buildWhereClauseForInsert(query);
+ }
+ }
+
+ @Override
+ protected void getDdlForCreate(StringBuilder query) {
+ defineColumns(query);
+
+ // PARTITIONED BY. Used for parts of minor compaction.
+ if (isPartitioned) {
+ query.append(" PARTITIONED BY (`file_name` STRING) ");
+ }
+
+ // CLUSTERED BY. (bucketing)
+ getMmBucketing(query);
+
+ // SKEWED BY
+ getSkewedByClause(query);
+
+ // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+ copySerdeFromSourceTable(query);
+
+ // LOCATION
+ if (location != null) {
+ query.append(" LOCATION
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+ }
+
+ // TBLPROPERTIES
+ addTblProperties(query);
+ }
+
+ /**
+ * Define columns of the create query.
+ */
+ @Override
+ protected void defineColumns(StringBuilder query) {
+ if (sourceTab != null) {
+ query.append("(");
+ List<String> columnDescs = getColumnDescs();
+ query.append(StringUtils.join(columnDescs, ','));
+ query.append(") ");
+ }
+ }
+
+ /**
+ * Part of Create operation. Copy source table bucketing for insert-only
compaction.
+ */
+ private void getMmBucketing(StringBuilder query) {
+ if (sourceTab != null) {
+ boolean isFirst;
+ List<String> buckCols = sourceTab.getSd().getBucketCols();
+ if (!buckCols.isEmpty()) {
+ query.append("CLUSTERED BY (").append(StringUtils.join(buckCols,
",")).append(") ");
+ List<Order> sortCols = sourceTab.getSd().getSortCols();
+ if (!sortCols.isEmpty()) {
+ query.append("SORTED BY (");
+ isFirst = true;
+ for (Order sortCol : sortCols) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append(sortCol.getCol()).append("
").append(DirectionUtils.codeToText(sortCol.getOrder()));
+ }
+ query.append(") ");
+ }
+ query.append("INTO
").append(sourceTab.getSd().getNumBuckets()).append(" BUCKETS");
+ }
+ }
+ }
+
+ /**
+ * Part of Create operation. Insert-only compaction tables copy source
tables.
+ */
+ private void getSkewedByClause(StringBuilder query) {
+ if (sourceTab != null) {
+ boolean isFirst; // Stored as directories. We don't care about the skew
otherwise.
+ if (sourceTab.getSd().isStoredAsSubDirectories()) {
+ SkewedInfo skewedInfo = sourceTab.getSd().getSkewedInfo();
+ if (skewedInfo != null && !skewedInfo.getSkewedColNames().isEmpty()) {
+ query.append(" SKEWED BY
(").append(StringUtils.join(skewedInfo.getSkewedColNames(), ", ")).append(") ON
");
+ isFirst = true;
+ for (List<String> colValues : skewedInfo.getSkewedColValues()) {
+ if (!isFirst) {
+ query.append(", ");
+ }
+ isFirst = false;
+ query.append("('").append(StringUtils.join(colValues,
"','")).append("')");
+ }
+ query.append(") STORED AS DIRECTORIES");
+ }
+ }
+ }
+ }
+
+ private static Set<String> getHiveMetastoreConstants() {
+ Set<String> result = new HashSet<>();
+ for (Field f : hive_metastoreConstants.class.getDeclaredFields()) {
+ if (!Modifier.isFinal(f.getModifiers())) {
+ continue;
+ }
+ if (!String.class.equals(f.getType())) {
+ continue;
+ }
+ f.setAccessible(true);
+ try {
+ result.add((String) f.get(null));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Part of Create operation. All tmp tables are not transactional and are
marked as
+ * compaction tables. Additionally...
+ * - Insert-only compaction tables copy source tables' tblproperties, except
metastore/statistics
+ * properties.
+ */
+ protected void addTblProperties(StringBuilder query) {
+ Map<String, String> tblProperties = new HashMap<>();
+ tblProperties.put("transactional", "false");
+ if (sourceTab != null) { // to avoid NPEs, skip this part if sourceTab is
null
+ // Exclude all standard table properties.
+ Set<String> excludes = getHiveMetastoreConstants();
+ excludes.addAll(StatsSetupConst.TABLE_PARAMS_STATS_KEYS);
+ for (Map.Entry<String, String> e : sourceTab.getParameters().entrySet())
{
+ if (e.getValue() == null) {
+ continue;
+ }
+ if (excludes.contains(e.getKey())) {
+ continue;
+ }
+ tblProperties.put(e.getKey(),
HiveStringUtils.escapeHiveCommand(e.getValue()));
+ }
+ }
+ super.addTblProperties(query, tblProperties);
+ }
+
+ /**
+ * Part of Create operation. Insert-only compaction tables copy source
tables' serde.
+ */
+ protected void copySerdeFromSourceTable(StringBuilder query) {
+ if (storageDescriptor != null) {
+ ensureTableToCompactIsNative();
+ SerDeInfo serdeInfo = storageDescriptor.getSerdeInfo();
+ Map<String, String> serdeParams = serdeInfo.getParameters();
+ query.append(" ROW FORMAT SERDE
'").append(HiveStringUtils.escapeHiveCommand(serdeInfo.getSerializationLib()))
+ .append("'");
+ // WITH SERDEPROPERTIES
+ if (!serdeParams.isEmpty()) {
+ DDLPlanUtils.appendSerdeParams(query, serdeParams);
+ }
+ query.append("STORED AS INPUTFORMAT '")
+
.append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getInputFormat())).append("'")
+ .append(" OUTPUTFORMAT
'").append(HiveStringUtils.escapeHiveCommand(storageDescriptor.getOutputFormat()))
+ .append("'");
+ }
+
+ }
+
+ private void ensureTableToCompactIsNative() {
+ if (sourceTab != null) {
+ String storageHandler =
sourceTab.getParameters().get(hive_metastoreConstants.META_TABLE_STORAGE);
+ if (storageHandler != null) {
+ String message =
+ "Table " + sourceTab.getTableName() + " has a storage handler (" +
storageHandler + "). Failing compaction for this non-native table.";
+ LOG.error(message);
+ throw new RuntimeException(message);
+ }
+ }
+ }
+
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMajor.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMajor.java
new file mode 100644
index 00000000000..c95b3cfee67
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMajor.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+
+import java.util.List;
+
+/**
+ * Builds query strings that help with query-based MAJOR compaction of CRUD.
+ */
+class CompactionQueryBuilderForMajor extends CompactionQueryBuilder {
+
+ CompactionQueryBuilderForMajor() {
+ super(CompactionType.MAJOR, false);
+ }
+
+ @Override
+ protected void buildSelectClauseForInsert(StringBuilder query) {
+ // Need list of columns fir major crud compaction.
+ // If the source table is null, don't throw an exception, but skip this
part of the query
+ if (sourceTab != null) {
+ List<FieldSchema> cols = sourceTab.getSd().getCols();
+
+ query.append(
+ "validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId,
ROW__ID.rowId), ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId,
ROW__ID.writeId, NAMED_STRUCT(");
+ appendColumns(query, cols, true);
+ query.append(") ");
+ }
+ }
+
+ @Override
+ protected void buildWhereClauseForInsert(StringBuilder query) {
+ super.buildWhereClauseForInsert(query);
+ }
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java
new file mode 100644
index 00000000000..ebaec5fccd4
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForMinor.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.ArrayUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.ql.metadata.Hive;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.apache.hive.common.util.HiveStringUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Builds query strings that help with query-based MINOR compaction of CRUD.
+ */
+class CompactionQueryBuilderForMinor extends CompactionQueryBuilder {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(CompactionQueryBuilderForMinor.class.getName());
+
+ CompactionQueryBuilderForMinor() {
+ super(CompactionType.MINOR, false);
+ }
+
+ @Override
+ protected void buildSelectClauseForInsert(StringBuilder query) {
+ query.append("`operation`, `originalTransaction`, `bucket`, `rowId`,
`currentTransaction`, `row`");
+ }
+
+ @Override
+ protected void buildWhereClauseForInsert(StringBuilder query) {
+ if (validWriteIdList != null) {
+ long[] invalidWriteIds = validWriteIdList.getInvalidWriteIds();
+ if (invalidWriteIds.length > 0) {
+ query.append(" where `originalTransaction` not in (")
+ .append(StringUtils.join(ArrayUtils.toObject(invalidWriteIds),
",")).append(")");
+ }
+ }
+ }
+
+ @Override
+ protected void getDdlForCreate(StringBuilder query) {
+ defineColumns(query);
+
+ // PARTITIONED BY. Used for parts of minor compaction.
+ if (isPartitioned) {
+ query.append(" PARTITIONED BY (`file_name` STRING) ");
+ }
+
+ // CLUSTERED BY. (bucketing)
+ int bucketingVersion = getMinorCrudBucketing(query);
+
+ // STORED AS / ROW FORMAT SERDE + INPUTFORMAT + OUTPUTFORMAT
+ query.append(" stored as orc");
+
+ // LOCATION
+ if (location != null) {
+ query.append(" LOCATION
'").append(HiveStringUtils.escapeHiveCommand(location)).append("'");
+ }
+
+ // TBLPROPERTIES
+ addTblProperties(query, isBucketed, bucketingVersion);
+ }
+
+ /**
+ * Part of Create operation. Minor crud compaction uses its own bucketing
system.
+ */
+ private int getMinorCrudBucketing(StringBuilder query) {
+ int bucketingVersion = 0;
+ if (isBucketed && sourceTab != null) { // skip if sourceTab is null to
avoid NPEs
+ int numBuckets = 1;
+ try {
+ org.apache.hadoop.hive.ql.metadata.Table t = getTable();
+ numBuckets = Math.max(t.getNumBuckets(), numBuckets);
+ bucketingVersion = t.getBucketingVersion();
+ } catch (HiveException e) {
+ LOG.info("Error finding table {}. Minor compaction result will use 0
buckets.", sourceTab.getTableName());
+ } finally {
+ query.append(" clustered by (`bucket`)").append(" sorted by
(`originalTransaction`, `bucket`, `rowId`)")
+ .append(" into ").append(numBuckets).append(" buckets");
+ }
+ }
+ return bucketingVersion;
+ }
+
+ protected org.apache.hadoop.hive.ql.metadata.Table getTable() throws
HiveException {
+ return Hive.get().getTable(sourceTab.getDbName(),
sourceTab.getTableName());
+ }
+
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForRebalance.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForRebalance.java
new file mode 100644
index 00000000000..91dca3ffd1b
--- /dev/null
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderForRebalance.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+
+import java.util.List;
+
+/**
+ * Builds query strings that help with REBALANCE compaction of CRUD.
+ */
+class CompactionQueryBuilderForRebalance extends CompactionQueryBuilder {
+
+ CompactionQueryBuilderForRebalance() {
+ super(CompactionType.REBALANCE, false);
+ }
+
+ @Override
+ protected void buildSelectClauseForInsert(StringBuilder query) {
+ // Need list of columns. If the source table is null, don't throw
+ // an exception, but skip this part of the query
+ if (sourceTab != null) {
+ List<FieldSchema> cols = sourceTab.getSd().getCols();
+ query.append("0, t2.writeId, t2.rowId DIV CEIL(numRows /
").append(numberOfBuckets)
+ .append("), t2.rowId, t2.writeId, t2.data from (select ")
+ .append("count(ROW__ID.writeId) over() as numRows, ");
+ if (StringUtils.isNotBlank(orderByClause)) {
+ // in case of reordering the data the writeids cannot be kept.
+ query.append("MAX(ROW__ID.writeId) over() as writeId, row_number()
OVER (").append(orderByClause);
+ } else {
+ query.append(
+ "ROW__ID.writeId as writeId, row_number() OVER (order by
ROW__ID.writeId ASC, ROW__ID.bucketId ASC, ROW__ID.rowId ASC");
+ }
+ query.append(") - 1 AS rowId, NAMED_STRUCT(");
+ for (int i = 0; i < cols.size(); ++i) {
+ query.append(i == 0 ? "'" : ",
'").append(cols.get(i).getName()).append("', `").append(cols.get(i).getName())
+ .append("`");
+ }
+ query.append(") as data");
+ }
+ }
+
+ @Override
+ protected void getSourceForInsert(StringBuilder query) {
+ super.getSourceForInsert(query);
+ if (StringUtils.isNotBlank(orderByClause)) {
+ query.append(orderByClause);
+ } else {
+ query.append("order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC,
ROW__ID.rowId ASC");
+ }
+ query.append(") t2");
+ }
+
+ @Override
+ protected void buildWhereClauseForInsert(StringBuilder query) {
+ // No where clause in the insert query of rebalance compaction
+ }
+
+}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
index 75fb798ac33..2dc49851531 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MajorQueryCompactor.java
@@ -72,23 +72,22 @@ final class MajorQueryCompactor extends QueryCompactor {
* See {@link
org.apache.hadoop.hive.conf.HiveConf.ConfVars#SPLIT_GROUPING_MODE} for the
config description.
*/
private List<String> getCreateQueries(String fullName, Table t, String
tmpTableLocation) {
- return Lists.newArrayList(new CompactionQueryBuilder(
- CompactionType.MAJOR,
- CompactionQueryBuilder.Operation.CREATE,
- false,
- fullName)
+ return Lists.newArrayList(new
CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MAJOR, false)
+ .setOperation(CompactionQueryBuilder.Operation.CREATE)
+ .setResultTableName(fullName)
.setSourceTab(t)
.setLocation(tmpTableLocation)
.build());
}
private List<String> getCompactionQueries(Table t, Partition p, String
tmpName) {
+
return Lists.newArrayList(
- new CompactionQueryBuilder(
- CompactionType.MAJOR,
- CompactionQueryBuilder.Operation.INSERT,
- false,
- tmpName)
+ new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MAJOR, false)
+ .setOperation(CompactionQueryBuilder.Operation.INSERT)
+ .setResultTableName(tmpName)
.setSourceTab(t)
.setSourcePartition(p)
.build());
@@ -96,10 +95,10 @@ final class MajorQueryCompactor extends QueryCompactor {
private List<String> getDropQueries(String tmpTableName) {
return Lists.newArrayList(
- new CompactionQueryBuilder(
- CompactionType.MAJOR,
- CompactionQueryBuilder.Operation.DROP,
- false,
- tmpTableName).build());
+ new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MAJOR, false)
+ .setOperation(CompactionQueryBuilder.Operation.DROP)
+ .setResultTableName(tmpTableName)
+ .build());
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
index cc1c3e92167..9234443a9d6 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MinorQueryCompactor.java
@@ -150,11 +150,10 @@ final class MinorQueryCompactor extends QueryCompactor {
*/
private String buildCreateTableQuery(Table table, String newTableName,
boolean isPartitioned,
boolean isBucketed, String location) {
- return new CompactionQueryBuilder(
- CompactionType.MINOR,
- CompactionQueryBuilder.Operation.CREATE,
- false,
- newTableName)
+ return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MINOR, false)
+ .setOperation(CompactionQueryBuilder.Operation.CREATE)
+ .setResultTableName(newTableName)
.setSourceTab(table)
.setBucketed(isBucketed)
.setPartitioned(isPartitioned)
@@ -173,11 +172,10 @@ final class MinorQueryCompactor extends QueryCompactor {
*/
private String buildAlterTableQuery(String tableName, AcidDirectory dir,
ValidWriteIdList validWriteIdList, boolean isDeleteDelta) {
- return new CompactionQueryBuilder(
- CompactionType.MINOR,
- CompactionQueryBuilder.Operation.ALTER,
- false,
- tableName)
+ return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MINOR, false)
+ .setOperation(CompactionQueryBuilder.Operation.ALTER)
+ .setResultTableName(tableName)
.setDir(dir)
.setValidWriteIdList(validWriteIdList)
.setIsDeleteDelta(isDeleteDelta)
@@ -214,11 +212,10 @@ final class MinorQueryCompactor extends QueryCompactor {
*/
private String buildCompactionQuery(String sourceTableName, String
resultTableName, Table table,
ValidWriteIdList validWriteIdList) {
- return new CompactionQueryBuilder(
- CompactionType.MINOR,
- CompactionQueryBuilder.Operation.INSERT,
- false,
- resultTableName)
+ return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MINOR, false)
+ .setOperation(CompactionQueryBuilder.Operation.INSERT)
+ .setResultTableName(resultTableName)
.setSourceTabForInsert(sourceTableName)
.setSourceTab(table)
.setValidWriteIdList(validWriteIdList)
@@ -239,10 +236,10 @@ final class MinorQueryCompactor extends QueryCompactor {
}
private String getDropQuery(String tableToDrop) {
- return new CompactionQueryBuilder(
- CompactionType.MINOR,
- CompactionQueryBuilder.Operation.DROP,
- false,
- tableToDrop).build();
+ return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MINOR, false)
+ .setOperation(CompactionQueryBuilder.Operation.DROP)
+ .setResultTableName(tableToDrop)
+ .build();
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
index ecdae6439ca..80a0d8f932e 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMajorQueryCompactor.java
@@ -82,13 +82,12 @@ final class MmMajorQueryCompactor extends QueryCompactor {
private List<String> getCreateQueries(String tmpTableName, Table table,
StorageDescriptor storageDescriptor, String baseLocation) {
return Lists.newArrayList(
- new CompactionQueryBuilder(
- CompactionType.MAJOR,
- CompactionQueryBuilder.Operation.CREATE,
- true,
- tmpTableName)
- .setSourceTab(table)
+ new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MAJOR, true)
+ .setOperation(CompactionQueryBuilder.Operation.CREATE)
+ .setResultTableName(tmpTableName)
.setStorageDescriptor(storageDescriptor)
+ .setSourceTab(table)
.setLocation(baseLocation)
.build()
);
@@ -96,11 +95,10 @@ final class MmMajorQueryCompactor extends QueryCompactor {
private List<String> getCompactionQueries(Table t, Partition p, String
tmpName) {
return Lists.newArrayList(
- new CompactionQueryBuilder(
- CompactionType.MAJOR,
- CompactionQueryBuilder.Operation.INSERT,
- true,
- tmpName)
+ new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MAJOR, true)
+ .setOperation(CompactionQueryBuilder.Operation.INSERT)
+ .setResultTableName(tmpName)
.setSourceTab(t)
.setSourcePartition(p)
.build()
@@ -109,10 +107,10 @@ final class MmMajorQueryCompactor extends QueryCompactor {
private List<String> getDropQueries(String tmpTableName) {
return Lists.newArrayList(
- new CompactionQueryBuilder(
- CompactionType.MAJOR,
- CompactionQueryBuilder.Operation.DROP,
- true,
- tmpTableName).build());
+ new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MAJOR, true)
+ .setOperation(CompactionQueryBuilder.Operation.DROP)
+ .setResultTableName(tmpTableName)
+ .build());
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
index 81e7b4cc19f..b3f8019a251 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MmMinorQueryCompactor.java
@@ -108,13 +108,12 @@ final class MmMinorQueryCompactor extends QueryCompactor {
private String getCreateQuery(String newTableName, Table t,
StorageDescriptor sd,
String location, boolean isPartitioned) {
- return new CompactionQueryBuilder(
- CompactionType.MINOR,
- CompactionQueryBuilder.Operation.CREATE,
- true,
- newTableName)
- .setSourceTab(t)
+ return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MINOR, true)
+ .setOperation(CompactionQueryBuilder.Operation.CREATE)
+ .setResultTableName(newTableName)
.setStorageDescriptor(sd)
+ .setSourceTab(t)
.setLocation(location)
.setPartitioned(isPartitioned)
.build();
@@ -130,11 +129,10 @@ final class MmMinorQueryCompactor extends QueryCompactor {
*/
private String buildAlterTableQuery(String tableName, AcidDirectory dir,
ValidWriteIdList validWriteIdList) {
- return new CompactionQueryBuilder(
- CompactionType.MINOR,
- CompactionQueryBuilder.Operation.ALTER,
- true,
- tableName)
+ return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MINOR, true)
+ .setOperation(CompactionQueryBuilder.Operation.ALTER)
+ .setResultTableName(tableName)
.setDir(dir)
.setValidWriteIdList(validWriteIdList)
.build();
@@ -154,14 +152,13 @@ final class MmMinorQueryCompactor extends QueryCompactor {
private List<String> getCompactionQueries(String sourceTmpTableName, String
resultTmpTableName,
Table sourceTable) {
return Lists.newArrayList(
- new CompactionQueryBuilder(
- CompactionType.MINOR,
- CompactionQueryBuilder.Operation.INSERT,
- true,
- resultTmpTableName)
- .setSourceTabForInsert(sourceTmpTableName)
- .setSourceTab(sourceTable)
- .build()
+ new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MINOR, true)
+ .setOperation(CompactionQueryBuilder.Operation.INSERT)
+ .setResultTableName(resultTmpTableName)
+ .setSourceTabForInsert(sourceTmpTableName)
+ .setSourceTab(sourceTable)
+ .build()
);
}
@@ -178,11 +175,11 @@ final class MmMinorQueryCompactor extends QueryCompactor {
}
private String getDropQuery(String tableToDrop) {
- return new CompactionQueryBuilder(
- CompactionType.MINOR,
- CompactionQueryBuilder.Operation.DROP,
- true,
- tableToDrop).build();
+ return new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.MINOR, true)
+ .setOperation(CompactionQueryBuilder.Operation.DROP)
+ .setResultTableName(tableToDrop)
+ .build();
}
private HiveConf setUpDriverSession(HiveConf hiveConf) {
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
index a9849404c7a..9e9d76f070c 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/RebalanceQueryCompactor.java
@@ -73,11 +73,10 @@ final class RebalanceQueryCompactor extends QueryCompactor {
}
private List<String> getCreateQueries(String fullName, Table t, String
tmpTableLocation) {
- return Lists.newArrayList(new CompactionQueryBuilder(
- CompactionType.REBALANCE,
- CompactionQueryBuilder.Operation.CREATE,
- false,
- fullName)
+ return Lists.newArrayList(new
CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.REBALANCE, false)
+ .setOperation(CompactionQueryBuilder.Operation.CREATE)
+ .setResultTableName(fullName)
.setSourceTab(t)
.setLocation(tmpTableLocation)
.build());
@@ -85,24 +84,23 @@ final class RebalanceQueryCompactor extends QueryCompactor {
private List<String> getCompactionQueries(Table t, Partition p, String
tmpName, int numberOfBuckets, String orderByClause) {
return Lists.newArrayList(
- new CompactionQueryBuilder(
- CompactionType.REBALANCE,
- CompactionQueryBuilder.Operation.INSERT,
- false,
- tmpName)
- .setSourceTab(t)
- .setSourcePartition(p)
+ new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.REBALANCE, false)
+ .setOperation(CompactionQueryBuilder.Operation.INSERT)
+ .setResultTableName(tmpName)
.setNumberOfBuckets(numberOfBuckets)
.setOrderByClause(orderByClause)
+ .setSourceTab(t)
+ .setSourcePartition(p)
.build());
}
private List<String> getDropQueries(String tmpTableName) {
return Lists.newArrayList(
- new CompactionQueryBuilder(
- CompactionType.REBALANCE,
- CompactionQueryBuilder.Operation.DROP,
- false,
- tmpTableName).build());
+ new CompactionQueryBuilderFactory().getCompactionQueryBuilder(
+ CompactionType.REBALANCE, false)
+ .setOperation(CompactionQueryBuilder.Operation.DROP)
+ .setResultTableName(tmpTableName)
+ .build());
}
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderTestBase.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderTestBase.java
new file mode 100644
index 00000000000..243f6a67063
--- /dev/null
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactionQueryBuilderTestBase.java
@@ -0,0 +1,150 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Order;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.mockito.Mockito.when;
+
+public class CompactionQueryBuilderTestBase {
+
+ public static final String DB_NAME = "comp_test_db";
+ public static final String SOURCE_TABLE_NAME = "comp_test_source_table";
+ public static final String RESULT_TABLE_NAME = "comp_test_result_table";
+ public static final String COLUMN_1_NAME = "column_1";
+ public static final String COLUMN_2_NAME = "column_2";
+ public static final String COLUMN_3_NAME = "column_3";
+ public static final String SOME_TEST_LOCATION = "some_test_path";
+ public static final String COMP_TEST_SOURCE_TABLE_FOR_INSERT =
"comp_test_db.comp_test_insert_table";
+
+ public Table createSourceTable() {
+ return createSourceTable(false, false, false);
+ }
+
+ public Table createSourceTableWithProperties() {
+ return createSourceTable(true, false, false);
+ }
+
+ public Table createSourceTableBucketed() {
+ return createSourceTable(false, true, false);
+ }
+
+ public Table createSourceTableBucketedSorted() {
+ return createSourceTable(false, true, true);
+ }
+
+ private Table createSourceTable(boolean addTableProperties, boolean
bucketed, boolean sorted) {
+ Table sourceTable = new Table();
+ sourceTable.setDbName(DB_NAME);
+ sourceTable.setTableName(SOURCE_TABLE_NAME);
+
+ StorageDescriptor sd = new StorageDescriptor();
+ List<FieldSchema> columns = new ArrayList<>();
+ FieldSchema col1 = new FieldSchema(COLUMN_1_NAME, "string", "First
column");
+ FieldSchema col2 = new FieldSchema(COLUMN_2_NAME, "int", null);
+ FieldSchema col3 = new FieldSchema(COLUMN_3_NAME, "boolean", "Third
column");
+ columns.add(col1);
+ columns.add(col2);
+ columns.add(col3);
+ sd.setCols(columns);
+
+ if (bucketed) {
+ sd.addToBucketCols(COLUMN_1_NAME);
+ sd.addToBucketCols(COLUMN_3_NAME);
+ sd.setNumBuckets(4);
+ } else {
+ sd.setBucketCols(Collections.emptyList());
+ }
+
+ if (sorted) {
+ sd.addToSortCols(new Order(COLUMN_1_NAME, 0));
+ sd.addToSortCols(new Order(COLUMN_2_NAME, 1));
+ } else {
+ sd.setSortCols(Collections.emptyList());
+ }
+
+ Map<String, String> parameters = new HashMap<>();
+ if (addTableProperties) {
+ parameters.put("property_1", "true");
+ parameters.put("orc.property_2", "44");
+ parameters.put("COLUMN_STATS_ACCURATE", "false");
+ parameters.put("columns.types", "test");
+ }
+ sourceTable.setParameters(parameters);
+ sourceTable.setSd(sd);
+ return sourceTable;
+ }
+
+ protected AcidDirectory createAcidDirectory() {
+ AcidDirectory dir = Mockito.mock(AcidDirectory.class);
+ AcidUtils.ParsedDelta d1 = Mockito.mock(AcidUtils.ParsedDelta.class);
+ AcidUtils.ParsedDelta d2 = Mockito.mock(AcidUtils.ParsedDelta.class);
+ AcidUtils.ParsedDelta d3 = Mockito.mock(AcidUtils.ParsedDelta.class);
+ AcidUtils.ParsedDelta d4 = Mockito.mock(AcidUtils.ParsedDelta.class);
+ AcidUtils.ParsedDelta d5 = Mockito.mock(AcidUtils.ParsedDelta.class);
+
+ List<AcidUtils.ParsedDelta> dirs = new ArrayList<>();
+ dirs.add(d1);
+ dirs.add(d2);
+ dirs.add(d3);
+ dirs.add(d4);
+ dirs.add(d5);
+
+ when(dir.getCurrentDirectories()).thenReturn(dirs);
+ when(d1.isDeleteDelta()).thenReturn(true);
+ when(d1.getMinWriteId()).thenReturn(7L);
+ when(d1.getMaxWriteId()).thenReturn(11L);
+ when(d1.getPath()).thenReturn(new Path("/compaction/test/table",
"test_delta_1"));
+ when(d2.isDeleteDelta()).thenReturn(true);
+ when(d2.getMinWriteId()).thenReturn(1L);
+ when(d2.getMaxWriteId()).thenReturn(11L);
+ when(d2.getPath()).thenReturn(new Path("/compaction/test/table",
"test_delta_2"));
+ when(d3.isDeleteDelta()).thenReturn(true);
+ when(d3.getMinWriteId()).thenReturn(5L);
+ when(d3.getMaxWriteId()).thenReturn(15L);
+ when(d3.getPath()).thenReturn(new Path("/compaction/test/table",
"test_delta_3"));
+ when(d4.isDeleteDelta()).thenReturn(true);
+ when(d4.getMinWriteId()).thenReturn(7L);
+ when(d4.getMaxWriteId()).thenReturn(20L);
+ when(d4.getPath()).thenReturn(new Path("/compaction/test/table",
"test_delta_4"));
+ when(d5.isDeleteDelta()).thenReturn(false);
+ when(d5.getMinWriteId()).thenReturn(6L);
+ when(d5.getMaxWriteId()).thenReturn(11L);
+ when(d5.getPath()).thenReturn(new Path("/compaction/test/table",
"test_delta_5"));
+ return dir;
+ }
+
+ protected ValidCompactorWriteIdList createWriteId(long minWriteId) {
+ long[] abortedWriteIdList = { 1111L };
+ return new ValidCompactorWriteIdList("comp_test_source_table",
abortedWriteIdList, null, 15L, minWriteId);
+ }
+}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java
new file mode 100644
index 00000000000..99f8b5303a5
--- /dev/null
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMajorCompaction.java
@@ -0,0 +1,255 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.Collections;
+
+import static org.mockito.Mockito.when;
+
+public class TestCompactionQueryBuilderForMajorCompaction extends
CompactionQueryBuilderTestBase {
+
+ @Test
+ public void testCreateNoSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForCreate();
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table stored as orc
TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreate() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) stored as orc TBLPROPERTIES
('compactiontable'='MAJOR', 'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreateWithSourceTableProperties() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableWithProperties();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) stored as orc TBLPROPERTIES
('compactiontable'='MAJOR', 'orc.property_2'='44', 'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreateWithSourceTableLocation() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setLocation(SOME_TEST_LOCATION);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) stored as orc LOCATION 'some_test_path'
TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreateWithPartitionedSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setPartitioned(true);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) PARTITIONED BY (`file_name` STRING) stored as orc
TBLPROPERTIES ('compactiontable'='MAJOR', 'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsert() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ Partition sourcePartition = new Partition();
+ sourcePartition.addToValues("source_part_1");
+ sourcePartition.addToValues("true");
+ sourcePartition.addToValues("4444");
+
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string",
"comment 1"));
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean",
"comment 2"));
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_3", "int",
"comment 3"));
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setSourcePartition(sourcePartition);
+
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select
validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId),
ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId,
NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3',
`column_3`) from comp_test_db.comp_test_insert_table where
`source_part_1`='source_part_1' and `source_part_2`=true and
`source_part_3`='4444'";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertPartitionMismatch() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ Partition sourcePartition = new Partition();
+ sourcePartition.addToValues("source_part_1");
+ sourcePartition.addToValues("true");
+ sourcePartition.addToValues("4444");
+
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string",
"comment 1"));
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean",
"comment 2"));
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setSourcePartition(sourcePartition);
+
+ String expectedMessage =
+ "source partition values ([source_part_1, true, 4444]) do not match
source table values ([FieldSchema(name:source_part_1, type:string,
comment:comment 1), FieldSchema(name:source_part_2, type:boolean,
comment:comment 2)]). Failing compaction.";
+ Assert.assertThrows(expectedMessage, IllegalStateException.class,
queryBuilder::build);
+ }
+
+ @Test
+ public void testInsertNoSourcePartition() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select
validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId),
ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId,
NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3',
`column_3`) from comp_test_db.comp_test_insert_table ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertNoSourceTableForInsert() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select
validate_acid_sort_order(ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId),
ROW__ID.writeId, ROW__ID.bucketId, ROW__ID.rowId, ROW__ID.writeId,
NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3',
`column_3`) from comp_test_db.comp_test_source_table ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertNoSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForInsert();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ String query = queryBuilder.build();
+ String expectedQuery = "INSERT into table comp_test_result_table select
from comp_test_db.comp_test_insert_table ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testAlter() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForAlter();
+ AcidDirectory dir = createAcidDirectory();
+ ValidCompactorWriteIdList writeIds = createWriteId(5L);
+ queryBuilder.setValidWriteIdList(writeIds);
+ queryBuilder.setDir(dir);
+ queryBuilder.setIsDeleteDelta(true);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "ALTER table comp_test_result_table add partition
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1'
partition (file_name='test_delta_3') location
'/compaction/test/table/test_delta_3' ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testAlterEmptyDir() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForAlter();
+ AcidDirectory dir = Mockito.mock(AcidDirectory.class);
+ when(dir.getCurrentDirectories()).thenReturn(Collections.emptyList());
+ ValidCompactorWriteIdList writeIds = createWriteId(5L);
+ queryBuilder.setValidWriteIdList(writeIds);
+ queryBuilder.setDir(dir);
+ queryBuilder.setIsDeleteDelta(true);
+ String query = queryBuilder.build();
+ Assert.assertTrue(query.isEmpty());
+ }
+
+ @Test
+ public void testAlterMinWriteIdIsNull() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForAlter();
+ AcidDirectory dir = createAcidDirectory();
+ ValidCompactorWriteIdList writeIds = createWriteId(Long.MAX_VALUE);
+ queryBuilder.setValidWriteIdList(writeIds);
+ queryBuilder.setDir(dir);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "ALTER table comp_test_result_table add partition
(file_name='test_delta_5') location '/compaction/test/table/test_delta_5' ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testAlterDirIsNull() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForAlter();
+ ValidCompactorWriteIdList writeIds = createWriteId(Long.MAX_VALUE);
+ queryBuilder.setValidWriteIdList(writeIds);
+ queryBuilder.setDir(null);
+ String query = queryBuilder.build();
+ Assert.assertTrue(query.isEmpty());
+ }
+
+ @Test
+ public void testAlterWalidWriteIdListIsNull() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForAlter();
+ AcidDirectory dir = createAcidDirectory();
+ queryBuilder.setValidWriteIdList(null);
+ queryBuilder.setDir(dir);
+ String query = queryBuilder.build();
+ Assert.assertTrue(query.isEmpty());
+ }
+
+ @Test
+ public void testDrop() {
+ CompactionQueryBuilder queryBuilder =
getMajorCompactionQueryBuilderForDrop();
+ String query = queryBuilder.build();
+ String expectedQuery = "DROP table if exists comp_test_result_table";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ private CompactionQueryBuilder getMajorCompactionQueryBuilderForCreate() {
+ return
getMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+ }
+
+ private CompactionQueryBuilder getMajorCompactionQueryBuilderForInsert() {
+ return
getMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+ }
+
+ private CompactionQueryBuilder getMajorCompactionQueryBuilderForAlter() {
+ return
getMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+ }
+
+ private CompactionQueryBuilder getMajorCompactionQueryBuilderForDrop() {
+ return
getMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+ }
+
+ private CompactionQueryBuilder getMajorCompactionBuilder() {
+ CompactionQueryBuilder compactionQueryBuilder =
+ new
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.MAJOR,
false);
+ return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+ }
+
+}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java
new file mode 100644
index 00000000000..eef965b1a92
--- /dev/null
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMinorCompaction.java
@@ -0,0 +1,186 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.Assert;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import static org.mockito.Mockito.when;
+
+public class TestCompactionQueryBuilderForMinorCompaction extends
CompactionQueryBuilderTestBase {
+
+ static class CompactionQueryBuilderForMinorMock extends
CompactionQueryBuilderForMinor {
+ private boolean throwException = false;
+
+ public void setThrowException(boolean throwException) {
+ this.throwException = throwException;
+ }
+
+ @Override
+ public org.apache.hadoop.hive.ql.metadata.Table getTable() throws
HiveException {
+ org.apache.hadoop.hive.ql.metadata.Table t =
Mockito.mock(org.apache.hadoop.hive.ql.metadata.Table.class);
+ if (throwException) {
+ throw new HiveException();
+ } else {
+ when(t.getBucketingVersion()).thenReturn(2);
+ when(t.getNumBuckets()).thenReturn(4);
+ return t;
+ }
+ }
+ }
+
+ @Test
+ public void testCreateWithoutSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getMinorCompactionQueryBuilderForCreate();
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table stored as orc
TBLPROPERTIES ('compactiontable'='MINOR', 'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreateWithTablePropertiesWithLocation() {
+ CompactionQueryBuilder queryBuilder =
getMinorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableWithProperties();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setLocation("some_test_path");
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) stored as orc LOCATION 'some_test_path'
TBLPROPERTIES ('compactiontable'='MINOR', 'orc.property_2'='44',
'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreatePartitioned() {
+ CompactionQueryBuilder queryBuilder =
getMinorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setPartitioned(true);
+ queryBuilder.setLocation("some_test_path");
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) PARTITIONED BY (`file_name` STRING) stored as orc
LOCATION 'some_test_path' TBLPROPERTIES ('compactiontable'='MINOR',
'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreateBucketed() {
+ CompactionQueryBuilder queryBuilder = new
CompactionQueryBuilderForMinorMock();
+ queryBuilder.setOperation(CompactionQueryBuilder.Operation.CREATE);
+ queryBuilder.setResultTableName(RESULT_TABLE_NAME);
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setBucketed(true);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) clustered by (`bucket`) sorted by
(`originalTransaction`, `bucket`, `rowId`) into 4 buckets stored as orc
TBLPROPERTIES ('compactiontable'='MINOR', 'bucketing_version'='2',
'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreateBucketedExceptionThrown() {
+ CompactionQueryBuilderForMinorMock queryBuilder = new
CompactionQueryBuilderForMinorMock();
+ queryBuilder.setThrowException(true);
+ queryBuilder.setOperation(CompactionQueryBuilder.Operation.CREATE);
+ queryBuilder.setResultTableName(RESULT_TABLE_NAME);
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setBucketed(true);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) clustered by (`bucket`) sorted by
(`originalTransaction`, `bucket`, `rowId`) into 1 buckets stored as orc
TBLPROPERTIES ('compactiontable'='MINOR', 'bucketing_version'='0',
'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsert() {
+ CompactionQueryBuilder queryBuilder =
getMinorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTabForInsert("comp_test_db.comp_test_insert_table");
+ long[] abortedWriteIdList = { 1111L, 2222L };
+ ValidCompactorWriteIdList writeIds =
+ new ValidCompactorWriteIdList("comp_test_source_table",
abortedWriteIdList, null, 111111L);
+ queryBuilder.setValidWriteIdList(writeIds);
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select `operation`,
`originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row` from
comp_test_db.comp_test_insert_table where `originalTransaction` not in
(1111,2222)";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertWithoutValidWriteIdsAndSourceTableForInsert() {
+ CompactionQueryBuilder queryBuilder =
getMinorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select `operation`,
`originalTransaction`, `bucket`, `rowId`, `currentTransaction`, `row` from
comp_test_db.comp_test_source_table ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testAlter() {
+ CompactionQueryBuilder queryBuilder =
getMinorCompactionQueryBuilderForAlter();
+ AcidDirectory dir = createAcidDirectory();
+ ValidCompactorWriteIdList writeIds = createWriteId(5L);
+ queryBuilder.setValidWriteIdList(writeIds);
+ queryBuilder.setDir(dir);
+ queryBuilder.setIsDeleteDelta(true);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "ALTER table comp_test_result_table add partition
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1'
partition (file_name='test_delta_3') location
'/compaction/test/table/test_delta_3' ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testDrop() {
+ CompactionQueryBuilder queryBuilder =
getMinorCompactionQueryBuilderForDrop();
+ String query = queryBuilder.build();
+ String expectedQuery = "DROP table if exists comp_test_result_table";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ private CompactionQueryBuilder getMinorCompactionQueryBuilderForCreate() {
+ return
getMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+ }
+
+ private CompactionQueryBuilder getMinorCompactionQueryBuilderForInsert() {
+ return
getMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+ }
+
+ private CompactionQueryBuilder getMinorCompactionQueryBuilderForAlter() {
+ return
getMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+ }
+
+ private CompactionQueryBuilder getMinorCompactionQueryBuilderForDrop() {
+ return
getMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+ }
+
+ private CompactionQueryBuilder getMinorCompactionBuilder() {
+ CompactionQueryBuilder compactionQueryBuilder =
+ new
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.MINOR,
false);
+ return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+ }
+}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java
new file mode 100644
index 00000000000..2295f9cf715
--- /dev/null
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForMmCompaction.java
@@ -0,0 +1,466 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.SerDeInfo;
+import org.apache.hadoop.hive.metastore.api.SkewedInfo;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.metadata.HiveException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestCompactionQueryBuilderForMmCompaction extends
CompactionQueryBuilderTestBase {
+
+ @Test
+ public void testMajorCompactionCreateWithoutSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForCreate();
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table TBLPROPERTIES
('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMajorCompactionCreateWithTablePropertiesWithLocation() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableWithProperties();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setLocation(SOME_TEST_LOCATION);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) LOCATION 'some_test_path'
TBLPROPERTIES ('property_1'='true', 'orc.property_2'='44',
'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMajorCompactionCreatePartitioned() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setPartitioned(true);
+ queryBuilder.setLocation(SOME_TEST_LOCATION);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) PARTITIONED BY (`file_name` STRING)
LOCATION 'some_test_path' TBLPROPERTIES ('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMajorCompactionCreateWithBucketedSourceTable() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableBucketed();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3) INTO
4 BUCKETS TBLPROPERTIES ('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMajorCompactionCreateWithBucketedSortedSourceTable() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableBucketedSorted();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3)
SORTED BY (column_1 DESC, column_2 ASC) INTO 4 BUCKETS TBLPROPERTIES
('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMajorCompactionCreateWithStorageDescriptor() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ StorageDescriptor storageDescriptor = createStorageDescriptor();
+ queryBuilder.setStorageDescriptor(storageDescriptor);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) ROW FORMAT SERDE
'/some/test/serialization_lib'WITH SERDEPROPERTIES ( \n" + "
'test_param_1'='test_value', \n" + " 'test_param_2'='test_value')STORED AS
INPUTFORMAT 'some.test.InputFormat' OUTPUTFORMAT 'some.test.OutputFormat'
TBLPROPERTIES ('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMajorCompactionCreateWithSkewedByClause() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ StorageDescriptor storageDescriptor = sourceTable.getSd();
+ SkewedInfo skewedInfo = new SkewedInfo();
+ skewedInfo.addToSkewedColNames("column_1");
+ List<String> skewedColValues = new ArrayList<>();
+ skewedColValues.add("value1");
+ skewedColValues.add("value2");
+ skewedColValues.add("value3");
+ skewedInfo.addToSkewedColValues(skewedColValues);
+ storageDescriptor.setSkewedInfo(skewedInfo);
+ storageDescriptor.setStoredAsSubDirectories(true);
+ sourceTable.setSd(storageDescriptor);
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) SKEWED BY (column_1) ON
('value1','value2','value3')) STORED AS DIRECTORIES TBLPROPERTIES
('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMajorCompactionCreateWithNonNativeTable() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("storage_handler", "test_storage_handler");
+ sourceTable.setParameters(parameters);
+ queryBuilder.setSourceTab(sourceTable);
+ StorageDescriptor storageDescriptor = createStorageDescriptor();
+ queryBuilder.setStorageDescriptor(storageDescriptor);
+ String expectedMessage =
+ "Table comp_test_source_table has a storage handler
(test_storage_handler). Failing compaction for this non-native table.";
+ Assert.assertThrows(expectedMessage, RuntimeException.class,
queryBuilder::build);
+ }
+
+ @Test
+ public void testMinorCompactionCreateWithoutSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForCreate();
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table TBLPROPERTIES
('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMinorCompactionCreateWithTablePropertiesWithLocation() {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableWithProperties();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setLocation(SOME_TEST_LOCATION);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) LOCATION 'some_test_path'
TBLPROPERTIES ('property_1'='true', 'orc.property_2'='44',
'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMinorCompactionCreatePartitioned() {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setPartitioned(true);
+ queryBuilder.setLocation(SOME_TEST_LOCATION);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) PARTITIONED BY (`file_name` STRING)
LOCATION 'some_test_path' TBLPROPERTIES ('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMinorCompactionCreateWithBucketedSourceTable() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableBucketed();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3) INTO
4 BUCKETS TBLPROPERTIES ('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMinorCompactionCreateWithBucketedSortedSourceTable() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableBucketedSorted();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) CLUSTERED BY (column_1,column_3)
SORTED BY (column_1 DESC, column_2 ASC) INTO 4 BUCKETS TBLPROPERTIES
('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMinorCompactionCreateWithStorageDescriptor() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ StorageDescriptor storageDescriptor = createStorageDescriptor();
+ queryBuilder.setStorageDescriptor(storageDescriptor);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) ROW FORMAT SERDE
'/some/test/serialization_lib'WITH SERDEPROPERTIES ( \n" + "
'test_param_1'='test_value', \n" + " 'test_param_2'='test_value')STORED AS
INPUTFORMAT 'some.test.InputFormat' OUTPUTFORMAT 'some.test.OutputFormat'
TBLPROPERTIES ('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMinorCompactionCreateWithSkewedByClause() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ StorageDescriptor storageDescriptor = sourceTable.getSd();
+ SkewedInfo skewedInfo = new SkewedInfo();
+ skewedInfo.addToSkewedColNames("column_1");
+ List<String> skewedColValues = new ArrayList<>();
+ skewedColValues.add("value1");
+ skewedColValues.add("value2");
+ skewedColValues.add("value3");
+ skewedInfo.addToSkewedColValues(skewedColValues);
+ storageDescriptor.setSkewedInfo(skewedInfo);
+ storageDescriptor.setStoredAsSubDirectories(true);
+ sourceTable.setSd(storageDescriptor);
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`column_1`
string,`column_2` int,`column_3` boolean) SKEWED BY (column_1) ON
('value1','value2','value3')) STORED AS DIRECTORIES TBLPROPERTIES
('transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testMinorCompactionCreateLWithNonNativeTable() throws
HiveException {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ Map<String, String> parameters = new HashMap<>();
+ parameters.put("storage_handler", "test_storage_handler");
+ sourceTable.setParameters(parameters);
+ queryBuilder.setSourceTab(sourceTable);
+ StorageDescriptor storageDescriptor = createStorageDescriptor();
+ queryBuilder.setStorageDescriptor(storageDescriptor);
+ String expectedMessage =
+ "Table comp_test_source_tablehas a storage handler
(test_storage_handler). Failing compaction for this non-native table.";
+ Assert.assertThrows(expectedMessage, RuntimeException.class,
queryBuilder::build);
+ }
+
+ @Test
+ public void testInsertMajorCompaction() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTableBucketedSorted();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ queryBuilder.setOrderByClause("ORDER BY column_1 ASC, column_2 DESC");
+ Partition sourcePartition = new Partition();
+ sourcePartition.addToValues("source_part_1");
+ sourcePartition.addToValues("true");
+ sourcePartition.addToValues("4444");
+
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string",
"comment 1"));
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean",
"comment 2"));
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_3", "int",
"comment 3"));
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setSourcePartition(sourcePartition);
+
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select `column_1`,
`column_2`, `column_3` from comp_test_db.comp_test_insert_table ORDER BY
column_1 ASC, column_2 DESC where `source_part_1`='source_part_1' and
`source_part_2`=true and `source_part_3`='4444'";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertMajorCompactionPartitionMismatch() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ Partition sourcePartition = new Partition();
+ sourcePartition.addToValues("source_part_1");
+ sourcePartition.addToValues("true");
+ sourcePartition.addToValues("4444");
+
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string",
"comment 1"));
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_2", "boolean",
"comment 2"));
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setSourcePartition(sourcePartition);
+
+ String expectedMessage =
+ "source partition values ([source_part_1, true, 4444]) do not match
source table values ([FieldSchema(name:source_part_1, type:string,
comment:comment 1), FieldSchema(name:source_part_2, type:boolean,
comment:comment 2)]). Failing compaction.";
+ Assert.assertThrows(expectedMessage, IllegalStateException.class,
queryBuilder::build);
+ }
+
+ @Test
+ public void testInsertMajorCompactionNoSourceTabForInsert() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ Partition sourcePartition = new Partition();
+ sourcePartition.addToValues("source_part_1");
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string",
"comment 1"));
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setSourcePartition(sourcePartition);
+
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select `column_1`,
`column_2`, `column_3` from comp_test_db.comp_test_source_table where
`source_part_1`='source_part_1'";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertMajorCompactionOnlySourceTableSet() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select * from
comp_test_db.comp_test_source_table ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertMajorCompactionNoSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForInsert();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select * from
comp_test_db.comp_test_insert_table ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertMinorCompaction() {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select `column_1`,
`column_2`, `column_3` from comp_test_db.comp_test_insert_table ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertMinorCompactionWithoutSourceTableForInsert() {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT into table comp_test_result_table select `column_1`,
`column_2`, `column_3` from comp_test_db.comp_test_source_table ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertMinorCompactionNoSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForInsert();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ String query = queryBuilder.build();
+ String expectedQuery = "INSERT into table comp_test_result_table select
from comp_test_db.comp_test_insert_table ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testAlterMajorCompaction() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForAlter();
+ AcidDirectory dir = createAcidDirectory();
+ ValidCompactorWriteIdList writeIds = createWriteId(5L);
+ queryBuilder.setValidWriteIdList(writeIds);
+ queryBuilder.setDir(dir);
+ queryBuilder.setIsDeleteDelta(true);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "ALTER table comp_test_result_table add partition
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1'
partition (file_name='test_delta_3') location
'/compaction/test/table/test_delta_3' ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testAlterMinorCompaction() {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForAlter();
+ AcidDirectory dir = createAcidDirectory();
+ ValidCompactorWriteIdList writeIds = createWriteId(5L);
+ queryBuilder.setValidWriteIdList(writeIds);
+ queryBuilder.setDir(dir);
+ queryBuilder.setIsDeleteDelta(true);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "ALTER table comp_test_result_table add partition
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1'
partition (file_name='test_delta_3') location
'/compaction/test/table/test_delta_3' ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testDropMajorCompaction() {
+ CompactionQueryBuilder queryBuilder =
getMmMajorCompactionQueryBuilderForDrop();
+ String query = queryBuilder.build();
+ String expectedQuery = "DROP table if exists comp_test_result_table";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testDropMinorCompaction() {
+ CompactionQueryBuilder queryBuilder =
getMmMinorCompactionQueryBuilderForDrop();
+ String query = queryBuilder.build();
+ String expectedQuery = "DROP table if exists comp_test_result_table";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ private StorageDescriptor createStorageDescriptor() {
+ SerDeInfo serdeInfo = new SerDeInfo();
+ serdeInfo.setSerializationLib("/some/test/serialization_lib");
+ Map<String, String> serdeParams = new HashMap<>();
+ serdeParams.put("test_param_1", "test_value");
+ serdeParams.put("test_param_2", "test_value");
+ serdeInfo.setParameters(serdeParams);
+ StorageDescriptor storageDescriptor = new StorageDescriptor();
+ storageDescriptor.setSerdeInfo(serdeInfo);
+ storageDescriptor.setInputFormat("some.test.InputFormat");
+ storageDescriptor.setOutputFormat("some.test.OutputFormat");
+ return storageDescriptor;
+ }
+
+ private CompactionQueryBuilder getMmMajorCompactionQueryBuilderForCreate() {
+ return
getMmMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+ }
+
+ private CompactionQueryBuilder getMmMajorCompactionQueryBuilderForInsert() {
+ return
getMmMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+ }
+
+ private CompactionQueryBuilder getMmMajorCompactionQueryBuilderForAlter() {
+ return
getMmMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+ }
+
+ private CompactionQueryBuilder getMmMajorCompactionQueryBuilderForDrop() {
+ return
getMmMajorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+ }
+
+ private CompactionQueryBuilder getMmMinorCompactionQueryBuilderForCreate() {
+ return
getMmMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+ }
+
+ private CompactionQueryBuilder getMmMinorCompactionQueryBuilderForInsert() {
+ return
getMmMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+ }
+
+ private CompactionQueryBuilder getMmMinorCompactionQueryBuilderForAlter() {
+ return
getMmMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+ }
+
+ private CompactionQueryBuilder getMmMinorCompactionQueryBuilderForDrop() {
+ return
getMmMinorCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+ }
+
+ private CompactionQueryBuilder getMmMajorCompactionBuilder() {
+ CompactionQueryBuilder compactionQueryBuilder =
+ new
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.MAJOR,
true);
+ return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+ }
+
+ private CompactionQueryBuilder getMmMinorCompactionBuilder() {
+ CompactionQueryBuilder compactionQueryBuilder =
+ new
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.MINOR,
true);
+ return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+ }
+}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java
new file mode 100644
index 00000000000..df96fb00314
--- /dev/null
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionQueryBuilderForRebalanceCompaction.java
@@ -0,0 +1,171 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.FieldSchema;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestCompactionQueryBuilderForRebalanceCompaction extends
CompactionQueryBuilderTestBase {
+
+ @Test
+ public void testCreate() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableWithProperties();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setLocation(SOME_TEST_LOCATION);
+ queryBuilder.setPartitioned(true);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) PARTITIONED BY (`file_name` STRING) stored as orc
LOCATION 'some_test_path' TBLPROPERTIES ('compactiontable'='REBALANCE',
'orc.property_2'='44', 'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreateWithNonPartitionedSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTableWithProperties();
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setLocation(SOME_TEST_LOCATION);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) stored as orc LOCATION 'some_test_path'
TBLPROPERTIES ('compactiontable'='REBALANCE', 'orc.property_2'='44',
'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testCreateWithNoLocationAndNoTableProperties() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForCreate();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "CREATE temporary external table comp_test_result_table(`operation`
int, `originalTransaction` bigint, `bucket` int, `rowId` bigint,
`currentTransaction` bigint, `row` struct<`column_1` :string,`column_2`
:int,`column_3` :boolean>) stored as orc TBLPROPERTIES
('compactiontable'='REBALANCE', 'transactional'='false')";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsert() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setNumberOfBuckets(5);
+ queryBuilder.setOrderByClause("ORDER BY column_1 ASC, column_3 DESC");
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT overwrite table comp_test_result_table select 0, t2.writeId,
t2.rowId DIV CEIL(numRows / 5), t2.rowId, t2.writeId, t2.data from (select
count(ROW__ID.writeId) over() as numRows, MAX(ROW__ID.writeId) over() as
writeId, row_number() OVER (ORDER BY column_1 ASC, column_3 DESC) - 1 AS rowId,
NAMED_STRUCT('column_1', `column_1`, 'column_2', `column_2`, 'column_3',
`column_3`) as data from comp_test_db.comp_test_insert_table ORDER BY column_1
ASC, column_3 DESC) t2";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertWithPartitionedTable() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ Partition sourcePartition = new Partition();
+ sourcePartition.addToValues("source_part_1");
+ sourceTable.addToPartitionKeys(new FieldSchema("source_part_1", "string",
"comment 1"));
+ queryBuilder.setSourceTab(sourceTable);
+ queryBuilder.setSourcePartition(sourcePartition);
+
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT overwrite table comp_test_result_table select 0, t2.writeId,
t2.rowId DIV CEIL(numRows / 0), t2.rowId, t2.writeId, t2.data from (select
count(ROW__ID.writeId) over() as numRows, ROW__ID.writeId as writeId,
row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC,
ROW__ID.rowId ASC) - 1 AS rowId, NAMED_STRUCT('column_1', `column_1`,
'column_2', `column_2`, 'column_3', `column_3`) as data from
comp_test_db.comp_test_insert_table order by ROW__ID.writeId ASC, RO [...]
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertOnlySourceTableIsSet() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForInsert();
+ Table sourceTable = createSourceTable();
+ queryBuilder.setSourceTab(sourceTable);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT overwrite table comp_test_result_table select 0, t2.writeId,
t2.rowId DIV CEIL(numRows / 0), t2.rowId, t2.writeId, t2.data from (select
count(ROW__ID.writeId) over() as numRows, ROW__ID.writeId as writeId,
row_number() OVER (order by ROW__ID.writeId ASC, ROW__ID.bucketId ASC,
ROW__ID.rowId ASC) - 1 AS rowId, NAMED_STRUCT('column_1', `column_1`,
'column_2', `column_2`, 'column_3', `column_3`) as data from
comp_test_db.comp_test_source_table order by ROW__ID.writeId ASC, RO [...]
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testInsertNoSourceTable() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForInsert();
+ queryBuilder.setSourceTabForInsert(COMP_TEST_SOURCE_TABLE_FOR_INSERT);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "INSERT overwrite table comp_test_result_table select from
comp_test_db.comp_test_insert_table order by ROW__ID.writeId ASC,
ROW__ID.bucketId ASC, ROW__ID.rowId ASC) t2";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testAlter() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForAlter();
+ AcidDirectory dir = createAcidDirectory();
+ ValidCompactorWriteIdList writeIds = createWriteId(5L);
+ queryBuilder.setValidWriteIdList(writeIds);
+ queryBuilder.setDir(dir);
+ queryBuilder.setIsDeleteDelta(true);
+ String query = queryBuilder.build();
+ String expectedQuery =
+ "ALTER table comp_test_result_table add partition
(file_name='test_delta_1') location '/compaction/test/table/test_delta_1'
partition (file_name='test_delta_3') location
'/compaction/test/table/test_delta_3' ";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testDrop() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForDrop();
+ String query = queryBuilder.build();
+ String expectedQuery = "DROP table if exists comp_test_result_table";
+ Assert.assertEquals(expectedQuery, query);
+ }
+
+ @Test
+ public void testRebalanceCompactionWithBuckets() {
+ CompactionQueryBuilder queryBuilder =
getRebalanceCompactionQueryBuilderForInsert();
+ String expectedMessage = "Rebalance compaction is supported only on
implicitly-bucketed tables!";
+ Assert.assertThrows(expectedMessage, IllegalArgumentException.class, () ->
{
+ queryBuilder.setBucketed(true);
+ });
+ }
+
+ private CompactionQueryBuilder getRebalanceCompactionQueryBuilderForCreate()
{
+ return
getRebalanceCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.CREATE);
+ }
+
+ private CompactionQueryBuilder getRebalanceCompactionQueryBuilderForInsert()
{
+ return
getRebalanceCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.INSERT);
+ }
+
+ private CompactionQueryBuilder getRebalanceCompactionQueryBuilderForAlter() {
+ return
getRebalanceCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.ALTER);
+ }
+
+ private CompactionQueryBuilder getRebalanceCompactionQueryBuilderForDrop() {
+ return
getRebalanceCompactionBuilder().setOperation(CompactionQueryBuilder.Operation.DROP);
+ }
+
+ private CompactionQueryBuilder getRebalanceCompactionBuilder() {
+ CompactionQueryBuilder compactionQueryBuilder =
+ new
CompactionQueryBuilderFactory().getCompactionQueryBuilder(CompactionType.REBALANCE,
false);
+ return compactionQueryBuilder.setResultTableName(RESULT_TABLE_NAME);
+ }
+}