This is an automated email from the ASF dual-hosted git repository.
deniskuzZ pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 52b8b1a7615 HIVE-29619: Hive ACID: Long-running queries on one table
shouldn't block cleanup operations on other tables. (#6497)
52b8b1a7615 is described below
commit 52b8b1a76152db84003c1b7e510fc2b3fb909bab
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Wed May 20 18:24:56 2026 +0300
HIVE-29619: Hive ACID: Long-running queries on one table shouldn't block
cleanup operations on other tables. (#6497)
---
.../org/apache/hadoop/hive/ql/TestAcidOnTez.java | 4 +
.../hive/ql/txn/compactor/CompactorOnTezTest.java | 2 +
.../hive/ql/txn/compactor/TestCompactorBase.java | 3 +
.../txn/compactor/handler/CompactionCleaner.java | 4 +-
.../compactor/service/AcidCompactionService.java | 4 +
.../hadoop/hive/ql/TxnCommandsBaseForTests.java | 2 +
.../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 5 +
.../hive/ql/txn/compactor/CompactorTest.java | 31 +++++-
.../TestCleanerWithMinHistoryWriteId.java | 107 +++++++++++++++++----
.../apache/hadoop/hive/metastore/txn/TxnUtils.java | 29 +++---
10 files changed, 147 insertions(+), 44 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
index a9ede32a591..0a9be7616bb 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/TestAcidOnTez.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@@ -28,6 +29,7 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
@@ -127,6 +129,8 @@ public void setUp() throws Exception {
"org.apache.hadoop.hive.ql.security.authorization.plugin.sqlstd.SQLStdHiveAuthorizerFactory");
MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+ HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0,
TimeUnit.SECONDS);
+
TestTxnDbUtil.setConfValues(hiveConf);
hiveConf.setInt(MRJobConfig.MAP_MEMORY_MB, 1024);
hiveConf.setInt(MRJobConfig.REDUCE_MEMORY_MB, 1024);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
index 428aedcd8f4..c8166d8ee7d 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorOnTezTest.java
@@ -56,6 +56,7 @@
import java.util.regex.Matcher;
import java.util.regex.Pattern;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static
org.apache.hadoop.hive.ql.txn.compactor.CompactorTestUtil.executeStatementOnDriverAndReturnResults;
import static
org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.executeStatementOnDriver;
import static org.apache.hadoop.hive.ql.txn.compactor.TestCompactor.dropTables;
@@ -106,6 +107,7 @@ protected void setupWithConf(HiveConfForTest hiveConf)
throws Exception {
MetastoreConf.setTimeVar(hiveConf,
MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, 2, TimeUnit.SECONDS);
MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+ HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0,
TimeUnit.SECONDS);
TestTxnDbUtil.setConfValues(hiveConf);
TestTxnDbUtil.cleanDb(hiveConf);
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
index b606515391c..49b231f086e 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCompactorBase.java
@@ -46,6 +46,8 @@
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+
class TestCompactorBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestCompactorBase.class);
@@ -92,6 +94,7 @@ public void setup() throws Exception {
MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
+ HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0,
TimeUnit.SECONDS);
TestTxnDbUtil.setConfValues(hiveConf);
TestTxnDbUtil.cleanDb(hiveConf);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
index 2a593e4927f..5d9a4b14fbe 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -36,6 +36,7 @@
import org.apache.hadoop.hive.metastore.metrics.Metrics;
import org.apache.hadoop.hive.metastore.metrics.MetricsConstants;
import org.apache.hadoop.hive.metastore.metrics.PerfLogger;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -73,7 +74,8 @@ public CompactionCleaner(HiveConf conf, TxnStore txnHandler,
@Override
public List<Runnable> getTasks(HiveConf conf) throws MetaException {
long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
- long retentionTime = HiveConf.getBoolVar(conf,
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+ long retentionTime = (HiveConf.getBoolVar(conf,
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
+ || TxnHandler.ConfVars.useMinHistoryWriteId())
? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME,
TimeUnit.MILLISECONDS)
: 0;
List<CompactionInfo> readyToClean =
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
index 3538b07c169..d1c7e3972d0 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/service/AcidCompactionService.java
@@ -198,6 +198,10 @@ public Boolean compact(Table table, CompactionInfo ci)
throws Exception {
txnWriteIds.addTableValidWriteIdList(tblValidWriteIds);
conf.set(ValidTxnWriteIdList.VALID_TABLES_WRITEIDS_KEY,
txnWriteIds.toString());
+ // Register in MIN_HISTORY_WRITE_ID so the per-table cleaner admission
blocks while open.
+ msc.addWriteIdsToMinHistory(compactionTxn.getTxnId(),
+ Map.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName)));
+
ci.highestWriteId = tblValidWriteIds.getHighWatermark();
//this writes TXN_COMPONENTS to ensure that if compactorTxnId fails, we
keep metadata about
//it until after any data written by it are physically removed
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
index d82e1cad9a0..ba093d0ac96 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TxnCommandsBaseForTests.java
@@ -66,6 +66,7 @@
import org.slf4j.LoggerFactory;
import static org.apache.commons.lang3.StringUtils.isNotBlank;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static
org.apache.hadoop.hive.metastore.DatabaseProduct.determineDatabaseProduct;
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
@@ -152,6 +153,7 @@ void setUpInternal() throws Exception {
hiveConf.setBoolean("mapred.input.dir.recursive", true);
MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(hiveConf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+ HiveConf.setTimeVar(hiveConf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0,
TimeUnit.SECONDS);
TestTxnDbUtil.setConfValues(hiveConf);
TestTxnDbUtil.prepDb(hiveConf);
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index 628e51f2a38..8f151b523cb 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -38,6 +38,9 @@
import org.junit.BeforeClass;
import java.io.File;
+import java.util.concurrent.TimeUnit;
+
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
/**
* Base class for "end-to-end" tests for DbTxnManager and simulate concurrent
queries.
@@ -59,6 +62,8 @@ public DbTxnManagerEndToEndTestBase() {
MetastoreConf.setVar(conf, MetastoreConf.ConfVars.WAREHOUSE,
getWarehouseDir());
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
+ HiveConf.setTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0,
TimeUnit.SECONDS);
+
TestTxnDbUtil.setConfValues(conf);
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index 322045b0dad..d97f457c11c 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -26,6 +26,7 @@
import org.apache.hadoop.hive.common.ServerUtils;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
+import org.apache.hadoop.hive.common.ValidTxnWriteIdList;
import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.HiveMetaStoreClient;
@@ -62,6 +63,7 @@
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf.ConfVars;
import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
@@ -113,6 +115,8 @@
import javax.management.MBeanServer;
import javax.management.ObjectName;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
+
/**
* Super class for all of the compactor test modules.
*/
@@ -143,8 +147,12 @@ protected final void setup(HiveConf conf) throws Exception
{
MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf, ConfVars.COMPACTOR_CLEANER_ON, true);
MetastoreConf.setBoolVar(conf, ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID,
useMinHistoryWriteId());
+ TxnHandler.ConfVars.setUseMinHistoryWriteId(useMinHistoryWriteId());
MetastoreConf.setVar(conf, ConfVars.COMPACTOR_INITIATOR_TABLE_OPTIMIZERS,
"org.apache.hadoop.hive.ql.txn.compactor.AcidTableOptimizer");
+ if (useMinHistoryWriteId()) {
+ HiveConf.setTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 0,
TimeUnit.SECONDS);
+ }
// Set this config to true in the base class, there are extended test
classes which set this config to false.
MetastoreConf.setBoolVar(conf,
ConfVars.COMPACTOR_CLEAN_ABORTS_USING_CLEANER, true);
TestTxnDbUtil.setConfValues(conf);
@@ -173,6 +181,12 @@ protected void startCleaner() throws Exception {
runOneLoopOfCompactorThread(CompactorThreadType.CLEANER);
}
+ void startCleaner(long retentionTime, TimeUnit timeUnit) throws Exception {
+ HiveConf.setTimeVar(conf,
+ HIVE_COMPACTOR_CLEANER_RETENTION_TIME, retentionTime, timeUnit);
+ startCleaner();
+ }
+
protected void runAcidMetricService() {
TestTxnDbUtil.setConfValues(conf);
AcidMetricService t = new AcidMetricService();
@@ -388,7 +402,7 @@ protected void burnThroughTransactions(String dbName,
String tblName, int num, S
txnHandler.commitTxn(new CommitTxnRequest(tid));
} else if (open.contains(tid) && useMinHistoryWriteId()){
txnHandler.addWriteIdsToMinHistory(tid,
- Collections.singletonMap(dbName + "." + tblName, minOpenWriteId));
+ Map.of(dbName + "." + tblName, minOpenWriteId));
}
}
}
@@ -754,14 +768,13 @@ long compactInTxn(CompactionRequest rqst, CommitAction
commitAction) throws Exce
ci.runAs = rqst.getRunas() == null ? System.getProperty("user.name") :
rqst.getRunas();
long compactorTxnId = openTxn(TxnType.COMPACTION);
+ String fullTableName = ci.getFullTableName().toLowerCase();
// Need to create a valid writeIdList to set the highestWriteId in ci
ValidTxnList validTxnList = TxnCommonUtils.createValidReadTxnList(
- txnHandler.getOpenTxns(Collections.singletonList(TxnType.READ_ONLY)),
compactorTxnId);
+ txnHandler.getOpenTxns(List.of(TxnType.READ_ONLY)), compactorTxnId);
- GetValidWriteIdsRequest writeIdsRequest = new GetValidWriteIdsRequest(
- Collections.singletonList(
- ci.getFullTableName().toLowerCase()));
+ GetValidWriteIdsRequest writeIdsRequest = new
GetValidWriteIdsRequest(List.of(fullTableName));
writeIdsRequest.setValidTxnList(validTxnList.writeToString());
// with this ValidWriteIdList is capped at whatever HWM validTxnList has
@@ -769,6 +782,14 @@ long compactInTxn(CompactionRequest rqst, CommitAction
commitAction) throws Exce
txnHandler.getValidWriteIds(writeIdsRequest).getTblValidWriteIds()
.getFirst());
+ if (useMinHistoryWriteId()) {
+ ValidTxnWriteIdList txnWriteIds = new
ValidTxnWriteIdList(compactorTxnId);
+ txnWriteIds.addTableValidWriteIdList(tblValidWriteIds);
+
+ txnHandler.addWriteIdsToMinHistory(compactorTxnId,
+ Map.of(fullTableName, txnWriteIds.getMinOpenWriteId(fullTableName)));
+ }
+
ci.highestWriteId = tblValidWriteIds.getHighWatermark();
txnHandler.updateCompactorState(ci, compactorTxnId);
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
index 8b1216335ac..6399bb5adbe 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithMinHistoryWriteId.java
@@ -20,6 +20,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.metastore.api.AbortTxnsRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
@@ -32,9 +33,12 @@
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import java.util.Collections;
import java.util.List;
+import java.util.Map;
+import java.util.concurrent.TimeUnit;
-import static org.apache.hadoop.hive.metastore.txn.TxnStore.FAILED_RESPONSE;
+import static org.apache.hadoop.hive.metastore.txn.TxnStore.CLEANING_RESPONSE;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.SUCCEEDED_RESPONSE;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.WORKING_RESPONSE;
import static org.apache.hadoop.hive.metastore.txn.TxnStore.INITIATED_STATE;
@@ -59,7 +63,7 @@ protected boolean useMinHistoryWriteId() {
@Test
public void cleanupAfterAbortedAndRetriedMajorCompaction() throws Exception {
- Table t = prepareTestTable();
+ Table t = prepareTestTable("camtc");
CompactionRequest rqst = new CompactionRequest("default", "camtc",
CompactionType.MAJOR);
long compactTxn = compactInTxn(rqst, CommitAction.ABORT);
addBaseFile(t, null, 25L, 25, compactTxn);
@@ -83,10 +87,10 @@ public void cleanupAfterAbortedAndRetriedMajorCompaction()
throws Exception {
@Test
public void cleanupAfterKilledAndRetriedMajorCompaction() throws Exception {
- Table t = prepareTestTable();
+ Table t = prepareTestTable("camtc");
CompactionRequest rqst = new CompactionRequest("default", "camtc",
CompactionType.MAJOR);
- long compactTxn = compactInTxn(rqst, CommitAction.NONE);
- addBaseFile(t, null, 25L, 25, compactTxn);
+ long compactTxn1 = compactInTxn(rqst, CommitAction.NONE);
+ addBaseFile(t, null, 25L, 25, compactTxn1);
txnHandler.revokeTimedoutWorkers(1L);
// an open txn should prevent the retry
@@ -96,20 +100,32 @@ public void cleanupAfterKilledAndRetriedMajorCompaction()
throws Exception {
// force retry
revokeTimedoutWorkers(conf);
- compactTxn = compactInTxn(rqst);
- addBaseFile(t, null, 25L, 25, compactTxn);
+ long compactTxn2 = compactInTxn(rqst);
+ addBaseFile(t, null, 25L, 25, compactTxn2);
startCleaner();
- // Validate that the cleanup attempt has failed.
+ // Validate that the cleanup attempt was skipped.
rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(1, rsp.getCompactsSize());
- assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
- assertEquals("txnid:26 is open and <= hwm: 27",
rsp.getCompacts().getFirst().getErrorMessage());
+ assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState());
// Check that the files are not removed
List<Path> paths = getDirectories(conf, t, null);
assertEquals(6, paths.size());
+
+ // Abort the open compaction txn, so that the Cleaner can proceed.
+ txnHandler.abortTxns(
+ new AbortTxnsRequest(Collections.singletonList(compactTxn1)));
+ startCleaner();
+
+ rsp = txnHandler.showCompact(new ShowCompactRequest());
+ assertEquals(1, rsp.getCompactsSize());
+ assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().getFirst().getState());
+
+ // Check that the files are removed
+ paths = getDirectories(conf, t, null);
+ assertEquals(1, paths.size());
}
private static void revokeTimedoutWorkers(Configuration conf) throws
Exception {
@@ -121,39 +137,88 @@ private static void revokeTimedoutWorkers(Configuration
conf) throws Exception {
}
@Test
- public void cleanupAfterMajorCompactionWithQueryWaitingToLockTheSnapshot()
throws Exception {
- Table t = prepareTestTable();
+ public void cleanupAndDanglingOpenTxnOnSameTable() throws Exception {
+ Table t = prepareTestTable("camtc");
CompactionRequest rqst = new CompactionRequest("default", "camtc",
CompactionType.MAJOR);
long compactTxn = compactInTxn(rqst, CommitAction.MARK_COMPACTED);
addBaseFile(t, null, 25L, 25, compactTxn);
- // Open a query during compaction,
+ // Open a readerTxn during compaction,
// Do not register minOpenWriteId (i.e. simulate delay locking the
snapshot)
- openTxn();
+ long readerTxn = openTxn();
txnHandler.commitTxn(new CommitTxnRequest(compactTxn));
- startCleaner();
+ Thread.sleep(MetastoreConf.getTimeVar(
+ conf, ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
- // Validate that the cleanup attempt has failed.
+ startCleaner(10, TimeUnit.SECONDS);
+
+ // Validate that the cleanup attempt was delayed by retention time.
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
assertEquals(1, rsp.getCompactsSize());
- assertEquals(FAILED_RESPONSE, rsp.getCompacts().getFirst().getState());
- assertEquals("txnid:27 is open and <= hwm: 27",
rsp.getCompacts().getFirst().getErrorMessage());
+ assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState());
// Check that the files are not removed
List<Path> paths = getDirectories(conf, t, null);
assertEquals(5, paths.size());
+
+ // Register minOpenWriteId for the readerTxn.
+ txnHandler.addWriteIdsToMinHistory(readerTxn, Map.of("default.camtc", 1L));
+
+ startCleaner(0, TimeUnit.SECONDS);
+
+ // Validate that the cleanup attempt was blocked by readerTxn.
+ rsp = txnHandler.showCompact(new ShowCompactRequest());
+ assertEquals(1, rsp.getCompactsSize());
+ assertEquals(CLEANING_RESPONSE, rsp.getCompacts().getFirst().getState());
+
+ // Check that the files are not removed
+ paths = getDirectories(conf, t, null);
+ assertEquals(5, paths.size());
+ }
+
+ @Test
+ public void cleanupNotBlockedByOpenTxnOnAnotherTable() throws Exception {
+ Table t1 = prepareTestTable("camtc1");
+ Table t2 = prepareTestTable("camtc2");
+
+ // Open a readerTxn on t1 and register minOpenWriteId.
+ long readerTxn = openTxn();
+ txnHandler.addWriteIdsToMinHistory(readerTxn, Map.of("default.camtc1",
1L));
+
+ CompactionRequest rqstTbl1 = new CompactionRequest("default", "camtc1",
CompactionType.MAJOR);
+ long compactTxn = compactInTxn(rqstTbl1);
+ addBaseFile(t1, null, 25L, 25, compactTxn);
+
+ CompactionRequest rqstTbl2 = new CompactionRequest("default", "camtc2",
CompactionType.MAJOR);
+ compactTxn = compactInTxn(rqstTbl2);
+ addBaseFile(t2, null, 25L, 25, compactTxn);
+
+ startCleaner();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ assertEquals(2, rsp.getCompactsSize());
+
+ assertEquals(SUCCEEDED_RESPONSE, rsp.getCompacts().get(0).getState());
+ assertEquals("camtc2", rsp.getCompacts().get(0).getTablename());
+ // camtc2 was cleaned: only the new base remains.
+ assertEquals(1, getDirectories(conf, t2, null).size());
+
+ assertEquals(CLEANING_RESPONSE, rsp.getCompacts().get(1).getState());
+ assertEquals("camtc1", rsp.getCompacts().get(1).getTablename());
+ // camtc1 wasn't actually cleaned (admission filter held it back).
+ assertEquals(5, getDirectories(conf, t1, null).size());
}
- private Table prepareTestTable() throws Exception {
- Table t = newTable("default", "camtc", false);
+ private Table prepareTestTable(String tblName) throws Exception {
+ Table t = newTable("default", tblName, false);
addBaseFile(t, null, 20L, 20);
addDeltaFile(t, null, 21L, 22L, 2);
addDeltaFile(t, null, 23L, 24L, 2);
addDeltaFile(t, null, 25L, 25, 2);
- burnThroughTransactions("default", "camtc", 25);
+ burnThroughTransactions("default", tblName, 25);
return t;
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
index aa613aa8192..5c5ca7c7bfa 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnUtils.java
@@ -79,33 +79,28 @@ public static ValidTxnList
createValidTxnListForCleaner(GetOpenTxnsResponse txns
long highWatermark = minOpenTxn - 1;
long[] exceptions = new long[txns.getOpen_txnsSize()];
BitSet abortedBits = BitSet.valueOf(txns.getAbortedBits());
- int i = 0;
+ int i = 0, j = 0;
for (long txnId : txns.getOpen_txns()) {
if (txnId > highWatermark) {
break;
}
- if (abortedBits.get(i)) {
- exceptions[i] = txnId;
+ if (abortedBits.get(i) || isAbortCleanup) {
+ exceptions[j++] = txnId;
+ } else if (!TxnHandler.ConfVars.useMinHistoryWriteId()) {
+ throw new IllegalStateException(
+ JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " +
highWatermark);
} else {
- if (isAbortCleanup) {
- exceptions[i] = txnId;
- } else {
- throw new IllegalStateException(
- JavaUtils.txnIdToString(txnId) + " is open and <= hwm: " +
highWatermark);
- }
+ LOG.debug("Ignoring open txn {} <= hwm: {}", txnId, highWatermark);
}
++i;
}
- exceptions = Arrays.copyOf(exceptions, i);
+ exceptions = Arrays.copyOf(exceptions, j);
+
+ BitSet bitSet = isAbortCleanup ? abortedBits : new BitSet(j);
if (!isAbortCleanup) {
- BitSet bitSet = new BitSet(exceptions.length);
- bitSet.set(0, exceptions.length);
- //add ValidCleanerTxnList? - could be problematic for all the places
that read it from
- // string as they'd have to know which object to instantiate
- return new ValidReadTxnList(exceptions, bitSet, highWatermark,
Long.MAX_VALUE);
- } else {
- return new ValidReadTxnList(exceptions, abortedBits, highWatermark,
Long.MAX_VALUE);
+ bitSet.set(0, j);
}
+ return new ValidReadTxnList(exceptions, bitSet, highWatermark,
Long.MAX_VALUE);
}
/**