This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 69aec11 HIVE-25777: ACID: Pick the compactor transaction over insert
dir (Denys Kuzmenko, reviewed by Karen Coppage)
69aec11 is described below
commit 69aec11e711293eaf5eb8a3fbb44905bc11841c9
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Thu Jan 27 11:41:35 2022 +0200
HIVE-25777: ACID: Pick the compactor transaction over insert dir (Denys
Kuzmenko, reviewed by Karen Coppage)
Closes #2968
---
.../org/apache/hadoop/hive/ql/io/AcidUtils.java | 10 ++++++--
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 28 ++++++++++++++++++++++
2 files changed, 36 insertions(+), 2 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
index 71931d9..3dda052 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/io/AcidUtils.java
@@ -1806,13 +1806,19 @@ public class AcidUtils {
// keep track for error reporting
directory.setOldestBase(parsedBase);
}
+ boolean isCompactedBase = isCompactedBase(parsedBase, directory.getFs(),
dirSnapshot);
// Handle aborted IOW base.
- if (writeIdList.isWriteIdAborted(writeId) && !isCompactedBase(parsedBase,
directory.getFs(), dirSnapshot)) {
+ if (writeIdList.isWriteIdAborted(writeId) && !isCompactedBase) {
directory.getAbortedDirectories().add(baseDir);
directory.getAbortedWriteIds().add(parsedBase.writeId);
return;
}
- if (directory.getBase() == null || directory.getBase().getWriteId() <
writeId) {
+ if (directory.getBase() == null || directory.getBase().getWriteId() <
writeId
+ // If there are two competing versions of a particular write-id, one
from the compactor and another from IOW,
+ // always pick the compactor one once it is committed.
+ || directory.getBase().getWriteId() == writeId &&
+ isCompactedBase &&
validTxnList.isTxnValid(parsedBase.getVisibilityTxnId())) {
+
if (isValidBase(parsedBase, writeIdList, directory.getFs(),
dirSnapshot)) {
List<HdfsFileStatusWithId> files = null;
if (dirSnapshot != null) {
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index f258005..1d0112f 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -93,6 +93,34 @@ public class TestCleaner extends CompactorTest {
}
@Test
+ public void cleanupAfterIOWAndMajorTableCompaction() throws Exception {
+ Table t = newTable("default", "camtc", false);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ addDeltaFile(t, null, 23L, 24L, 2);
+ addBaseFile(t, null, 25L, 25); //IOW
+
+ burnThroughTransactions("default", "camtc", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "camtc",
CompactionType.MAJOR);
+ long compactTxn = compactInTxn(rqst);
+ addBaseFile(t, null, 25L, 25, compactTxn);
+
+ startCleaner();
+
+ // Check there are no compactions requests left.
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
+
+ // Check that the files are removed
+ List<Path> paths = getDirectories(conf, t, null);
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals("base_25_v26", paths.get(0).getName());
+ }
+
+ @Test
public void cleanupAfterMajorTableCompactionWithLongRunningQuery() throws
Exception {
Table t = newTable("default", "camtc", false);