This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new de49826286e HIVE-27526: Cleaner should honor compaction writeIdHwm 
(Denys Kuzmenko, reviewed by Laszlo Vegh, Sourabh Badhya)
de49826286e is described below

commit de49826286e57976be447c00f9b9abfc41bc305e
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Mon Oct 2 14:50:51 2023 +0300

    HIVE-27526: Cleaner should honor compaction writeIdHwm (Denys Kuzmenko, 
reviewed by Laszlo Vegh, Sourabh Badhya)
    
    Closes #4491
---
 .../txn/compactor/handler/CompactionCleaner.java   |  4 +-
 .../hive/ql/txn/compactor/handler/TaskHandler.java |  7 +--
 .../hadoop/hive/ql/txn/compactor/TestCleaner.java  | 57 ++++++++++++++++++++++
 .../hive/common/ValidCleanerWriteIdList.java       | 52 ++++++++++++++++++++
 .../hive/common/ValidCompactorWriteIdList.java     |  2 +-
 .../hadoop/hive/common/ValidReaderWriteIdList.java |  4 +-
 6 files changed, 118 insertions(+), 8 deletions(-)

diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
index a928f1438f4..c2be268b6f9 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -90,7 +90,7 @@ class CompactionCleaner extends TaskHandler {
       // when min_history_level is finally dropped, than every HMS will commit 
compaction the new way
       // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used 
instead.
       return readyToClean.stream().map(ci -> {
-        long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 : 
minTxnIdSeenOpen;
+        long cleanerWaterMark = (ci.minOpenWriteId >= 0) ? ci.nextTxnId + 1 : 
minTxnIdSeenOpen;
         LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
         return ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark, 
metricsEnabled));
       }).collect(Collectors.toList());
@@ -292,7 +292,7 @@ class CompactionCleaner extends TaskHandler {
      * should not touch the newer obsolete directories to not violate the 
retentionTime for those.
      */
     if (ci.highestWriteId < validWriteIdList.getHighWatermark()) {
-      validWriteIdList = 
validWriteIdList.updateHighWatermark(ci.highestWriteId);
+      validWriteIdList.setHighWatermark(ci.highestWriteId);
     }
     return validWriteIdList;
   }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
index ff35ce48423..8a105c0bc20 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor.handler;
 
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidCleanerWriteIdList;
 import org.apache.hadoop.hive.common.ValidReadTxnList;
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
@@ -121,7 +122,8 @@ public abstract class TaskHandler {
     // been some delta/base dirs
     assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
 
-    return 
TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
+    return new ValidCleanerWriteIdList(
+        
TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)));
   }
 
   protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info, 
String location,
@@ -153,8 +155,7 @@ public abstract class TaskHandler {
     // Make sure there are no leftovers below the compacted watermark
     boolean success = false;
     conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
-    dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
-                    info.getFullTableName(), new long[0], new BitSet(), 
info.highestWriteId, Long.MAX_VALUE),
+    dir = AcidUtils.getAcidState(fs, path, conf, new 
ValidCleanerWriteIdList(info.getFullTableName(), info.highestWriteId),
             Ref.from(false), false, dirSnapshots);
 
     List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, 
isDynPartAbort), deleted);
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index b438524b4df..67bb1985328 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
 import org.apache.hadoop.hive.metastore.api.CompactionResponse;
@@ -60,6 +61,7 @@ import java.util.Optional;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
@@ -1117,6 +1119,61 @@ public class TestCleaner extends CompactorTest {
     Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(0).getState());
   }
 
+  @Test
+  public void testCompactionHighWatermarkIsHonored() throws Exception {
+    String dbName = "default";
+    String tblName = "trfcp";
+    String partName = "ds=today";
+    Table t = newTable(dbName, tblName, true);
+    Partition p = newPartition(t, "today");
+
+    // minor compaction
+    addBaseFile(t, p, 19L, 19);
+    addDeltaFile(t, p, 20L, 20L, 1);
+    addDeltaFile(t, p, 21L, 21L, 1);
+    addDeltaFile(t, p, 22L, 22L, 1);
+    burnThroughTransactions(dbName, tblName, 22);
+
+    CompactionRequest rqst = new CompactionRequest(dbName, tblName, 
CompactionType.MINOR);
+    rqst.setPartitionname(partName);
+    long ctxnid = compactInTxn(rqst);
+    addDeltaFile(t, p, 20, 22, 3, ctxnid);
+
+    // block cleaner with an open txn
+    long openTxnId = openTxn();
+
+    //2nd minor
+    addDeltaFile(t, p, 23L, 23L, 1);
+    addDeltaFile(t, p, 24L, 24L, 1);
+    burnThroughTransactions(dbName, tblName, 2);
+
+    rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
+    rqst.setPartitionname(partName);
+    ctxnid = compactInTxn(rqst);
+    addDeltaFile(t, p, 20, 24, 5, ctxnid);
+
+    startCleaner();
+    txnHandler.abortTxn(new AbortTxnRequest(openTxnId));
+
+    ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+    Assert.assertEquals(2, rsp.getCompactsSize());
+    Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE, 
rsp.getCompacts().get(0).getState());
+    Assert.assertEquals(TxnStore.CLEANING_RESPONSE, 
rsp.getCompacts().get(1).getState());
+
+    List<String> actualDirs = getDirectories(conf, t, p).stream()
+      .map(Path::getName).sorted()
+      .collect(Collectors.toList());
+    
+    List<String> expectedDirs = Arrays.asList(
+      "base_19",
+      addVisibilitySuffix(makeDeltaDirName(20, 22), 23),
+      addVisibilitySuffix(makeDeltaDirName(20, 24), 27),
+      makeDeltaDirName(23, 23),
+      makeDeltaDirName(24, 24)
+    );
+    Assert.assertEquals("Directories do not match", expectedDirs, actualDirs);
+  }
+
   private void allocateTableWriteId(String dbName, String tblName, long txnId) 
throws Exception {
     AllocateTableWriteIdsRequest awiRqst = new 
AllocateTableWriteIdsRequest(dbName, tblName);
     awiRqst.setTxnIds(Collections.singletonList(txnId));
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCleanerWriteIdList.java
 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCleanerWriteIdList.java
new file mode 100644
index 00000000000..f58fe57a9f2
--- /dev/null
+++ 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCleanerWriteIdList.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.util.BitSet;
+import java.util.Optional;
+
+/**
+ * An implementation of {@link ValidWriteIdList} for use by the Cleaner.
+ * Uses the same logic as {@link ValidReaderWriteIdList} with he only 
exception: 
+ * returns NONE for any range that includes any unresolved write ids or write 
id above {@code highWatermark}
+ */
+public class ValidCleanerWriteIdList extends ValidReaderWriteIdList {
+
+  public ValidCleanerWriteIdList(ValidReaderWriteIdList vrwil) {
+    super(vrwil.getTableName(), vrwil.exceptions, vrwil.abortedBits, 
vrwil.getHighWatermark(),
+      Optional.ofNullable(vrwil.getMinOpenWriteId()).orElse(Long.MAX_VALUE));
+  }
+
+  public ValidCleanerWriteIdList(String tableName, long highWatermark) {
+    super(tableName, new long[0], new BitSet(), highWatermark, Long.MAX_VALUE);
+  }
+
+
+  /**
+   * Returns NONE if some of the write ids in the range are not resolved, 
+   * otherwise uses {@link ValidReaderWriteIdList#isWriteIdRangeValid(long, 
long)} 
+   */
+  @Override
+  public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) {
+    if (maxWriteId > highWatermark) {
+      return RangeResponse.NONE;
+    }
+    return super.isWriteIdRangeValid(minWriteId, maxWriteId);
+  }
+}
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
index 5d74241815a..760d5e75d25 100644
--- 
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
+++ 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
@@ -24,7 +24,7 @@ import java.util.BitSet;
 /**
  * An implementation of {@link ValidWriteIdList} for use by the compactor.
  *
- * Compaction should only include txns up to smallest open txn (exclussive).
+ * Compaction should only include txns up to smallest open txn (exclusive).
  * There may be aborted write ids in the snapshot represented by this 
ValidCompactorWriteIdList.
  * Thus {@link #isWriteIdRangeValid(long, long)} returns NONE for any range 
that includes any unresolved
  * write ids.  Any write id above {@code highWatermark} is unresolved.
diff --git 
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
index 4c2cf7c5aaf..1601b38d101 100644
--- 
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
+++ 
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
@@ -262,8 +262,8 @@ public class ValidReaderWriteIdList implements 
ValidWriteIdList {
     }
   }
 
-  public ValidReaderWriteIdList updateHighWatermark(long value) {
-    return new ValidReaderWriteIdList(tableName, exceptions, abortedBits, 
value, minOpenWriteId);
+  public void setHighWatermark(long value) {
+    this.highWatermark = value;
   }
 }
 

Reply via email to