This is an automated email from the ASF dual-hosted git repository.

deniskuzZ pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 52b8b1a7615 HIVE-29619: Hive ACID: Long-running queries on one table 
shouldn't block cleanup operations on other tables. (#6497)
52b8b1a7615 is described below

commit 52b8b1a76152db84003c1b7e510fc2b3fb909bab
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed May 20 18:24:56 2026 +0300

    HIVE-29619: Hive ACID: Long-running queries on one table shouldn't block 
cleanup operations on other tables. (#6497)
---
 .../org/apache/hadoop/hive/ql/TestAcidOnTez.java   |   4 +
 .../hive/ql/txn/compactor/CompactorOnTezTest.java  |   2 +
 .../hive/ql/txn/compactor/TestCompactorBase.java   |   3 +
 .../txn/compactor/handler/CompactionCleaner.java   |   4 +-
 .../compactor/service/AcidCompactionService.java   |   4 +
 .../hadoop/hive/ql/TxnCommandsBaseForTests.java    |   2 +
 .../ql/lockmgr/DbTxnManagerEndToEndTestBase.java   |   5 +
 .../hive/ql/txn/compactor/CompactorTest.java       |  31 +++++-
 .../TestCleanerWithMinHistoryWriteId.java          | 107 +++++++++++++++++----
 .../apache/hadoop/hive/metastore/txn/TxnUtils.java |  29 +++---
 10 files changed, 147 insertions(+), 44 deletions(-)

diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index a9ede32a591..0a9be7616bb 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -18,6 +18,7 @@
 
 package org.apache.hadoop.hive.ql;
 
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 
@@ -28,6 +29,7 @@
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
+import java.util.concurrent.TimeUnit;
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileStatus;
@@ -127,6 +129,8 @@ public void setUp() throws Exception {
         
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+    HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, 
TimeUnit.SECONDS);
+
     TestTxnDbUtil.setConfValues(hiveConf);
     hiveConf.setInt(MRJobConfig.MAP_MEMORY_MB, 1024);
     hiveConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index 428aedcd8f4..c8166d8ee7d 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -56,6 +56,7 @@
 import java.util.regex.Matcher;
 import java.util.regex.Pattern;
 
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
 import static 
org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults;
 import static 
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
 import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.dropTables;
@@ -106,6 +107,7 @@ protected void setupWithConf(HiveConfForTest hiveConf) 
throws Exception {
     MetastoreConf.setTimeVar(hiveConf, 
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 2, TimeUnit.SECONDS);
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+    HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, 
TimeUnit.SECONDS);
 
     TestTxnDbUtil.setConfValues(hiveConf);
     TestTxnDbUtil.cleanDb(hiveConf);
diff --git 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
index b606515391c..49b231f086e 100644
--- 
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
+++ 
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
@@ -46,6 +46,8 @@
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicInteger;
 
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+
 class TestCompactorBase {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TestCompactorBase.class);
@@ -92,6 +94,7 @@ public void setup() throws Exception {
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
+    HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, 
TimeUnit.SECONDS);
 
     TestTxnDbUtil.setConfValues(hiveConf);
     TestTxnDbUtil.cleanDb(hiveConf);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
index 2a593e4927f..5d9a4b14fbe 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -36,6 +36,7 @@
 import org.apache.hadoop.hive.metastore.metrics.Metrics;
 import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
 import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -73,7 +74,8 @@ public CompactionCleaner(HiveConf conf, TxnStore txnHandler,
   @Override
   public List<Runnable> getTasks(HiveConf conf) throws MetaException {
     long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
-    long retentionTime = HiveConf.getBoolVar(conf, 
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+    long retentionTime = (HiveConf.getBoolVar(conf, 
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+          || TxnHandler.ConfVars.useMinHistoryWriteId())
         ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 
TimeUnit.MILLISECONDS)
         : 0;
     List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
index 3538b07c169..d1c7e3972d0 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
@@ -198,6 +198,10 @@ public Boolean compact(Table table, CompactionInfo ci) 
throws Exception {
       txnWriteIds.addTableValidWriteIdList(tblValidWriteIds);
       conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY, 
txnWriteIds.toString());
 
+      // Register in MIN_HISTORY_WRITE_ID so the per-table cleaner admission 
blocks while open.
+      msc.addWriteIdsToMinHistory(compactionTxn.getTxnId(),
+          Map.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName)));
+
       ci.highestWriteId = tblValidWriteIds.getHighWatermark();
       //this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we 
keep metadata about
       //it until after any data written by it are physically removed
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java 
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index d82e1cad9a0..ba093d0ac96 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -66,6 +66,7 @@
 import org.slf4j.LoggerFactory;
 
 import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
 import static 
org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct;
 import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
 
@@ -152,6 +153,7 @@ void setUpInternal() throws Exception {
     hiveConf.setBoolean("mapred.input.dir.recursive", true);
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(hiveConf, 
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+    HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, 
TimeUnit.SECONDS);
       
     TestTxnDbUtil.setConfValues(hiveConf);
     TestTxnDbUtil.prepDb(hiveConf);
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index 628e51f2a38..8f151b523cb 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -38,6 +38,9 @@
 import org.junit.BeforeClass;
 
 import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
 
 /**
  * Base class for "end-to-end" tests for DbTxnManager and simulate concurrent 
queries.
@@ -59,6 +62,8 @@ public DbTxnManagerEndToEndTestBase() {
     MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE, 
getWarehouseDir());
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(conf, 
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+    HiveConf.setTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, 
TimeUnit.SECONDS);
+
     TestTxnDbUtil.setConfValues(conf);
   }
   
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 322045b0dad..d97f457c11c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -26,6 +26,7 @@
 import org.apache.hadoop.hive.common.ServerUtils;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
 import org.apache.hadoop.hive.common.ValidWriteIdList;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -62,6 +63,7 @@
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
@@ -113,6 +115,8 @@
 import javax.management.MBeanServer;
 import javax.management.ObjectName;
 
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+
 /**
  * Super class for all of the compactor test modules.
  */
@@ -143,8 +147,12 @@ protected final void setup(HiveConf conf) throws Exception 
{
     MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON, true);
     MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEANER_ON, true);
     MetastoreConf.setBoolVar(conf, ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, 
useMinHistoryWriteId());
+    TxnHandler.ConfVars.setUseMinHistoryWriteId(useMinHistoryWriteId());
     MetastoreConf.setVar(conf, ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS,
         "org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer");
+    if (useMinHistoryWriteId()) {
+      HiveConf.setTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0, 
TimeUnit.SECONDS);
+    }
     // Set this config to true in the base class, there are extended test 
classes which set this config to false.
     MetastoreConf.setBoolVar(conf, 
ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
     TestTxnDbUtil.setConfValues(conf);
@@ -173,6 +181,12 @@ protected void startCleaner() throws Exception {
     runOneLoopOfCompactorThread(CompactorThreadType.CLEANER);
   }
 
+  void startCleaner(long retentionTime, TimeUnit timeUnit) throws Exception {
+    HiveConf.setTimeVar(conf,
+        HIVE_COMPACTOR_CLEANER_RETENTION_TIME, retentionTime, timeUnit);
+    startCleaner();
+  }
+
   protected void runAcidMetricService() {
     TestTxnDbUtil.setConfValues(conf);
     AcidMetricService t = new AcidMetricService();
@@ -388,7 +402,7 @@ protected void burnThroughTransactions(String dbName, 
String tblName, int num, S
         txnHandler.commitTxn(new CommitTxnRequest(tid));
       } else if (open.contains(tid) && useMinHistoryWriteId()){
         txnHandler.addWriteIdsToMinHistory(tid,
-          Collections.singletonMap(dbName + "." + tblName, minOpenWriteId));
+            Map.of(dbName + "." + tblName, minOpenWriteId));
       }
     }
   }
@@ -754,14 +768,13 @@ long compactInTxn(CompactionRequest rqst, CommitAction 
commitAction) throws Exce
     ci.runAs = rqst.getRunas() == null ? System.getProperty("user.name") : 
rqst.getRunas();
 
     long compactorTxnId = openTxn(TxnType.COMPACTION);
+    String fullTableName = ci.getFullTableName().toLowerCase();
 
     // Need to create a valid writeIdList to set the highestWriteId in ci
     ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(
-        txnHandler.getOpenTxns(Collections.singletonList(TxnType.READ_ONLY)), 
compactorTxnId);
+        txnHandler.getOpenTxns(List.of(TxnType.READ_ONLY)), compactorTxnId);
 
-    GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest(
-        Collections.singletonList(
-            ci.getFullTableName().toLowerCase()));
+    GetValidWriteIdsRequest writeIdsRequest = new 
GetValidWriteIdsRequest(List.of(fullTableName));
     writeIdsRequest.setValidTxnList(validTxnList.writeToString());
 
     // with this ValidWriteIdList is capped at whatever HWM validTxnList has
@@ -769,6 +782,14 @@ long compactInTxn(CompactionRequest rqst, CommitAction 
commitAction) throws Exce
         txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds()
             .getFirst());
 
+    if (useMinHistoryWriteId()) {
+      ValidTxnWriteIdList txnWriteIds = new 
ValidTxnWriteIdList(compactorTxnId);
+      txnWriteIds.addTableValidWriteIdList(tblValidWriteIds);
+
+      txnHandler.addWriteIdsToMinHistory(compactorTxnId,
+          Map.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName)));
+    }
+
     ci.highestWriteId = tblValidWriteIds.getHighWatermark();
     txnHandler.updateCompactorState(ci, compactorTxnId);
 
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
index 8b1216335ac..6399bb5adbe 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -32,9 +33,12 @@
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 
+import java.util.Collections;
 import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
 
-import static org.apache.hadoop.hive.metastore.txn.TxnStore.FAILED_RESPONSE;
+import static org.apache.hadoop.hive.metastore.txn.TxnStore.CLEANING_RESPONSE;
 import static org.apache.hadoop.hive.metastore.txn.TxnStore.SUCCEEDED_RESPONSE;
 import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_RESPONSE;
 import static org.apache.hadoop.hive.metastore.txn.TxnStore.INITIATED_STATE;
@@ -59,7 +63,7 @@ protected boolean useMinHistoryWriteId() {
 
   @Test
   public void cleanupAfterAbortedAndRetriedMajorCompaction() throws Exception {
-    Table t = prepareTestTable();
+    Table t = prepareTestTable("camtc");
     CompactionRequest rqst = new CompactionRequest("default", "camtc", 
CompactionType.MAJOR);
     long compactTxn = compactInTxn(rqst, CommitAction.ABORT);
     addBaseFile(t, null, 25L, 25, compactTxn);
@@ -83,10 +87,10 @@ public void cleanupAfterAbortedAndRetriedMajorCompaction() 
throws Exception {
 
   @Test
   public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception {
-    Table t = prepareTestTable();
+    Table t = prepareTestTable("camtc");
     CompactionRequest rqst = new CompactionRequest("default", "camtc", 
CompactionType.MAJOR);
-    long compactTxn = compactInTxn(rqst, CommitAction.NONE);
-    addBaseFile(t, null, 25L, 25, compactTxn);
+    long compactTxn1 = compactInTxn(rqst, CommitAction.NONE);
+    addBaseFile(t, null, 25L, 25, compactTxn1);
 
     txnHandler.revokeTimedoutWorkers(1L);
     // an open txn should prevent the retry
@@ -96,20 +100,32 @@ public void cleanupAfterKilledAndRetriedMajorCompaction() 
throws Exception {
 
     // force retry
     revokeTimedoutWorkers(conf);
-    compactTxn = compactInTxn(rqst);
-    addBaseFile(t, null, 25L, 25, compactTxn);
+    long compactTxn2 = compactInTxn(rqst);
+    addBaseFile(t, null, 25L, 25, compactTxn2);
 
     startCleaner();
 
-    // Validate that the cleanup attempt has failed.
+    // Validate that the cleanup attempt was skipped.
     rsp = txnHandler.showCompact(new ShowCompactRequest());
     assertEquals(1, rsp.getCompactsSize());
-    assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
-    assertEquals("txnid:26 is open and <= hwm: 27", 
rsp.getCompacts().getFirst().getErrorMessage());
+    assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState());
 
     // Check that the files are not removed
     List<Path> paths = getDirectories(conf, t, null);
     assertEquals(6, paths.size());
+
+    // Abort the open compaction txn, so that the Cleaner can proceed.
+    txnHandler.abortTxns(
+        new AbortTxnsRequest(Collections.singletonList(compactTxn1)));
+    startCleaner();
+
+    rsp = txnHandler.showCompact(new ShowCompactRequest());
+    assertEquals(1, rsp.getCompactsSize());
+    assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState());
+
+    // Check that the files are removed
+    paths = getDirectories(conf, t, null);
+    assertEquals(1, paths.size());
   }
 
   private static void revokeTimedoutWorkers(Configuration conf) throws 
Exception {
@@ -121,39 +137,88 @@ private static void revokeTimedoutWorkers(Configuration 
conf) throws Exception {
   }
 
   @Test
-  public void cleanupAfterMajorCompactionWithQueryWaitingToLockTheSnapshot() 
throws Exception {
-    Table t = prepareTestTable();
+  public void cleanupAndDanglingOpenTxnOnSameTable() throws Exception {
+    Table t = prepareTestTable("camtc");
     CompactionRequest rqst = new CompactionRequest("default", "camtc", 
CompactionType.MAJOR);
     long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
     addBaseFile(t, null, 25L, 25, compactTxn);
 
-    // Open a query during compaction,
+    // Open a readerTxn during compaction,
     // Do not register minOpenWriteId (i.e. simulate delay locking the 
snapshot)
-    openTxn();
+    long readerTxn = openTxn();
 
     txnHandler.commitTxn(new CommitTxnRequest(compactTxn));
-    startCleaner();
+    Thread.sleep(MetastoreConf.getTimeVar(
+        conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
 
-    // Validate that the cleanup attempt has failed.
+    startCleaner(10, TimeUnit.SECONDS);
+
+    // Validate that the cleanup attempt was delayed by retention time.
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     assertEquals(1, rsp.getCompactsSize());
-    assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
-    assertEquals("txnid:27 is open and <= hwm: 27", 
rsp.getCompacts().getFirst().getErrorMessage());
+    assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState());
 
     // Check that the files are not removed
     List<Path> paths = getDirectories(conf, t, null);
     assertEquals(5, paths.size());
+
+    // Register minOpenWriteId for the readerTxn.
+    txnHandler.addWriteIdsToMinHistory(readerTxn, Map.of("default.camtc", 1L));
+
+    startCleaner(0, TimeUnit.SECONDS);
+
+    // Validate that the cleanup attempt was blocked by readerTxn.
+    rsp = txnHandler.showCompact(new ShowCompactRequest());
+    assertEquals(1, rsp.getCompactsSize());
+    assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState());
+
+    // Check that the files are not removed
+    paths = getDirectories(conf, t, null);
+    assertEquals(5, paths.size());
+  }
+
+  @Test
+  public void cleanupNotBlockedByOpenTxnOnAnotherTable() throws Exception {
+    Table t1 = prepareTestTable("camtc1");
+    Table t2 = prepareTestTable("camtc2");
+
+    // Open a readerTxn on t1 and register minOpenWriteId.
+    long readerTxn = openTxn();
+    txnHandler.addWriteIdsToMinHistory(readerTxn, Map.of("default.camtc1", 
1L));
+
+    CompactionRequest rqstTbl1 = new CompactionRequest("default", "camtc1", 
CompactionType.MAJOR);
+    long compactTxn = compactInTxn(rqstTbl1);
+    addBaseFile(t1, null, 25L, 25, compactTxn);
+
+    CompactionRequest rqstTbl2 = new CompactionRequest("default", "camtc2", 
CompactionType.MAJOR);
+    compactTxn = compactInTxn(rqstTbl2);
+    addBaseFile(t2, null, 25L, 25, compactTxn);
+
+    startCleaner();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    assertEquals(2, rsp.getCompactsSize());
+
+    assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
+    assertEquals("camtc2", rsp.getCompacts().get(0).getTablename());
+    // camtc2 was cleaned: only the new base remains.
+    assertEquals(1, getDirectories(conf, t2, null).size());
+
+    assertEquals(CLEANING_RESPONSE, rsp.getCompacts().get(1).getState());
+    assertEquals("camtc1", rsp.getCompacts().get(1).getTablename());
+    // camtc1 wasn't actually cleaned (admission filter held it back).
+    assertEquals(5, getDirectories(conf, t1, null).size());
   }
 
-  private Table prepareTestTable() throws Exception {
-    Table t = newTable("default", "camtc", false);
+  private Table prepareTestTable(String tblName) throws Exception {
+    Table t = newTable("default", tblName, false);
 
     addBaseFile(t, null, 20L, 20);
     addDeltaFile(t, null, 21L, 22L, 2);
     addDeltaFile(t, null, 23L, 24L, 2);
     addDeltaFile(t, null, 25L, 25, 2);
 
-    burnThroughTransactions("default", "camtc", 25);
+    burnThroughTransactions("default", tblName, 25);
     return t;
   }
 
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index aa613aa8192..5c5ca7c7bfa 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -79,33 +79,28 @@ public static ValidTxnList 
createValidTxnListForCleaner(GetOpenTxnsResponse txns
     long highWatermark = minOpenTxn - 1;
     long[] exceptions = new long[txns.getOpen_txnsSize()];
     BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
-    int i = 0;
+    int i = 0, j = 0;
     for (long txnId : txns.getOpen_txns()) {
       if (txnId > highWatermark) {
         break;
       }
-      if (abortedBits.get(i)) {
-        exceptions[i] = txnId;
+      if (abortedBits.get(i) || isAbortCleanup) {
+        exceptions[j++] = txnId;
+      } else if (!TxnHandler.ConfVars.useMinHistoryWriteId()) {
+        throw new IllegalStateException(
+            JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " + 
highWatermark);
       } else {
-        if (isAbortCleanup) {
-          exceptions[i] = txnId;
-        } else {
-          throw new IllegalStateException(
-              JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " + 
highWatermark);
-        }
+        LOG.debug("Ignoring open txn {} <= hwm: {}", txnId, highWatermark);
       }
       ++i;
     }
-    exceptions = Arrays.copyOf(exceptions, i);
+    exceptions = Arrays.copyOf(exceptions, j);
+
+    BitSet bitSet = isAbortCleanup ? abortedBits : new BitSet(j);
     if (!isAbortCleanup) {
-      BitSet bitSet = new BitSet(exceptions.length);
-      bitSet.set(0, exceptions.length);
-      //add ValidCleanerTxnList? - could be problematic for all the places 
that read it from
-      // string as they'd have to know which object to instantiate
-      return new ValidReadTxnList(exceptions, bitSet, highWatermark, 
Long.MAX_VALUE);
-    } else {
-      return new ValidReadTxnList(exceptions, abortedBits, highWatermark, 
Long.MAX_VALUE);
+      bitSet.set(0, j);
     }
+    return new ValidReadTxnList(exceptions, bitSet, highWatermark, 
Long.MAX_VALUE);
   }
 
   /**

Reply via email to