This is an automated email from the ASF dual-hosted git repository.

deniskuzZ pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 341b863ee5d HIVE-29571: ACID Compaction: Refused compaction Txn should 
not be aborted (#6517)
341b863ee5d is described below

commit 341b863ee5d42c0208407fad362aa905b6891f25
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Thu Jun 4 18:45:46 2026 +0300

    HIVE-29571: ACID Compaction: Refused compaction Txn should not be aborted 
(#6517)
---
 .../compactor/service/AcidCompactionService.java   | 136 ++++++++++-----------
 1 file changed, 63 insertions(+), 73 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
index cf6e4c02e4c..f513d289fa9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
@@ -126,57 +126,56 @@ public void cleanupResultDirs(CompactionInfo ci) {
   
   public Boolean compact(Table table, CompactionInfo ci) throws Exception {
 
-    try (CompactionTxn compactionTxn = new CompactionTxn()) {
-
-      if (ci.isRebalanceCompaction() && table.getSd().getNumBuckets() > 0) {
-        LOG.error("Cannot execute rebalancing compaction on bucketed tables.");
-        ci.errorMessage = "Cannot execute rebalancing compaction on bucketed 
tables.";
-        msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
-        return false;
-      }
+    if (ci.isRebalanceCompaction() && table.getSd().getNumBuckets() > 0) {
+      LOG.error("Cannot execute rebalancing compaction on bucketed tables.");
+      ci.errorMessage = "Cannot execute rebalancing compaction on bucketed 
tables.";
+      msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
+      return false;
+    }
 
-      if (!ci.type.equals(CompactionType.REBALANCE) && ci.numberOfBuckets > 0) 
{
-        if (LOG.isWarnEnabled()) {
-          LOG.warn("Only the REBALANCE compaction accepts the number of 
buckets clause (CLUSTERED INTO {N} BUCKETS). " +
-              "Since the compaction request is {}, it will be ignored.", 
ci.type);
-        }
+    if (!ci.type.equals(CompactionType.REBALANCE) && ci.numberOfBuckets > 0) {
+      if (LOG.isWarnEnabled()) {
+        LOG.warn("Only the REBALANCE compaction accepts the number of buckets 
clause (CLUSTERED INTO {N} BUCKETS). " +
+            "Since the compaction request is {}, it will be ignored.", 
ci.type);
       }
+    }
 
-      String fullTableName = TxnUtils.getFullTableName(table.getDbName(), 
table.getTableName());
+    String fullTableName = TxnUtils.getFullTableName(table.getDbName(), 
table.getTableName());
 
-      // Find the partition we will be working with, if there is one.
-      Partition p;
-      try {
-        p = CompactorUtil.resolvePartition(conf, msc, ci.dbname, ci.tableName, 
ci.partName,
-            CompactorUtil.METADATA_FETCH_MODE.REMOTE);
-        if (p == null && ci.partName != null) {
-          ci.errorMessage = "Unable to find partition " + 
ci.getFullPartitionName() + ", assuming it was dropped and moving on.";
-          LOG.warn(ci.errorMessage + " Compaction info: {}", ci);
-          msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
-          return false;
-        }
-      } catch (Exception e) {
-        LOG.error("Unexpected error during resolving partition.", e);
-        ci.errorMessage = e.getMessage();
-        msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+    // Find the partition we will be working with, if there is one.
+    Partition p;
+    try {
+      p = CompactorUtil.resolvePartition(conf, msc, ci.dbname, ci.tableName, 
ci.partName,
+          CompactorUtil.METADATA_FETCH_MODE.REMOTE);
+      if (p == null && ci.partName != null) {
+        ci.errorMessage = "Unable to find partition " + 
ci.getFullPartitionName() + ", assuming it was dropped and moving on.";
+        LOG.warn(ci.errorMessage + " Compaction info: {}", ci);
+        msc.markRefused(CompactionInfo.compactionInfoToStruct(ci));
         return false;
       }
+    } catch (Exception e) {
+      LOG.error("Unexpected error during resolving partition.", e);
+      ci.errorMessage = e.getMessage();
+      msc.markFailed(CompactionInfo.compactionInfoToStruct(ci));
+      return false;
+    }
 
-      CompactorUtil.checkInterrupt(CLASS_NAME);
+    CompactorUtil.checkInterrupt(CLASS_NAME);
 
-      // Find the appropriate storage descriptor
-      sd =  CompactorUtil.resolveStorageDescriptor(table, p);
+    // Find the appropriate storage descriptor
+    sd =  CompactorUtil.resolveStorageDescriptor(table, p);
 
-      if (isTableSorted(sd, ci)) {
-        return false;
-      }
+    if (isTableSorted(sd, ci)) {
+      return false;
+    }
 
-      if (ci.runAs == null) {
-        ci.runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
-      }
+    if (ci.runAs == null) {
+      ci.runAs = TxnUtils.findUserToRunAs(sd.getLocation(), table, conf);
+    }
 
-      CompactorUtil.checkInterrupt(CLASS_NAME);
+    CompactorUtil.checkInterrupt(CLASS_NAME);
 
+    try (CompactionTxn compactionTxn = new CompactionTxn()) {
       /*
        * we cannot have Worker use HiveTxnManager (which is on ThreadLocal) 
since
        * then the Driver would already have the an open txn but then this txn 
would have
@@ -212,19 +211,21 @@ public Boolean compact(Table table, CompactionInfo ci) 
throws Exception {
 
       // Don't start compaction or cleaning if not necessary
       if (isDynPartAbort(table, ci)) {
-        compactionTxn.markForCommit(() -> 
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
+        compactionTxn.commit(() ->
+            msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
         return false;
       }
       dir = getAcidStateForWorker(ci, sd, tblValidWriteIds);
       if (!isEnoughToCompact(ci, dir, sd)) {
         if (needsCleaning(dir, sd)) {
-          compactionTxn.markForCommit(() -> 
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
+          compactionTxn.commit(() ->
+              msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
         } else {
           // do nothing
           ci.errorMessage = "None of the compaction thresholds met, compaction 
request is refused!";
           LOG.debug(ci.errorMessage + " Compaction info: {}", ci);
-          compactionTxn.markForCommit(() -> 
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
-
+          compactionTxn.commit(() ->
+              msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
         }
         return false;
       }
@@ -232,7 +233,8 @@ public Boolean compact(Table table, CompactionInfo ci) 
throws Exception {
         ci.errorMessage = "Query based Minor compaction is not possible for 
full acid tables having raw format " +
             "(non-acid) data in them.";
         LOG.error(ci.errorMessage + " Compaction info: {}", ci);
-        compactionTxn.markForAbort(() -> 
msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
+        compactionTxn.commit(() ->
+            msc.markRefused(CompactionInfo.compactionInfoToStruct(ci)));
         return false;
       }
       CompactorUtil.checkInterrupt(CLASS_NAME);
@@ -240,12 +242,12 @@ public Boolean compact(Table table, CompactionInfo ci) 
throws Exception {
       try {
         failCompactionIfSetForTest();
 
-      /*
-      First try to run compaction via HiveQL queries.
-      Compaction for MM tables happens here, or run compaction for Crud tables 
if query-based compaction is enabled.
-      todo Find a more generic approach to collecting files in the same 
logical bucket to compact within the same
-      task (currently we're using Tez split grouping).
-      */
+        /*
+        First try to run compaction via HiveQL queries.
+        Compaction for MM tables happens here, or run compaction for Crud 
tables if query-based compaction is enabled.
+        todo Find a more generic approach to collecting files in the same 
logical bucket to compact within the same
+        task (currently we're using Tez split grouping).
+        */
         CompactorPipeline compactorPipeline = 
compactorFactory.getCompactorPipeline(table, conf, ci, msc);
         computeStats = (compactorPipeline.isMRCompaction() && collectMrStats) 
|| collectGenericStats;
 
@@ -257,7 +259,8 @@ public Boolean compact(Table table, CompactionInfo ci) 
throws Exception {
 
         LOG.info("Completed " + ci.type.toString() + " compaction for " + 
ci.getFullPartitionName() + " in "
             + compactionTxn + ", marking as compacted.");
-        compactionTxn.markForCommit(() -> 
msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
+        compactionTxn.commit(() ->
+            msc.markCompacted(CompactionInfo.compactionInfoToStruct(ci)));
 
         AcidMetricService.updateMetricsFromWorker(ci.dbname, ci.tableName, 
ci.partName, ci.type,
             dir.getCurrentDirectories().size(), dir.getDeleteDeltas().size(), 
conf, msc);
@@ -267,9 +270,6 @@ public Boolean compact(Table table, CompactionInfo ci) 
throws Exception {
       }
 
       return true;
-    } catch (Exception e) {
-      LOG.error("Caught exception in " + CLASS_NAME + " while trying to 
compact " + ci, e);
-      throw e;
     }
   }
 
@@ -341,11 +341,8 @@ class CompactionTxn implements AutoCloseable {
     private long lockId = 0;
 
     private TxnStatus status = TxnStatus.UNKNOWN;
-
-    private ThrowingRunnable onCommitSuccess;
-    private ThrowingRunnable onAbortSuccess;
-
     private boolean rollbackOnly = true;
+    private ThrowingRunnable<?> onCommitSuccess;
 
     /**
      * Try to open a new txn.
@@ -376,13 +373,9 @@ private LockRequest createLockRequest(CompactionInfo ci) {
       return CompactorUtil.createLockRequest(conf, ci, txnId, 
lockAndOpType.getKey(), lockAndOpType.getValue());
     }
 
-    void markForCommit(ThrowingRunnable action) {
+    void commit(ThrowingRunnable<?> postAction) {
       this.rollbackOnly = false;
-      this.onCommitSuccess = action;
-    }
-
-    void markForAbort(ThrowingRunnable action) {
-      this.onAbortSuccess = action;
+      this.onCommitSuccess = postAction;
     }
 
     /**
@@ -397,16 +390,13 @@ public void close() throws Exception {
         //the transaction is about to close, we can stop heartbeating 
regardless of it's state
         CompactionHeartbeatService.getInstance(conf).stopHeartbeat(txnId);
       } finally {
-        if (rollbackOnly) {
-          abort();
-          if (onAbortSuccess != null) {
-            onAbortSuccess.run();
+        if (!rollbackOnly) {
+          commit();
+          if (onCommitSuccess != null) {
+            onCommitSuccess.run();
           }
-          return;
-        }
-        commit();
-        if (onCommitSuccess != null) {
-          onCommitSuccess.run();
+        } else {
+          abort();
         }
       }
     }

Reply via email to