This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 2fa252a0474 HIVE-29251: Hive ACID: HiveConf object shouldn't be shared
between multiple cleanup task threads (#6119)
2fa252a0474 is described below
commit 2fa252a04748e97e6581b7685f8dcc93825eb7f9
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Mon Oct 13 10:26:36 2025 +0200
HIVE-29251: Hive ACID: HiveConf object shouldn't be shared between multiple
cleanup task threads (#6119)
---
.../ql/txn/compactor/TestCleanerWithSecureDFS.java | 2 +-
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 2 +-
.../hive/ql/txn/compactor/MetadataCache.java | 5 +-
.../txn/compactor/handler/AbortedTxnCleaner.java | 65 +++++++-------
.../txn/compactor/handler/CompactionCleaner.java | 75 +++++++++--------
.../hive/ql/txn/compactor/handler/TaskHandler.java | 98 +++++++++++++++-------
.../compactor/handler/TestAbortedTxnCleaner.java | 14 ++--
.../hive/ql/txn/compactor/handler/TestHandler.java | 5 +-
.../txn/jdbc/queries/ReadyToCleanAbortHandler.java | 2 +-
.../txn/jdbc/queries/ReadyToCleanHandler.java | 3 +-
10 files changed, 157 insertions(+), 114 deletions(-)
diff --git
a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
index 2667a83d43d..f40e3d75c90 100644
---
a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
+++
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
@@ -173,7 +173,7 @@ public void testLeakAfterHistoryException() throws
Exception {
// Depending on the Xmx value the leak may lead to OOM; if you definitely
want to see the OOM
// increase the size of the configuration or the number of failed
compactions.
Assert.assertTrue("Allocated memory, " + diffMem + "bytes , exceeds
acceptable variance of 250MB.",
- diffMem < 250_000_000);
+ diffMem < 450_000_000);
}
@Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 6126f150e3a..b155e600dc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -91,7 +91,7 @@ public void run() {
for (TaskHandler cleanupHandler : cleanupHandlers) {
try {
CompactorUtil.checkInterrupt(CLASS_NAME);
- List<Runnable> tasks = cleanupHandler.getTasks();
+ List<Runnable> tasks = cleanupHandler.getTasks(conf);
List<CompletableFuture<Void>> asyncTasks = new ArrayList<>();
for (Runnable task : tasks) {
CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
index d3c7e1d64ad..8898ac46c36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
@@ -29,7 +29,10 @@ public class MetadataCache {
public MetadataCache(boolean isCacheEnabled) {
if (isCacheEnabled) {
- metaCache = Caffeine.newBuilder().softValues().build();
+ metaCache = Caffeine.newBuilder()
+ .softValues()
+ .maximumSize(10_000)
+ .build();
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
index 2314ce4d2e4..9ad9191e144 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
@@ -38,9 +38,10 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static java.util.Objects.isNull;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD;
+import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
/**
* Abort-cleanup based implementation of TaskHandler.
@@ -51,8 +52,7 @@ class AbortedTxnCleaner extends TaskHandler {
private static final Logger LOG =
LoggerFactory.getLogger(AbortedTxnCleaner.class.getName());
public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
- MetadataCache metadataCache, boolean metricsEnabled,
- FSRemover fsRemover) {
+ MetadataCache metadataCache, boolean metricsEnabled, FSRemover
fsRemover) {
super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
}
@@ -73,23 +73,23 @@ a. Find the list of entries which are suitable for cleanup
(This is done in {@li
e. Fetch the aborted write IDs from the AcidState and use it to delete
the associated metadata in the TXN_COMPONENTS table.
**/
@Override
- public List<Runnable> getTasks() throws MetaException {
- int abortedThreshold = HiveConf.getIntVar(conf,
- HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
- long abortedTimeThreshold = HiveConf
- .getTimeVar(conf,
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
- TimeUnit.MILLISECONDS);
- List<CompactionInfo> readyToCleanAborts =
txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
+ public List<Runnable> getTasks(HiveConf conf) throws MetaException {
+ int abortedThreshold = HiveConf.getIntVar(conf,
HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+ long abortedTimeThreshold = HiveConf.getTimeVar(conf,
HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+ TimeUnit.MILLISECONDS);
+ List<CompactionInfo> readyToCleanAborts =
+ txnHandler.findReadyToCleanAborts(abortedTimeThreshold,
abortedThreshold);
if (!readyToCleanAborts.isEmpty()) {
- return readyToCleanAborts.stream().map(info ->
ThrowingRunnable.unchecked(() ->
- clean(info, info.minOpenWriteTxnId, metricsEnabled)))
- .collect(Collectors.toList());
+ return readyToCleanAborts.stream()
+ .map(info -> ThrowingRunnable.unchecked(
+ () -> clean(info, metricsEnabled)))
+ .toList();
}
return Collections.emptyList();
}
- private void clean(CompactionInfo info, long minOpenWriteTxn, boolean
metricsEnabled) throws MetaException, InterruptedException {
+ private void clean(CompactionInfo info, boolean metricsEnabled) throws
MetaException, InterruptedException {
LOG.info("Starting cleaning for {}", info);
PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_";
@@ -98,7 +98,8 @@ private void clean(CompactionInfo info, long minOpenWriteTxn,
boolean metricsEna
perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(),
cleanerMetric);
}
Partition p = null;
- Table t = metadataCache.computeIfAbsent(info.getFullTableName(), () ->
resolveTable(info.dbname, info.tableName));
+ Table t = resolveTable(info);
+
if (isNull(t)) {
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table {}, assuming it was dropped.",
info.getFullTableName());
@@ -109,25 +110,24 @@ private void clean(CompactionInfo info, long
minOpenWriteTxn, boolean metricsEna
p = resolvePartition(info.dbname, info.tableName, info.partName);
if (isNull(p)) {
// The partition was dropped before we got around to cleaning it.
- LOG.info("Unable to find partition {}, assuming it was dropped.",
- info.getFullPartitionName());
+ LOG.info("Unable to find partition {}, assuming it was dropped.",
info.getFullPartitionName());
txnHandler.markCleaned(info);
return;
}
}
- String location =
CompactorUtil.resolveStorageDescriptor(t,p).getLocation();
- info.runAs = TxnUtils.findUserToRunAs(location, t, conf);
- abortCleanUsingAcidDir(info, location, minOpenWriteTxn);
+ String location = CompactorUtil.resolveStorageDescriptor(t,
p).getLocation();
+ info.runAs = TxnUtils.findUserToRunAs(location, t, getConf());
+ abortCleanUsingAcidDir(info, t, location);
} catch (InterruptedException e) {
LOG.error("Caught an interrupted exception when cleaning, unable to
complete cleaning of {} due to {}", info,
- e.getMessage());
+ e.getMessage());
handleCleanerAttemptFailure(info, e.getMessage());
throw e;
} catch (Exception e) {
LOG.error("Caught exception when cleaning, unable to complete cleaning
of {} due to {}", info,
- e.getMessage());
+ e.getMessage());
handleCleanerAttemptFailure(info, e.getMessage());
throw new MetaException(e.getMessage());
} finally {
@@ -137,22 +137,24 @@ private void clean(CompactionInfo info, long
minOpenWriteTxn, boolean metricsEna
}
}
- private void abortCleanUsingAcidDir(CompactionInfo info, String location,
long minOpenWriteTxn) throws Exception {
- ValidTxnList validTxnList =
- TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(),
minOpenWriteTxn, true);
+ private void abortCleanUsingAcidDir(CompactionInfo info, Table table, String
location) throws Exception {
+ ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner(
+ getOpenTxns(), info.minOpenWriteTxnId, true);
//save it so that getAcidState() sees it
- conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+ getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+ // Creating 'reader' list since we are interested in the set of 'obsolete'
files
ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(info,
validTxnList);
+ LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
// Set the highestWriteId of the cleanup equal to the min(minOpenWriteId -
1, highWatermark).
- // This is necessary for looking at the complete state of the table till
the min open write Id
+ // This is necessary for looking at the complete state of the table till
the min open writeId
// (if there is an open txn on the table) or the highestWatermark.
// This is used later on while deleting the records in TXN_COMPONENTS
table.
- info.highestWriteId =
Math.min(isNull(validWriteIdList.getMinOpenWriteId()) ?
- Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1,
validWriteIdList.getHighWatermark());
- Table table = metadataCache.computeIfAbsent(info.getFullTableName(), () ->
resolveTable(info.dbname, info.tableName));
- LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+ info.highestWriteId = Math.min(
+ isNull(validWriteIdList.getMinOpenWriteId()) ?
+ Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1,
+ validWriteIdList.getHighWatermark());
boolean success = cleanAndVerifyObsoleteDirectories(info, location,
validWriteIdList, table);
if (success || CompactorUtil.isDynPartAbort(table, info.partName)) {
@@ -160,6 +162,5 @@ private void abortCleanUsingAcidDir(CompactionInfo info,
String location, long m
} else {
LOG.warn("Leaving aborted entry {} in TXN_COMPONENTS table.", info);
}
-
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
index 634da542717..fd49bbb90f1 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -52,7 +52,6 @@
import java.util.Collections;
import java.util.List;
import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
import static
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
@@ -67,17 +66,16 @@ class CompactionCleaner extends TaskHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CompactionCleaner.class.getName());
public CompactionCleaner(HiveConf conf, TxnStore txnHandler,
- MetadataCache metadataCache, boolean
metricsEnabled,
- FSRemover fsRemover) {
+ MetadataCache metadataCache, boolean metricsEnabled, FSRemover
fsRemover) {
super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
}
@Override
- public List<Runnable> getTasks() throws MetaException {
+ public List<Runnable> getTasks(HiveConf conf) throws MetaException {
long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
long retentionTime = HiveConf.getBoolVar(conf,
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
- ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME,
TimeUnit.MILLISECONDS)
- : 0;
+ ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME,
TimeUnit.MILLISECONDS)
+ : 0;
List<CompactionInfo> readyToClean =
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
if (!readyToClean.isEmpty()) {
long minTxnIdSeenOpen = Math.min(minOpenTxnId,
txnHandler.findMinTxnIdSeenOpen());
@@ -87,20 +85,21 @@ public List<Runnable> getTasks() throws MetaException {
// to the clean method, to avoid cleaning up deltas needed for running
queries
// when min_history_level is finally dropped, than every HMS will commit
compaction the new way
// and minTxnIdSeenOpen can be removed and minOpenTxnId can be used
instead.
- return readyToClean.stream().map(ci -> {
- long cleanerWaterMark = (ci.minOpenWriteId >= 0) ? ci.nextTxnId + 1 :
minTxnIdSeenOpen;
- LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
- return ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark,
metricsEnabled));
- }).collect(Collectors.toList());
+ return readyToClean.stream()
+ .map(ci -> ThrowingRunnable.unchecked(
+ () -> clean(ci, minTxnIdSeenOpen, metricsEnabled)))
+ .toList();
}
return Collections.emptyList();
}
private void clean(CompactionInfo ci, long minOpenTxn, boolean
metricsEnabled) throws MetaException {
- LOG.info("Starting cleaning for {}", ci);
+ LOG.info("Starting cleaning for {}, based on min open {}", ci,
+ (ci.minOpenWriteId > 0) ? "writeId: " + ci.minOpenWriteId : "txnId: "
+ minOpenTxn);
+
PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
- (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
+ (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
try {
if (metricsEnabled) {
perfLogger.perfLogBegin(CompactionCleaner.class.getName(),
cleanerMetric);
@@ -111,11 +110,12 @@ private void clean(CompactionInfo ci, long minOpenTxn,
boolean metricsEnabled) t
Partition p = null;
if (isNull(location)) {
- t = metadataCache.computeIfAbsent(ci.getFullTableName(), () ->
resolveTable(ci.dbname, ci.tableName));
+ t = resolveTable(ci);
+
if (isNull(t)) {
// The table was dropped before we got around to cleaning it.
LOG.info("Unable to find table {}, assuming it was dropped. {}",
ci.getFullTableName(),
- idWatermark(ci));
+ idWatermark(ci));
txnHandler.markCleaned(ci);
return;
}
@@ -129,8 +129,8 @@ private void clean(CompactionInfo ci, long minOpenTxn,
boolean metricsEnabled) t
p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
if (isNull(p)) {
// The partition was dropped before we got around to cleaning it.
- LOG.info("Unable to find partition {}, assuming it was dropped.
{}",
- ci.getFullPartitionName(), idWatermark(ci));
+ LOG.info("Unable to find partition {}, assuming it was dropped.
{}", ci.getFullPartitionName(),
+ idWatermark(ci));
txnHandler.markCleaned(ci);
return;
}
@@ -146,22 +146,23 @@ private void clean(CompactionInfo ci, long minOpenTxn,
boolean metricsEnabled) t
if (!isNull(t) || !isNull(ci.partName)) {
String path = isNull(location)
- ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
- : location;
+ ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
+ : location;
boolean dropPartition = !isNull(ci.partName) && isNull(p);
//check if partition wasn't re-created
if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName,
ci.partName))) {
cleanUsingLocation(ci, path, true);
} else {
- cleanUsingAcidDir(ci, path, minOpenTxn);
+ long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 :
minOpenTxn;
+ cleanUsingAcidDir(ci, t, path, cleanerWaterMark);
}
} else {
cleanUsingLocation(ci, location, false);
}
} catch (Exception e) {
LOG.error("Caught exception when cleaning, unable to complete cleaning
of {} due to {}", ci,
- e.getMessage());
+ e.getMessage());
if (metricsEnabled) {
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
}
@@ -203,19 +204,18 @@ private void cleanUsingLocation(CompactionInfo ci, String
path, boolean requires
}
}
- private void cleanUsingAcidDir(CompactionInfo ci, String location, long
minOpenTxn) throws Exception {
- ValidTxnList validTxnList =
- TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(),
minOpenTxn, false);
+ private void cleanUsingAcidDir(CompactionInfo ci, Table table, String
location, long minOpenTxn) throws Exception {
+ ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner(
+ getOpenTxns(), minOpenTxn, false);
//save it so that getAcidState() sees it
- conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+ getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
/*
- * {@code validTxnList} is capped by minOpenTxnGLB so if
+ * {@code validTxnList} is capped by global minOpenTxn so if
* {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)}
sees a base/delta
* produced by a compactor, that means every reader that could be active
right now see it
- * as well. That means if this base/delta shadows some earlier
base/delta, it will be
+ * as well. That means if this base/delta shadows some earlier base/delta,
it will be
* used in favor of any files that it shadows. Thus, the shadowed files
are safe to delete.
*
- *
* The metadata about aborted writeIds (and consequently aborted txn IDs)
cannot be deleted
* above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
* See {@link TxnStore#markCleaned(CompactionInfo)} for details.
@@ -241,7 +241,6 @@ private void cleanUsingAcidDir(CompactionInfo ci, String
location, long minOpenT
// Creating 'reader' list since we are interested in the set of 'obsolete'
files
ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci,
validTxnList);
- Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () ->
resolveTable(ci.dbname, ci.tableName));
LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
boolean success = cleanAndVerifyObsoleteDirectories(ci, location,
validWriteIdList, table);
@@ -249,12 +248,13 @@ private void cleanUsingAcidDir(CompactionInfo ci, String
location, long minOpenT
txnHandler.markCleaned(ci);
} else {
txnHandler.clearCleanerStart(ci);
- LOG.warn("No files were removed. Leaving queue entry {} in ready for
cleaning state.", ci);
+ LOG.warn("Leaving queue entry {} in ready for cleaning state.", ci);
}
}
private LockRequest createLockRequest(CompactionInfo ci) {
- return CompactorUtil.createLockRequest(conf, ci, 0, LockType.EXCL_WRITE,
DataOperationType.DELETE);
+ return CompactorUtil.createLockRequest(
+ getConf(), ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
}
private static String idWatermark(CompactionInfo ci) {
@@ -263,7 +263,7 @@ private static String idWatermark(CompactionInfo ci) {
@Override
protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo
ci, ValidTxnList validTxnList)
- throws NoSuchTxnException, MetaException {
+ throws Exception {
ValidReaderWriteIdList validWriteIdList =
super.getValidCleanerWriteIdList(ci, validTxnList);
/*
* We need to filter the obsoletes dir list, to only remove directories
that were made obsolete by this compaction
@@ -279,11 +279,14 @@ protected ValidReaderWriteIdList
getValidCleanerWriteIdList(CompactionInfo ci, V
private CleanupRequest getCleaningRequestBasedOnLocation(CompactionInfo ci,
String location) {
String strIfPurge = ci.getProperty("ifPurge");
boolean ifPurge = strIfPurge != null ||
Boolean.parseBoolean(ci.getProperty("ifPurge"));
-
Path obsoletePath = new Path(location);
+
return new CleanupRequestBuilder()
-
.setLocation(location).setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
-
.setRunAs(ci.runAs).setPurge(ifPurge).setObsoleteDirs(Collections.singletonList(obsoletePath))
- .build();
+ .setLocation(location).setDbName(ci.dbname)
+ .setFullPartitionName(ci.getFullPartitionName())
+ .setRunAs(ci.runAs)
+ .setPurge(ifPurge)
+ .setObsoleteDirs(Collections.singletonList(obsoletePath))
+ .build();
}
}
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
index f4d0a5adc15..d140eb4d9ac 100644
---
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
+++
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor.handler;
+import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidCleanerWriteIdList;
@@ -24,13 +25,14 @@
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
import org.apache.hadoop.hive.common.ValidTxnList;
import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
import org.apache.hadoop.hive.metastore.api.Partition;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -45,7 +47,6 @@
import org.slf4j.LoggerFactory;
import java.io.IOException;
-import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
@@ -63,57 +64,81 @@
public abstract class TaskHandler {
private static final Logger LOG =
LoggerFactory.getLogger(TaskHandler.class.getName());
+
protected final TxnStore txnHandler;
- protected final HiveConf conf;
+ private final ThreadLocal<HiveConf> threadLocalConf;
protected final boolean metricsEnabled;
- protected final MetadataCache metadataCache;
+ private final MetadataCache metadataCache;
protected final FSRemover fsRemover;
- protected final long defaultRetention;
+ private final long defaultRetention;
TaskHandler(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
- boolean metricsEnabled, FSRemover fsRemover) {
- this.conf = conf;
+ boolean metricsEnabled, FSRemover fsRemover) {
+ this.threadLocalConf = ThreadLocal.withInitial(() -> new HiveConf(conf));
this.txnHandler = txnHandler;
this.metadataCache = metadataCache;
this.metricsEnabled = metricsEnabled;
this.fsRemover = fsRemover;
- this.defaultRetention = getTimeVar(conf,
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
+ this.defaultRetention = getTimeVar(conf,
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME,
+ TimeUnit.MILLISECONDS);
+ }
+
+ public abstract List<Runnable> getTasks(HiveConf conf) throws MetaException;
+
+ protected HiveConf getConf() {
+ return threadLocalConf.get();
}
- public abstract List<Runnable> getTasks() throws MetaException;
+ protected GetOpenTxnsResponse getOpenTxns() throws Exception {
+ return metadataCache.computeIfAbsent("openTxns", txnHandler::getOpenTxns);
+ }
- protected Table resolveTable(String dbName, String tableName) throws
MetaException {
- return CompactorUtil.resolveTable(conf, dbName, tableName);
+ protected Table resolveTable(CompactionInfo info) throws Exception {
+ return metadataCache.computeIfAbsent(info.getFullTableName(),
+ () -> resolveTable(info.dbname, info.tableName));
+ }
+
+ @VisibleForTesting
+ Table resolveTable(String dbName, String tableName) throws MetaException {
+ return CompactorUtil.resolveTable(getConf(), dbName, tableName);
}
protected Partition resolvePartition(String dbName, String tableName, String
partName) throws MetaException {
- return CompactorUtil.resolvePartition(conf, null, dbName, tableName,
partName, CompactorUtil.METADATA_FETCH_MODE.LOCAL);
+ return CompactorUtil.resolvePartition(
+ getConf(), null, dbName, tableName, partName,
CompactorUtil.METADATA_FETCH_MODE.LOCAL);
}
protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo
info, ValidTxnList validTxnList)
- throws NoSuchTxnException, MetaException {
- List<String> tblNames =
Collections.singletonList(AcidUtils.getFullTableName(info.dbname,
info.tableName));
+ throws Exception {
+ List<String> tblNames = Collections.singletonList(
+ TxnUtils.getFullTableName(info.dbname, info.tableName));
+
GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
request.setValidTxnList(validTxnList.writeToString());
- GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
+ GetValidWriteIdsResponse rsp =
metadataCache.computeIfAbsent(info.getFullTableName() +
validTxnList.writeToString(),
+ () -> txnHandler.getValidWriteIds(request));
// we could have no write IDs for a table if it was never written to but
// since we are in the Cleaner phase of compactions, there must have
// been some delta/base dirs
assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
- return new ValidCleanerWriteIdList(
-
TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)));
+ ValidReaderWriteIdList validWriteIdList =
TxnCommonUtils.createValidReaderWriteIdList(
+ rsp.getTblValidWriteIds().getFirst());
+ return new ValidCleanerWriteIdList(validWriteIdList);
}
protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info,
String location,
- ValidReaderWriteIdList
validWriteIdList, Table table) throws MetaException, IOException {
+ ValidReaderWriteIdList validWriteIdList, Table table) throws
MetaException, IOException {
Path path = new Path(location);
- FileSystem fs = path.getFileSystem(conf);
+ FileSystem fs = path.getFileSystem(getConf());
// Collect all the files/dirs
Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots =
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
- AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf,
validWriteIdList, Ref.from(false), false,
- dirSnapshots);
+
+ AcidDirectory dir = AcidUtils.getAcidState(
+ fs, path, getConf(), validWriteIdList, Ref.from(false), false,
+ dirSnapshots);
+
boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table,
info.partName);
List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir,
isDynPartAbort);
@@ -121,26 +146,35 @@ protected boolean
cleanAndVerifyObsoleteDirectories(CompactionInfo info, String
info.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
}
- List<Path> deleted = fsRemover.clean(new
CleanupRequest.CleanupRequestBuilder().setLocation(location)
-
.setDbName(info.dbname).setFullPartitionName(info.getFullPartitionName())
- .setRunAs(info.runAs).setObsoleteDirs(obsoleteDirs).setPurge(true)
+ List<Path> deleted = fsRemover.clean(
+ new CleanupRequest.CleanupRequestBuilder()
+ .setLocation(location)
+ .setDbName(info.dbname)
+ .setFullPartitionName(info.getFullPartitionName())
+ .setRunAs(info.runAs)
+ .setObsoleteDirs(obsoleteDirs).setPurge(true)
.build());
if (!deleted.isEmpty()) {
- AcidMetricService.updateMetricsFromCleaner(info.dbname, info.tableName,
info.partName, dir.getObsolete(), conf,
- txnHandler);
+ AcidMetricService.updateMetricsFromCleaner(
+ info.dbname, info.tableName, info.partName, dir.getObsolete(),
getConf(),
+ txnHandler);
}
// Make sure there are no leftovers below the compacted watermark
boolean success = false;
- conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
- dir = AcidUtils.getAcidState(fs, path, conf, new
ValidCleanerWriteIdList(info.getFullTableName(), info.highestWriteId),
- Ref.from(false), false, dirSnapshots);
+ getConf().set(ValidTxnList.VALID_TXNS_KEY, new
ValidReadTxnList().toString());
+
+ dir = AcidUtils.getAcidState(
+ fs, path, getConf(),
+ new ValidCleanerWriteIdList(info.getFullTableName(),
info.highestWriteId),
+ Ref.from(false), false,
+ dirSnapshots);
List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir,
isDynPartAbort), deleted);
if (!remained.isEmpty()) {
- LOG.warn("Remained {} obsolete directories from {}. {}",
- remained.size(), location, CompactorUtil.getDebugInfo(remained));
+ LOG.warn("Remained {} obsolete directories from {}. {}",
remained.size(), location,
+ CompactorUtil.getDebugInfo(remained));
} else {
LOG.debug("All cleared below the watermark: {} from {}",
info.highestWriteId, location);
success = true;
@@ -160,7 +194,7 @@ protected void handleCleanerAttemptFailure(CompactionInfo
info, String errorMess
if (info.retryRetention > 0) {
cleanAttempts = (int) (Math.log(info.retryRetention /
defaultRetention) / Math.log(2)) + 1;
}
- if (cleanAttempts >= getIntVar(conf,
HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
+ if (cleanAttempts >= getIntVar(getConf(),
HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
//Mark it as failed if the max attempt threshold is reached.
txnHandler.markFailed(info);
} else {
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
index 8f6814d4890..7e73d06c9ff 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
@@ -78,7 +78,7 @@ public void
testCleaningOfAbortedDirectoriesForUnpartitionedTables() throws Exce
cleaner.run();
Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
- Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ Mockito.verify(mockedTaskHandler,
Mockito.times(1)).getTasks(any(HiveConf.class));
List<Path> directories = getDirectories(conf, t, null);
// All aborted directories removed, hence 1 committed delta directory must
be present
@@ -109,7 +109,7 @@ public void
testCleaningOfAbortedDirectoriesForSinglePartition() throws Exceptio
cleaner.run();
Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
- Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ Mockito.verify(mockedTaskHandler,
Mockito.times(1)).getTasks(any(HiveConf.class));
List<Path> directories = getDirectories(conf, t, p);
// All aborted directories removed, hence 1 committed delta directory must
be present
@@ -147,7 +147,7 @@ public void
testCleaningOfAbortedDirectoriesForMultiplePartitions() throws Excep
cleaner.run();
Mockito.verify(mockedFSRemover,
Mockito.times(2)).clean(any(CleanupRequest.class));
- Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ Mockito.verify(mockedTaskHandler,
Mockito.times(1)).getTasks(any(HiveConf.class));
List<Path> directories = getDirectories(conf, t, p1);
// All aborted directories removed, hence 1 committed delta directory must
be present
@@ -190,7 +190,7 @@ public void
testCleaningOfAbortedDirectoriesWithLongRunningOpenWriteTxn() throws
cleaner.run();
Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
- Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ Mockito.verify(mockedTaskHandler,
Mockito.times(1)).getTasks(any(HiveConf.class));
List<Path> directories = getDirectories(conf, t, null);
// All aborted directories below min open write ID are removed,
@@ -241,7 +241,7 @@ public void testCleaningOfAbortedDirectoriesOnTopOfBase()
throws Exception {
cleaner.run();
Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
- Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ Mockito.verify(mockedTaskHandler,
Mockito.times(1)).getTasks(any(HiveConf.class));
directories = getDirectories(conf, t, null);
Assert.assertEquals(1, directories.size());
@@ -283,7 +283,7 @@ public void testCleaningOfAbortedDirectoriesBelowBase()
throws Exception {
cleaner.run();
Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
- Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ Mockito.verify(mockedTaskHandler,
Mockito.times(1)).getTasks(any(HiveConf.class));
directories = getDirectories(conf, t, null);
// The table is already compacted, so we must see 1 base delta
@@ -367,7 +367,7 @@ public void
testAbortCleanupNotUpdatingSpecificCompactionTables(boolean isPartit
cleaner.run();
Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
- Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ Mockito.verify(mockedTaskHandler,
Mockito.times(1)).getTasks(any(HiveConf.class));
Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf,
compactionQueuePresence));
Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT
COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " +
diff --git
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
index b1d60b8a851..1279cd6cc18 100644
---
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
+++
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
@@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hive.ql.txn.compactor.handler;
+import org.apache.hadoop.hive.conf.HiveConf;
import org.apache.hadoop.hive.metastore.ReplChangeManager;
import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -72,7 +73,7 @@ public void testCompactionHandlerAndFsRemover() throws
Exception {
cleaner.run();
Mockito.verify(mockedFSRemover,
Mockito.times(1)).clean(any(CleanupRequest.class));
- Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+ Mockito.verify(mockedTaskHandler,
Mockito.times(1)).getTasks(any(HiveConf.class));
}
@Test
@@ -105,7 +106,7 @@ public void testMetaCache() throws Exception {
ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
List<ShowCompactResponseElement> compacts = rsp.getCompacts();
Assert.assertEquals(1, compacts.size());
- Mockito.verify(mockedMetadataCache, times(3)).computeIfAbsent(any(),
any());
+ Mockito.verify(mockedMetadataCache, times(4)).computeIfAbsent(any(),
any());
Mockito.verify(mockedTaskHandler, times(1)).resolveTable(any(), any());
}
}
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
index 4940d384095..ae88852ebef 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
@@ -113,7 +113,7 @@ public List<CompactionInfo> extractData(ResultSet rs)
throws DataAccessException
info.partName = rs.getString("PART");
// In this case, this field contains min open write txn ID.
long value = rs.getLong("MIN_OPEN_WRITE_TXNID");
- info.minOpenWriteTxnId = value > 0 ? value : Long.MAX_VALUE;
+ info.minOpenWriteTxnId = !rs.wasNull() ? value : Long.MAX_VALUE;
// The specific type, state assigned to abort cleanup.
info.type = CompactionType.ABORT_TXN_CLEANUP;
info.state = READY_FOR_CLEANING;
diff --git
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java
index 0f22b00e197..26bb2bf6d11 100644
---
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java
+++
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java
@@ -120,7 +120,8 @@ public List<CompactionInfo> extractData(ResultSet rs)
throws SQLException, DataA
info.retryRetention = rs.getInt(9);
info.nextTxnId = rs.getLong(10);
if (TxnHandler.ConfVars.useMinHistoryWriteId()) {
- info.minOpenWriteId = rs.getLong(11);
+ long value = rs.getLong(11);
+ info.minOpenWriteId = !rs.wasNull() ? value : Long.MAX_VALUE;
}
infos.add(info);
}