This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2e1af9b37b9 HIVE-29291: Hive ACID: Use minHistoryWriteId by default
(#6154)
2e1af9b37b9 is described below
commit 2e1af9b37b998d721c31132d231fa13fc4375353
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Thu Nov 6 10:54:15 2025 +0100
HIVE-29291: Hive ACID: Use minHistoryWriteId by default (#6154)
---
.../org/apache/hadoop/hive/ql/TestTxnCommands.java | 6 +-
.../apache/hadoop/hive/ql/TestTxnCommands2.java | 4 +
.../apache/hadoop/hive/ql/TestTxnCommands3.java | 4 +-
.../ql/lockmgr/DbTxnManagerEndToEndTestBase.java | 5 +-
.../hadoop/hive/ql/lockmgr/TestDbTxnManager2.java | 17 ++-
.../ql/txn/compactor/TestCompactionMetrics.java | 1 +
.../hadoop/hive/metastore/conf/MetastoreConf.java | 3 +-
.../hive/metastore/txn/CompactionTxnHandler.java | 6 +-
.../hadoop/hive/metastore/txn/TxnHandler.java | 2 +-
.../apache/hadoop/hive/metastore/txn/TxnStore.java | 2 +-
.../functions/CleanTxnToWriteIdTableFunction.java | 39 +-----
.../txn/jdbc/functions/CommitTxnFunction.java | 69 +++++++----
.../txn/jdbc/queries/AbortTxnInfoHandler.java | 131 ---------------------
.../jdbc/queries/LatestTxnIdInConflictHandler.java | 6 +-
.../jdbc/queries/MinUncommittedTxnIdHandler.java | 5 +-
15 files changed, 90 insertions(+), 210 deletions(-)
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
index 464c4fcb65f..3d6b8e0c840 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands.java
@@ -1856,7 +1856,9 @@ public void testDropTableWithSuffix() throws Exception {
}
MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
houseKeeperService.setConf(hiveConf);
-
+
+ Thread.sleep(MetastoreConf.getTimeVar(hiveConf,
+ MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
houseKeeperService.run();
count = TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + tableName +
"'");
@@ -1945,6 +1947,8 @@ public void testDropMaterializedViewWithSuffix() throws
Exception {
MetastoreTaskThread houseKeeperService = new AcidHouseKeeperService();
houseKeeperService.setConf(hiveConf);
+ Thread.sleep(MetastoreConf.getTimeVar(hiveConf,
+ MetastoreConf.ConfVars.TXN_OPENTXN_TIMEOUT, TimeUnit.MILLISECONDS));
houseKeeperService.run();
count = TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE = '" + mviewName +
"'");
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
index 77d18fdffcd..c992c5970e1 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands2.java
@@ -2373,6 +2373,7 @@ public void testCleanerForTxnToWriteId() throws Exception
{
txnHandler.compact(new CompactionRequest("default",
Table.ACIDTBL.name().toLowerCase(), CompactionType.MAJOR));
runWorker(hiveConf);
runCleaner(hiveConf);
+ txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();
// After compaction/cleanup, all entries from TXN_TO_WRITE_ID should be
cleaned up as all txns are committed.
@@ -2416,6 +2417,7 @@ public void testCleanerForTxnToWriteId() throws Exception
{
// aborted txn would be removed from TXNS only after the compaction. Also,
committed txn > open txn is retained.
// As open txn doesn't allocate writeid, the 2 entries for aborted and
committed should be retained.
txnHandler.cleanEmptyAbortedAndCommittedTxns();
+ txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from
TXN_TO_WRITE_ID" + acidTblWhereClause),
3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from
TXN_TO_WRITE_ID" + acidTblWhereClause));
@@ -2428,6 +2430,7 @@ public void testCleanerForTxnToWriteId() throws Exception
{
runWorker(hiveConf);
runCleaner(hiveConf);
txnHandler.cleanEmptyAbortedAndCommittedTxns();
+ txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from
TXN_TO_WRITE_ID"),
3, TestTxnDbUtil.countQueryAgent(hiveConf, "select count(*) from
TXN_TO_WRITE_ID"));
@@ -2439,6 +2442,7 @@ public void testCleanerForTxnToWriteId() throws Exception
{
// The txn opened after the compaction commit should not effect the Cleaner
runCleaner(hiveConf);
txnHandler.cleanEmptyAbortedAndCommittedTxns();
+ txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();
Assert.assertEquals(TestTxnDbUtil.queryToString(hiveConf, "select * from
TXN_TO_WRITE_ID"),
diff --git a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
index a2446d63ad1..6301b6d0290 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/TestTxnCommands3.java
@@ -110,7 +110,7 @@ public void testRenameTable() throws Exception {
"select count(*) from COMPLETED_TXN_COMPONENTS where
CTC_TABLE='s'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPACTION_QUEUE where CQ_TABLE='s'"));
- Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
+ Assert.assertEquals(2, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from WRITE_SET where WS_TABLE='s'"));
Assert.assertEquals(3, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='s'"));
@@ -125,7 +125,7 @@ public void testRenameTable() throws Exception {
"select count(*) from COMPLETED_TXN_COMPONENTS where
CTC_TABLE='bar'"));
Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from COMPACTION_QUEUE where CQ_TABLE='bar'"));
- Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(hiveConf,
+ Assert.assertEquals(2, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from WRITE_SET where WS_TABLE='bar'"));
Assert.assertEquals(4, TestTxnDbUtil.countQueryAgent(hiveConf,
"select count(*) from TXN_TO_WRITE_ID where T2W_TABLE='bar'"));
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
index 5787951a3bf..628e51f2a38 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/DbTxnManagerEndToEndTestBase.java
@@ -23,6 +23,7 @@
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.conf.HiveConfForTest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.utils.TestTxnDbUtil;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
@@ -78,7 +79,9 @@ public void setUp() throws Exception {
HiveConf.setIntVar(conf, HiveConf.ConfVars.HIVE_LOCKS_PARTITION_THRESHOLD,
-1);
HiveConf.setBoolVar(conf,
HiveConf.ConfVars.HIVE_ACID_LOCKLESS_READS_ENABLED, false);
HiveConf.setBoolVar(conf, HiveConf.ConfVars.TXN_WRITE_X_LOCK, false);
- MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, true);
+ TxnHandler.ConfVars.setUseMinHistoryWriteId(true);
driver = new Driver(new
QueryState.Builder().withHiveConf(conf).nonIsolated().build());
driver2 = new Driver(new QueryState.Builder().withHiveConf(conf).build());
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
index 89b1b2a13ec..4ab68b1efb2 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/lockmgr/TestDbTxnManager2.java
@@ -37,6 +37,7 @@
import org.apache.hadoop.hive.metastore.api.TxnType;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.conf.MetastoreConf;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import
org.apache.hadoop.hive.metastore.txn.service.CompactionHouseKeeperService;
import org.apache.hadoop.hive.metastore.txn.service.AcidHouseKeeperService;
import org.apache.hadoop.hive.ql.Context;
@@ -1235,6 +1236,9 @@ public void testWriteSetTracking4() throws Exception {
*/
@Test
public void testWriteSetTracking5() throws Exception {
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
+ TxnHandler.ConfVars.setUseMinHistoryWriteId(false);
+
dropTable(new String[] {"TAB_PART"});
Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "select
count(*) from \"WRITE_SET\""));
driver.run("create table if not exists TAB_PART (a int, b int) " +
@@ -2109,6 +2113,9 @@ public void testMergeUnpartitionedConflictSharedWrite()
throws Exception {
* @param causeConflict true to make 2 operations such that they update the
same entity
*/
private void testMergeUnpartitioned(boolean causeConflict, boolean
sharedWrite) throws Exception {
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
+ TxnHandler.ConfVars.setUseMinHistoryWriteId(false);
+
dropTable(new String[] {"target","source"});
conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);
@@ -2873,6 +2880,9 @@ public void testMergePartitionedConflictSharedWrite()
throws Exception {
* @param causeConflict - true to make the operations cause a Write conflict
*/
private void testMergePartitioned(boolean causeConflict, boolean
sharedWrite) throws Exception {
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
+ TxnHandler.ConfVars.setUseMinHistoryWriteId(false);
+
dropTable(new String[] {"target","source"});
conf.setBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK, !sharedWrite);
@@ -3537,7 +3547,8 @@ public void testSkipAcquireLocksForExplain() throws
Exception {
@Test
public void testInsertSnapshotIsolationMinHistoryDisabled() throws Exception
{
- MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false);
+ TxnHandler.ConfVars.setUseMinHistoryWriteId(false);
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
testInsertSnapshotIsolation();
}
@@ -3555,6 +3566,7 @@ public void testInsertSnapshotIsolation() throws
Exception {
swapTxnManager(txnMgr);
driver.run();
+ txnHandler.performWriteSetGC();
txnHandler.cleanTxnToWriteIdTable();
swapTxnManager(txnMgr2);
@@ -3566,7 +3578,8 @@ public void testInsertSnapshotIsolation() throws
Exception {
@Test
public void testUpdateSnapshotIsolationMinHistoryDisabled() throws Exception
{
- MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, false);
+ TxnHandler.ConfVars.setUseMinHistoryWriteId(false);
+ MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_WRITE_ID, false);
testUpdateSnapshotIsolation();
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
index b33f8917c74..42b0d6cc6df 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCompactionMetrics.java
@@ -86,6 +86,7 @@ public class TestCompactionMetrics extends CompactorTest {
public void setUp() throws Exception {
MetastoreConf.setBoolVar(conf, MetastoreConf.ConfVars.METRICS_ENABLED,
true);
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.TXN_USE_MIN_HISTORY_LEVEL, true);
+ TxnHandler.ConfVars.setUseMinHistoryLevel(true);
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_INITIATOR_ON, true);
MetastoreConf.setBoolVar(conf,
MetastoreConf.ConfVars.COMPACTOR_CLEANER_ON, true);
// re-initialize metrics
diff --git
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
index e3e29aa92bd..71e99b0d32e 100644
---
a/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
+++
b/standalone-metastore/metastore-common/src/main/java/org/apache/hadoop/hive/metastore/conf/MetastoreConf.java
@@ -1682,10 +1682,11 @@ public enum ConfVars {
"time after which transactions are declared aborted if the client has
not sent a heartbeat."),
TXN_OPENTXN_TIMEOUT("metastore.txn.opentxn.timeout",
"hive.txn.opentxn.timeout", 1000, TimeUnit.MILLISECONDS,
"Time before an open transaction operation should persist, otherwise
it is considered invalid and rolled back"),
+ @Deprecated
TXN_USE_MIN_HISTORY_LEVEL("metastore.txn.use.minhistorylevel",
"hive.txn.use.minhistorylevel", true,
"Set this to false, for the TxnHandler and Cleaner to not use
MIN_HISTORY_LEVEL table and take advantage of openTxn optimisation.\n"
+ "If the table is dropped HMS will switch this flag to false, any
other value changes need a restart to take effect."),
- TXN_USE_MIN_HISTORY_WRITE_ID("metastore.txn.use.minhistorywriteid",
"hive.txn.use.minhistorywriteid", false,
+ TXN_USE_MIN_HISTORY_WRITE_ID("metastore.txn.use.minhistorywriteid",
"hive.txn.use.minhistorywriteid", true,
"Set this to true, to avoid global minOpenTxn check in Cleaner.\n"
+ "If the table is dropped HMS will switch this flag to false."),
LOCK_NUMRETRIES("metastore.lock.numretries", "hive.lock.numretries", 100,
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
index 393cb9443e5..4d7e9349003 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/CompactionTxnHandler.java
@@ -212,7 +212,7 @@ public void markCleaned(CompactionInfo info) throws
MetaException {
/**
* Clean up entries from TXN_TO_WRITE_ID table less than
min_uncommited_txnid as found by
- * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted
TXNS.txn_id)).
+ * min(max(TXNS.txn_id), min(WRITE_SET.WS_TXNID), min(Aborted TXNS.txn_id)).
*/
@Override
@RetrySemantics.SafeToRetry
@@ -374,7 +374,7 @@ private void updateStatus(CompactionInfo ci) throws
MetaException {
String strState = CompactionState.fromSqlConst(ci.state).toString();
LOG.debug("Marking as {}: CompactionInfo: {}", strState, ci);
- CompactionInfo ciActual = jdbcResource.execute(new
GetCompactionInfoHandler(ci.id, false));
+ CompactionInfo ciActual = jdbcResource.execute(new
GetCompactionInfoHandler(ci.id, false));
long endTime = getDbTime().getTime();
if (ciActual != null) {
@@ -505,7 +505,7 @@ public long findMinOpenTxnIdForCleaner() throws
MetaException {
@RetrySemantics.Idempotent
@Deprecated
public long findMinTxnIdSeenOpen() {
- if (!ConfVars.useMinHistoryLevel() || ConfVars.useMinHistoryWriteId()) {
+ if (!ConfVars.useMinHistoryLevel()) {
return Long.MAX_VALUE;
}
try {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
index e1e060c1256..be7917f4ce7 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnHandler.java
@@ -205,7 +205,7 @@ private ConfVars() {}
private boolean useMinHistoryWriteId;
public boolean useMinHistoryLevel() {
- return useMinHistoryLevel;
+ return useMinHistoryLevel && !useMinHistoryWriteId;
}
public void setUseMinHistoryLevel(boolean useMinHistoryLevel) {
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
index b77dd08601e..8092768b91d 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/TxnStore.java
@@ -716,7 +716,7 @@ Set<CompactionInfo> findPotentialCompactions(int
abortedThreshold, long abortedT
/**
* Clean up entries from TXN_TO_WRITE_ID table less than
min_uncommited_txnid as found by
- * min(max(TXNS.txn_id), min(WRITE_SET.WS_COMMIT_ID), min(Aborted
TXNS.txn_id)).
+ * min(max(TXNS.txn_id), min(WRITE_SET.WS_TXNID), min(Aborted TXNS.txn_id)).
*/
@SqlRetry
@Transactional(POOL_COMPACTOR)
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CleanTxnToWriteIdTableFunction.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CleanTxnToWriteIdTableFunction.java
index 6457cd27f04..946ecb39347 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CleanTxnToWriteIdTableFunction.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CleanTxnToWriteIdTableFunction.java
@@ -18,36 +18,18 @@
package org.apache.hadoop.hive.metastore.txn.jdbc.functions;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
-import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
import org.apache.hadoop.hive.metastore.txn.jdbc.MultiDataSourceJdbcResource;
import org.apache.hadoop.hive.metastore.txn.jdbc.TransactionalFunction;
+import
org.apache.hadoop.hive.metastore.txn.jdbc.queries.MinUncommittedTxnIdHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
import org.springframework.jdbc.core.namedparam.NamedParameterJdbcTemplate;
-import java.sql.Types;
-
public class CleanTxnToWriteIdTableFunction implements
TransactionalFunction<Void> {
private static final Logger LOG =
LoggerFactory.getLogger(CleanTxnToWriteIdTableFunction.class);
- //language=SQL
- private static String minHistoryLevelSql = "SELECT MIN(\"RES\".\"ID\") AS
\"ID\" FROM (" +
- " SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" +
- " UNION" +
- " SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" =
:abortedState) \"RES\"";
- //language=SQL
- private static String noMinHistoryLevelSql = "SELECT MIN(\"RES\".\"ID\") AS
\"ID\" FROM (" +
- " SELECT MAX(\"TXN_ID\") + 1 AS \"ID\" FROM \"TXNS\"" +
- " UNION" +
- " SELECT MIN(\"WS_TXNID\") AS \"ID\" FROM \"WRITE_SET\"" +
- " UNION" +
- " SELECT MIN(\"TXN_ID\") AS \"ID\" FROM \"TXNS\" WHERE \"TXN_STATE\" = "
+ TxnStatus.ABORTED +
- " OR \"TXN_STATE\" = " + TxnStatus.OPEN +
- " ) \"RES\"";
-
private final long minTxnIdSeenOpen;
public CleanTxnToWriteIdTableFunction(long minTxnIdSeenOpen) {
@@ -56,32 +38,17 @@ public CleanTxnToWriteIdTableFunction(long
minTxnIdSeenOpen) {
@Override
public Void execute(MultiDataSourceJdbcResource jdbcResource) throws
MetaException {
- NamedParameterJdbcTemplate jdbcTemplate = jdbcResource.getJdbcTemplate();
- String sql = TxnHandler.ConfVars.useMinHistoryLevel() ? minHistoryLevelSql
: noMinHistoryLevelSql;
- MapSqlParameterSource params = new MapSqlParameterSource()
- .addValue("abortedState", TxnStatus.ABORTED.getSqlConst(), Types.CHAR);
- if (!TxnHandler.ConfVars.useMinHistoryLevel()) {
- params.addValue("openState", TxnStatus.OPEN.getSqlConst(), Types.CHAR);
- }
-
// First need to find the min_uncommitted_txnid which is currently seen by
any open transactions.
// If there are no txns which are currently open or aborted in the system,
then current value of
// max(TXNS.txn_id) could be min_uncommitted_txnid.
- Long minTxnId = jdbcTemplate.query(sql, params, rs -> {
- if (rs.next()) {
- return rs.getLong(1);
- } else {
- return null;
- }
- });
-
+ Long minTxnId = jdbcResource.execute(new MinUncommittedTxnIdHandler());
if (minTxnId == null) {
throw new MetaException("Transaction tables not properly initialized, no
record found in TXNS");
}
long minUncommitedTxnid = Math.min(minTxnId, minTxnIdSeenOpen);
-
// As all txns below min_uncommitted_txnid are either committed or
empty_aborted, we are allowed
// to clean up the entries less than min_uncommitted_txnid from the
TXN_TO_WRITE_ID table.
+ NamedParameterJdbcTemplate jdbcTemplate = jdbcResource.getJdbcTemplate();
int rc = jdbcTemplate.update("DELETE FROM \"TXN_TO_WRITE_ID\" WHERE
\"T2W_TXNID\" < :txnId",
new MapSqlParameterSource("txnId", minUncommitedTxnid));
LOG.info("Removed {} rows from TXN_TO_WRITE_ID with Txn Low-Water-Mark:
{}", rc, minUncommitedTxnid);
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
index bcd5226e014..661f7b37e6e 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/functions/CommitTxnFunction.java
@@ -36,11 +36,11 @@
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.entities.OperationType;
import org.apache.hadoop.hive.metastore.txn.TxnErrorMsg;
-import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.txn.entities.TxnWriteDetails;
+import org.apache.hadoop.hive.metastore.txn.jdbc.InClauseBatchCommand;
import
org.apache.hadoop.hive.metastore.txn.jdbc.commands.DeleteReplTxnMapEntryCommand;
import
org.apache.hadoop.hive.metastore.txn.jdbc.commands.InsertCompletedTxnComponentsCommand;
import
org.apache.hadoop.hive.metastore.txn.jdbc.commands.RemoveTxnsFromMinHistoryLevelCommand;
@@ -71,9 +71,11 @@
import java.util.Collections;
import java.util.List;
import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static org.apache.hadoop.hive.metastore.txn.TxnHandler.ConfVars;
import static
org.apache.hadoop.hive.metastore.txn.TxnHandler.notifyCommitOrAbortEvent;
import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
import static
org.apache.hadoop.hive.metastore.utils.StringUtils.normalizeIdentifier;
@@ -126,7 +128,7 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
throw new RollbackException(null);
}
assert targetTxnIds.size() == 1;
- txnid = targetTxnIds.get(0);
+ txnid = targetTxnIds.getFirst();
}
/**
@@ -154,21 +156,32 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
TxnUtils.raiseTxnUnexpectedState(actualTxnStatus, txnid);
}
- String conflictSQLSuffix = "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"=" +
txnid + " AND \"TC_OPERATION_TYPE\" IN (" +
- OperationType.UPDATE + "," + OperationType.DELETE + ")";
+ String conflictSQLSuffix = String.format("""
+ FROM "TXN_COMPONENTS" WHERE "TC_TXNID" = :txnId AND
"TC_OPERATION_TYPE" IN (%s, %s)
+ """, OperationType.UPDATE, OperationType.DELETE);
long tempCommitId = TxnUtils.generateTemporaryId();
if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
- new AcquireTxnLockFunction(false).execute(jdbcResource);
+ if (!ConfVars.useMinHistoryWriteId()) {
+ new AcquireTxnLockFunction(false).execute(jdbcResource);
+ }
commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
} else if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn) {
- String writeSetInsertSql = "INSERT INTO \"WRITE_SET\" (\"WS_DATABASE\",
\"WS_TABLE\", \"WS_PARTITION\"," +
- " \"WS_TXNID\", \"WS_COMMIT_ID\", \"WS_OPERATION_TYPE\")" +
- " SELECT DISTINCT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\",
\"TC_TXNID\", " + tempCommitId + ", \"TC_OPERATION_TYPE\" ";
+ String writeSetInsertSql = """
+ INSERT INTO "WRITE_SET"
+ ("WS_DATABASE", "WS_TABLE", "WS_PARTITION", "WS_TXNID",
"WS_COMMIT_ID", "WS_OPERATION_TYPE")
+ SELECT DISTINCT
+ "TC_DATABASE", "TC_TABLE", "TC_PARTITION", "TC_TXNID",
+ :commitId,
+ "TC_OPERATION_TYPE"
+ """;
boolean isUpdateOrDelete =
Boolean.TRUE.equals(jdbcResource.getJdbcTemplate().query(
- jdbcResource.getSqlGenerator().addLimitClause(1,
"\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
+ jdbcResource.getSqlGenerator()
+ .addLimitClause(1, "\"TC_OPERATION_TYPE\" " + conflictSQLSuffix),
+ new MapSqlParameterSource()
+ .addValue("txnId", txnid),
ResultSet::next));
if (isUpdateOrDelete) {
@@ -188,11 +201,13 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
*/
Object undoWriteSetForCurrentTxn = context.createSavepoint();
jdbcResource.getJdbcTemplate().update(
- writeSetInsertSql + (TxnHandler.ConfVars.useMinHistoryLevel() ?
conflictSQLSuffix :
- "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId AND
\"TC_OPERATION_TYPE\" <> :type"),
+ writeSetInsertSql + (ConfVars.useMinHistoryLevel() ?
conflictSQLSuffix :
+ "FROM \"TXN_COMPONENTS\" WHERE \"TC_TXNID\"= :txnId" + (
+ (txnType != TxnType.REBALANCE_COMPACTION) ? "" : " AND
\"TC_OPERATION_TYPE\" <> :type")),
new MapSqlParameterSource()
.addValue("txnId", txnid)
- .addValue("type", OperationType.COMPACT.getSqlConst()));
+ .addValue("type", OperationType.COMPACT.getSqlConst())
+ .addValue("commitId", tempCommitId));
/**
* This S4U will mutex with other commitTxn() and openTxns().
@@ -235,12 +250,11 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
throw new TxnAbortedException(msg);
}
}
- } else if (!TxnHandler.ConfVars.useMinHistoryLevel()) {
- jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM
\"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId AND \"TC_OPERATION_TYPE\" <>
:type",
+ } else if (!ConfVars.useMinHistoryLevel()) {
+ jdbcResource.getJdbcTemplate().update(writeSetInsertSql + "FROM
\"TXN_COMPONENTS\" WHERE \"TC_TXNID\" = :txnId",
new MapSqlParameterSource()
.addValue("txnId", txnid)
- .addValue("type", OperationType.COMPACT.getSqlConst()));
- commitId = jdbcResource.execute(new GetHighWaterMarkHandler());
+ .addValue("commitId", jdbcResource.execute(new
GetHighWaterMarkHandler())));
}
} else {
/*
@@ -256,7 +270,6 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
assert true;
}
-
if (txnType != TxnType.READ_ONLY && !isReplayedReplTxn &&
!MetaStoreServerUtils.isCompactionTxn(txnType)) {
moveTxnComponentsToCompleted(jdbcResource, txnid, isUpdateDelete);
} else if (isReplayedReplTxn) {
@@ -266,8 +279,7 @@ public TxnType execute(MultiDataSourceJdbcResource
jdbcResource) throws MetaExce
jdbcResource.execute(new DeleteReplTxnMapEntryCommand(sourceTxnId,
rqst.getReplPolicy()));
}
updateWSCommitIdAndCleanUpMetadata(jdbcResource, txnid, txnType, commitId,
tempCommitId);
- jdbcResource.execute(new
RemoveTxnsFromMinHistoryLevelCommand(ImmutableList.of(txnid)));
- jdbcResource.execute(new
RemoveWriteIdsFromMinHistoryCommand(ImmutableList.of(txnid)));
+
if (rqst.isSetKeyValue()) {
updateKeyValueAssociatedWithTxn(jdbcResource, rqst);
}
@@ -554,11 +566,8 @@ private void
updateKeyValueAssociatedWithTxn(MultiDataSourceJdbcResource jdbcRes
}
}
- /**
- * See overridden method in CompactionTxnHandler also.
- */
- private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource
jdbcResource, long txnid, TxnType txnType,
- Long commitId, long
tempId) throws MetaException {
+ private void updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource
jdbcResource,
+ long txnid, TxnType txnType, Long commitId, long tempId) throws
MetaException {
List<String> queryBatch = new ArrayList<>(6);
// update write_set with real commitId
if (commitId != null) {
@@ -575,13 +584,21 @@ private void
updateWSCommitIdAndCleanUpMetadata(MultiDataSourceJdbcResource jdbc
if (txnType == TxnType.MATER_VIEW_REBUILD) {
queryBatch.add("DELETE FROM \"MATERIALIZATION_REBUILD_LOCKS\" WHERE
\"MRL_TXN_ID\" = " + txnid);
}
- if (txnType == TxnType.SOFT_DELETE || txnType == TxnType.COMPACTION) {
+ if (txnType == TxnType.SOFT_DELETE ||
MetaStoreServerUtils.isCompactionTxn(txnType)) {
queryBatch.add("UPDATE \"COMPACTION_QUEUE\" SET \"CQ_NEXT_TXN_ID\" = " +
commitId + ", \"CQ_COMMIT_TIME\" = " +
getEpochFn(jdbcResource.getDatabaseProduct()) + " WHERE
\"CQ_TXN_ID\" = " + txnid);
}
-
+
// execute all in one batch
jdbcResource.getJdbcTemplate().getJdbcTemplate().batchUpdate(queryBatch.toArray(new
String[0]));
+
+ List<Function<List<Long>, InClauseBatchCommand<Long>>> commands = List.of(
+ RemoveTxnsFromMinHistoryLevelCommand::new,
+ RemoveWriteIdsFromMinHistoryCommand::new
+ );
+ for (var cmd : commands) {
+ jdbcResource.execute(cmd.apply(ImmutableList.of(txnid)));
+ }
}
/**
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/AbortTxnInfoHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/AbortTxnInfoHandler.java
deleted file mode 100644
index cf7f50956d4..00000000000
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/AbortTxnInfoHandler.java
+++ /dev/null
@@ -1,131 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hadoop.hive.metastore.txn.jdbc.queries;
-
-import org.apache.hadoop.hive.metastore.DatabaseProduct;
-import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
-import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
-import org.apache.hadoop.hive.metastore.txn.TxnStore;
-import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler;
-import org.springframework.dao.DataAccessException;
-import org.springframework.jdbc.core.namedparam.MapSqlParameterSource;
-import org.springframework.jdbc.core.namedparam.SqlParameterSource;
-
-import java.sql.ResultSet;
-import java.sql.SQLException;
-import java.sql.Types;
-import java.util.ArrayList;
-import java.util.List;
-
-import static org.apache.hadoop.hive.metastore.txn.TxnStore.READY_FOR_CLEANING;
-import static org.apache.hadoop.hive.metastore.txn.TxnUtils.getEpochFn;
-
-public class AbortTxnInfoHandler implements QueryHandler<List<CompactionInfo>>
{
-
- // Three inner sub-queries which are under left-join to fetch the required
data for aborted txns.
- //language=SQL
- private static final String SELECT_ABORTS_WITH_MIN_OPEN_WRITETXN_QUERY =
- " \"res1\".\"TC_DATABASE\" AS \"DB\", \"res1\".\"TC_TABLE\" AS \"TBL\",
\"res1\".\"TC_PARTITION\" AS \"PART\", " +
- " \"res1\".\"MIN_TXN_START_TIME\" AS \"MIN_TXN_START_TIME\",
\"res1\".\"ABORTED_TXN_COUNT\" AS \"ABORTED_TXN_COUNT\", " +
- " \"res2\".\"MIN_OPEN_WRITE_TXNID\" AS \"MIN_OPEN_WRITE_TXNID\",
\"res3\".\"RETRY_RETENTION\" AS \"RETRY_RETENTION\", " +
- " \"res3\".\"ID\" AS \"RETRY_CQ_ID\" " +
- " FROM " +
- // First sub-query - Gets the aborted txns with min txn start time,
number of aborted txns
- // for corresponding db, table, partition.
- " ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\",
MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " +
- " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\",
\"TXN_COMPONENTS\" " +
- " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :abortedState"
+
- " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s )
\"res1\" " +
- " LEFT JOIN" +
- // Second sub-query - Gets the min open txn id for corresponding db,
table, partition.
- "( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\",
MIN(\"TC_TXNID\") AS \"MIN_OPEN_WRITE_TXNID\" " +
- " FROM \"TXNS\", \"TXN_COMPONENTS\" " +
- " WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :openState" +
- " GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" )
\"res2\"" +
- " ON \"res1\".\"TC_DATABASE\" = \"res2\".\"TC_DATABASE\"" +
- " AND \"res1\".\"TC_TABLE\" = \"res2\".\"TC_TABLE\"" +
- " AND (\"res1\".\"TC_PARTITION\" = \"res2\".\"TC_PARTITION\" " +
- " OR (\"res1\".\"TC_PARTITION\" IS NULL AND
\"res2\".\"TC_PARTITION\" IS NULL)) " +
- " LEFT JOIN " +
- // Third sub-query - Gets the retry entries for corresponding db,
table, partition.
- "( SELECT \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\",
MAX(\"CQ_ID\") AS \"ID\", " +
- " MAX(\"CQ_RETRY_RETENTION\") AS \"RETRY_RETENTION\", " +
- " MIN(\"CQ_COMMIT_TIME\") - %s + MAX(\"CQ_RETRY_RETENTION\") AS
\"RETRY_RECORD_CHECK\" FROM \"COMPACTION_QUEUE\" " +
- " WHERE \"CQ_TYPE\" = :type" +
- " GROUP BY \"CQ_DATABASE\", \"CQ_TABLE\", \"CQ_PARTITION\") \"res3\"
" +
- " ON \"res1\".\"TC_DATABASE\" = \"res3\".\"CQ_DATABASE\" " +
- " AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " +
- " AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " +
- " OR (\"res1\".\"TC_PARTITION\" IS NULL AND
\"res3\".\"CQ_PARTITION\" IS NULL))" +
- " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR
\"res3\".\"RETRY_RECORD_CHECK\" IS NULL";
-
- private final long abortedTimeThreshold;
- private final int abortedThreshold;
- private final int fetchSize;
-
- public String getParameterizedQueryString(DatabaseProduct dbProduct) throws
MetaException {
- return dbProduct.addLimitClause(
- fetchSize,
-
String.format(AbortTxnInfoHandler.SELECT_ABORTS_WITH_MIN_OPEN_WRITETXN_QUERY,
- abortedTimeThreshold >= 0 ? "" : " HAVING COUNT(*) > " +
abortedThreshold, getEpochFn(dbProduct)));
- }
-
- @Override
- public SqlParameterSource getQueryParameters() {
- return new MapSqlParameterSource()
- .addValue("abortedState", TxnStatus.ABORTED.getSqlConst(), Types.CHAR)
- .addValue("openState", TxnStatus.OPEN.getSqlConst(), Types.CHAR)
- .addValue("type", Character.toString(TxnStore.ABORT_TXN_CLEANUP_TYPE),
Types.CHAR);
- }
-
- @Override
- public List<CompactionInfo> extractData(ResultSet rs) throws
DataAccessException, SQLException {
- List<CompactionInfo> readyToCleanAborts = new ArrayList<>();
- long systemTime = System.currentTimeMillis();
- boolean checkAbortedTimeThreshold = abortedTimeThreshold >= 0;
- while (rs.next()) {
- boolean pastTimeThreshold =
- checkAbortedTimeThreshold && rs.getLong("MIN_TXN_START_TIME") +
abortedTimeThreshold < systemTime;
- int numAbortedTxns = rs.getInt("ABORTED_TXN_COUNT");
- if (numAbortedTxns > abortedThreshold || pastTimeThreshold) {
- CompactionInfo info = new CompactionInfo();
- info.dbname = rs.getString("DB");
- info.tableName = rs.getString("TBL");
- info.partName = rs.getString("PART");
- // In this case, this field contains min open write txn ID.
- long value = rs.getLong("MIN_OPEN_WRITE_TXNID");
- info.minOpenWriteTxnId = value > 0 ? value : Long.MAX_VALUE;
- // The specific type, state assigned to abort cleanup.
- info.type = CompactionType.ABORT_TXN_CLEANUP;
- info.state = READY_FOR_CLEANING;
- info.retryRetention = rs.getLong("RETRY_RETENTION");
- info.id = rs.getLong("RETRY_CQ_ID");
- readyToCleanAborts.add(info);
- }
- }
- return readyToCleanAborts;
- }
-
- public AbortTxnInfoHandler(long abortedTimeThreshold, int abortedThreshold,
int fetchSize) {
- this.abortedTimeThreshold = abortedTimeThreshold;
- this.abortedThreshold = abortedThreshold;
- this.fetchSize = fetchSize;
- }
-}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java
index f63748ca966..ccbe0c512da 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/LatestTxnIdInConflictHandler.java
@@ -58,15 +58,15 @@ public String getParameterizedQueryString(DatabaseProduct
databaseProduct) throw
" ) \"CUR\"" +
" ON \"COMMITTED\".\"WS_DATABASE\" = \"CUR\".\"TC_DATABASE\"" +
" AND \"COMMITTED\".\"WS_TABLE\" = \"CUR\".\"TC_TABLE\"" +
- (TxnHandler.ConfVars.useMinHistoryLevel() ? "" :
- " AND \"COMMITTED\".\"WS_OPERATION_TYPE\" != :wsType") +
// For partitioned table we always track writes at partition level
(never at table)
// and for non partitioned - always at table level, thus the same
table should never
// have entries with partition key and w/o
" AND (\"COMMITTED\".\"WS_PARTITION\" = \"CUR\".\"TC_PARTITION\" OR"
+
" \"CUR\".\"TC_PARTITION\" IS NULL) " +
// txns overlap
- " WHERE \"CUR\".\"TC_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\"";
+ " WHERE \"CUR\".\"TC_TXNID\" <= \"COMMITTED\".\"WS_COMMIT_ID\"" +
+ (TxnHandler.ConfVars.useMinHistoryLevel() ? "" :
+ " AND \"COMMITTED\".\"WS_OPERATION_TYPE\" != :wsType");
}
@Override
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/MinUncommittedTxnIdHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/MinUncommittedTxnIdHandler.java
index 327963a5a8d..716fdd19e3a 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/MinUncommittedTxnIdHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/MinUncommittedTxnIdHandler.java
@@ -19,6 +19,7 @@
import org.apache.hadoop.hive.metastore.DatabaseProduct;
import org.apache.hadoop.hive.metastore.api.MetaException;
+import org.apache.hadoop.hive.metastore.txn.TxnHandler;
import org.apache.hadoop.hive.metastore.txn.entities.TxnStatus;
import org.apache.hadoop.hive.metastore.txn.jdbc.QueryHandler;
import org.springframework.dao.DataAccessException;
@@ -48,8 +49,8 @@ public class MinUncommittedTxnIdHandler implements
QueryHandler<Long> {
private final boolean useMinHistoryLevel;
- public MinUncommittedTxnIdHandler(boolean useMinHistoryLevel) {
- this.useMinHistoryLevel = useMinHistoryLevel;
+ public MinUncommittedTxnIdHandler() {
+ this.useMinHistoryLevel = TxnHandler.ConfVars.useMinHistoryLevel();
}
@Override