This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 5214369 HIVE-25883: Enhance Compaction Cleaner to skip when there is
nothing to do (#2958) (Zoltan Haindrich reviewed by Denys Kuzmenko)
5214369 is described below
commit 5214369d05ac02f36b6723c336cbdb953ea3c61c
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Sun Jan 23 08:49:30 2022 +0100
HIVE-25883: Enhance Compaction Cleaner to skip when there is nothing to do
(#2958) (Zoltan Haindrich reviewed by Denys Kuzmenko)
---
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 57 +++++++---
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 116 +++++++++++++++++++--
2 files changed, 154 insertions(+), 19 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index db65250..8d724f6 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -52,6 +51,7 @@ import
org.apache.hadoop.hive.ql.txn.compactor.CompactorUtil.ThrowingRunnable;
import
org.apache.hadoop.hive.ql.txn.compactor.metrics.DeltaFilesMetricReporter;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidWriteIdList;
@@ -60,6 +60,8 @@ import org.apache.hadoop.hive.common.StringableMap;
import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidUtils;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedBaseLight;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
import org.apache.hive.common.util.Ref;
@@ -208,7 +210,7 @@ public class Cleaner extends MetaStoreCompactorThread {
}
Optional<String> location =
Optional.ofNullable(ci.properties).map(StringableMap::new)
.map(config -> config.get("location"));
-
+
Callable<Boolean> cleanUpTask;
Table t = null;
Partition p = resolvePartition(ci);
@@ -248,12 +250,12 @@ public class Cleaner extends MetaStoreCompactorThread {
if (t != null) {
StorageDescriptor sd = resolveStorageDescriptor(t, p);
- cleanUpTask = () -> removeFiles(location.orElse(sd.getLocation()),
minOpenTxnGLB, ci,
+ cleanUpTask = () -> removeFiles(location.orElse(sd.getLocation()),
minOpenTxnGLB, ci,
ci.partName != null && p == null);
} else {
cleanUpTask = () -> removeFiles(location.get(), ci);
}
-
+
Ref<Boolean> removedFiles = Ref.from(false);
if (runJobAsSelf(ci.runAs)) {
removedFiles.value = cleanUpTask.call();
@@ -328,7 +330,7 @@ public class Cleaner extends MetaStoreCompactorThread {
if (dropPartition) {
LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE,
DataOperationType.DELETE);
LockResponse res = null;
-
+
try {
res = txnHandler.lock(lockRequest);
if (res.getState() == LockState.ACQUIRED) {
@@ -349,7 +351,7 @@ public class Cleaner extends MetaStoreCompactorThread {
}
}
}
-
+
ValidTxnList validTxnList =
TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(),
minOpenTxnGLB);
//save it so that getAcidState() sees it
@@ -388,7 +390,7 @@ public class Cleaner extends MetaStoreCompactorThread {
// Creating 'reader' list since we are interested in the set of 'obsolete'
files
ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci,
validTxnList);
LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
-
+
return removeFiles(location, validWriteIdList, ci);
}
/**
@@ -419,6 +421,10 @@ public class Cleaner extends MetaStoreCompactorThread {
// Including obsolete directories for partitioned tables can result in
data loss.
obsoleteDirs = dir.getAbortedDirectories();
}
+ if (obsoleteDirs.isEmpty() && !hasDataBelowWatermark(fs, path,
writeIdList.getHighWatermark())) {
+ LOG.info(idWatermark(ci) + " nothing to remove below watermark " +
writeIdList.getHighWatermark() + ", ");
+ return true;
+ }
StringBuilder extraDebugInfo = new
StringBuilder("[").append(obsoleteDirs.stream()
.map(Path::getName).collect(Collectors.joining(",")));
boolean success = remove(location, ci, obsoleteDirs, true, fs,
extraDebugInfo);
@@ -428,6 +434,33 @@ public class Cleaner extends MetaStoreCompactorThread {
return success;
}
+ private boolean hasDataBelowWatermark(FileSystem fs, Path path, long
highWatermark) throws IOException {
+ FileStatus[] children = fs.listStatus(path);
+ for (FileStatus child : children) {
+ if (isFileBelowWatermark(child, highWatermark)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ private boolean isFileBelowWatermark(FileStatus child, long highWatermark) {
+ Path p = child.getPath();
+ String fn = p.getName();
+ if (!child.isDirectory()) {
+ return false;
+ }
+ if (fn.startsWith(AcidUtils.BASE_PREFIX)) {
+ ParsedBaseLight b = ParsedBaseLight.parseBase(p);
+ return b.getWriteId() < highWatermark;
+ }
+ if (fn.startsWith(AcidUtils.DELTA_PREFIX) ||
fn.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
+ ParsedDeltaLight d = ParsedDeltaLight.parse(p);
+ return d.getMaxWriteId() < highWatermark;
+ }
+ return false;
+ }
+
private boolean removeFiles(String location, CompactionInfo ci)
throws NoSuchObjectException, IOException, MetaException {
Path path = new Path(location);
@@ -439,11 +472,11 @@ public class Cleaner extends MetaStoreCompactorThread {
return remove(location, ci, Collections.singletonList(path), ifPurge,
path.getFileSystem(conf), extraDebugInfo);
}
-
- private boolean remove(String location, CompactionInfo ci, List<Path>
filesToDelete, boolean ifPurge,
- FileSystem fs, StringBuilder extraDebugInfo)
+
+ private boolean remove(String location, CompactionInfo ci, List<Path>
filesToDelete, boolean ifPurge,
+ FileSystem fs, StringBuilder extraDebugInfo)
throws NoSuchObjectException, MetaException, IOException {
-
+
extraDebugInfo.setCharAt(extraDebugInfo.length() - 1, ']');
LOG.info(idWatermark(ci) + " About to remove " + filesToDelete.size() +
" obsolete directories from " + location + ". " +
extraDebugInfo.toString());
@@ -454,7 +487,7 @@ public class Cleaner extends MetaStoreCompactorThread {
}
Database db = getMSForConf(conf).getDatabase(getDefaultCatalog(conf),
ci.dbname);
boolean needCmRecycle = ReplChangeManager.isSourceOfReplication(db);
-
+
for (Path dead : filesToDelete) {
LOG.debug("Going to delete path " + dead.toString());
if (needCmRecycle) {
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 42c5a04..f258005 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -24,7 +24,6 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.api.CommitTxnRequest;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
import org.apache.hadoop.hive.metastore.api.CompactionType;
-import org.apache.hadoop.hive.metastore.api.GetPartitionRequest;
import org.apache.hadoop.hive.metastore.api.GetTableRequest;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.FindNextCompactRequest;
@@ -210,9 +209,13 @@ public class TestCleaner extends CompactorTest {
Assert.assertEquals(2, paths.size());
boolean sawBase = false, sawDelta = false;
for (Path p : paths) {
- if (p.getName().equals("base_20")) sawBase = true;
- else if (p.getName().equals(makeDeltaDirName(21, 24))) sawDelta = true;
- else Assert.fail("Unexpected file " + p.getName());
+ if (p.getName().equals("base_20")) {
+ sawBase = true;
+ } else if (p.getName().equals(makeDeltaDirName(21, 24))) {
+ sawDelta = true;
+ } else {
+ Assert.fail("Unexpected file " + p.getName());
+ }
}
Assert.assertTrue(sawBase);
Assert.assertTrue(sawDelta);
@@ -246,9 +249,13 @@ public class TestCleaner extends CompactorTest {
Assert.assertEquals(2, paths.size());
boolean sawBase = false, sawDelta = false;
for (Path path : paths) {
- if (path.getName().equals("base_20")) sawBase = true;
- else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24)))
sawDelta = true;
- else Assert.fail("Unexpected file " + path.getName());
+ if (path.getName().equals("base_20")) {
+ sawBase = true;
+ } else if (path.getName().equals(makeDeltaDirNameCompacted(21, 24))) {
+ sawDelta = true;
+ } else {
+ Assert.fail("Unexpected file " + path.getName());
+ }
}
Assert.assertTrue(sawBase);
Assert.assertTrue(sawDelta);
@@ -722,4 +729,99 @@ public class TestCleaner extends CompactorTest {
Assert.assertTrue(sawBase);
Assert.assertTrue(sawDelta);
}
+
+ @Test
+ public void withSingleBaseCleanerSucceeds() throws Exception {
+ Map<String, String> parameters = new HashMap<>();
+
+ Table t = newTable("default", "dcamc", false, parameters);
+
+ addBaseFile(t, null, 25L, 25);
+
+ burnThroughTransactions("default", "dcamc", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "dcamc",
CompactionType.MAJOR);
+ compactInTxn(rqst);
+
+ startCleaner();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
+ }
+
+ @Test
+ public void withNewerBaseCleanerSucceeds() throws Exception {
+ Map<String, String> parameters = new HashMap<>();
+
+ Table t = newTable("default", "dcamc", false, parameters);
+
+ addBaseFile(t, null, 25L, 25);
+
+ burnThroughTransactions("default", "dcamc", 25);
+
+ CompactionRequest rqst = new CompactionRequest("default", "dcamc",
CompactionType.MAJOR);
+ compactInTxn(rqst);
+
+ burnThroughTransactions("default", "dcamc", 1);
+ addBaseFile(t, null, 26L, 26);
+
+ startCleaner();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
+
+ List<Path> paths = getDirectories(conf, t, null);
+ // we should retain both 25 and 26
+ Assert.assertEquals(2, paths.size());
+ }
+
+ @Test
+ public void withNotYetVisibleBase() throws Exception {
+
+ String dbName = "default";
+ String tableName = "camtc";
+ Table t = newTable(dbName, tableName, false);
+
+ addBaseFile(t, null, 20L, 20);
+ burnThroughTransactions(dbName, tableName, 25);
+
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+
+ long compactTxn = compactInTxn(rqst);
+ addBaseFile(t, null, 25L, 25, compactTxn);
+ startCleaner();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
+ }
+
+ @Test
+ public void cleanMultipleTimesWithSameWatermark() throws Exception {
+ String dbName = "default";
+ String tableName = "camtc";
+ Table t = newTable(dbName, tableName, false);
+
+ addBaseFile(t, null, 20L, 20);
+ addDeltaFile(t, null, 21L, 22L, 2);
+ burnThroughTransactions(dbName, tableName, 22);
+
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+ addBaseFile(t, null, 22L, 22);
+ compactInTxn(rqst);
+ compactInTxn(rqst);
+
+ startCleaner();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(2, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(1).getState());
+
+ List<Path> paths = getDirectories(conf, t, null);
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals("base_22", paths.get(0).getName());
+ }
}