This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2fa85ab5f66 HIVE-29420: Extra test coverage for Cleaner with
minOpenWriteId flag (#6286)
2fa85ab5f66 is described below
commit 2fa85ab5f6683e16125b30b63b4189b95b098b5a
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Tue Feb 24 13:51:28 2026 +0200
HIVE-29420: Extra test coverage for Cleaner with minOpenWriteId flag (#6286)
---
.../hive/ql/txn/compactor/CompactorTest.java | 27 +++++++------
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 46 +++++++++++-----------
.../TestCleanerWithMinHistoryWriteId.java | 28 ++++++++++++-
3 files changed, 64 insertions(+), 37 deletions(-)
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 8a1d0fe7677..403b3ec9d1a 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -456,18 +456,16 @@ private void addFile(Table t, Partition p, long minTxn,
long maxTxn, int numReco
boolean allBucketsPresent, long visibilityId) throws Exception {
String partValue = (p == null) ? null : p.getValues().getFirst();
Path location = new Path(getLocation(t.getTableName(), partValue));
- String filename = null;
- switch (type) {
- case BASE -> filename =
AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX + maxTxn, visibilityId);
+ String filename = switch (type) {
+ case BASE -> AcidUtils.addVisibilitySuffix(AcidUtils.BASE_PREFIX +
maxTxn, visibilityId);
case LENGTH_FILE, // Fall through to delta
- DELTA -> filename =
AcidUtils.addVisibilitySuffix(makeDeltaDirName(minTxn, maxTxn), visibilityId);
- case LEGACY -> {
- // handled below
- }
+ DELTA -> AcidUtils.addVisibilitySuffix(makeDeltaDirName(minTxn,
maxTxn), visibilityId);
+ case LEGACY -> // handled below
+ null;
case null, default ->
throw new IllegalStateException("Unexpected type: " + type);
- }
+ };
FileSystem fs = FileSystem.get(conf);
for (int bucket = 0; bucket < numBuckets; bucket++) {
@@ -735,7 +733,13 @@ enum CommitAction {
}
protected long compactInTxn(CompactionRequest rqst) throws Exception {
- return compactInTxn(rqst, CommitAction.COMMIT);
+ long compactorTxnId = compactInTxn(rqst, CommitAction.COMMIT);
+
+ // Wait for the cooldown period so the Cleaner can see the last committed
txn as the highest committed watermark
+ // TODO: doesn't belong here, should probably be moved to
CompactorTest#startCleaner()
+ Thread.sleep(MetastoreConf.getTimeVar(
+ conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
+ return compactorTxnId;
}
long compactInTxn(CompactionRequest rqst, CommitAction commitAction) throws
Exception {
@@ -769,14 +773,11 @@ long compactInTxn(CompactionRequest rqst, CommitAction
commitAction) throws Exce
switch (commitAction) {
case MARK_COMPACTED ->
- txnHandler.markCompacted(ci);
+ txnHandler.markCompacted(ci);
case COMMIT -> {
txnHandler.markCompacted(ci);
txnHandler.commitTxn(new CommitTxnRequest(compactorTxnId));
-
- Thread.sleep(MetastoreConf.getTimeVar(
- conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
}
case ABORT ->
txnHandler.abortTxn(new AbortTxnRequest(compactorTxnId));
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 5686af14194..30815442ff2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -20,7 +20,6 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
-import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
@@ -32,6 +31,7 @@
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.ql.testutil.TxnStoreHelper;
@@ -94,8 +94,8 @@ public void testRetryAfterFailedCleanupDelayDisabled() throws
Exception {
public void testRetryAfterFailedCleanup(boolean delayEnabled) throws
Exception {
HiveConf.setBoolVar(conf, HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED,
delayEnabled);
HiveConf.setTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 2,
TimeUnit.SECONDS);
- MetastoreConf.setLongVar(conf,
MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 3);
- MetastoreConf.setTimeVar(conf,
MetastoreConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 100,
TimeUnit.MILLISECONDS);
+ MetastoreConf.setLongVar(conf,
ConfVars.HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS, 3);
+ MetastoreConf.setTimeVar(conf,
ConfVars.HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, 100,
TimeUnit.MILLISECONDS);
String errorMessage = "No cleanup here!";
//Prevent cleaner from marking the compaction as cleaned
@@ -287,19 +287,19 @@ public void
cleanupAfterMajorTableCompactionWithLongRunningQuery() throws Except
addBaseFile(t, null, 20L, 20);
addDeltaFile(t, null, 21L, 22L, 2);
addDeltaFile(t, null, 23L, 24L, 2);
- addBaseFile(t, null, 25L, 25, 26);
burnThroughTransactions("default", "camtc", 25);
CompactionRequest rqst = new CompactionRequest("default", "camtc",
CompactionType.MAJOR);
long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
+ addBaseFile(t, null, 25L, 25, 26);
+
// Open a query during compaction
long longQuery = openTxn();
TxnStoreHelper.wrap(txnHandler)
.registerMinOpenWriteId("default", "camtc", longQuery);
txnHandler.commitTxn(new CommitTxnRequest(compactTxn));
-
startCleaner();
// The long-running query should prevent the cleanup
@@ -313,7 +313,8 @@ public void
cleanupAfterMajorTableCompactionWithLongRunningQuery() throws Except
// After the commit cleaning can proceed
txnHandler.commitTxn(new CommitTxnRequest(longQuery));
- Thread.sleep(MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
+ Thread.sleep(MetastoreConf.getTimeVar(conf, ConfVars.TXN_OPENTXN_TIMEOUT,
TimeUnit.MILLISECONDS));
+
startCleaner();
rsp = txnHandler.showCompact(new ShowCompactRequest());
@@ -725,20 +726,22 @@ public void testReadyForCleaningPileup() throws Exception
{
Table t = newTable(dbName, tblName, true);
Partition p = newPartition(t, "today");
- // block cleaner with an open txn
- long blockingTxn = openTxn();
-
// minor compaction
addBaseFile(t, p, 20L, 20);
addDeltaFile(t, p, 21L, 21L, 1);
addDeltaFile(t, p, 22L, 22L, 1);
burnThroughTransactions(dbName, tblName, 22);
+
+ // block cleaner with an open txn
+ long blockingTxn = openTxn();
+ TxnStoreHelper.wrap(txnHandler)
+ .registerMinOpenWriteId(dbName, tblName, blockingTxn);
+
CompactionRequest rqst = new CompactionRequest(dbName, tblName,
CompactionType.MINOR);
rqst.setPartitionname(partName);
long compactTxn = compactInTxn(rqst);
addDeltaFile(t, p, 21, 22, 2);
- txnHandler.addWriteIdsToMinHistory(1,
Collections.singletonMap("default.trfcp", 23L));
startCleaner();
// make sure cleaner didn't remove anything, and cleaning is still queued
@@ -1081,14 +1084,15 @@ public void testReady() throws Exception {
burnThroughTransactions(dbName, tblName, 22);
// block cleaner with an open txn
- long txnId = openTxn();
+ long blockingTxn = openTxn();
TxnStoreHelper.wrap(txnHandler)
- .registerMinOpenWriteId(dbName, tblName, txnId);
+ .registerMinOpenWriteId(dbName, tblName, blockingTxn);
CompactionRequest rqst = new CompactionRequest(dbName, tblName,
CompactionType.MINOR);
rqst.setPartitionname(partName);
- long ctxnid = compactInTxn(rqst);
- addDeltaFile(t, p, 20, 22, 2, ctxnid);
+ long compactTxn = compactInTxn(rqst);
+ addDeltaFile(t, p, 20, 22, 2, compactTxn);
+
startCleaner();
// make sure cleaner didn't remove anything, and cleaning is still queued
@@ -1118,11 +1122,8 @@ public void testCompactionHighWatermarkIsHonored()
throws Exception {
CompactionRequest rqst = new CompactionRequest(dbName, tblName,
CompactionType.MINOR);
rqst.setPartitionname(partName);
- long ctxnid = compactInTxn(rqst);
- addDeltaFile(t, p, 20, 22, 3, ctxnid);
-
- // block cleaner with an open txn
- long openTxnId = openTxn();
+ long compactTxn = compactInTxn(rqst);
+ addDeltaFile(t, p, 20, 22, 3, compactTxn);
//2nd minor
addDeltaFile(t, p, 23L, 23L, 1);
@@ -1131,11 +1132,10 @@ public void testCompactionHighWatermarkIsHonored()
throws Exception {
rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
rqst.setPartitionname(partName);
- ctxnid = compactInTxn(rqst);
- addDeltaFile(t, p, 20, 24, 5, ctxnid);
+ compactTxn = compactInTxn(rqst);
+ addDeltaFile(t, p, 20, 24, 5, compactTxn);
startCleaner();
- txnHandler.abortTxn(new AbortTxnRequest(openTxnId));
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(2, rsp.getCompactsSize());
@@ -1149,7 +1149,7 @@ public void testCompactionHighWatermarkIsHonored() throws
Exception {
List<String> expectedDirs = Arrays.asList(
"base_19",
addVisibilitySuffix(makeDeltaDirName(20, 22), 23),
- addVisibilitySuffix(makeDeltaDirName(20, 24), 27),
+ addVisibilitySuffix(makeDeltaDirName(20, 24), 26),
makeDeltaDirName(23, 23),
makeDeltaDirName(24, 24)
);
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
index 1e9079df25e..8b1216335ac 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
import org.apache.hadoop.hive.metastore.api.ShowCompactRequest;
@@ -100,7 +101,7 @@ public void cleanupAfterKilledAndRetriedMajorCompaction()
throws Exception {
startCleaner();
- // Check there are no compactions requests left.
+ // Validate that the cleanup attempt has failed.
rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(1, rsp.getCompactsSize());
assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
@@ -119,6 +120,31 @@ private static void revokeTimedoutWorkers(Configuration
conf) throws Exception {
""".formatted(INITIATED_STATE, WORKING_STATE));
}
+ @Test
+ public void cleanupAfterMajorCompactionWithQueryWaitingToLockTheSnapshot()
throws Exception {
+ Table t = prepareTestTable();
+ CompactionRequest rqst = new CompactionRequest("default", "camtc",
CompactionType.MAJOR);
+ long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
+ addBaseFile(t, null, 25L, 25, compactTxn);
+
+ // Open a query during compaction,
+ // Do not register minOpenWriteId (i.e. simulate delay locking the
snapshot)
+ openTxn();
+
+ txnHandler.commitTxn(new CommitTxnRequest(compactTxn));
+ startCleaner();
+
+ // Validate that the cleanup attempt has failed.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ assertEquals(1, rsp.getCompactsSize());
+ assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
+ assertEquals("txnid:27 is open and <= hwm: 27",
rsp.getCompacts().getFirst().getErrorMessage());
+
+ // Check that the files are not removed
+ List<Path> paths = getDirectories(conf, t, null);
+ assertEquals(5, paths.size());
+ }
+
private Table prepareTestTable() throws Exception {
Table t = newTable("default", "camtc", false);