This is an automated email from the ASF dual-hosted git repository.
kgyrtkirk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new d3542e1 HIVE-25977: Enhance Compaction Cleaner to skip when there is
nothing to do #2 (#2971) (Zoltan Haindrich reviewed by Karen Coppage and Denys
Kuzmenko)
d3542e1 is described below
commit d3542e1b35bbdbaafb52ad742a5168bd29549cee
Author: Zoltan Haindrich <[email protected]>
AuthorDate: Wed Mar 9 13:04:11 2022 +0100
HIVE-25977: Enhance Compaction Cleaner to skip when there is nothing to do
#2 (#2971) (Zoltan Haindrich reviewed by Karen Coppage and Denys Kuzmenko)
---
.../txn/compactor/TestCleanerWithReplication.java | 4 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 38 ++++++---
.../hive/ql/txn/compactor/CompactorTest.java | 33 ++++++--
.../hadoop/hive/ql/txn/compactor/TestCleaner.java | 95 ++++++++++++++++++++++
4 files changed, 148 insertions(+), 22 deletions(-)
diff --git
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
index 429d55c..6353b37 100644
---
a/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
+++
b/itests/hive-unit/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithReplication.java
@@ -143,7 +143,7 @@ public class TestCleanerWithReplication extends
CompactorTest {
addDeltaFile(t, null, 23L, 24L, 2);
addDeltaFile(t, null, 21L, 24L, 4);
- burnThroughTransactions(dbName, "camitc", 25);
+ burnThroughTransactions(dbName, "camitc", 24);
CompactionRequest rqst = new CompactionRequest(dbName, "camitc",
CompactionType.MINOR);
compactInTxn(rqst);
@@ -161,7 +161,7 @@ public class TestCleanerWithReplication extends
CompactorTest {
addDeltaFile(t, p, 23L, 24L, 2);
addDeltaFile(t, p, 21L, 24L, 4);
- burnThroughTransactions(dbName, "camipc", 25);
+ burnThroughTransactions(dbName, "camipc", 24);
CompactionRequest rqst = new CompactionRequest(dbName, "camipc",
CompactionType.MINOR);
rqst.setPartitionname("ds=today");
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 1e0dbf8..55a7802 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -17,7 +17,6 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor;
-import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.DataOperationType;
@@ -62,6 +61,7 @@ import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.txn.CompactionInfo;
import org.apache.hadoop.hive.ql.io.AcidUtils;
import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedBaseLight;
+import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDelta;
import org.apache.hadoop.hive.ql.io.AcidUtils.ParsedDeltaLight;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.StringUtils;
@@ -71,14 +71,15 @@ import java.io.IOException;
import java.security.PrivilegedExceptionAction;
import java.util.ArrayList;
import java.util.Collections;
+import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.concurrent.Callable;
+import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import static
org.apache.hadoop.hive.conf.Constants.COMPACTOR_CLEANER_THREAD_NAME_FORMAT;
@@ -87,8 +88,6 @@ import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAY
import static org.apache.hadoop.hive.metastore.HMSHandler.getMSForConf;
import static
org.apache.hadoop.hive.metastore.utils.MetaStoreUtils.getDefaultCatalog;
-import com.codahale.metrics.Counter;
-
/**
* A class to clean directories after compactions. This will run in a
separate thread.
*/
@@ -425,8 +424,10 @@ public class Cleaner extends MetaStoreCompactorThread {
// Including obsolete directories for partitioned tables can result in
data loss.
obsoleteDirs = dir.getAbortedDirectories();
}
- if (obsoleteDirs.isEmpty() && !hasDataBelowWatermark(fs, path,
writeIdList.getHighWatermark())) {
- LOG.info(idWatermark(ci) + " nothing to remove below watermark " +
writeIdList.getHighWatermark() + ", ");
+
+ if (obsoleteDirs.isEmpty()
+ && !hasDataBelowWatermark(dir, fs, path, ci.highestWriteId,
writeIdList.getHighWatermark())) {
+ LOG.info(idWatermark(ci) + " nothing to remove below watermark " +
ci.highestWriteId + ", ");
return true;
}
StringBuilder extraDebugInfo = new
StringBuilder("[").append(obsoleteDirs.stream()
@@ -439,29 +440,40 @@ public class Cleaner extends MetaStoreCompactorThread {
return success;
}
- private boolean hasDataBelowWatermark(FileSystem fs, Path path, long
highWatermark) throws IOException {
- FileStatus[] children = fs.listStatus(path);
+ private boolean hasDataBelowWatermark(AcidDirectory acidDir, FileSystem fs,
Path path, long highWatermark,
+ long minOpenTxn)
+ throws IOException {
+ Set<Path> acidPaths = new HashSet<>();
+ for (ParsedDelta delta : acidDir.getCurrentDirectories()) {
+ acidPaths.add(delta.getPath());
+ }
+ if (acidDir.getBaseDirectory() != null) {
+ acidPaths.add(acidDir.getBaseDirectory());
+ }
+ FileStatus[] children = fs.listStatus(path, p -> {
+ return !acidPaths.contains(p);
+ });
for (FileStatus child : children) {
- if (isFileBelowWatermark(child, highWatermark)) {
+ if (isFileBelowWatermark(child, highWatermark, minOpenTxn)) {
return true;
}
}
return false;
}
- private boolean isFileBelowWatermark(FileStatus child, long highWatermark) {
+ private boolean isFileBelowWatermark(FileStatus child, long highWatermark,
long minOpenTxn) {
Path p = child.getPath();
String fn = p.getName();
if (!child.isDirectory()) {
- return false;
+ return true;
}
if (fn.startsWith(AcidUtils.BASE_PREFIX)) {
ParsedBaseLight b = ParsedBaseLight.parseBase(p);
- return b.getWriteId() < highWatermark;
+ return b.getWriteId() <= highWatermark;
}
if (fn.startsWith(AcidUtils.DELTA_PREFIX) ||
fn.startsWith(AcidUtils.DELETE_DELTA_PREFIX)) {
ParsedDeltaLight d = ParsedDeltaLight.parse(p);
- return d.getMaxWriteId() < highWatermark;
+ return d.getMaxWriteId() <= highWatermark;
}
return false;
}
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
index f4ce773..e486497 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/CompactorTest.java
@@ -197,7 +197,9 @@ public abstract class CompactorTest {
TransactionalValidationListener.INSERTONLY_TRANSACTIONAL_PROPERTY);
}
table.setParameters(parameters);
- if (isTemporary) table.setTemporary(true);
+ if (isTemporary) {
+ table.setTemporary(true);
+ }
// drop the table first, in case some previous test created it
ms.dropTable(dbName, tableName);
@@ -272,6 +274,11 @@ public abstract class CompactorTest {
}
protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn,
int numRecords,
+ long visibilityId) throws Exception {
+ addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, 2, true,
visibilityId);
+ }
+
+ protected void addDeltaFile(Table t, Partition p, long minTxn, long maxTxn,
int numRecords,
int numBuckets, boolean allBucketsPresent)
throws Exception {
addFile(t, p, minTxn, maxTxn, numRecords, FileType.DELTA, numBuckets,
allBucketsPresent);
}
@@ -288,7 +295,9 @@ public abstract class CompactorTest {
FileSystem fs = FileSystem.get(conf);
FileStatus[] stats = fs.listStatus(dir);
List<Path> paths = new ArrayList<Path>(stats.length);
- for (int i = 0; i < stats.length; i++) paths.add(stats[i].getPath());
+ for (int i = 0; i < stats.length; i++) {
+ paths.add(stats[i].getPath());
+ }
return paths;
}
@@ -398,7 +407,9 @@ public abstract class CompactorTest {
FileSystem fs = FileSystem.get(conf);
for (int bucket = 0; bucket < numBuckets; bucket++) {
- if (bucket == 0 && !allBucketsPresent) continue; // skip one
+ if (bucket == 0 && !allBucketsPresent) {
+ continue; // skip one
+ }
Path partFile = null;
if (type == FileType.LEGACY) {
partFile = new Path(location,
String.format(AcidUtils.LEGACY_FILE_BUCKET_DIGITS, bucket) + "_0");
@@ -443,7 +454,9 @@ public abstract class CompactorTest {
if (baseDirectory.getName().startsWith(AcidUtils.BASE_PREFIX)) {
Path p = AcidUtils.createBucketFile(baseDirectory, bucket);
FileSystem fs = p.getFileSystem(conf);
- if (fs.exists(p)) filesToRead.add(p);
+ if (fs.exists(p)) {
+ filesToRead.add(p);
+ }
} else {
filesToRead.add(new Path(baseDirectory, "000000_0"));
@@ -452,7 +465,9 @@ public abstract class CompactorTest {
for (int i = 0; i < deltaDirectory.length; i++) {
Path p = AcidUtils.createBucketFile(deltaDirectory[i], bucket);
FileSystem fs = p.getFileSystem(conf);
- if (fs.exists(p)) filesToRead.add(p);
+ if (fs.exists(p)) {
+ filesToRead.add(p);
+ }
}
return new MockRawReader(conf, filesToRead);
}
@@ -484,7 +499,9 @@ public abstract class CompactorTest {
MockRawReader(Configuration conf, List<Path> files) throws IOException {
filesToRead = new Stack<Path>();
- for (Path file : files) filesToRead.push(file);
+ for (Path file : files) {
+ filesToRead.push(file);
+ }
this.conf = conf;
fs = FileSystem.get(conf);
}
@@ -512,7 +529,9 @@ public abstract class CompactorTest {
public boolean next(RecordIdentifier identifier, Text text) throws
IOException {
if (is == null) {
// Open the next file
- if (filesToRead.empty()) return false;
+ if (filesToRead.empty()) {
+ return false;
+ }
Path p = filesToRead.pop();
LOG.debug("Reading records from " + p.toString());
is = fs.open(p);
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
index 1a09d50..2bb8ace 100644
--- a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
+++ b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/TestCleaner.java
@@ -43,8 +43,10 @@ import org.junit.Assert;
import org.junit.Test;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
@@ -852,4 +854,97 @@ public class TestCleaner extends CompactorTest {
Assert.assertEquals(1, paths.size());
Assert.assertEquals("base_22", paths.get(0).getName());
}
+
+ @Test
+ public void nothingToCleanAfterAbortsBase() throws Exception {
+ String dbName = "default";
+ String tableName = "camtc";
+ Table t = newTable(dbName, tableName, false);
+
+ addBaseFile(t, null, 20L, 1);
+ addDeltaFile(t, null, 21L, 21L, 2);
+ addDeltaFile(t, null, 22L, 22L, 2);
+ burnThroughTransactions(dbName, tableName, 22, null, new
HashSet<Long>(Arrays.asList(21L, 22L)));
+
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+
+ compactInTxn(rqst);
+ compactInTxn(rqst);
+
+ startCleaner();
+ startCleaner();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(2, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(1).getState());
+
+ List<Path> paths = getDirectories(conf, t, null);
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals("base_20", paths.get(0).getName());
+ }
+
+ @Test
+ public void nothingToCleanAfterAbortsDelta() throws Exception {
+ String dbName = "default";
+ String tableName = "camtc";
+ Table t = newTable(dbName, tableName, false);
+
+ addDeltaFile(t, null, 20L, 20L, 1);
+ addDeltaFile(t, null, 21L, 21L, 2);
+ addDeltaFile(t, null, 22L, 22L, 2);
+ burnThroughTransactions(dbName, tableName, 22, null, new
HashSet<Long>(Arrays.asList(21L, 22L)));
+
+ CompactionRequest rqst = new CompactionRequest(dbName, tableName,
CompactionType.MAJOR);
+
+ compactInTxn(rqst);
+ compactInTxn(rqst);
+
+ startCleaner();
+ startCleaner();
+
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals(2, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(0).getState());
+ Assert.assertEquals(TxnStore.SUCCEEDED_RESPONSE,
rsp.getCompacts().get(1).getState());
+
+ List<Path> paths = getDirectories(conf, t, null);
+ Assert.assertEquals(1, paths.size());
+ Assert.assertEquals("delta_0000020_0000020", paths.get(0).getName());
+ }
+
+ @Test
+ public void testReady() throws Exception {
+ String dbName = "default";
+ String tblName = "trfcp";
+ String partName = "ds=today";
+ Table t = newTable(dbName, tblName, true);
+ Partition p = newPartition(t, "today");
+
+ // minor compaction
+ addBaseFile(t, p, 19L, 19);
+ addDeltaFile(t, p, 20L, 20L, 1);
+ addDeltaFile(t, p, 21L, 21L, 1);
+ addDeltaFile(t, p, 22L, 22L, 1);
+ burnThroughTransactions(dbName, tblName, 22);
+
+ // block cleaner with an open txn
+ long blockingTxn = openTxn();
+
+ CompactionRequest rqst = new CompactionRequest(dbName, tblName,
CompactionType.MINOR);
+ rqst.setPartitionname(partName);
+ long ctxnid = compactInTxn(rqst);
+ addDeltaFile(t, p, 20, 22, 2, ctxnid);
+ startCleaner();
+
+ // make sure cleaner didn't remove anything, and cleaning is still queued
+ List<Path> paths = getDirectories(conf, t, p);
+ Assert.assertEquals("Expected 5 files after minor compaction, instead
these files were present " + paths, 5,
+ paths.size());
+ ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
+ Assert.assertEquals("Expected 1 compaction in queue, got: " +
rsp.getCompacts(), 1, rsp.getCompactsSize());
+ Assert.assertEquals(TxnStore.CLEANING_RESPONSE,
rsp.getCompacts().get(0).getState());
+ }
+
+
}