This is an automated email from the ASF dual-hosted git repository.
kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 96cf347ae4b HIVE-29210: Minor compaction produces duplicates
conditionally in case of HMS instance running initiator crash (#6101)
96cf347ae4b is described below
commit 96cf347ae4b449b6fb8118d5c588641ea1a12106
Author: Tanishq Chugh <[email protected]>
AuthorDate: Mon Sep 29 13:27:11 2025 +0530
HIVE-29210: Minor compaction produces duplicates conditionally in case of
HMS instance running initiator crash (#6101)
---
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 7 +++-
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 40 ++++++++++++++++++++++
2 files changed, 46 insertions(+), 1 deletion(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index dfaf138cc6e..c0ee570a64e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1133,6 +1133,7 @@ boolean mayContainSideFile() {
* overlapping writeId boundaries. The sort order helps figure out the
"best" set of files
* to use to get data.
* This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before
delta_5_10 (and delta_11_20)
+ * In case of same min writeID, max writeID & statement ID, it sorts in
the descending order of visibilityTxnID
*/
@Override
public int compareTo(ParsedDeltaLight parsedDelta) {
@@ -1163,6 +1164,9 @@ else if(statementId != parsedDelta.statementId) {
return 1;
}
}
+ else if (visibilityTxnId != parsedDelta.visibilityTxnId) {
+ return visibilityTxnId < parsedDelta.visibilityTxnId ? 1 : -1;
+ }
else {
return path.compareTo(parsedDelta.path);
}
@@ -1452,7 +1456,8 @@ else if(next.maxWriteId == current && lastStmtId >= 0) {
}
else if (prev != null && next.maxWriteId == prev.maxWriteId
&& next.minWriteId == prev.minWriteId
- && next.statementId == prev.statementId) {
+ && next.statementId == prev.statementId
+ && (next.isDeleteDelta || prev.isDeleteDelta)) {
// The 'next' parsedDelta may have everything equal to the 'prev'
parsedDelta, except
// the path. This may happen when we have split update and we have two
types of delta
// directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn
range.
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index d435a07c2df..ef8c928e6e8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -1175,6 +1175,46 @@ public void testCompactionHighWatermarkIsHonored()
throws Exception {
Assert.assertEquals("Directories do not match", expectedDirs, actualDirs);
}
+ @Test
+ public void testCleanupOnConcurrentMinorCompactions() throws Exception {
+ String dbName = "default";
+ String tblName = "tcocmc";
+ Table t = newTable(dbName, tblName, false);
+
+ addBaseFile(t, null, 20L, 20, 21);
+ addDeltaFile(t, null, 22L, 22L, 1);
+ addDeltaFile(t, null, 23L, 23L, 1);
+
+ // Overlapping compacted deltas with different visibilityTxnIDs simulating
concurrent compaction from two workers
+ addDeltaFile(t, null, 22L, 23L, 2, 24);
+ addDeltaFile(t, null, 22L, 23L, 2, 25);
+ burnThroughTransactions(dbName, tblName, 25);
+
+ CompactionRequest rqst = new CompactionRequest(dbName, tblName,
CompactionType.MINOR);
+ compactInTxn(rqst);
+
+ startCleaner();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
+
+ List<Path> paths = getDirectories(conf, t, null);
+ Assert.assertEquals(2, paths.size());
+ boolean sawBase = false, sawDelta = false;
+ for (Path path : paths) {
+ if (path.getName().equals("base_20_v0000021")) {
+ sawBase = true;
+ } else if
(path.getName().equals(addVisibilitySuffix(makeDeltaDirNameCompacted(22, 23),
25))) {
+ sawDelta = true;
+ } else {
+ Assert.fail("Unexpected file " + path.getName());
+ }
+ }
+ Assert.assertTrue(sawBase);
+ Assert.assertTrue(sawDelta);
+ }
+
private void allocateTableWriteId(String dbName, String tblName, long txnId)
throws Exception {
AllocateTableWriteIdsRequest awiRqst = new
AllocateTableWriteIdsRequest(dbName, tblName);
awiRqst.setTxnIds(Collections.singletonList(txnId));