This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fa85ab5f66 HIVE-29420: Extra test coverage for Cleaner with 
minOpenWriteId flag (#6286)
2fa85ab5f66 is described below

commit 2fa85ab5f6683e16125b30b63b4189b95b098b5a
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Tue Feb 24 13:51:28 2026 +0200

    HIVE-29420: Extra test coverage for Cleaner with minOpenWriteId flag (#6286)
---
 .../hive/ql/txn/compactor/CompactorTest.java       | 27 +++++++------
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  | 46 +++++++++++-----------
 .../TestCleanerWithMinHistoryWriteId.java          | 28 ++++++++++++-
 3 files changed, 64 insertions(+), 37 deletions(-)

diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 8a1d0fe7677..403b3ec9d1a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -456,18 +456,16 @@ private void addFile(Table t, Partition p, long minTxn, 
long maxTxn, int numReco
       boolean allBucketsPresent, long visibilityId) throws Exception {
     String partValue = (p == null) ? null : p.getValues().getFirst();
     Path location = new Path(getLocation(t.getTableName(), partValue));
-    String filename = null;
 
-    switch (type) {
-      case BASE -> filename = 
AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX + maxTxn, visibilityId);
+    String filename = switch (type) {
+      case BASE -> AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX + 
maxTxn, visibilityId);
       case LENGTH_FILE, // Fall through to delta
-           DELTA -> filename = 
AcidUtils.addVisibilitySuffix(makeDeltaDirName(minTxn, maxTxn), visibilityId);
-      case LEGACY -> {
-        // handled below
-      }
+           DELTA -> AcidUtils.addVisibilitySuffix(makeDeltaDirName(minTxn, 
maxTxn), visibilityId);
+      case LEGACY -> // handled below
+          null;
       case null, default ->
           throw new IllegalStateException("Unexpected type: " + type);
-    }
+    };
 
     FileSystem fs = FileSystem.get(conf);
     for (int bucket = 0; bucket < numBuckets; bucket++) {
@@ -735,7 +733,13 @@ enum CommitAction {
   }
 
   protected long compactInTxn(CompactionRequest rqst) throws Exception {
-    return compactInTxn(rqst, CommitAction.COMMIT);
+    long compactorTxnId = compactInTxn(rqst, CommitAction.COMMIT);
+
+    // Wait for the cooldown period so the Cleaner can see the last committed 
txn as the highest committed watermark
+    // TODO: doesn't belong here, should probably be moved to 
CompactorTest#startCleaner()
+    Thread.sleep(MetastoreConf.getTimeVar(
+        conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
+    return compactorTxnId;
   }
 
   long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws 
Exception {
@@ -769,14 +773,11 @@ long compactInTxn(CompactionRequest rqst, CommitAction 
commitAction) throws Exce
 
     switch (commitAction) {
       case MARK_COMPACTED ->
-        txnHandler.markCompacted(ci);
+          txnHandler.markCompacted(ci);
 
       case COMMIT -> {
         txnHandler.markCompacted(ci);
         txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));
-
-        Thread.sleep(MetastoreConf.getTimeVar(
-            conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
       }
       case ABORT ->
           txnHandler.abortTxn(new AbortTxnRequest(compactorTxnId));
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 5686af14194..30815442ff2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -20,7 +20,6 @@
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionResponse;
@@ -32,6 +31,7 @@
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
 import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper;
@@ -94,8 +94,8 @@ public void testRetryAfterFailedCleanupDelayDisabled() throws 
Exception {
   public void testRetryAfterFailedCleanup(boolean delayEnabled) throws 
Exception {
     HiveConf.setBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED, 
delayEnabled);
     HiveConf.setTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 2, 
TimeUnit.SECONDS);
-    MetastoreConf.setLongVar(conf, 
MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 3);
-    MetastoreConf.setTimeVar(conf, 
MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 100, 
TimeUnit.MILLISECONDS);
+    MetastoreConf.setLongVar(conf, 
ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 3);
+    MetastoreConf.setTimeVar(conf, 
ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 100, 
TimeUnit.MILLISECONDS);
     String errorMessage = "No cleanup here!";
 
     //Prevent cleaner from marking the compaction as cleaned
@@ -287,19 +287,19 @@ public void 
cleanupAfterMajorTableCompactionWithLongRunningQuery() throws Except
     addBaseFile(t, null, 20L, 20);
     addDeltaFile(t, null, 21L, 22L, 2);
     addDeltaFile(t, null, 23L, 24L, 2);
-    addBaseFile(t, null, 25L, 25, 26);
 
     burnThroughTransactions("default", "camtc", 25);
 
     CompactionRequest rqst = new CompactionRequest("default", "camtc", 
CompactionType.MAJOR);
     long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
+    addBaseFile(t, null, 25L, 25, 26);
+
     // Open a query during compaction
     long longQuery = openTxn();
     TxnStoreHelper.wrap(txnHandler)
         .registerMinOpenWriteId("default", "camtc", longQuery);
 
     txnHandler.commitTxn(new CommitTxnRequest(compactTxn));
-
     startCleaner();
 
     // The long-running query should prevent the cleanup
@@ -313,7 +313,8 @@ public void 
cleanupAfterMajorTableCompactionWithLongRunningQuery() throws Except
 
     // After the commit cleaning can proceed
     txnHandler.commitTxn(new CommitTxnRequest(longQuery));
-    Thread.sleep(MetastoreConf.getTimeVar(conf, 
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
+    Thread.sleep(MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT, 
TimeUnit.MILLISECONDS));
+
     startCleaner();
 
     rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -725,20 +726,22 @@ public void testReadyForCleaningPileup() throws Exception 
{
     Table t = newTable(dbName, tblName, true);
     Partition p = newPartition(t, "today");
 
-    // block cleaner with an open txn
-    long blockingTxn = openTxn();
-
     // minor compaction
     addBaseFile(t, p, 20L, 20);
     addDeltaFile(t, p, 21L, 21L, 1);
     addDeltaFile(t, p, 22L, 22L, 1);
     burnThroughTransactions(dbName, tblName, 22);
+
+    // block cleaner with an open txn
+    long blockingTxn = openTxn();
+    TxnStoreHelper.wrap(txnHandler)
+        .registerMinOpenWriteId(dbName, tblName, blockingTxn);
+
     CompactionRequest rqst = new CompactionRequest(dbName, tblName, 
CompactionType.MINOR);
     rqst.setPartitionname(partName);
     long compactTxn = compactInTxn(rqst);
     addDeltaFile(t, p, 21, 22, 2);
 
-    txnHandler.addWriteIdsToMinHistory(1, 
Collections.singletonMap("default.trfcp", 23L));
     startCleaner();
 
     // make sure cleaner didn't remove anything, and cleaning is still queued
@@ -1081,14 +1084,15 @@ public void testReady() throws Exception {
     burnThroughTransactions(dbName, tblName, 22);
 
     // block cleaner with an open txn
-    long txnId = openTxn();
+    long blockingTxn = openTxn();
     TxnStoreHelper.wrap(txnHandler)
-        .registerMinOpenWriteId(dbName, tblName, txnId);
+        .registerMinOpenWriteId(dbName, tblName, blockingTxn);
 
     CompactionRequest rqst = new CompactionRequest(dbName, tblName, 
CompactionType.MINOR);
     rqst.setPartitionname(partName);
-    long ctxnid = compactInTxn(rqst);
-    addDeltaFile(t, p, 20, 22, 2, ctxnid);
+    long compactTxn = compactInTxn(rqst);
+    addDeltaFile(t, p, 20, 22, 2, compactTxn);
+
     startCleaner();
 
     // make sure cleaner didn't remove anything, and cleaning is still queued
@@ -1118,11 +1122,8 @@ public void testCompactionHighWatermarkIsHonored() 
throws Exception {
 
     CompactionRequest rqst = new CompactionRequest(dbName, tblName, 
CompactionType.MINOR);
     rqst.setPartitionname(partName);
-    long ctxnid = compactInTxn(rqst);
-    addDeltaFile(t, p, 20, 22, 3, ctxnid);
-
-    // block cleaner with an open txn
-    long openTxnId = openTxn();
+    long compactTxn = compactInTxn(rqst);
+    addDeltaFile(t, p, 20, 22, 3, compactTxn);
 
     //2nd minor
     addDeltaFile(t, p, 23L, 23L, 1);
@@ -1131,11 +1132,10 @@ public void testCompactionHighWatermarkIsHonored() 
throws Exception {
 
     rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
     rqst.setPartitionname(partName);
-    ctxnid = compactInTxn(rqst);
-    addDeltaFile(t, p, 20, 24, 5, ctxnid);
+    compactTxn = compactInTxn(rqst);
+    addDeltaFile(t, p, 20, 24, 5, compactTxn);
 
     startCleaner();
-    txnHandler.abortTxn(new AbortTxnRequest(openTxnId));
 
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     assertEquals(2, rsp.getCompactsSize());
@@ -1149,7 +1149,7 @@ public void testCompactionHighWatermarkIsHonored() throws 
Exception {
     List<String> expectedDirs = Arrays.asList(
       "base_19",
       addVisibilitySuffix(makeDeltaDirName(20, 22), 23),
-      addVisibilitySuffix(makeDeltaDirName(20, 24), 27),
+      addVisibilitySuffix(makeDeltaDirName(20, 24), 26),
       makeDeltaDirName(23, 23),
       makeDeltaDirName(24, 24)
     );
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
index 1e9079df25e..8b1216335ac 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
@@ -20,6 +20,7 @@
 
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionType;
 import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -100,7 +101,7 @@ public void cleanupAfterKilledAndRetriedMajorCompaction() 
throws Exception {
 
     startCleaner();
 
-    // Check there are no compactions requests left.
+    // Validate that the cleanup attempt has failed.
     rsp = txnHandler.showCompact(new ShowCompactRequest());
     assertEquals(1, rsp.getCompactsSize());
     assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
@@ -119,6 +120,31 @@ private static void revokeTimedoutWorkers(Configuration 
conf) throws Exception {
         """.formatted(INITIATED_STATE, WORKING_STATE));
   }
 
+  @Test
+  public void cleanupAfterMajorCompactionWithQueryWaitingToLockTheSnapshot() 
throws Exception {
+    Table t = prepareTestTable();
+    CompactionRequest rqst = new CompactionRequest("default", "camtc", 
CompactionType.MAJOR);
+    long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
+    addBaseFile(t, null, 25L, 25, compactTxn);
+
+    // Open a query during compaction,
+    // Do not register minOpenWriteId (i.e. simulate delay locking the 
snapshot)
+    openTxn();
+
+    txnHandler.commitTxn(new CommitTxnRequest(compactTxn));
+    startCleaner();
+
+    // Validate that the cleanup attempt has failed.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    assertEquals(1, rsp.getCompactsSize());
+    assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
+    assertEquals("txnid:27 is open and <= hwm: 27", 
rsp.getCompacts().getFirst().getErrorMessage());
+
+    // Check that the files are not removed
+    List<Path> paths = getDirectories(conf, t, null);
+    assertEquals(5, paths.size());
+  }
+
   private Table prepareTestTable() throws Exception {
     Table t = newTable("default", "camtc", false);
 

Reply via email to