This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new de49826286e HIVE-27526: Cleaner should honor compaction writeIdHwm
(Denys Kuzmenko, reviewed by Laszlo Vegh, Sourabh Badhya)
de49826286e is described below
commit de49826286e57976be447c00f9b9abfc41bc305e
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Mon Oct 2 14:50:51 2023 +0300
HIVE-27526: Cleaner should honor compaction writeIdHwm (Denys Kuzmenko,
reviewed by Laszlo Vegh, Sourabh Badhya)
Closes #4491
---
.../txn/compactor/handler/CompactionCleaner.java | 4 +-
.../hive/ql/txn/compactor/handler/TaskHandler.java | 7 +--
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 57 ++++++++++++++++++++++
.../hive/common/ValidCleanerWriteIdList.java | 52 ++++++++++++++++++++
.../hive/common/ValidCompactorWriteIdList.java | 2 +-
.../hadoop/hive/common/ValidReaderWriteIdList.java | 4 +-
6 files changed, 118 insertions(+), 8 deletions(-)
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
index a928f1438f4..c2be268b6f9 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -90,7 +90,7 @@ class CompactionCleaner extends TaskHandler {
// when min_history_level is finally dropped, than every HMS will commit
compaction the new way
// and minTxnIdSeenOpen can be removed and minOpenTxnId can be used
instead.
return readyToClean.stream().map(ci -> {
- long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 :
minTxnIdSeenOpen;
+ long cleanerWaterMark = (ci.minOpenWriteId >= 0) ? ci.nextTxnId + 1 :
minTxnIdSeenOpen;
LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
return ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark,
metricsEnabled));
}).collect(Collectors.toList());
@@ -292,7 +292,7 @@ class CompactionCleaner extends TaskHandler {
* should not touch the newer obsolete directories to not violate the
retentionTime for those.
*/
if (ci.highestWriteId < validWriteIdList.getHighWatermark()) {
- validWriteIdList =
validWriteIdList.updateHighWatermark(ci.highestWriteId);
+ validWriteIdList.setHighWatermark(ci.highestWriteId);
}
return validWriteIdList;
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
index ff35ce48423..8a105c0bc20 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
@@ -19,6 +19,7 @@ package org.apache.hadoop.hive.ql.txn.compactor.handler;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidCleanerWriteIdList;
import org.apache.hadoop.hive.common.ValidReadTxnList;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
@@ -121,7 +122,8 @@ public abstract class TaskHandler {
// been some delta/base dirs
assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
- return
TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0));
+ return new ValidCleanerWriteIdList(
+
TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)));
}
protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info,
String location,
@@ -153,8 +155,7 @@ public abstract class TaskHandler {
// Make sure there are no leftovers below the compacted watermark
boolean success = false;
conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
- dir = AcidUtils.getAcidState(fs, path, conf, new ValidReaderWriteIdList(
- info.getFullTableName(), new long[0], new BitSet(),
info.highestWriteId, Long.MAX_VALUE),
+ dir = AcidUtils.getAcidState(fs, path, conf, new
ValidCleanerWriteIdList(info.getFullTableName(), info.highestWriteId),
Ref.from(false), false, dirSnapshots);
List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir,
isDynPartAbort), deleted);
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index b438524b4df..67bb1985328 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.AllocateTableWriteIdsResponse;
+import org.apache.hadoop.hive.metastore.api.AbortTxnRequest;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionResponse;
@@ -60,6 +61,7 @@ import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
@@ -1117,6 +1119,61 @@ public class TestCleaner extends CompactorTest {
Assert.assertEquals(TxnStore.CLEANING_RESPONSE,
rsp.getCompacts().get(0).getState());
}
+ @Test
+ public void testCompactionHighWatermarkIsHonored() throws Exception {
+ String dbName = "default";
+ String tblName = "trfcp";
+ String partName = "ds=today";
+ Table t = newTable(dbName, tblName, true);
+ Partition p = newPartition(t, "today");
+
+ // minor compaction
+ addBaseFile(t, p, 19L, 19);
+ addDeltaFile(t, p, 20L, 20L, 1);
+ addDeltaFile(t, p, 21L, 21L, 1);
+ addDeltaFile(t, p, 22L, 22L, 1);
+ burnThroughTransactions(dbName, tblName, 22);
+
+ CompactionRequest rqst = new CompactionRequest(dbName, tblName,
CompactionType.MINOR);
+ rqst.setPartitionname(partName);
+ long ctxnid = compactInTxn(rqst);
+ addDeltaFile(t, p, 20, 22, 3, ctxnid);
+
+ // block cleaner with an open txn
+ long openTxnId = openTxn();
+
+ //2nd minor
+ addDeltaFile(t, p, 23L, 23L, 1);
+ addDeltaFile(t, p, 24L, 24L, 1);
+ burnThroughTransactions(dbName, tblName, 2);
+
+ rqst = new CompactionRequest(dbName, tblName, CompactionType.MINOR);
+ rqst.setPartitionname(partName);
+ ctxnid = compactInTxn(rqst);
+ addDeltaFile(t, p, 20, 24, 5, ctxnid);
+
+ startCleaner();
+ txnHandler.abortTxn(new AbortTxnRequest(openTxnId));
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(2, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE,
rsp.getCompacts().get(1).getState());
+
+ List<String> actualDirs = getDirectories(conf, t, p).stream()
+ .map(Path::getName).sorted()
+ .collect(Collectors.toList());
+
+ List<String> expectedDirs = Arrays.asList(
+ "base_19",
+ addVisibilitySuffix(makeDeltaDirName(20, 22), 23),
+ addVisibilitySuffix(makeDeltaDirName(20, 24), 27),
+ makeDeltaDirName(23, 23),
+ makeDeltaDirName(24, 24)
+ );
+ Assert.assertEquals("Directories do not match", expectedDirs, actualDirs);
+ }
+
private void allocateTableWriteId(String dbName, String tblName, long txnId)
throws Exception {
AllocateTableWriteIdsRequest awiRqst = new
AllocateTableWriteIdsRequest(dbName, tblName);
awiRqst.setTxnIds(Collections.singletonList(txnId));
diff --git
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCleanerWriteIdList.java
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCleanerWriteIdList.java
new file mode 100644
index 00000000000..f58fe57a9f2
--- /dev/null
+++
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCleanerWriteIdList.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hive.common;
+
+import java.util.BitSet;
+import java.util.Optional;
+
+/**
+ * An implementation of {@link ValidWriteIdList} for use by the Cleaner.
+ * Uses the same logic as {@link ValidReaderWriteIdList} with he only
exception:
+ * returns NONE for any range that includes any unresolved write ids or write
id above {@code highWatermark}
+ */
+public class ValidCleanerWriteIdList extends ValidReaderWriteIdList {
+
+ public ValidCleanerWriteIdList(ValidReaderWriteIdList vrwil) {
+ super(vrwil.getTableName(), vrwil.exceptions, vrwil.abortedBits,
vrwil.getHighWatermark(),
+ Optional.ofNullable(vrwil.getMinOpenWriteId()).orElse(Long.MAX_VALUE));
+ }
+
+ public ValidCleanerWriteIdList(String tableName, long highWatermark) {
+ super(tableName, new long[0], new BitSet(), highWatermark, Long.MAX_VALUE);
+ }
+
+
+ /**
+ * Returns NONE if some of the write ids in the range are not resolved,
+ * otherwise uses {@link ValidReaderWriteIdList#isWriteIdRangeValid(long,
long)}
+ */
+ @Override
+ public RangeResponse isWriteIdRangeValid(long minWriteId, long maxWriteId) {
+ if (maxWriteId > highWatermark) {
+ return RangeResponse.NONE;
+ }
+ return super.isWriteIdRangeValid(minWriteId, maxWriteId);
+ }
+}
diff --git
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
index 5d74241815a..760d5e75d25 100644
---
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
+++
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidCompactorWriteIdList.java
@@ -24,7 +24,7 @@ import java.util.BitSet;
/**
* An implementation of {@link ValidWriteIdList} for use by the compactor.
*
- * Compaction should only include txns up to smallest open txn (exclussive).
+ * Compaction should only include txns up to smallest open txn (exclusive).
* There may be aborted write ids in the snapshot represented by this
ValidCompactorWriteIdList.
* Thus {@link #isWriteIdRangeValid(long, long)} returns NONE for any range
that includes any unresolved
* write ids. Any write id above {@code highWatermark} is unresolved.
diff --git
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
index 4c2cf7c5aaf..1601b38d101 100644
---
a/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
+++
b/storage-api/src/java/org/apache/hadoop/hive/common/ValidReaderWriteIdList.java
@@ -262,8 +262,8 @@ public class ValidReaderWriteIdList implements
ValidWriteIdList {
}
}
- public ValidReaderWriteIdList updateHighWatermark(long value) {
- return new ValidReaderWriteIdList(tableName, exceptions, abortedBits,
value, minOpenWriteId);
+ public void setHighWatermark(long value) {
+ this.highWatermark = value;
}
}