This is an automated email from the ASF dual-hosted git repository.
sbadhya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new f3439697343 HIVE-27637: Compare highest write ID of compaction records
when trying to perform abort cleanup (#4740) (Zsolt Miskolczi reviewed by
Attila Turoczy, Sourabh Badhya)
f3439697343 is described below
commit f3439697343f3d5e1f1d007d8c878a6eb821713b
Author: InvisibleProgrammer <[email protected]>
AuthorDate: Fri Feb 2 09:32:26 2024 +0100
HIVE-27637: Compare highest write ID of compaction records when trying to
perform abort cleanup (#4740) (Zsolt Miskolczi reviewed by Attila Turoczy,
Sourabh Badhya)
---
.../metastore/txn/TestCompactionTxnHandler.java | 44 ++++++++++++++++++++++
.../compactor/handler/TestAbortedTxnCleaner.java | 17 ++++++++-
.../txn/jdbc/queries/ReadyToCleanAbortHandler.java | 12 +++++-
3 files changed, 70 insertions(+), 3 deletions(-)
diff --git
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index d26f3774af7..cd02eb1ba3e 100644
---
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -1056,6 +1056,50 @@ public class TestCompactionTxnHandler {
assertEquals(1, potentials.size());
}
+ @Test
+ public void testFindReadyToCleanAborts() throws Exception {
+ long txnId = openTxn();
+
+ List<LockComponent> components = new ArrayList<>();
+ components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB,
"mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
+ components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB,
"mydb", "yourtable", "mypartition=myvalue", DataOperationType.UPDATE));
+
+ allocateTableWriteIds("mydb", "mytable", txnId);
+ allocateTableWriteIds("mydb", "yourtable", txnId);
+
+ LockRequest req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnId);
+ LockResponse res = txnHandler.lock(req);
+ assertSame(res.getState(), LockState.ACQUIRED);
+
+ txnHandler.abortTxn(new AbortTxnRequest((txnId)));
+
+ txnId = openTxn();
+ components = new ArrayList<>();
+ components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB,
"mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
+ allocateTableWriteIds("mydb", "mytable", txnId);
+
+ req = new LockRequest(components, "me", "localhost");
+ req.setTxnid(txnId);
+ res = txnHandler.lock(req);
+ assertSame(res.getState(), LockState.ACQUIRED);
+
+ CompactionRequest rqst = new CompactionRequest("mydb", "mytable",
CompactionType.MINOR);
+ rqst.setPartitionname("mypartition=myvalue");
+ txnHandler.compact(rqst);
+
+ CompactionInfo ci =
txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
+ assertNotNull(ci);
+ ci.highestWriteId = 41;
+ txnHandler.updateCompactorState(ci, 0);
+
+ List<CompactionInfo> potentials = txnHandler.findReadyToCleanAborts(1, 0);
+ assertEquals(1, potentials.size());
+ CompactionInfo potentialToCleanAbort = potentials.get(0);
+ assertEquals("mydb", potentialToCleanAbort.dbname);
+ assertEquals("yourtable", potentialToCleanAbort.tableName);
+ }
+
private static FindNextCompactRequest aFindNextCompactRequest(String
workerId, String workerVersion) {
FindNextCompactRequest request = new FindNextCompactRequest();
request.setWorkerId(workerId);
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
index 8f6814d4890..1d31aae9434 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
@@ -44,7 +44,9 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
+import java.util.ArrayList;
import java.util.Arrays;
+import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -282,8 +284,21 @@ public class TestAbortedTxnCleaner extends TestHandler {
cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
cleaner.run();
- Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
+ Mockito.verifyNoInteractions(mockedFSRemover);
Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ String compactionQueuePresence = "SELECT COUNT(*) FROM
\"COMPACTION_QUEUE\" " +
+ " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" +
tableName + "' AND \"CQ_PARTITION\" IS NULL";
+ Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf,
compactionQueuePresence));
+
+ directories = getDirectories(conf, t, null);
+ // Both base and delta files are present since the cleaner skips them as
there is a newer write.
+ Assert.assertEquals(5, directories.size());
+ Assert.assertEquals(1, directories.stream().filter(dir ->
dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count());
+
+ // Run compaction and clean up
+ startInitiator();
+ startWorker();
+ startCleaner();
directories = getDirectories(conf, t, null);
// The table is already compacted, so we must see 1 base delta
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
index 4940d384095..eebe29dc441 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
@@ -52,7 +52,7 @@ public class ReadyToCleanAbortHandler implements
QueryHandler<List<CompactionInf
// First sub-query - Gets the aborted txns with min txn start time,
number of aborted txns
// for corresponding db, table, partition.
" ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\",
MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " +
- " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\",
\"TXN_COMPONENTS\" " +
+ " MAX(\"TC_WRITEID\") as \"MAX_ABORTED_WRITE_ID\", COUNT(*) AS
\"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
" WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :abortedState"
+
" GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s )
\"res1\" " +
" LEFT JOIN" +
@@ -76,7 +76,15 @@ public class ReadyToCleanAbortHandler implements
QueryHandler<List<CompactionInf
" AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " +
" AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " +
" OR (\"res1\".\"TC_PARTITION\" IS NULL AND
\"res3\".\"CQ_PARTITION\" IS NULL))" +
- " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR
\"res3\".\"RETRY_RECORD_CHECK\" IS NULL";
+ " WHERE (\"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR
\"res3\".\"RETRY_RECORD_CHECK\" IS NULL)" +
+ " AND NOT EXISTS (SELECT 1 " +
+ " FROM \"COMPACTION_QUEUE\" AS \"cq\" " +
+ " WHERE \"cq\".\"CQ_DATABASE\" = \"res1\".\"TC_DATABASE\" AND
\"cq\".\"CQ_TABLE\" = \"res1\".\"TC_TABLE\"" +
+ " AND (\"cq\".\"CQ_PARTITION\" = \"res1\".\"TC_PARTITION\"" +
+ " OR (\"cq\".\"CQ_PARTITION\" IS NULL AND \"res1\".\"TC_PARTITION\"
IS NULL))" +
+ " AND \"cq\".\"CQ_HIGHEST_WRITE_ID\" >
\"res1\".\"MAX_ABORTED_WRITE_ID\"" +
+ " AND \"cq\".\"CQ_STATE\" " +
+ " IN ('i', 'w', 'r'))";
private final long abortedTimeThreshold;
private final int abortedThreshold;