This is an automated email from the ASF dual-hosted git repository.
deniskuzZ pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 341b863ee5d HIVE-29571: ACID Compaction: Refused compaction Txn should
not be aborted (#6517)
341b863ee5d is described below
commit 341b863ee5d42c0208407fad362aa905b6891f25
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Thu Jun 4 18:45:46 2026 +0300
HIVE-29571: ACID Compaction: Refused compaction Txn should not be aborted
(#6517)
---
.../compactor/service/AcidCompactionService.java | 136 ++++++++++-----------
1 file changed, 63 insertions(+), 73 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
index cf6e4c02e4c..f513d289fa9 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
@@ -126,57 +126,56 @@ public void cleanupResultDirs(CompactionInfo ci) {
public Boolean compact(Table table, CompactionInfo ci) throws Exception {
- try (CompactionTxn compactionTxn = new CompactionTxn()) {
-
- if (ci.isRebalanceCompaction() && table.getSd().getNumBuckets() > 0) {
- LOG.error("Cannot execute rebalancing compaction on bucketed tables.");
- ci.errorMessage = "Cannot execute rebalancing compaction on bucketed
tables.";
- msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
- return false;
- }
+ if (ci.isRebalanceCompaction() && table.getSd().getNumBuckets() > 0) {
+ LOG.error("Cannot execute rebalancing compaction on bucketed tables.");
+ ci.errorMessage = "Cannot execute rebalancing compaction on bucketed
tables.";
+ msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
+ return false;
+ }
- if (!ci.type.equals(CompactionType.REBALANCE) && ci.numberOfBuckets > 0)
{
- if (LOG.isWarnEnabled()) {
- LOG.warn("Only the REBALANCE compaction accepts the number of
buckets clause (CLUSTERED INTO {N} BUCKETS). " +
- "Since the compaction request is {}, it will be ignored.",
ci.type);
- }
+ if (!ci.type.equals(CompactionType.REBALANCE) && ci.numberOfBuckets > 0) {
+ if (LOG.isWarnEnabled()) {
+ LOG.warn("Only the REBALANCE compaction accepts the number of buckets
clause (CLUSTERED INTO {N} BUCKETS). " +
+ "Since the compaction request is {}, it will be ignored.",
ci.type);
}
+ }
- String fullTableName = TxnUtils.getFullTableName(table.getDbName(),
table.getTableName());
+ String fullTableName = TxnUtils.getFullTableName(table.getDbName(),
table.getTableName());
- // Find the partition we will be working with, if there is one.
- Partition p;
- try {
- p = CompactorUtil.resolvePartition(conf, msc, ci.dbname, ci.tableName,
ci.partName,
- CompactorUtil.METADATA_FETCH_MODE.REMOTE);
- if (p == null && ci.partName != null) {
- ci.errorMessage = "Unable to find partition " +
ci.getFullPartitionName() + ", assuming it was dropped and moving on.";
- LOG.warn(ci.errorMessage + " Compaction info: {}", ci);
- msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
- return false;
- }
- } catch (Exception e) {
- LOG.error("Unexpected error during resolving partition.", e);
- ci.errorMessage = e.getMessage();
- msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+ // Find the partition we will be working with, if there is one.
+ Partition p;
+ try {
+ p = CompactorUtil.resolvePartition(conf, msc, ci.dbname, ci.tableName,
ci.partName,
+ CompactorUtil.METADATA_FETCH_MODE.REMOTE);
+ if (p == null && ci.partName != null) {
+ ci.errorMessage = "Unable to find partition " +
ci.getFullPartitionName() + ", assuming it was dropped and moving on.";
+ LOG.warn(ci.errorMessage + " Compaction info: {}", ci);
+ msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
return false;
}
+ } catch (Exception e) {
+ LOG.error("Unexpected error during resolving partition.", e);
+ ci.errorMessage = e.getMessage();
+ msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+ return false;
+ }
- CompactorUtil.checkInterrupt(CLASS_NAME);
+ CompactorUtil.checkInterrupt(CLASS_NAME);
- // Find the appropriate storage descriptor
- sd = CompactorUtil.resolveStorageDescriptor(table, p);
+ // Find the appropriate storage descriptor
+ sd = CompactorUtil.resolveStorageDescriptor(table, p);
- if (isTableSorted(sd, ci)) {
- return false;
- }
+ if (isTableSorted(sd, ci)) {
+ return false;
+ }
- if (ci.runAs == null) {
- ci.runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
- }
+ if (ci.runAs == null) {
+ ci.runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+ }
- CompactorUtil.checkInterrupt(CLASS_NAME);
+ CompactorUtil.checkInterrupt(CLASS_NAME);
+ try (CompactionTxn compactionTxn = new CompactionTxn()) {
/*
* we cannot have Worker use HiveTxnManager (which is on ThreadLocal)
since
* then the Driver would already have the an open txn but then this txn
would have
@@ -212,19 +211,21 @@ public Boolean compact(Table table, CompactionInfo ci)
throws Exception {
// Don't start compaction or cleaning if not necessary
if (isDynPartAbort(table, ci)) {
- compactionTxn.markForCommit(() ->
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
+ compactionTxn.commit(() ->
+ msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
return false;
}
dir = getAcidStateForWorker(ci, sd, tblValidWriteIds);
if (!isEnoughToCompact(ci, dir, sd)) {
if (needsCleaning(dir, sd)) {
- compactionTxn.markForCommit(() ->
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
+ compactionTxn.commit(() ->
+ msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
} else {
// do nothing
ci.errorMessage = "None of the compaction thresholds met, compaction
request is refused!";
LOG.debug(ci.errorMessage + " Compaction info: {}", ci);
- compactionTxn.markForCommit(() ->
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
-
+ compactionTxn.commit(() ->
+ msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
}
return false;
}
@@ -232,7 +233,8 @@ public Boolean compact(Table table, CompactionInfo ci)
throws Exception {
ci.errorMessage = "Query based Minor compaction is not possible for
full acid tables having raw format " +
"(non-acid) data in them.";
LOG.error(ci.errorMessage + " Compaction info: {}", ci);
- compactionTxn.markForAbort(() ->
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
+ compactionTxn.commit(() ->
+ msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
return false;
}
CompactorUtil.checkInterrupt(CLASS_NAME);
@@ -240,12 +242,12 @@ public Boolean compact(Table table, CompactionInfo ci)
throws Exception {
try {
failCompactionIfSetForTest();
- /*
- First try to run compaction via HiveQL queries.
- Compaction for MM tables happens here, or run compaction for Crud tables
if query-based compaction is enabled.
- todo Find a more generic approach to collecting files in the same
logical bucket to compact within the same
- task (currently we're using Tez split grouping).
- */
+ /*
+ First try to run compaction via HiveQL queries.
+ Compaction for MM tables happens here, or run compaction for Crud
tables if query-based compaction is enabled.
+ todo Find a more generic approach to collecting files in the same
logical bucket to compact within the same
+ task (currently we're using Tez split grouping).
+ */
CompactorPipeline compactorPipeline =
compactorFactory.getCompactorPipeline(table, conf, ci, msc);
computeStats = (compactorPipeline.isMRCompaction() && collectMrStats)
|| collectGenericStats;
@@ -257,7 +259,8 @@ public Boolean compact(Table table, CompactionInfo ci)
throws Exception {
LOG.info("Completed " + ci.type.toString() + " compaction for " +
ci.getFullPartitionName() + " in "
+ compactionTxn + ", marking as compacted.");
- compactionTxn.markForCommit(() ->
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
+ compactionTxn.commit(() ->
+ msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName,
ci.partName, ci.type,
dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(),
conf, msc);
@@ -267,9 +270,6 @@ public Boolean compact(Table table, CompactionInfo ci)
throws Exception {
}
return true;
- } catch (Exception e) {
- LOG.error("Caught exception in " + CLASS_NAME + " while trying to
compact " + ci, e);
- throw e;
}
}
@@ -341,11 +341,8 @@ class CompactionTxn implements AutoCloseable {
private long lockId = 0;
private TxnStatus status = TxnStatus.UNKNOWN;
-
- private ThrowingRunnable onCommitSuccess;
- private ThrowingRunnable onAbortSuccess;
-
private boolean rollbackOnly = true;
+ private ThrowingRunnable<?> onCommitSuccess;
/**
* Try to open a new txn.
@@ -376,13 +373,9 @@ private LockRequest createLockRequest(CompactionInfo ci) {
return CompactorUtil.createLockRequest(conf, ci, txnId,
lockAndOpType.getKey(), lockAndOpType.getValue());
}
- void markForCommit(ThrowingRunnable action) {
+ void commit(ThrowingRunnable<?> postAction) {
this.rollbackOnly = false;
- this.onCommitSuccess = action;
- }
-
- void markForAbort(ThrowingRunnable action) {
- this.onAbortSuccess = action;
+ this.onCommitSuccess = postAction;
}
/**
@@ -397,16 +390,13 @@ public void close() throws Exception {
//the transaction is about to close, we can stop heartbeating
regardless of it's state
CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId);
} finally {
- if (rollbackOnly) {
- abort();
- if (onAbortSuccess != null) {
- onAbortSuccess.run();
+ if (!rollbackOnly) {
+ commit();
+ if (onCommitSuccess != null) {
+ onCommitSuccess.run();
}
- return;
- }
- commit();
- if (onCommitSuccess != null) {
- onCommitSuccess.run();
+ } else {
+ abort();
}
}
}