This is an automated email from the ASF dual-hosted git repository.

kuczoram pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 96cf347ae4b HIVE-29210: Minor compaction produces duplicates 
conditionally in case of HMS instance running initiator crash (#6101)
96cf347ae4b is described below

commit 96cf347ae4b449b6fb8118d5c588641ea1a12106
Author: Tanishq Chugh <[email protected]>
AuthorDate: Mon Sep 29 13:27:11 2025 +0530

    HIVE-29210: Minor compaction produces duplicates conditionally in case of 
HMS instance running initiator crash (#6101)
---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    |  7 +++-
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  | 40 ++++++++++++++++++++++
 2 files changed, 46 insertions(+), 1 deletion(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index dfaf138cc6e..c0ee570a64e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1133,6 +1133,7 @@ boolean mayContainSideFile() {
      * overlapping writeId boundaries.  The sort order helps figure out the 
"best" set of files
      * to use to get data.
      * This sorts "wider" delta before "narrower" i.e. delta_5_20 sorts before 
delta_5_10 (and delta_11_20)
+     * In case of same min writeID, max writeID & statement ID, it sorts in 
the descending order of visibilityTxnID
      */
     @Override
     public int compareTo(ParsedDeltaLight parsedDelta) {
@@ -1163,6 +1164,9 @@ else if(statementId != parsedDelta.statementId) {
           return 1;
         }
       }
+      else if (visibilityTxnId != parsedDelta.visibilityTxnId) {
+        return visibilityTxnId < parsedDelta.visibilityTxnId ? 1 : -1;
+      }
       else {
         return path.compareTo(parsedDelta.path);
       }
@@ -1452,7 +1456,8 @@ else if(next.maxWriteId == current && lastStmtId >= 0) {
       }
       else if (prev != null && next.maxWriteId == prev.maxWriteId
                   && next.minWriteId == prev.minWriteId
-                  && next.statementId == prev.statementId) {
+                  && next.statementId == prev.statementId
+                  && (next.isDeleteDelta || prev.isDeleteDelta)) {
         // The 'next' parsedDelta may have everything equal to the 'prev' 
parsedDelta, except
         // the path. This may happen when we have split update and we have two 
types of delta
         // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn 
range.
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index d435a07c2df..ef8c928e6e8 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -1175,6 +1175,46 @@ public void testCompactionHighWatermarkIsHonored() 
throws Exception {
     Assert.assertEquals("Directories do not match", expectedDirs, actualDirs);
   }
 
+  @Test
+  public void testCleanupOnConcurrentMinorCompactions() throws Exception {
+    String dbName = "default";
+    String tblName = "tcocmc";
+    Table t = newTable(dbName, tblName, false);
+    
+    addBaseFile(t, null, 20L, 20, 21);
+    addDeltaFile(t, null, 22L, 22L, 1);
+    addDeltaFile(t, null, 23L, 23L, 1);
+
+    // Overlapping compacted deltas with different visibilityTxnIDs simulating 
concurrent compaction from two workers
+    addDeltaFile(t, null, 22L, 23L, 2, 24);
+    addDeltaFile(t, null, 22L, 23L, 2, 25);
+    burnThroughTransactions(dbName, tblName, 25);
+    
+    CompactionRequest rqst = new CompactionRequest(dbName, tblName, 
CompactionType.MINOR);
+    compactInTxn(rqst);
+    
+    startCleaner();
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+
+    List<Path> paths = getDirectories(conf, t, null);
+    Assert.assertEquals(2, paths.size());
+    boolean sawBase = false, sawDelta = false;
+    for (Path path : paths) {
+      if (path.getName().equals("base_20_v0000021")) {
+        sawBase = true;
+      } else if 
(path.getName().equals(addVisibilitySuffix(makeDeltaDirNameCompacted(22, 23), 
25))) {
+        sawDelta = true;
+      } else {
+        Assert.fail("Unexpected file " + path.getName());
+      }
+    }
+    Assert.assertTrue(sawBase);
+    Assert.assertTrue(sawDelta);
+  }
+
   private void allocateTableWriteId(String dbName, String tblName, long txnId) 
throws Exception {
     AllocateTableWriteIdsRequest awiRqst = new 
AllocateTableWriteIdsRequest(dbName, tblName);
     awiRqst.setTxnIds(Collections.singletonList(txnId));

Reply via email to