This is an automated email from the ASF dual-hosted git repository.
veghlaci05 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d2d4b86433e HIVE-27598 - Enhance alter table compact to work for
partitioned tables (Taraka Rama Rao Lethavadla, reviewed by Laszlo Vegh)
d2d4b86433e is described below
commit d2d4b86433ef356bf738edd72bfef74511a15444
Author: tarak271 <[email protected]>
AuthorDate: Tue Oct 31 15:08:20 2023 +0530
HIVE-27598 - Enhance alter table compact to work for partitioned tables
(Taraka Rama Rao Lethavadla, reviewed by Laszlo Vegh)
---
.../compact/AlterTableCompactOperation.java | 130 ++++---
.../hadoop/hive/ql/txn/compactor/Initiator.java | 249 +------------
.../hive/ql/txn/compactor/InitiatorBase.java | 309 ++++++++++++++++
.../queries/clientpositive/manual_compaction.q | 68 ++++
.../clientpositive/llap/manual_compaction.q.out | 387 +++++++++++++++++++++
5 files changed, 843 insertions(+), 300 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
index 9187101367b..cc896331aff 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/ddl/table/storage/compact/AlterTableCompactOperation.java
@@ -18,27 +18,27 @@
package org.apache.hadoop.hive.ql.ddl.table.storage.compact;
-import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.utils.JavaUtils;
-import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
-import org.apache.hadoop.hive.ql.io.AcidUtils;
-
-import java.util.List;
-import java.util.Map;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
+import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
import org.apache.hadoop.hive.ql.ErrorMsg;
import org.apache.hadoop.hive.ql.ddl.DDLOperation;
+import org.apache.hadoop.hive.ql.ddl.DDLOperationContext;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.metadata.Partition;
import org.apache.hadoop.hive.ql.metadata.Table;
+import org.apache.hadoop.hive.ql.txn.compactor.InitiatorBase;
+
+import java.util.*;
+import java.util.concurrent.atomic.AtomicBoolean;
import static
org.apache.hadoop.hive.ql.io.AcidUtils.compactionTypeStr2ThriftType;
@@ -50,80 +50,95 @@ public class AlterTableCompactOperation extends
DDLOperation<AlterTableCompactDe
super(context, desc);
}
- @Override
- public int execute() throws HiveException {
+ @Override public int execute() throws Exception {
Table table = context.getDb().getTable(desc.getTableName());
if (!AcidUtils.isTransactionalTable(table)) {
throw new HiveException(ErrorMsg.NONACID_COMPACTION_NOT_SUPPORTED,
table.getDbName(), table.getTableName());
}
- String partitionName = getPartitionName(table);
+ CompactionRequest compactionRequest = new
CompactionRequest(table.getDbName(), table.getTableName(),
+ compactionTypeStr2ThriftType(desc.getCompactionType()));
- CompactionResponse resp = compact(table, partitionName);
- if (!resp.isAccepted()) {
- String message = Constants.ERROR_MESSAGE_NO_DETAILS_AVAILABLE;
- if (resp.isSetErrormessage()) {
- message = resp.getErrormessage();
- }
- throw new HiveException(ErrorMsg.COMPACTION_REFUSED,
- table.getDbName(), table.getTableName(), partitionName == null ? ""
: "(partition=" + partitionName + ")", message);
- }
+ compactionRequest.setPoolName(desc.getPoolName());
+ compactionRequest.setProperties(desc.getProperties());
+ compactionRequest.setInitiatorId(JavaUtils.hostname() + "-" +
HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION);
+
compactionRequest.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion());
+ compactionRequest.setOrderByClause(desc.getOrderByClause());
- if (desc.isBlocking() && resp.isAccepted()) {
- waitForCompactionToFinish(resp);
+ if (desc.getNumberOfBuckets() > 0) {
+ compactionRequest.setNumberOfBuckets(desc.getNumberOfBuckets());
}
+ InitiatorBase initiatorBase = new InitiatorBase();
+ initiatorBase.setConf(context.getConf());
+ initiatorBase.init(new AtomicBoolean());
+
+ Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
+ convertPartitionsFromThriftToDB(getPartitions(table, desc, context));
+
+ if(desc.getPartitionSpec() != null){
+ Optional<String> partitionName =
partitionMap.keySet().stream().findFirst();
+ partitionName.ifPresent(compactionRequest::setPartitionname);
+ }
+ List<CompactionResponse> compactionResponses =
+ initiatorBase.initiateCompactionForTable(compactionRequest,
table.getTTable(), partitionMap);
+ for (CompactionResponse compactionResponse : compactionResponses) {
+ if (!compactionResponse.isAccepted()) {
+ String message;
+ if (compactionResponse.isSetErrormessage()) {
+ message = compactionResponse.getErrormessage();
+ throw new HiveException(ErrorMsg.COMPACTION_REFUSED,
table.getDbName(), table.getTableName(),
+ "CompactionId: " + compactionResponse.getId(), message);
+ }
+ context.getConsole().printInfo(
+ "Compaction already enqueued with id " +
compactionResponse.getId() + "; State is "
+ + compactionResponse.getState());
+ continue;
+ }
+ context.getConsole().printInfo("Compaction enqueued with id " +
compactionResponse.getId());
+ if (desc.isBlocking() && compactionResponse.isAccepted()) {
+ waitForCompactionToFinish(compactionResponse, context);
+ }
+ }
return 0;
}
- private String getPartitionName(Table table) throws HiveException {
- String partitionName = null;
+ private List<Partition> getPartitions(Table table, AlterTableCompactDesc
desc, DDLOperationContext context)
+ throws HiveException {
+ List<Partition> partitions = new ArrayList<>();
+
if (desc.getPartitionSpec() == null) {
- if (table.isPartitioned()) { // Compaction can only be done on the whole
table if the table is non-partitioned.
- throw new HiveException(ErrorMsg.NO_COMPACTION_PARTITION);
+ if (table.isPartitioned()) {
+ // Compaction will get initiated for all the potential partitions that
meets the criteria
+ partitions = context.getDb().getPartitions(table);
}
} else {
Map<String, String> partitionSpec = desc.getPartitionSpec();
- List<Partition> partitions = context.getDb().getPartitions(table,
partitionSpec);
+ partitions = context.getDb().getPartitions(table, partitionSpec);
if (partitions.size() > 1) {
throw new HiveException(ErrorMsg.TOO_MANY_COMPACTION_PARTITIONS);
} else if (partitions.size() == 0) {
throw new HiveException(ErrorMsg.INVALID_PARTITION_SPEC);
}
- partitionName = partitions.get(0).getName();
}
- return partitionName;
+ return partitions;
}
- private CompactionResponse compact(Table table, String partitionName) throws
HiveException {
- CompactionRequest req = new CompactionRequest(table.getDbName(),
table.getTableName(),
- compactionTypeStr2ThriftType(desc.getCompactionType()));
- req.setPartitionname(partitionName);
- req.setPoolName(desc.getPoolName());
- req.setProperties(desc.getProperties());
- req.setInitiatorId(JavaUtils.hostname() + "-" +
HiveMetaStoreClient.MANUALLY_INITIATED_COMPACTION);
-
req.setInitiatorVersion(HiveMetaStoreClient.class.getPackage().getImplementationVersion());
- req.setOrderByClause(desc.getOrderByClause());
- if (desc.getNumberOfBuckets() > 0) {
- req.setNumberOfBuckets(desc.getNumberOfBuckets());
- }
- CompactionResponse resp = context.getDb().compact(req);
- if (resp.isAccepted()) {
- context.getConsole().printInfo("Compaction enqueued with id " +
resp.getId());
- } else {
- context.getConsole().printInfo("Compaction already enqueued with id " +
resp.getId() + "; State is " +
- resp.getState());
- }
- return resp;
+ private Map<String, org.apache.hadoop.hive.metastore.api.Partition>
convertPartitionsFromThriftToDB(
+ List<Partition> partitions) {
+ Map<String, org.apache.hadoop.hive.metastore.api.Partition> partitionMap =
new LinkedHashMap<>();
+ partitions.forEach(partition -> partitionMap.put(partition.getName(),
partition.getTPartition()));
+ return partitionMap;
}
- private void waitForCompactionToFinish(CompactionResponse resp) throws
HiveException {
+ private void waitForCompactionToFinish(CompactionResponse resp,
DDLOperationContext context) throws HiveException {
StringBuilder progressDots = new StringBuilder();
long waitTimeMs = 1000;
long waitTimeOut = HiveConf.getLongVar(context.getConf(),
HiveConf.ConfVars.HIVE_COMPACTOR_WAIT_TIMEOUT);
- wait: while (true) {
+ wait:
+ while (true) {
//double wait time until 5min
- waitTimeMs = waitTimeMs*2;
+ waitTimeMs = waitTimeMs * 2;
waitTimeMs = Math.min(waitTimeMs, waitTimeOut);
try {
Thread.sleep(waitTimeMs);
@@ -133,10 +148,11 @@ public class AlterTableCompactOperation extends
DDLOperation<AlterTableCompactDe
}
ShowCompactRequest request = new ShowCompactRequest();
request.setId(resp.getId());
-
+
ShowCompactResponse compaction =
context.getDb().showCompactions(request);
if (compaction.getCompactsSize() == 1) {
ShowCompactResponseElement comp = compaction.getCompacts().get(0);
+ LOG.debug("Response for cid: "+comp.getId()+" is "+comp.getState());
switch (comp.getState()) {
case TxnStore.WORKING_RESPONSE:
case TxnStore.INITIATED_RESPONSE:
@@ -146,11 +162,11 @@ public class AlterTableCompactOperation extends
DDLOperation<AlterTableCompactDe
continue wait;
default:
//done
- context.getConsole().printInfo("Compaction with id " +
resp.getId() + " finished with status: " +
- comp.getState());
+ context.getConsole()
+ .printInfo("Compaction with id " + resp.getId() + " finished
with status: " + comp.getState());
break wait;
}
- }else {
+ } else {
throw new HiveException("No suitable compaction found");
}
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a86c18baad9..bb48c8f219b 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -20,33 +20,12 @@ package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.Sets;
import org.apache.commons.lang3.StringUtils;
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ServerUtils;
-import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidTxnList;
-import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.Constants;
import org.apache.hadoop.hive.conf.HiveConf;
-import org.apache.hadoop.hive.metastore.api.CompactionRequest;
-import org.apache.hadoop.hive.metastore.api.CompactionResponse;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.Database;
-import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchObjectException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
-import org.apache.hadoop.hive.metastore.api.Partition;
-import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponse;
-import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
-import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
-import org.apache.hadoop.hive.metastore.api.Table;
-import org.apache.hadoop.hive.metastore.api.hive_metastoreConstants;
+import org.apache.hadoop.hive.metastore.api.*;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
-import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
-import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
@@ -54,28 +33,14 @@ import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.utils.MetaStoreUtils;
-import org.apache.hadoop.hive.ql.io.AcidDirectory;
import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDirectory;
-import org.apache.hadoop.hive.shims.HadoopShims.HdfsFileStatusWithId;
-import org.apache.hadoop.security.UserGroupInformation;
-import org.apache.hive.common.util.Ref;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.security.PrivilegedExceptionAction;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Comparator;
-import java.util.HashMap;
-import java.util.List;
-import java.util.LongSummaryStatistics;
-import java.util.Map;
-import java.util.Objects;
-import java.util.Set;
-import java.util.concurrent.ExecutorService;
+import java.util.*;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.stream.Collectors;
@@ -86,12 +51,10 @@ import static
org.apache.hadoop.hive.conf.Constants.COMPACTOR_INTIATOR_THREAD_NA
* A class to initiate compactions. This will run in a separate thread.
* It's critical that there exactly 1 of these in a given warehouse.
*/
-public class Initiator extends MetaStoreCompactorThread {
+public class Initiator extends InitiatorBase {
static final private String CLASS_NAME = Initiator.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
- static final private String COMPACTORTHRESHOLD_PREFIX =
"compactorthreshold.";
-
private ExecutorService compactionExecutor;
private boolean metricsEnabled;
@@ -178,7 +141,7 @@ public class Initiator extends MetaStoreCompactorThread {
}
Table t = metadataCache.computeIfAbsent(ci.getFullTableName(),
() -> resolveTable(ci));
- String poolName = getPoolName(ci, t);
+ ci.poolName = getPoolName(ci, t);
Partition p = resolvePartition(ci);
if (p == null && ci.partName != null) {
LOG.info("Can't find partition " + ci.getFullPartitionName() +
@@ -194,7 +157,7 @@ public class Initiator extends MetaStoreCompactorThread {
CompletableFuture<Void> asyncJob =
CompletableFuture.runAsync(
CompactorUtil.ThrowingRunnable.unchecked(() ->
- scheduleCompactionIfRequired(ci, t, p, poolName,
runAs, metricsEnabled)), compactionExecutor)
+ scheduleCompactionIfRequired(ci, t, p, runAs,
metricsEnabled)), compactionExecutor)
.exceptionally(exc -> {
LOG.error("Error while running scheduling the
compaction on the table {} / partition {}.", tableName, partition, exc);
return null;
@@ -250,36 +213,6 @@ public class Initiator extends MetaStoreCompactorThread {
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_TABLECACHE_ON);
}
- private void scheduleCompactionIfRequired(CompactionInfo ci, Table t,
Partition p, String poolName,
- String runAs, boolean
metricsEnabled)
- throws MetaException {
- StorageDescriptor sd = resolveStorageDescriptor(t, p);
- try {
- ValidWriteIdList validWriteIds = resolveValidWriteIds(t);
-
- checkInterrupt();
-
- CompactionType type = checkForCompaction(ci, validWriteIds, sd,
t.getParameters(), runAs);
- if (type != null) {
- ci.type = type;
- ci.poolName = poolName;
- requestCompaction(ci, runAs);
- }
- } catch (InterruptedException e) {
- //Handle InterruptedException separately so the compactioninfo won't be
marked as failed.
- LOG.info("Initiator pool is being shut down, task received
interruption.");
- } catch (Throwable ex) {
- String errorMessage = "Caught exception while trying to determine if we
should compact " + ci + ". Marking "
- + "failed to avoid repeated failures, " + ex;
- LOG.error(errorMessage);
- ci.errorMessage = errorMessage;
- if (metricsEnabled) {
-
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
- }
- txnHandler.markFailed(ci);
- }
- }
-
private String getPoolName(CompactionInfo ci, Table t) throws Exception {
Map<String, String> params = t.getParameters();
String poolName = params == null ? null :
params.get(Constants.HIVE_COMPACTOR_WORKER_POOL);
@@ -294,16 +227,7 @@ public class Initiator extends MetaStoreCompactorThread {
return CompactorUtil.resolveDatabase(conf, ci.dbname);
}
- private ValidWriteIdList resolveValidWriteIds(Table t) throws
NoSuchTxnException, MetaException {
- ValidTxnList validTxnList = new
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
- // The response will have one entry per table and hence we get only one
ValidWriteIdList
- String fullTableName = TxnUtils.getFullTableName(t.getDbName(),
t.getTableName());
- GetValidWriteIdsRequest rqst = new
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
- rqst.setValidTxnList(validTxnList.writeToString());
- return TxnUtils.createValidCompactWriteIdList(
- txnHandler.getValidWriteIds(rqst).getTblValidWriteIds().get(0));
- }
@VisibleForTesting
protected String resolveUserToRunAs(Map<String, String> cache, Table t,
Partition p)
@@ -394,160 +318,6 @@ public class Initiator extends MetaStoreCompactorThread {
return false;
}
- private CompactionType checkForCompaction(final CompactionInfo ci,
- final ValidWriteIdList writeIds,
- final StorageDescriptor sd,
- final Map<String, String>
tblproperties,
- final String runAs)
- throws IOException, InterruptedException {
- // If it's marked as too many aborted, we already know we need to compact
- if (ci.tooManyAborts) {
- LOG.debug("Found too many aborted transactions for " +
ci.getFullPartitionName() + ", " +
- "initiating major compaction");
- return CompactionType.MAJOR;
- }
-
- if (ci.hasOldAbort) {
- HiveConf.ConfVars oldAbortedTimeoutProp =
- HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
- LOG.debug("Found an aborted transaction for " + ci.getFullPartitionName()
- + " with age older than threshold " + oldAbortedTimeoutProp + ": " +
conf
- .getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
- + "Initiating minor compaction.");
- return CompactionType.MINOR;
- }
- AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds);
- long baseSize = getBaseSize(acidDirectory);
- FileSystem fs = acidDirectory.getFs();
- Map<Path, Long> deltaSizes = new HashMap<>();
- for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
- deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
- }
- long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
- AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName,
ci.partName, conf, txnHandler,
- baseSize, deltaSizes, acidDirectory.getObsolete());
-
- if (runJobAsSelf(runAs)) {
- return determineCompactionType(ci, acidDirectory, tblproperties,
baseSize, deltaSize);
- } else {
- LOG.info("Going to initiate as user " + runAs + " for " +
ci.getFullPartitionName());
- UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
- UserGroupInformation.getLoginUser());
- CompactionType compactionType;
- try {
- compactionType = ugi.doAs(
- (PrivilegedExceptionAction<CompactionType>) () ->
determineCompactionType(ci, acidDirectory, tblproperties, baseSize, deltaSize));
- } finally {
- try {
- FileSystem.closeAllForUGI(ugi);
- } catch (IOException exception) {
- LOG.error("Could not clean up file-system handles for UGI: " + ugi +
" for " +
- ci.getFullPartitionName(), exception);
- }
- }
- return compactionType;
- }
- }
-
- private AcidDirectory getAcidDirectory(StorageDescriptor sd,
ValidWriteIdList writeIds) throws IOException {
- Path location = new Path(sd.getLocation());
- FileSystem fs = location.getFileSystem(conf);
- return AcidUtils.getAcidState(fs, location, conf, writeIds,
Ref.from(false), false);
- }
-
- private CompactionType determineCompactionType(CompactionInfo ci,
AcidDirectory dir, Map<String,
- String> tblproperties, long baseSize, long deltaSize) {
- boolean noBase = false;
- List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
- if (baseSize == 0 && deltaSize > 0) {
- noBase = true;
- } else {
- String deltaPctProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
- HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
- float deltaPctThreshold = deltaPctProp == null ?
- HiveConf.getFloatVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) :
- Float.parseFloat(deltaPctProp);
- boolean bigEnough = (float)deltaSize/(float)baseSize >
deltaPctThreshold;
- boolean multiBase = dir.getObsolete().stream()
- .anyMatch(path ->
path.getName().startsWith(AcidUtils.BASE_PREFIX));
-
- boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
- if (LOG.isDebugEnabled()) {
- StringBuilder msg = new StringBuilder("delta size: ");
- msg.append(deltaSize);
- msg.append(" base size: ");
- msg.append(baseSize);
- msg.append(" multiBase ");
- msg.append(multiBase);
- msg.append(" deltaSize ");
- msg.append(deltaSize);
- msg.append(" threshold: ");
- msg.append(deltaPctThreshold);
- msg.append(" delta/base ratio >
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
- .append(": ");
- msg.append(bigEnough);
- msg.append(".");
- if (!initiateMajor) {
- msg.append("not");
- }
- msg.append(" initiating major compaction.");
- LOG.debug(msg.toString());
- }
- if (initiateMajor) return CompactionType.MAJOR;
- }
-
- String deltaNumProp = tblproperties.get(COMPACTORTHRESHOLD_PREFIX +
- HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
- int deltaNumThreshold = deltaNumProp == null ?
- HiveConf.getIntVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
- Integer.parseInt(deltaNumProp);
- boolean enough = deltas.size() > deltaNumThreshold;
- if (!enough) {
- LOG.debug("Not enough deltas to initiate compaction for table=" +
ci.tableName + "partition=" + ci.partName
- + ". Found: " + deltas.size() + " deltas, threshold is " +
deltaNumThreshold);
- return null;
- }
- // If there's no base file, do a major compaction
- LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no"
: "has") + " base," +
- "requesting " + (noBase ? "major" : "minor") + " compaction");
-
- return noBase || !isMinorCompactionSupported(tblproperties, dir) ?
- CompactionType.MAJOR : CompactionType.MINOR;
- }
-
- private long getBaseSize(AcidDirectory dir) throws IOException {
- long baseSize = 0;
- if (dir.getBase() != null) {
- baseSize = getDirSize(dir.getFs(), dir.getBase());
- } else {
- for (HdfsFileStatusWithId origStat : dir.getOriginalFiles()) {
- baseSize += origStat.getFileStatus().getLen();
- }
- }
- return baseSize;
- }
-
- private long getDirSize(FileSystem fs, ParsedDirectory dir) throws
IOException {
- return dir.getFiles(fs, Ref.from(false)).stream()
- .map(HdfsFileStatusWithId::getFileStatus)
- .mapToLong(FileStatus::getLen)
- .sum();
- }
-
- private void requestCompaction(CompactionInfo ci, String runAs) throws
MetaException {
- CompactionRequest rqst = new CompactionRequest(ci.dbname, ci.tableName,
ci.type);
- if (ci.partName != null) rqst.setPartitionname(ci.partName);
- rqst.setRunas(runAs);
- rqst.setInitiatorId(getInitiatorId(Thread.currentThread().getId()));
- rqst.setInitiatorVersion(this.runtimeVersion);
- rqst.setPoolName(ci.poolName);
- LOG.info("Requesting compaction: " + rqst);
- CompactionResponse resp = txnHandler.compact(rqst);
- if(resp.isAccepted()) {
- ci.id = resp.getId();
- }
- }
-
// Check if it's a dynamic partitioning case. If so, do not initiate
compaction for streaming ingest, only for aborts.
private static boolean isDynPartIngest(Table t, CompactionInfo ci){
if (t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0 &&
@@ -637,13 +407,6 @@ public class Initiator extends MetaStoreCompactorThread {
return true;
}
- private String getInitiatorId(long threadId) {
- StringBuilder name = new StringBuilder(this.hostName);
- name.append("-");
- name.append(threadId);
- return name.toString();
- }
-
private static class InitiatorCycleUpdater implements Runnable {
private final String metric;
private final long startedAt;
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java
new file mode 100644
index 00000000000..14dd2ebffe0
--- /dev/null
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/InitiatorBase.java
@@ -0,0 +1,309 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.ql.txn.compactor;
+
+import org.apache.commons.collections4.CollectionUtils;
+import org.apache.commons.lang3.StringUtils;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
+import org.apache.hadoop.hive.metastore.api.CompactionRequest;
+import org.apache.hadoop.hive.metastore.api.CompactionResponse;
+import org.apache.hadoop.hive.metastore.api.CompactionType;
+import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
+import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
+import org.apache.hadoop.hive.metastore.api.Partition;
+import org.apache.hadoop.hive.metastore.api.StorageDescriptor;
+import org.apache.hadoop.hive.metastore.api.Table;
+import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.metrics.Metrics;
+import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
+import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
+import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
+import org.apache.hadoop.hive.metastore.utils.JavaUtils;
+import org.apache.hadoop.hive.ql.io.AcidDirectory;
+import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.shims.HadoopShims;
+import org.apache.hadoop.security.UserGroupInformation;
+import org.apache.hive.common.util.Ref;
+
+import java.io.IOException;
+import java.security.PrivilegedExceptionAction;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
+
+public class InitiatorBase extends MetaStoreCompactorThread {
+
+ static final private String COMPACTOR_THRESHOLD_PREFIX =
"compactorthreshold.";
+
+ private List<CompactionResponse>
initiateCompactionForMultiplePartitions(Table table,
+ Map<String, Partition> partitions, CompactionRequest request) {
+ List<CompactionResponse> compactionResponses = new ArrayList<>();
+ partitions.entrySet().parallelStream().forEach(entry -> {
+ try {
+ StorageDescriptor sd = resolveStorageDescriptor(table,
entry.getValue());
+ String runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+ CompactionInfo ci =
+ new CompactionInfo(table.getDbName(), table.getTableName(),
entry.getKey(), request.getType());
+ ci.initiatorId = request.getInitiatorId();
+ ci.orderByClause = request.getOrderByClause();
+ ci.initiatorVersion = request.getInitiatorVersion();
+ if (request.getNumberOfBuckets() > 0) {
+ ci.numberOfBuckets = request.getNumberOfBuckets();
+ }
+ ci.poolName = request.getPoolName();
+ LOG.info(
+ "Checking to see if we should compact partition " + entry.getKey()
+ " of table " + table.getDbName() + "."
+ + table.getTableName());
+ CollectionUtils.addIgnoreNull(compactionResponses,
+ scheduleCompactionIfRequired(ci, table, entry.getValue(), runAs,
false));
+ } catch (IOException | InterruptedException | MetaException e) {
+ LOG.error(
+ "Error occurred while Checking if we should compact partition " +
entry.getKey() + " of table " + table.getDbName() + "."
+ + table.getTableName() + " Exception: " + e.getMessage());
+ throw new RuntimeException(e);
+ }
+ });
+ return compactionResponses;
+ }
+
+ public List<CompactionResponse> initiateCompactionForTable(CompactionRequest
request, Table table, Map<String, Partition> partitions) throws Exception {
+ ValidTxnList validTxnList =
TxnCommonUtils.createValidReadTxnList(txnHandler.getOpenTxns(), 0);
+ conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+
+ if (request.getPartitionname()!= null || partitions.isEmpty()) {
+ List<CompactionResponse> responses = new ArrayList<>();
+ responses.add(txnHandler.compact(request));
+ return responses;
+ } else {
+ return initiateCompactionForMultiplePartitions(table, partitions,
request);
+ }
+ }
+
+ @Override protected boolean isCacheEnabled() {
+ return false;
+ }
+
+ private String getInitiatorId(long threadId) {
+ return this.hostName + "-" + threadId;
+ }
+
+ private CompactionResponse requestCompaction(CompactionInfo ci, String
runAs) throws MetaException {
+ CompactionRequest compactionRequest = new CompactionRequest(ci.dbname,
ci.tableName, ci.type);
+ if (ci.partName != null)
+ compactionRequest.setPartitionname(ci.partName);
+ compactionRequest.setRunas(runAs);
+ if (StringUtils.isEmpty(ci.initiatorId)) {
+
compactionRequest.setInitiatorId(getInitiatorId(Thread.currentThread().getId()));
+ } else {
+ compactionRequest.setInitiatorId(ci.initiatorId);
+ }
+ compactionRequest.setInitiatorVersion(this.runtimeVersion);
+ compactionRequest.setPoolName(ci.poolName);
+ LOG.info("Requesting compaction: " + compactionRequest);
+ CompactionResponse resp = txnHandler.compact(compactionRequest);
+ if (resp.isAccepted()) {
+ ci.id = resp.getId();
+ }
+ return resp;
+ }
+
+ private AcidDirectory getAcidDirectory(StorageDescriptor sd,
ValidWriteIdList writeIds) throws IOException {
+ Path location = new Path(sd.getLocation());
+ FileSystem fs = location.getFileSystem(conf);
+ return AcidUtils.getAcidState(fs, location, conf, writeIds,
Ref.from(false), false);
+ }
+
+ private CompactionType determineCompactionType(CompactionInfo ci,
AcidDirectory dir,
+ Map<String, String> tblProperties, long baseSize, long deltaSize) {
+ boolean noBase = false;
+ List<AcidUtils.ParsedDelta> deltas = dir.getCurrentDirectories();
+ if (baseSize == 0 && deltaSize > 0) {
+ noBase = true;
+ } else {
+ String deltaPctProp =
+ tblProperties.get(COMPACTOR_THRESHOLD_PREFIX +
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD);
+ float deltaPctThreshold = deltaPctProp == null ?
HiveConf.getFloatVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD) :
Float.parseFloat(deltaPctProp);
+ boolean bigEnough = (float) deltaSize / (float) baseSize >
deltaPctThreshold;
+ boolean multiBase = dir.getObsolete().stream().anyMatch(path ->
path.getName().startsWith(AcidUtils.BASE_PREFIX));
+
+ boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
+ if (LOG.isDebugEnabled()) {
+ StringBuilder msg = new StringBuilder("delta size: ");
+ msg.append(deltaSize);
+ msg.append(" base size: ");
+ msg.append(baseSize);
+ msg.append(" multiBase ");
+ msg.append(multiBase);
+ msg.append(" deltaSize ");
+ msg.append(deltaSize);
+ msg.append(" threshold: ");
+ msg.append(deltaPctThreshold);
+ msg.append(" delta/base ratio >
").append(HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_PCT_THRESHOLD.varname)
+ .append(": ");
+ msg.append(bigEnough);
+ msg.append(".");
+ if (!initiateMajor) {
+ msg.append("not");
+ }
+ msg.append(" initiating major compaction.");
+ LOG.debug(msg.toString());
+ }
+ if (initiateMajor)
+ return CompactionType.MAJOR;
+ }
+
+ String deltaNumProp =
+ tblProperties.get(COMPACTOR_THRESHOLD_PREFIX +
HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD);
+ int deltaNumThreshold = deltaNumProp == null ? HiveConf.getIntVar(conf,
+ HiveConf.ConfVars.HIVE_COMPACTOR_DELTA_NUM_THRESHOLD) :
Integer.parseInt(deltaNumProp);
+ boolean enough = deltas.size() > deltaNumThreshold;
+ if (!enough) {
+ LOG.debug("Not enough deltas to initiate compaction for table=" +
ci.tableName + "partition=" + ci.partName
+ + ". Found: " + deltas.size() + " deltas, threshold is " +
deltaNumThreshold);
+ return null;
+ }
+ // If there's no base file, do a major compaction
+ LOG.debug("Found " + deltas.size() + " delta files, and " + (noBase ? "no"
: "has") + " base," + "requesting "
+ + (noBase ? "major" : "minor") + " compaction");
+
+ return noBase || !isMinorCompactionSupported(tblProperties, dir) ?
CompactionType.MAJOR : CompactionType.MINOR;
+ }
+
+ private long getBaseSize(AcidDirectory dir) throws IOException {
+ long baseSize = 0;
+ if (dir.getBase() != null) {
+ baseSize = getDirSize(dir.getFs(), dir.getBase());
+ } else {
+ for (HadoopShims.HdfsFileStatusWithId origStat : dir.getOriginalFiles())
{
+ baseSize += origStat.getFileStatus().getLen();
+ }
+ }
+ return baseSize;
+ }
+
+ private long getDirSize(FileSystem fs, AcidUtils.ParsedDirectory dir) throws
IOException {
+ return dir.getFiles(fs,
Ref.from(false)).stream().map(HadoopShims.HdfsFileStatusWithId::getFileStatus)
+ .mapToLong(FileStatus::getLen).sum();
+ }
+
+ private CompactionType checkForCompaction(final CompactionInfo ci, final
ValidWriteIdList writeIds,
+ final StorageDescriptor sd, final Map<String, String> tblProperties,
final String runAs)
+ throws IOException, InterruptedException {
+ // If it's marked as too many aborted, we already know we need to compact
+ if (ci.tooManyAborts) {
+ LOG.debug("Found too many aborted transactions for " +
ci.getFullPartitionName() + ", "
+ + "initiating major compaction");
+ return CompactionType.MAJOR;
+ }
+
+ if (ci.hasOldAbort) {
+ HiveConf.ConfVars oldAbortedTimeoutProp =
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
+ LOG.debug("Found an aborted transaction for " +
ci.getFullPartitionName() + " with age older than threshold "
+ + oldAbortedTimeoutProp + ": " +
conf.getTimeVar(oldAbortedTimeoutProp, TimeUnit.HOURS) + " hours. "
+ + "Initiating minor compaction.");
+ return CompactionType.MINOR;
+ }
+ AcidDirectory acidDirectory = getAcidDirectory(sd, writeIds);
+ long baseSize = getBaseSize(acidDirectory);
+ FileSystem fs = acidDirectory.getFs();
+ Map<Path, Long> deltaSizes = new HashMap<>();
+ for (AcidUtils.ParsedDelta delta : acidDirectory.getCurrentDirectories()) {
+ deltaSizes.put(delta.getPath(), getDirSize(fs, delta));
+ }
+ long deltaSize = deltaSizes.values().stream().reduce(0L, Long::sum);
+ AcidMetricService.updateMetricsFromInitiator(ci.dbname, ci.tableName,
ci.partName, conf, txnHandler, baseSize,
+ deltaSizes, acidDirectory.getObsolete());
+
+ if (runJobAsSelf(runAs)) {
+ return determineCompactionType(ci, acidDirectory, tblProperties,
baseSize, deltaSize);
+ } else {
+ LOG.info("Going to initiate as user " + runAs + " for " +
ci.getFullPartitionName());
+ UserGroupInformation ugi = UserGroupInformation.createProxyUser(runAs,
UserGroupInformation.getLoginUser());
+ CompactionType compactionType;
+ try {
+ compactionType = ugi.doAs(
+ (PrivilegedExceptionAction<CompactionType>) () ->
determineCompactionType(ci, acidDirectory, tblProperties,
+ baseSize, deltaSize));
+ } finally {
+ try {
+ FileSystem.closeAllForUGI(ugi);
+ } catch (IOException exception) {
+ LOG.error("Could not clean up file-system handles for UGI: " + ugi +
" for " + ci.getFullPartitionName(),
+ exception);
+ }
+ }
+ return compactionType;
+ }
+ }
+
+ private ValidWriteIdList resolveValidWriteIds(Table t)
+ throws NoSuchTxnException, MetaException {
+ ValidTxnList validTxnList = new
ValidReadTxnList(conf.get(ValidTxnList.VALID_TXNS_KEY));
+ // The response will have one entry per table and hence we get only one
ValidWriteIdList
+ String fullTableName = TxnUtils.getFullTableName(t.getDbName(),
t.getTableName());
+ GetValidWriteIdsRequest validWriteIdsRequest = new
GetValidWriteIdsRequest(Collections.singletonList(fullTableName));
+ validWriteIdsRequest.setValidTxnList(validTxnList.writeToString());
+
+ return
TxnUtils.createValidCompactWriteIdList(txnHandler.getValidWriteIds(validWriteIdsRequest).getTblValidWriteIds().get(0));
+ }
+
+ protected CompactionResponse scheduleCompactionIfRequired(CompactionInfo ci,
Table t,
+ Partition p, String runAs, boolean metricsEnabled)
+ throws MetaException {
+ StorageDescriptor sd = resolveStorageDescriptor(t, p);
+ try {
+ ValidWriteIdList validWriteIds = resolveValidWriteIds(t);
+
+ checkInterrupt();
+
+ CompactionType type = checkForCompaction(ci, validWriteIds, sd,
t.getParameters(), runAs);
+ if (type != null) {
+ ci.type = type;
+ return requestCompaction(ci, runAs);
+ }
+ } catch (InterruptedException e) {
+ //Handle InterruptedException separately so the compactionInfo won't be
marked as failed.
+ LOG.info("Initiator pool is being shut down, task received
interruption.");
+ } catch (Throwable ex) {
+ String errorMessage = "Caught exception while trying to determine if we
should compact " + ci + ". Marking "
+ + "failed to avoid repeated failures, " + ex;
+ LOG.error(errorMessage);
+ ci.errorMessage = errorMessage;
+ if (metricsEnabled) {
+
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_INITIATOR_FAILURE_COUNTER).inc();
+ }
+ txnHandler.markFailed(ci);
+ }
+ return null;
+ }
+
+}
diff --git a/ql/src/test/queries/clientpositive/manual_compaction.q
b/ql/src/test/queries/clientpositive/manual_compaction.q
new file mode 100644
index 00000000000..a794ff7555a
--- /dev/null
+++ b/ql/src/test/queries/clientpositive/manual_compaction.q
@@ -0,0 +1,68 @@
+-- Mask the enqueue time which is based on current time
+--! qt:replace:/(initiated\s+---\s+---\s+)[0-9]*(\s+---)/$1#Masked#$2/
+--! qt:replace:/(---\s+)[a-zA-Z0-9\-]+(\s+manual)/$1#Masked#$2/
+-- Mask the hostname in show compaction
+--! qt:replace:/(---\s+)[\S]*(\s+manual)/$1#Masked#$2/
+-- Mask compaction id as they will be allocated in parallel threads
+--! qt:replace:/^[0-9]/#Masked#/
+
+set hive.support.concurrency=true;
+set hive.txn.manager=org.apache.hadoop.hive.ql.lockmgr.DbTxnManager;
+
+create table UN_PARTITIONED_T(key string, val string) clustered by (val) into
2 buckets stored as ORC TBLPROPERTIES ('transactional'='true');
+
+create table UN_PARTITIONED_T_MINOR(key string, val string) clustered by (val)
into 2 buckets stored as ORC TBLPROPERTIES ('transactional'='true');
+
+create table PARTITIONED_T(key string, val string) partitioned by (dt string)
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES
('transactional'='true');
+
+alter table UN_PARTITIONED_T compact 'major';
+
+alter table UN_PARTITIONED_T_MINOR compact 'minor';
+
+alter table PARTITIONED_T add partition(dt='2023');
+
+insert into PARTITIONED_T partition(dt='2023') values ('k1','v1');
+insert into PARTITIONED_T partition(dt='2023') values ('k2','v2');
+insert into PARTITIONED_T partition(dt='2023') values ('k3','v3');
+
+alter table PARTITIONED_T partition(dt='2023') compact 'minor';
+
+SHOW COMPACTIONS ORDER BY 'PARTITION' DESC;
+
+alter table PARTITIONED_T add partition(dt='2024');
+
+insert into PARTITIONED_T partition(dt='2024') values ('k1','v1');
+insert into PARTITIONED_T partition(dt='2024') values ('k2','v2');
+insert into PARTITIONED_T partition(dt='2024') values ('k3','v3');
+insert into PARTITIONED_T partition(dt='2024') values ('k4','v4');
+insert into PARTITIONED_T partition(dt='2024') values ('k5','v5');
+insert into PARTITIONED_T partition(dt='2024') values ('k6','v6');
+insert into PARTITIONED_T partition(dt='2024') values ('k7','v7');
+insert into PARTITIONED_T partition(dt='2024') values ('k8','v8');
+insert into PARTITIONED_T partition(dt='2024') values ('k9','v9');
+insert into PARTITIONED_T partition(dt='2024') values ('k10','v10');
+insert into PARTITIONED_T partition(dt='2024') values ('k11','v11');
+
+insert into PARTITIONED_T partition(dt='2022') values ('k1','v1');
+insert into PARTITIONED_T partition(dt='2022') values ('k2','v2');
+insert into PARTITIONED_T partition(dt='2022') values ('k3','v3');
+insert into PARTITIONED_T partition(dt='2022') values ('k4','v4');
+insert into PARTITIONED_T partition(dt='2022') values ('k5','v5');
+insert into PARTITIONED_T partition(dt='2022') values ('k6','v6');
+insert into PARTITIONED_T partition(dt='2022') values ('k7','v7');
+insert into PARTITIONED_T partition(dt='2022') values ('k8','v8');
+insert into PARTITIONED_T partition(dt='2022') values ('k9','v9');
+insert into PARTITIONED_T partition(dt='2022') values ('k10','v10');
+insert into PARTITIONED_T partition(dt='2022') values ('k11','v11');
+
+explain alter table PARTITIONED_T compact 'major';
+
+alter table PARTITIONED_T compact 'major';
+
+SHOW COMPACTIONS ORDER BY 'PARTITION' DESC;
+
+drop table UN_PARTITIONED_T;
+
+drop table UN_PARTITIONED_T_MINOR;
+
+drop table PARTITIONED_T;
diff --git a/ql/src/test/results/clientpositive/llap/manual_compaction.q.out
b/ql/src/test/results/clientpositive/llap/manual_compaction.q.out
new file mode 100644
index 00000000000..90dddd709e1
--- /dev/null
+++ b/ql/src/test/results/clientpositive/llap/manual_compaction.q.out
@@ -0,0 +1,387 @@
+PREHOOK: query: create table UN_PARTITIONED_T(key string, val string)
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES
('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@UN_PARTITIONED_T
+POSTHOOK: query: create table UN_PARTITIONED_T(key string, val string)
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES
('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@UN_PARTITIONED_T
+PREHOOK: query: create table UN_PARTITIONED_T_MINOR(key string, val string)
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES
('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@UN_PARTITIONED_T_MINOR
+POSTHOOK: query: create table UN_PARTITIONED_T_MINOR(key string, val string)
clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES
('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@UN_PARTITIONED_T_MINOR
+PREHOOK: query: create table PARTITIONED_T(key string, val string) partitioned
by (dt string) clustered by (val) into 2 buckets stored as ORC TBLPROPERTIES
('transactional'='true')
+PREHOOK: type: CREATETABLE
+PREHOOK: Output: database:default
+PREHOOK: Output: default@PARTITIONED_T
+POSTHOOK: query: create table PARTITIONED_T(key string, val string)
partitioned by (dt string) clustered by (val) into 2 buckets stored as ORC
TBLPROPERTIES ('transactional'='true')
+POSTHOOK: type: CREATETABLE
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@PARTITIONED_T
+PREHOOK: query: alter table UN_PARTITIONED_T compact 'major'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@un_partitioned_t
+PREHOOK: Output: default@un_partitioned_t
+POSTHOOK: query: alter table UN_PARTITIONED_T compact 'major'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@un_partitioned_t
+POSTHOOK: Output: default@un_partitioned_t
+PREHOOK: query: alter table UN_PARTITIONED_T_MINOR compact 'minor'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@un_partitioned_t_minor
+PREHOOK: Output: default@un_partitioned_t_minor
+POSTHOOK: query: alter table UN_PARTITIONED_T_MINOR compact 'minor'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@un_partitioned_t_minor
+POSTHOOK: Output: default@un_partitioned_t_minor
+PREHOOK: query: alter table PARTITIONED_T add partition(dt='2023')
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: alter table PARTITIONED_T add partition(dt='2023')
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t@dt=2023
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2023') values
('k1','v1')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2023') values
('k1','v1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2023') values
('k2','v2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2023') values
('k2','v2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2023') values
('k3','v3')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2023') values
('k3','v3')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2023).val SCRIPT []
+PREHOOK: query: alter table PARTITIONED_T partition(dt='2023') compact 'minor'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@partitioned_t
+PREHOOK: Output: default@partitioned_t@dt=2023
+POSTHOOK: query: alter table PARTITIONED_T partition(dt='2023') compact 'minor'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t@dt=2023
+PREHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC
+POSTHOOK: type: SHOW COMPACTIONS
+CompactionId Database Table Partition Type State Worker
host Worker Enqueue Time Start Time Duration(ms) HadoopJobId
Error message Initiator host Initiator Pool name TxnId Next
TxnId Commit Time Highest WriteId
+#Masked# default un_partitioned_t_minor --- MINOR initiated
--- --- #Masked# --- --- --- --- #Masked#
manual default 0 0 0 ---
+#Masked# default un_partitioned_t --- MAJOR initiated
--- --- #Masked# --- --- --- --- #Masked#
manual default 0 0 0 ---
+#Masked# default partitioned_t dt=2023 MINOR initiated ---
--- #Masked# --- --- --- --- #Masked# manual
default 0 0 0 ---
+PREHOOK: query: alter table PARTITIONED_T add partition(dt='2024')
+PREHOOK: type: ALTERTABLE_ADDPARTS
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: alter table PARTITIONED_T add partition(dt='2024')
+POSTHOOK: type: ALTERTABLE_ADDPARTS
+POSTHOOK: Output: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t@dt=2024
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k1','v1')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k1','v1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k2','v2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k2','v2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k3','v3')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k3','v3')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k4','v4')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k4','v4')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k5','v5')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k5','v5')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k6','v6')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k6','v6')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k7','v7')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k7','v7')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k8','v8')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k8','v8')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k9','v9')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k9','v9')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k10','v10')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k10','v10')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k11','v11')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2024') values
('k11','v11')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2024
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2024).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k1','v1')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k1','v1')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k2','v2')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k2','v2')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k3','v3')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k3','v3')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k4','v4')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k4','v4')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k5','v5')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k5','v5')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k6','v6')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k6','v6')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k7','v7')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k7','v7')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k8','v8')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k8','v8')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k9','v9')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k9','v9')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k10','v10')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k10','v10')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k11','v11')
+PREHOOK: type: QUERY
+PREHOOK: Input: _dummy_database@_dummy_table
+PREHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: query: insert into PARTITIONED_T partition(dt='2022') values
('k11','v11')
+POSTHOOK: type: QUERY
+POSTHOOK: Input: _dummy_database@_dummy_table
+POSTHOOK: Output: default@partitioned_t@dt=2022
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).key SCRIPT []
+POSTHOOK: Lineage: partitioned_t PARTITION(dt=2022).val SCRIPT []
+PREHOOK: query: explain alter table PARTITIONED_T compact 'major'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@partitioned_t
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: explain alter table PARTITIONED_T compact 'major'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t
+STAGE DEPENDENCIES:
+ Stage-0 is a root stage
+
+STAGE PLANS:
+ Stage: Stage-0
+ Compact
+ compaction type: major
+ table name: default.PARTITIONED_T
+ numberOfBuckets: 0
+ table name: default.PARTITIONED_T
+
+PREHOOK: query: alter table PARTITIONED_T compact 'major'
+PREHOOK: type: ALTERTABLE_COMPACT
+PREHOOK: Input: default@partitioned_t
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: alter table PARTITIONED_T compact 'major'
+POSTHOOK: type: ALTERTABLE_COMPACT
+POSTHOOK: Input: default@partitioned_t
+POSTHOOK: Output: default@partitioned_t
+PREHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC
+PREHOOK: type: SHOW COMPACTIONS
+POSTHOOK: query: SHOW COMPACTIONS ORDER BY 'PARTITION' DESC
+POSTHOOK: type: SHOW COMPACTIONS
+CompactionId Database Table Partition Type State Worker
host Worker Enqueue Time Start Time Duration(ms) HadoopJobId
Error message Initiator host Initiator Pool name TxnId Next
TxnId Commit Time Highest WriteId
+#Masked# default un_partitioned_t_minor --- MINOR initiated
--- --- #Masked# --- --- --- --- #Masked#
manual default 0 0 0 ---
+#Masked# default un_partitioned_t --- MAJOR initiated
--- --- #Masked# --- --- --- --- #Masked#
manual default 0 0 0 ---
+#Masked# default partitioned_t dt=2024 MAJOR initiated ---
--- #Masked# --- --- --- --- #Masked# manual
default 0 0 0 ---
+#Masked# default partitioned_t dt=2023 MINOR initiated ---
--- #Masked# --- --- --- --- #Masked# manual
default 0 0 0 ---
+#Masked# default partitioned_t dt=2022 MAJOR initiated ---
--- #Masked# --- --- --- --- #Masked# manual
default 0 0 0 ---
+PREHOOK: query: drop table UN_PARTITIONED_T
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@un_partitioned_t
+PREHOOK: Output: database:default
+PREHOOK: Output: default@un_partitioned_t
+POSTHOOK: query: drop table UN_PARTITIONED_T
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@un_partitioned_t
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@un_partitioned_t
+PREHOOK: query: drop table UN_PARTITIONED_T_MINOR
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@un_partitioned_t_minor
+PREHOOK: Output: database:default
+PREHOOK: Output: default@un_partitioned_t_minor
+POSTHOOK: query: drop table UN_PARTITIONED_T_MINOR
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@un_partitioned_t_minor
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@un_partitioned_t_minor
+PREHOOK: query: drop table PARTITIONED_T
+PREHOOK: type: DROPTABLE
+PREHOOK: Input: default@partitioned_t
+PREHOOK: Output: database:default
+PREHOOK: Output: default@partitioned_t
+POSTHOOK: query: drop table PARTITIONED_T
+POSTHOOK: type: DROPTABLE
+POSTHOOK: Input: default@partitioned_t
+POSTHOOK: Output: database:default
+POSTHOOK: Output: default@partitioned_t