This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 69aec11  HIVE-25777: ACID: Pick the compactor transaction over insert 
dir (Denys Kuzmenko, reviewed by Karen Coppage)
69aec11 is described below

commit 69aec11e711293eaf5eb8a3fbb44905bc11841c9
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Thu Jan 27 11:41:35 2022 +0200

    HIVE-25777: ACID: Pick the compactor transaction over insert dir (Denys 
Kuzmenko, reviewed by Karen Coppage)
    
    Closes #2968
---
 .../org/apache/hadoop/hive/ql/io/AcidUtils.java    | 10 ++++++--
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  | 28 ++++++++++++++++++++++
 2 files changed, 36 insertions(+), 2 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java 
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 71931d9..3dda052 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1806,13 +1806,19 @@ public class AcidUtils {
       // keep track for error reporting
       directory.setOldestBase(parsedBase);
     }
+    boolean isCompactedBase = isCompactedBase(parsedBase, directory.getFs(), 
dirSnapshot);
     // Handle aborted IOW base.
-    if (writeIdList.isWriteIdAborted(writeId) && !isCompactedBase(parsedBase, 
directory.getFs(), dirSnapshot)) {
+    if (writeIdList.isWriteIdAborted(writeId) && !isCompactedBase) {
       directory.getAbortedDirectories().add(baseDir);
       directory.getAbortedWriteIds().add(parsedBase.writeId);
       return;
     }
-    if (directory.getBase() == null || directory.getBase().getWriteId() < 
writeId) {
+    if (directory.getBase() == null || directory.getBase().getWriteId() < 
writeId
+      // If there are two competing versions of a particular write-id, one 
from the compactor and another from IOW, 
+      // always pick the compactor one once it is committed.
+      || directory.getBase().getWriteId() == writeId && 
+          isCompactedBase && 
validTxnList.isTxnValid(parsedBase.getVisibilityTxnId())) {
+      
       if (isValidBase(parsedBase, writeIdList, directory.getFs(), 
dirSnapshot)) {
         List<HdfsFileStatusWithId> files = null;
         if (dirSnapshot != null) {
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index f258005..1d0112f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -93,6 +93,34 @@ public class TestCleaner extends CompactorTest {
   }
 
   @Test
+  public void cleanupAfterIOWAndMajorTableCompaction() throws Exception {
+    Table t = newTable("default", "camtc", false);
+
+    addBaseFile(t, null, 20L, 20);
+    addDeltaFile(t, null, 21L, 22L, 2);
+    addDeltaFile(t, null, 23L, 24L, 2);
+    addBaseFile(t, null, 25L, 25); //IOW
+
+    burnThroughTransactions("default", "camtc", 25);
+
+    CompactionRequest rqst = new CompactionRequest("default", "camtc", 
CompactionType.MAJOR);
+    long compactTxn = compactInTxn(rqst);
+    addBaseFile(t, null, 25L, 25, compactTxn);
+
+    startCleaner();
+
+    // Check there are no compactions requests left.
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(1, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+
+    // Check that the files are removed
+    List<Path> paths = getDirectories(conf, t, null);
+    Assert.assertEquals(1, paths.size());
+    Assert.assertEquals("base_25_v26", paths.get(0).getName());
+  }
+  
+  @Test
   public void cleanupAfterMajorTableCompactionWithLongRunningQuery() throws 
Exception {
     Table t = newTable("default", "camtc", false);
 

Reply via email to