This is an automated email from the ASF dual-hosted git repository.
sbadhya pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0f8190fefa5 Revert "HIVE-27637: Compare highest write ID of compaction
records when trying to perform abort cleanup (Zsolt Miskolczi reviewed by
Attila Turoczy, Sourabh Badhya)" (#5058)
0f8190fefa5 is described below
commit 0f8190fefa56513a042566ef87b852e8343610aa
Author: Sourabh Badhya <[email protected]>
AuthorDate: Fri Feb 2 18:59:36 2024 +0530
Revert "HIVE-27637: Compare highest write ID of compaction records when
trying to perform abort cleanup (Zsolt Miskolczi reviewed by Attila Turoczy,
Sourabh Badhya)" (#5058)
This reverts commit f3439697343f3d5e1f1d007d8c878a6eb821713b.
---
.../metastore/txn/TestCompactionTxnHandler.java | 44 ----------------------
.../compactor/handler/TestAbortedTxnCleaner.java | 17 +--------
.../txn/jdbc/queries/ReadyToCleanAbortHandler.java | 12 +-----
3 files changed, 3 insertions(+), 70 deletions(-)
diff --git
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
index cd02eb1ba3e..d26f3774af7 100644
---
a/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
+++
b/ql/src/test/org/apache/hadoop/hive/metastore/txn/TestCompactionTxnHandler.java
@@ -1056,50 +1056,6 @@ public class TestCompactionTxnHandler {
assertEquals(1, potentials.size());
}
- @Test
- public void testFindReadyToCleanAborts() throws Exception {
- long txnId = openTxn();
-
- List<LockComponent> components = new ArrayList<>();
- components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB,
"mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
- components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB,
"mydb", "yourtable", "mypartition=myvalue", DataOperationType.UPDATE));
-
- allocateTableWriteIds("mydb", "mytable", txnId);
- allocateTableWriteIds("mydb", "yourtable", txnId);
-
- LockRequest req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnId);
- LockResponse res = txnHandler.lock(req);
- assertSame(res.getState(), LockState.ACQUIRED);
-
- txnHandler.abortTxn(new AbortTxnRequest((txnId)));
-
- txnId = openTxn();
- components = new ArrayList<>();
- components.add(createLockComponent(LockType.SHARED_WRITE, LockLevel.DB,
"mydb", "mytable", "mypartition=myvalue", DataOperationType.UPDATE));
- allocateTableWriteIds("mydb", "mytable", txnId);
-
- req = new LockRequest(components, "me", "localhost");
- req.setTxnid(txnId);
- res = txnHandler.lock(req);
- assertSame(res.getState(), LockState.ACQUIRED);
-
- CompactionRequest rqst = new CompactionRequest("mydb", "mytable",
CompactionType.MINOR);
- rqst.setPartitionname("mypartition=myvalue");
- txnHandler.compact(rqst);
-
- CompactionInfo ci =
txnHandler.findNextToCompact(aFindNextCompactRequest("fred", WORKER_VERSION));
- assertNotNull(ci);
- ci.highestWriteId = 41;
- txnHandler.updateCompactorState(ci, 0);
-
- List<CompactionInfo> potentials = txnHandler.findReadyToCleanAborts(1, 0);
- assertEquals(1, potentials.size());
- CompactionInfo potentialToCleanAbort = potentials.get(0);
- assertEquals("mydb", potentialToCleanAbort.dbname);
- assertEquals("yourtable", potentialToCleanAbort.tableName);
- }
-
private static FindNextCompactRequest aFindNextCompactRequest(String
workerId, String workerVersion) {
FindNextCompactRequest request = new FindNextCompactRequest();
request.setWorkerId(workerId);
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
index 1d31aae9434..8f6814d4890 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
@@ -44,9 +44,7 @@ import org.junit.jupiter.params.provider.ValueSource;
import org.mockito.Mockito;
import org.mockito.invocation.InvocationOnMock;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
@@ -284,21 +282,8 @@ public class TestAbortedTxnCleaner extends TestHandler {
cleaner.setCleanupHandlers(Arrays.asList(mockedTaskHandler));
cleaner.run();
- Mockito.verifyNoInteractions(mockedFSRemover);
+ Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
- String compactionQueuePresence = "SELECT COUNT(*) FROM
\"COMPACTION_QUEUE\" " +
- " WHERE \"CQ_DATABASE\" = '" + dbName+ "' AND \"CQ_TABLE\" = '" +
tableName + "' AND \"CQ_PARTITION\" IS NULL";
- Assert.assertEquals(1, TestTxnDbUtil.countQueryAgent(conf,
compactionQueuePresence));
-
- directories = getDirectories(conf, t, null);
- // Both base and delta files are present since the cleaner skips them as
there is a newer write.
- Assert.assertEquals(5, directories.size());
- Assert.assertEquals(1, directories.stream().filter(dir ->
dir.getName().startsWith(AcidUtils.BASE_PREFIX)).count());
-
- // Run compaction and clean up
- startInitiator();
- startWorker();
- startCleaner();
directories = getDirectories(conf, t, null);
// The table is already compacted, so we must see 1 base delta
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
index eebe29dc441..4940d384095 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
@@ -52,7 +52,7 @@ public class ReadyToCleanAbortHandler implements
QueryHandler<List<CompactionInf
// First sub-query - Gets the aborted txns with min txn start time,
number of aborted txns
// for corresponding db, table, partition.
" ( SELECT \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\",
MIN(\"TXN_STARTED\") AS \"MIN_TXN_START_TIME\", " +
- " MAX(\"TC_WRITEID\") as \"MAX_ABORTED_WRITE_ID\", COUNT(*) AS
\"ABORTED_TXN_COUNT\" FROM \"TXNS\", \"TXN_COMPONENTS\" " +
+ " COUNT(*) AS \"ABORTED_TXN_COUNT\" FROM \"TXNS\",
\"TXN_COMPONENTS\" " +
" WHERE \"TXN_ID\" = \"TC_TXNID\" AND \"TXN_STATE\" = :abortedState"
+
" GROUP BY \"TC_DATABASE\", \"TC_TABLE\", \"TC_PARTITION\" %s )
\"res1\" " +
" LEFT JOIN" +
@@ -76,15 +76,7 @@ public class ReadyToCleanAbortHandler implements
QueryHandler<List<CompactionInf
" AND \"res1\".\"TC_TABLE\" = \"res3\".\"CQ_TABLE\" " +
" AND (\"res1\".\"TC_PARTITION\" = \"res3\".\"CQ_PARTITION\" " +
" OR (\"res1\".\"TC_PARTITION\" IS NULL AND
\"res3\".\"CQ_PARTITION\" IS NULL))" +
- " WHERE (\"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR
\"res3\".\"RETRY_RECORD_CHECK\" IS NULL)" +
- " AND NOT EXISTS (SELECT 1 " +
- " FROM \"COMPACTION_QUEUE\" AS \"cq\" " +
- " WHERE \"cq\".\"CQ_DATABASE\" = \"res1\".\"TC_DATABASE\" AND
\"cq\".\"CQ_TABLE\" = \"res1\".\"TC_TABLE\"" +
- " AND (\"cq\".\"CQ_PARTITION\" = \"res1\".\"TC_PARTITION\"" +
- " OR (\"cq\".\"CQ_PARTITION\" IS NULL AND \"res1\".\"TC_PARTITION\"
IS NULL))" +
- " AND \"cq\".\"CQ_HIGHEST_WRITE_ID\" >
\"res1\".\"MAX_ABORTED_WRITE_ID\"" +
- " AND \"cq\".\"CQ_STATE\" " +
- " IN ('i', 'w', 'r'))";
+ " WHERE \"res3\".\"RETRY_RECORD_CHECK\" <= 0 OR
\"res3\".\"RETRY_RECORD_CHECK\" IS NULL";
private final long abortedTimeThreshold;
private final int abortedThreshold;