This is an automated email from the ASF dual-hosted git repository.

dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 2fa252a0474 HIVE-29251: Hive ACID: HiveConf object shouldn't be shared 
between multiple cleanup task threads (#6119)
2fa252a0474 is described below

commit 2fa252a04748e97e6581b7685f8dcc93825eb7f9
Author: Denys Kuzmenko <[email protected]>
AuthorDate: Mon Oct 13 10:26:36 2025 +0200

    HIVE-29251: Hive ACID: HiveConf object shouldn't be shared between multiple 
cleanup task threads (#6119)
---
 .../ql/txn/compactor/TestCleanerWithSecureDFS.java |  2 +-
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      |  2 +-
 .../hive/ql/txn/compactor/MetadataCache.java       |  5 +-
 .../txn/compactor/handler/AbortedTxnCleaner.java   | 65 +++++++-------
 .../txn/compactor/handler/CompactionCleaner.java   | 75 +++++++++--------
 .../hive/ql/txn/compactor/handler/TaskHandler.java | 98 +++++++++++++++-------
 .../compactor/handler/TestAbortedTxnCleaner.java   | 14 ++--
 .../hive/ql/txn/compactor/handler/TestHandler.java |  5 +-
 .../txn/jdbc/queries/ReadyToCleanAbortHandler.java |  2 +-
 .../txn/jdbc/queries/ReadyToCleanHandler.java      |  3 +-
 10 files changed, 157 insertions(+), 114 deletions(-)

diff --git 
a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
 
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
index 2667a83d43d..f40e3d75c90 100644
--- 
a/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
+++ 
b/itests/hive-minikdc/src/test/java/org/apache/hadoop/hive/ql/txn/compactor/TestCleanerWithSecureDFS.java
@@ -173,7 +173,7 @@ public void testLeakAfterHistoryException() throws 
Exception {
     // Depending on the Xmx value the leak may lead to OOM; if you definitely 
want to see the OOM
     // increase the size of the configuration or the number of failed 
compactions.
     Assert.assertTrue("Allocated memory, " + diffMem + "bytes , exceeds 
acceptable variance of 250MB.",
-        diffMem < 250_000_000);
+        diffMem < 450_000_000);
   }
 
   @Override
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 6126f150e3a..b155e600dc8 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -91,7 +91,7 @@ public void run() {
           for (TaskHandler cleanupHandler : cleanupHandlers) {
             try {
               CompactorUtil.checkInterrupt(CLASS_NAME);
-              List<Runnable> tasks = cleanupHandler.getTasks();
+              List<Runnable> tasks = cleanupHandler.getTasks(conf);
               List<CompletableFuture<Void>> asyncTasks = new ArrayList<>();
               for (Runnable task : tasks) {
                 CompletableFuture<Void> asyncTask = CompletableFuture.runAsync(
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
index d3c7e1d64ad..8898ac46c36 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/MetadataCache.java
@@ -29,7 +29,10 @@ public class MetadataCache {
 
   public MetadataCache(boolean isCacheEnabled) {
     if (isCacheEnabled) {
-      metaCache = Caffeine.newBuilder().softValues().build();
+      metaCache = Caffeine.newBuilder()
+          .softValues()
+          .maximumSize(10_000)
+          .build();
     }
   }
 
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
index 2314ce4d2e4..9ad9191e144 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/AbortedTxnCleaner.java
@@ -38,9 +38,10 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static java.util.Objects.isNull;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD;
+import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD;
 
 /**
  * Abort-cleanup based implementation of TaskHandler.
@@ -51,8 +52,7 @@ class AbortedTxnCleaner extends TaskHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(AbortedTxnCleaner.class.getName());
 
   public AbortedTxnCleaner(HiveConf conf, TxnStore txnHandler,
-                           MetadataCache metadataCache, boolean metricsEnabled,
-                           FSRemover fsRemover) {
+        MetadataCache metadataCache, boolean metricsEnabled, FSRemover 
fsRemover) {
     super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
   }
 
@@ -73,23 +73,23 @@ a. Find the list of entries which are suitable for cleanup 
(This is done in {@li
       e. Fetch the aborted write IDs from the AcidState and use it to delete 
the associated metadata in the TXN_COMPONENTS table.
    **/
   @Override
-  public List<Runnable> getTasks() throws MetaException {
-    int abortedThreshold = HiveConf.getIntVar(conf,
-              HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
-    long abortedTimeThreshold = HiveConf
-              .getTimeVar(conf, 
HiveConf.ConfVars.HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
-                      TimeUnit.MILLISECONDS);
-    List<CompactionInfo> readyToCleanAborts = 
txnHandler.findReadyToCleanAborts(abortedTimeThreshold, abortedThreshold);
+  public List<Runnable> getTasks(HiveConf conf) throws MetaException {
+    int abortedThreshold = HiveConf.getIntVar(conf, 
HIVE_COMPACTOR_ABORTEDTXN_THRESHOLD);
+    long abortedTimeThreshold = HiveConf.getTimeVar(conf, 
HIVE_COMPACTOR_ABORTEDTXN_TIME_THRESHOLD,
+        TimeUnit.MILLISECONDS);
+    List<CompactionInfo> readyToCleanAborts =
+        txnHandler.findReadyToCleanAborts(abortedTimeThreshold, 
abortedThreshold);
 
     if (!readyToCleanAborts.isEmpty()) {
-      return readyToCleanAborts.stream().map(info -> 
ThrowingRunnable.unchecked(() ->
-                      clean(info, info.minOpenWriteTxnId, metricsEnabled)))
-              .collect(Collectors.toList());
+      return readyToCleanAborts.stream()
+          .map(info -> ThrowingRunnable.unchecked(
+              () -> clean(info, metricsEnabled)))
+          .toList();
     }
     return Collections.emptyList();
   }
 
-  private void clean(CompactionInfo info, long minOpenWriteTxn, boolean 
metricsEnabled) throws MetaException, InterruptedException {
+  private void clean(CompactionInfo info, boolean metricsEnabled) throws 
MetaException, InterruptedException {
     LOG.info("Starting cleaning for {}", info);
     PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
     String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_";
@@ -98,7 +98,8 @@ private void clean(CompactionInfo info, long minOpenWriteTxn, 
boolean metricsEna
         perfLogger.perfLogBegin(AbortedTxnCleaner.class.getName(), 
cleanerMetric);
       }
       Partition p = null;
-      Table t = metadataCache.computeIfAbsent(info.getFullTableName(), () -> 
resolveTable(info.dbname, info.tableName));
+      Table t = resolveTable(info);
+
       if (isNull(t)) {
         // The table was dropped before we got around to cleaning it.
         LOG.info("Unable to find table {}, assuming it was dropped.", 
info.getFullTableName());
@@ -109,25 +110,24 @@ private void clean(CompactionInfo info, long 
minOpenWriteTxn, boolean metricsEna
         p = resolvePartition(info.dbname, info.tableName, info.partName);
         if (isNull(p)) {
           // The partition was dropped before we got around to cleaning it.
-          LOG.info("Unable to find partition {}, assuming it was dropped.",
-                  info.getFullPartitionName());
+          LOG.info("Unable to find partition {}, assuming it was dropped.", 
info.getFullPartitionName());
           txnHandler.markCleaned(info);
           return;
         }
       }
 
-      String location = 
CompactorUtil.resolveStorageDescriptor(t,p).getLocation();
-      info.runAs = TxnUtils.findUserToRunAs(location, t, conf);
-      abortCleanUsingAcidDir(info, location, minOpenWriteTxn);
+      String location = CompactorUtil.resolveStorageDescriptor(t, 
p).getLocation();
+      info.runAs = TxnUtils.findUserToRunAs(location, t, getConf());
+      abortCleanUsingAcidDir(info, t, location);
 
     } catch (InterruptedException e) {
       LOG.error("Caught an interrupted exception when cleaning, unable to 
complete cleaning of {} due to {}", info,
-              e.getMessage());
+          e.getMessage());
       handleCleanerAttemptFailure(info, e.getMessage());
       throw e;
     } catch (Exception e) {
       LOG.error("Caught exception when cleaning, unable to complete cleaning 
of {} due to {}", info,
-              e.getMessage());
+          e.getMessage());
       handleCleanerAttemptFailure(info, e.getMessage());
       throw new MetaException(e.getMessage());
     } finally {
@@ -137,22 +137,24 @@ private void clean(CompactionInfo info, long 
minOpenWriteTxn, boolean metricsEna
     }
   }
 
-  private void abortCleanUsingAcidDir(CompactionInfo info, String location, 
long minOpenWriteTxn) throws Exception {
-    ValidTxnList validTxnList =
-            TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), 
minOpenWriteTxn, true);
+  private void abortCleanUsingAcidDir(CompactionInfo info, Table table, String 
location) throws Exception {
+    ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner(
+        getOpenTxns(), info.minOpenWriteTxnId, true);
     //save it so that getAcidState() sees it
-    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
 
+    // Creating 'reader' list since we are interested in the set of 'obsolete' 
files
     ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(info, 
validTxnList);
+    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
 
     // Set the highestWriteId of the cleanup equal to the min(minOpenWriteId - 
1, highWatermark).
-    // This is necessary for looking at the complete state of the table till 
the min open write Id
+    // This is necessary for looking at the complete state of the table till 
the min open writeId
     // (if there is an open txn on the table) or the highestWatermark.
     // This is used later on while deleting the records in TXN_COMPONENTS 
table.
-    info.highestWriteId = 
Math.min(isNull(validWriteIdList.getMinOpenWriteId()) ?
-            Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1, 
validWriteIdList.getHighWatermark());
-    Table table = metadataCache.computeIfAbsent(info.getFullTableName(), () -> 
resolveTable(info.dbname, info.tableName));
-    LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
+    info.highestWriteId = Math.min(
+        isNull(validWriteIdList.getMinOpenWriteId()) ?
+            Long.MAX_VALUE : validWriteIdList.getMinOpenWriteId() - 1,
+        validWriteIdList.getHighWatermark());
 
     boolean success = cleanAndVerifyObsoleteDirectories(info, location, 
validWriteIdList, table);
     if (success || CompactorUtil.isDynPartAbort(table, info.partName)) {
@@ -160,6 +162,5 @@ private void abortCleanUsingAcidDir(CompactionInfo info, 
String location, long m
     } else {
       LOG.warn("Leaving aborted entry {} in TXN_COMPONENTS table.", info);
     }
-
   }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
index 634da542717..fd49bbb90f1 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/CompactionCleaner.java
@@ -52,7 +52,6 @@
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
 
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RETENTION_TIME;
 import static 
org.apache.hadoop.hive.conf.HiveConf.ConfVars.HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED;
@@ -67,17 +66,16 @@ class CompactionCleaner extends TaskHandler {
   private static final Logger LOG = 
LoggerFactory.getLogger(CompactionCleaner.class.getName());
 
   public CompactionCleaner(HiveConf conf, TxnStore txnHandler,
-                                MetadataCache metadataCache, boolean 
metricsEnabled,
-                                FSRemover fsRemover) {
+        MetadataCache metadataCache, boolean metricsEnabled, FSRemover 
fsRemover) {
     super(conf, txnHandler, metadataCache, metricsEnabled, fsRemover);
   }
 
   @Override
-  public List<Runnable> getTasks() throws MetaException {
+  public List<Runnable> getTasks(HiveConf conf) throws MetaException {
     long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
     long retentionTime = HiveConf.getBoolVar(conf, 
HIVE_COMPACTOR_DELAYED_CLEANUP_ENABLED)
-            ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 
TimeUnit.MILLISECONDS)
-            : 0;
+        ? HiveConf.getTimeVar(conf, HIVE_COMPACTOR_CLEANER_RETENTION_TIME, 
TimeUnit.MILLISECONDS)
+        : 0;
     List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
     if (!readyToClean.isEmpty()) {
       long minTxnIdSeenOpen = Math.min(minOpenTxnId, 
txnHandler.findMinTxnIdSeenOpen());
@@ -87,20 +85,21 @@ public List<Runnable> getTasks() throws MetaException {
       // to the clean method, to avoid cleaning up deltas needed for running 
queries
       // when min_history_level is finally dropped, than every HMS will commit 
compaction the new way
       // and minTxnIdSeenOpen can be removed and minOpenTxnId can be used 
instead.
-      return readyToClean.stream().map(ci -> {
-        long cleanerWaterMark = (ci.minOpenWriteId >= 0) ? ci.nextTxnId + 1 : 
minTxnIdSeenOpen;
-        LOG.info("Cleaning based on min open txn id: {}", cleanerWaterMark);
-        return ThrowingRunnable.unchecked(() -> clean(ci, cleanerWaterMark, 
metricsEnabled));
-      }).collect(Collectors.toList());
+      return readyToClean.stream()
+          .map(ci -> ThrowingRunnable.unchecked(
+              () -> clean(ci, minTxnIdSeenOpen, metricsEnabled)))
+          .toList();
     }
     return Collections.emptyList();
   }
 
   private void clean(CompactionInfo ci, long minOpenTxn, boolean 
metricsEnabled) throws MetaException {
-    LOG.info("Starting cleaning for {}", ci);
+    LOG.info("Starting cleaning for {}, based on min open {}", ci,
+        (ci.minOpenWriteId > 0) ? "writeId: " + ci.minOpenWriteId : "txnId: " 
+ minOpenTxn);
+
     PerfLogger perfLogger = PerfLogger.getPerfLogger(false);
     String cleanerMetric = MetricsConstants.COMPACTION_CLEANER_CYCLE + "_" +
-            (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
+        (!isNull(ci.type) ? ci.type.toString().toLowerCase() : null);
     try {
       if (metricsEnabled) {
         perfLogger.perfLogBegin(CompactionCleaner.class.getName(), 
cleanerMetric);
@@ -111,11 +110,12 @@ private void clean(CompactionInfo ci, long minOpenTxn, 
boolean metricsEnabled) t
       Partition p = null;
 
       if (isNull(location)) {
-        t = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
+        t = resolveTable(ci);
+
         if (isNull(t)) {
           // The table was dropped before we got around to cleaning it.
           LOG.info("Unable to find table {}, assuming it was dropped. {}", 
ci.getFullTableName(),
-                  idWatermark(ci));
+              idWatermark(ci));
           txnHandler.markCleaned(ci);
           return;
         }
@@ -129,8 +129,8 @@ private void clean(CompactionInfo ci, long minOpenTxn, 
boolean metricsEnabled) t
           p = resolvePartition(ci.dbname, ci.tableName, ci.partName);
           if (isNull(p)) {
             // The partition was dropped before we got around to cleaning it.
-            LOG.info("Unable to find partition {}, assuming it was dropped. 
{}",
-                    ci.getFullPartitionName(), idWatermark(ci));
+            LOG.info("Unable to find partition {}, assuming it was dropped. 
{}", ci.getFullPartitionName(),
+                idWatermark(ci));
             txnHandler.markCleaned(ci);
             return;
           }
@@ -146,22 +146,23 @@ private void clean(CompactionInfo ci, long minOpenTxn, 
boolean metricsEnabled) t
 
       if (!isNull(t) || !isNull(ci.partName)) {
         String path = isNull(location)
-                ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
-                : location;
+            ? CompactorUtil.resolveStorageDescriptor(t, p).getLocation()
+            : location;
         boolean dropPartition = !isNull(ci.partName) && isNull(p);
 
         //check if partition wasn't re-created
         if (dropPartition && isNull(resolvePartition(ci.dbname, ci.tableName, 
ci.partName))) {
           cleanUsingLocation(ci, path, true);
         } else {
-          cleanUsingAcidDir(ci, path, minOpenTxn);
+          long cleanerWaterMark = (ci.minOpenWriteId > 0) ? ci.nextTxnId + 1 : 
minOpenTxn;
+          cleanUsingAcidDir(ci, t, path, cleanerWaterMark);
         }
       } else {
         cleanUsingLocation(ci, location, false);
       }
     } catch (Exception e) {
       LOG.error("Caught exception when cleaning, unable to complete cleaning 
of {} due to {}", ci,
-              e.getMessage());
+          e.getMessage());
       if (metricsEnabled) {
         
Metrics.getOrCreateCounter(MetricsConstants.COMPACTION_CLEANER_FAILURE_COUNTER).inc();
       }
@@ -203,19 +204,18 @@ private void cleanUsingLocation(CompactionInfo ci, String 
path, boolean requires
     }
   }
 
-  private void cleanUsingAcidDir(CompactionInfo ci, String location, long 
minOpenTxn) throws Exception {
-    ValidTxnList validTxnList =
-            TxnUtils.createValidTxnListForCleaner(txnHandler.getOpenTxns(), 
minOpenTxn, false);
+  private void cleanUsingAcidDir(CompactionInfo ci, Table table, String 
location, long minOpenTxn) throws Exception {
+    ValidTxnList validTxnList = TxnUtils.createValidTxnListForCleaner(
+        getOpenTxns(), minOpenTxn, false);
     //save it so that getAcidState() sees it
-    conf.set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
+    getConf().set(ValidTxnList.VALID_TXNS_KEY, validTxnList.writeToString());
     /*
-     * {@code validTxnList} is capped by minOpenTxnGLB so if
+     * {@code validTxnList} is capped by global minOpenTxn so if
      * {@link AcidUtils#getAcidState(Path, Configuration, ValidWriteIdList)} 
sees a base/delta
      * produced by a compactor, that means every reader that could be active 
right now see it
-     * as well.  That means if this base/delta shadows some earlier 
base/delta, it will be
+     * as well. That means if this base/delta shadows some earlier base/delta, 
it will be
      * used in favor of any files that it shadows. Thus, the shadowed files 
are safe to delete.
      *
-     *
      * The metadata about aborted writeIds (and consequently aborted txn IDs) 
cannot be deleted
      * above COMPACTION_QUEUE.CQ_HIGHEST_WRITE_ID.
      * See {@link TxnStore#markCleaned(CompactionInfo)} for details.
@@ -241,7 +241,6 @@ private void cleanUsingAcidDir(CompactionInfo ci, String 
location, long minOpenT
 
     // Creating 'reader' list since we are interested in the set of 'obsolete' 
files
     ValidReaderWriteIdList validWriteIdList = getValidCleanerWriteIdList(ci, 
validTxnList);
-    Table table = metadataCache.computeIfAbsent(ci.getFullTableName(), () -> 
resolveTable(ci.dbname, ci.tableName));
     LOG.debug("Cleaning based on writeIdList: {}", validWriteIdList);
 
     boolean success = cleanAndVerifyObsoleteDirectories(ci, location, 
validWriteIdList, table);
@@ -249,12 +248,13 @@ private void cleanUsingAcidDir(CompactionInfo ci, String 
location, long minOpenT
       txnHandler.markCleaned(ci);
     } else {
       txnHandler.clearCleanerStart(ci);
-      LOG.warn("No files were removed. Leaving queue entry {} in ready for 
cleaning state.", ci);
+      LOG.warn("Leaving queue entry {} in ready for cleaning state.", ci);
     }
   }
 
   private LockRequest createLockRequest(CompactionInfo ci) {
-    return CompactorUtil.createLockRequest(conf, ci, 0, LockType.EXCL_WRITE, 
DataOperationType.DELETE);
+    return CompactorUtil.createLockRequest(
+        getConf(), ci, 0, LockType.EXCL_WRITE, DataOperationType.DELETE);
   }
 
   private static String idWatermark(CompactionInfo ci) {
@@ -263,7 +263,7 @@ private static String idWatermark(CompactionInfo ci) {
 
   @Override
   protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo 
ci, ValidTxnList validTxnList)
-          throws NoSuchTxnException, MetaException {
+      throws Exception {
     ValidReaderWriteIdList validWriteIdList = 
super.getValidCleanerWriteIdList(ci, validTxnList);
     /*
      * We need to filter the obsoletes dir list, to only remove directories 
that were made obsolete by this compaction
@@ -279,11 +279,14 @@ protected ValidReaderWriteIdList 
getValidCleanerWriteIdList(CompactionInfo ci, V
   private CleanupRequest getCleaningRequestBasedOnLocation(CompactionInfo ci, 
String location) {
     String strIfPurge = ci.getProperty("ifPurge");
     boolean ifPurge = strIfPurge != null || 
Boolean.parseBoolean(ci.getProperty("ifPurge"));
-
     Path obsoletePath = new Path(location);
+
     return new CleanupRequestBuilder()
-            
.setLocation(location).setDbName(ci.dbname).setFullPartitionName(ci.getFullPartitionName())
-            
.setRunAs(ci.runAs).setPurge(ifPurge).setObsoleteDirs(Collections.singletonList(obsoletePath))
-            .build();
+        .setLocation(location).setDbName(ci.dbname)
+        .setFullPartitionName(ci.getFullPartitionName())
+        .setRunAs(ci.runAs)
+        .setPurge(ifPurge)
+        .setObsoleteDirs(Collections.singletonList(obsoletePath))
+        .build();
   }
 }
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
index f4d0a5adc15..d140eb4d9ac 100644
--- 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
+++ 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/handler/TaskHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor.handler;
 
+import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidCleanerWriteIdList;
@@ -24,13 +25,14 @@
 import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
 import org.apache.hadoop.hive.common.ValidTxnList;
 import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.metastore.api.GetOpenTxnsResponse;
 import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsRequest;
 import org.apache.hadoop.hive.metastore.api.GetValidWriteIdsResponse;
 import org.apache.hadoop.hive.metastore.api.MetaException;
-import org.apache.hadoop.hive.metastore.api.NoSuchTxnException;
 import org.apache.hadoop.hive.metastore.api.Partition;
 import org.apache.hadoop.hive.metastore.api.Table;
 import org.apache.hadoop.hive.metastore.metrics.AcidMetricService;
+import org.apache.hadoop.hive.metastore.txn.TxnUtils;
 import org.apache.hadoop.hive.metastore.txn.entities.CompactionInfo;
 import org.apache.hadoop.hive.metastore.txn.TxnCommonUtils;
 import org.apache.hadoop.hive.metastore.txn.TxnStore;
@@ -45,7 +47,6 @@
 import org.slf4j.LoggerFactory;
 
 import java.io.IOException;
-import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
@@ -63,57 +64,81 @@
 public abstract class TaskHandler {
 
   private static final Logger LOG = 
LoggerFactory.getLogger(TaskHandler.class.getName());
+
   protected final TxnStore txnHandler;
-  protected final HiveConf conf;
+  private final ThreadLocal<HiveConf> threadLocalConf;
   protected final boolean metricsEnabled;
-  protected final MetadataCache metadataCache;
+  private final MetadataCache metadataCache;
   protected final FSRemover fsRemover;
-  protected final long defaultRetention;
+  private final long defaultRetention;
 
   TaskHandler(HiveConf conf, TxnStore txnHandler, MetadataCache metadataCache,
-                         boolean metricsEnabled, FSRemover fsRemover) {
-    this.conf = conf;
+        boolean metricsEnabled, FSRemover fsRemover) {
+    this.threadLocalConf = ThreadLocal.withInitial(() -> new HiveConf(conf));
     this.txnHandler = txnHandler;
     this.metadataCache = metadataCache;
     this.metricsEnabled = metricsEnabled;
     this.fsRemover = fsRemover;
-    this.defaultRetention = getTimeVar(conf, 
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME, TimeUnit.MILLISECONDS);
+    this.defaultRetention = getTimeVar(conf, 
HIVE_COMPACTOR_CLEANER_RETRY_RETENTION_TIME,
+        TimeUnit.MILLISECONDS);
+  }
+
+  public abstract List<Runnable> getTasks(HiveConf conf) throws MetaException;
+
+  protected HiveConf getConf() {
+    return threadLocalConf.get();
   }
 
-  public abstract List<Runnable> getTasks() throws MetaException;
+  protected GetOpenTxnsResponse getOpenTxns() throws Exception {
+    return metadataCache.computeIfAbsent("openTxns", txnHandler::getOpenTxns);
+  }
 
-  protected Table resolveTable(String dbName, String tableName) throws 
MetaException {
-    return CompactorUtil.resolveTable(conf, dbName, tableName);
+  protected Table resolveTable(CompactionInfo info) throws Exception {
+    return metadataCache.computeIfAbsent(info.getFullTableName(),
+        () -> resolveTable(info.dbname, info.tableName));
+  }
+
+  @VisibleForTesting
+  Table resolveTable(String dbName, String tableName) throws MetaException {
+    return CompactorUtil.resolveTable(getConf(), dbName, tableName);
   }
 
   protected Partition resolvePartition(String dbName, String tableName, String 
partName) throws MetaException {
-    return CompactorUtil.resolvePartition(conf, null, dbName, tableName, 
partName, CompactorUtil.METADATA_FETCH_MODE.LOCAL);
+    return CompactorUtil.resolvePartition(
+        getConf(), null, dbName, tableName, partName, 
CompactorUtil.METADATA_FETCH_MODE.LOCAL);
   }
 
   protected ValidReaderWriteIdList getValidCleanerWriteIdList(CompactionInfo 
info, ValidTxnList validTxnList)
-          throws NoSuchTxnException, MetaException {
-    List<String> tblNames = 
Collections.singletonList(AcidUtils.getFullTableName(info.dbname, 
info.tableName));
+      throws Exception {
+    List<String> tblNames = Collections.singletonList(
+        TxnUtils.getFullTableName(info.dbname, info.tableName));
+
     GetValidWriteIdsRequest request = new GetValidWriteIdsRequest(tblNames);
     request.setValidTxnList(validTxnList.writeToString());
-    GetValidWriteIdsResponse rsp = txnHandler.getValidWriteIds(request);
+    GetValidWriteIdsResponse rsp = 
metadataCache.computeIfAbsent(info.getFullTableName() + 
validTxnList.writeToString(),
+        () -> txnHandler.getValidWriteIds(request));
     // we could have no write IDs for a table if it was never written to but
     // since we are in the Cleaner phase of compactions, there must have
     // been some delta/base dirs
     assert rsp != null && rsp.getTblValidWriteIdsSize() == 1;
 
-    return new ValidCleanerWriteIdList(
-        
TxnCommonUtils.createValidReaderWriteIdList(rsp.getTblValidWriteIds().get(0)));
+    ValidReaderWriteIdList validWriteIdList = 
TxnCommonUtils.createValidReaderWriteIdList(
+        rsp.getTblValidWriteIds().getFirst());
+    return new ValidCleanerWriteIdList(validWriteIdList);
   }
 
   protected boolean cleanAndVerifyObsoleteDirectories(CompactionInfo info, 
String location,
-                                                      ValidReaderWriteIdList 
validWriteIdList, Table table) throws MetaException, IOException {
+        ValidReaderWriteIdList validWriteIdList, Table table) throws 
MetaException, IOException {
     Path path = new Path(location);
-    FileSystem fs = path.getFileSystem(conf);
+    FileSystem fs = path.getFileSystem(getConf());
 
     // Collect all the files/dirs
     Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
-    AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, 
validWriteIdList, Ref.from(false), false,
-            dirSnapshots);
+
+    AcidDirectory dir = AcidUtils.getAcidState(
+        fs, path, getConf(), validWriteIdList, Ref.from(false), false,
+        dirSnapshots);
+
     boolean isDynPartAbort = CompactorUtil.isDynPartAbort(table, 
info.partName);
 
     List<Path> obsoleteDirs = CompactorUtil.getObsoleteDirs(dir, 
isDynPartAbort);
@@ -121,26 +146,35 @@ protected boolean 
cleanAndVerifyObsoleteDirectories(CompactionInfo info, String
       info.setWriteIds(dir.hasUncompactedAborts(), dir.getAbortedWriteIds());
     }
 
-    List<Path> deleted = fsRemover.clean(new 
CleanupRequest.CleanupRequestBuilder().setLocation(location)
-            
.setDbName(info.dbname).setFullPartitionName(info.getFullPartitionName())
-            .setRunAs(info.runAs).setObsoleteDirs(obsoleteDirs).setPurge(true)
+    List<Path> deleted = fsRemover.clean(
+        new CleanupRequest.CleanupRequestBuilder()
+            .setLocation(location)
+            .setDbName(info.dbname)
+            .setFullPartitionName(info.getFullPartitionName())
+            .setRunAs(info.runAs)
+            .setObsoleteDirs(obsoleteDirs).setPurge(true)
             .build());
 
     if (!deleted.isEmpty()) {
-      AcidMetricService.updateMetricsFromCleaner(info.dbname, info.tableName, 
info.partName, dir.getObsolete(), conf,
-              txnHandler);
+      AcidMetricService.updateMetricsFromCleaner(
+          info.dbname, info.tableName, info.partName, dir.getObsolete(), 
getConf(),
+          txnHandler);
     }
 
     // Make sure there are no leftovers below the compacted watermark
     boolean success = false;
-    conf.set(ValidTxnList.VALID_TXNS_KEY, new ValidReadTxnList().toString());
-    dir = AcidUtils.getAcidState(fs, path, conf, new 
ValidCleanerWriteIdList(info.getFullTableName(), info.highestWriteId),
-            Ref.from(false), false, dirSnapshots);
+    getConf().set(ValidTxnList.VALID_TXNS_KEY, new 
ValidReadTxnList().toString());
+
+    dir = AcidUtils.getAcidState(
+        fs, path, getConf(),
+        new ValidCleanerWriteIdList(info.getFullTableName(), 
info.highestWriteId),
+        Ref.from(false), false,
+        dirSnapshots);
 
     List<Path> remained = subtract(CompactorUtil.getObsoleteDirs(dir, 
isDynPartAbort), deleted);
     if (!remained.isEmpty()) {
-      LOG.warn("Remained {} obsolete directories from {}. {}",
-              remained.size(), location, CompactorUtil.getDebugInfo(remained));
+      LOG.warn("Remained {} obsolete directories from {}. {}", 
remained.size(), location,
+          CompactorUtil.getDebugInfo(remained));
     } else {
       LOG.debug("All cleared below the watermark: {} from {}", 
info.highestWriteId, location);
       success = true;
@@ -160,7 +194,7 @@ protected void handleCleanerAttemptFailure(CompactionInfo 
info, String errorMess
       if (info.retryRetention > 0) {
         cleanAttempts = (int) (Math.log(info.retryRetention / 
defaultRetention) / Math.log(2)) + 1;
       }
-      if (cleanAttempts >= getIntVar(conf, 
HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
+      if (cleanAttempts >= getIntVar(getConf(), 
HIVE_COMPACTOR_CLEANER_MAX_RETRY_ATTEMPTS)) {
         //Mark it as failed if the max attempt threshold is reached.
         txnHandler.markFailed(info);
       } else {
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
index 8f6814d4890..7e73d06c9ff 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestAbortedTxnCleaner.java
@@ -78,7 +78,7 @@ public void 
testCleaningOfAbortedDirectoriesForUnpartitionedTables() throws Exce
     cleaner.run();
 
     Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
-    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+    Mockito.verify(mockedTaskHandler, 
Mockito.times(1)).getTasks(any(HiveConf.class));
 
     List<Path> directories = getDirectories(conf, t, null);
     // All aborted directories removed, hence 1 committed delta directory must 
be present
@@ -109,7 +109,7 @@ public void 
testCleaningOfAbortedDirectoriesForSinglePartition() throws Exceptio
     cleaner.run();
 
     Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
-    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+    Mockito.verify(mockedTaskHandler, 
Mockito.times(1)).getTasks(any(HiveConf.class));
 
     List<Path> directories = getDirectories(conf, t, p);
     // All aborted directories removed, hence 1 committed delta directory must 
be present
@@ -147,7 +147,7 @@ public void 
testCleaningOfAbortedDirectoriesForMultiplePartitions() throws Excep
     cleaner.run();
 
     Mockito.verify(mockedFSRemover, 
Mockito.times(2)).clean(any(CleanupRequest.class));
-    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+    Mockito.verify(mockedTaskHandler, 
Mockito.times(1)).getTasks(any(HiveConf.class));
 
     List<Path> directories = getDirectories(conf, t, p1);
     // All aborted directories removed, hence 1 committed delta directory must 
be present
@@ -190,7 +190,7 @@ public void 
testCleaningOfAbortedDirectoriesWithLongRunningOpenWriteTxn() throws
     cleaner.run();
 
     Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
-    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+    Mockito.verify(mockedTaskHandler, 
Mockito.times(1)).getTasks(any(HiveConf.class));
 
     List<Path> directories = getDirectories(conf, t, null);
     // All aborted directories below min open write ID are removed,
@@ -241,7 +241,7 @@ public void testCleaningOfAbortedDirectoriesOnTopOfBase() 
throws Exception {
     cleaner.run();
 
     Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
-    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+    Mockito.verify(mockedTaskHandler, 
Mockito.times(1)).getTasks(any(HiveConf.class));
 
     directories = getDirectories(conf, t, null);
     Assert.assertEquals(1, directories.size());
@@ -283,7 +283,7 @@ public void testCleaningOfAbortedDirectoriesBelowBase() 
throws Exception {
     cleaner.run();
 
     Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
-    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+    Mockito.verify(mockedTaskHandler, 
Mockito.times(1)).getTasks(any(HiveConf.class));
 
     directories = getDirectories(conf, t, null);
     // The table is already compacted, so we must see 1 base delta
@@ -367,7 +367,7 @@ public void 
testAbortCleanupNotUpdatingSpecificCompactionTables(boolean isPartit
     cleaner.run();
 
     Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
-    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+    Mockito.verify(mockedTaskHandler, 
Mockito.times(1)).getTasks(any(HiveConf.class));
 
     Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, 
compactionQueuePresence));
     Assert.assertEquals(0, TestTxnDbUtil.countQueryAgent(conf, "SELECT 
COUNT(*) FROM \"COMPLETED_COMPACTIONS\" " +
diff --git 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
index b1d60b8a851..1279cd6cc18 100644
--- 
a/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
+++ 
b/ql/src/test/org/apache/hadoop/hive/ql/txn/compactor/handler/TestHandler.java
@@ -17,6 +17,7 @@
  */
 package org.apache.hadoop.hive.ql.txn.compactor.handler;
 
+import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.hadoop.hive.metastore.ReplChangeManager;
 import org.apache.hadoop.hive.metastore.api.ShowCompactResponseElement;
 import org.apache.hadoop.hive.metastore.api.CompactionRequest;
@@ -72,7 +73,7 @@ public void testCompactionHandlerAndFsRemover() throws 
Exception {
     cleaner.run();
 
     Mockito.verify(mockedFSRemover, 
Mockito.times(1)).clean(any(CleanupRequest.class));
-    Mockito.verify(mockedTaskHandler, Mockito.times(1)).getTasks();
+    Mockito.verify(mockedTaskHandler, 
Mockito.times(1)).getTasks(any(HiveConf.class));
   }
 
   @Test
@@ -105,7 +106,7 @@ public void testMetaCache() throws Exception {
     ShowCompactResponse rsp = txnHandler.showCompact(new ShowCompactRequest());
     List<ShowCompactResponseElement> compacts = rsp.getCompacts();
     Assert.assertEquals(1, compacts.size());
-    Mockito.verify(mockedMetadataCache, times(3)).computeIfAbsent(any(), 
any());
+    Mockito.verify(mockedMetadataCache, times(4)).computeIfAbsent(any(), 
any());
     Mockito.verify(mockedTaskHandler, times(1)).resolveTable(any(), any());
   }
 }
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
index 4940d384095..ae88852ebef 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanAbortHandler.java
@@ -113,7 +113,7 @@ public List<CompactionInfo> extractData(ResultSet rs) 
throws DataAccessException
         info.partName = rs.getString("PART");
         // In this case, this field contains min open write txn ID.
         long value = rs.getLong("MIN_OPEN_WRITE_TXNID");
-        info.minOpenWriteTxnId = value > 0 ? value : Long.MAX_VALUE;
+        info.minOpenWriteTxnId = !rs.wasNull() ? value : Long.MAX_VALUE;
         // The specific type, state assigned to abort cleanup.
         info.type = CompactionType.ABORT_TXN_CLEANUP;
         info.state = READY_FOR_CLEANING;
diff --git 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java
 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java
index 0f22b00e197..26bb2bf6d11 100644
--- 
a/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java
+++ 
b/standalone-metastore/metastore-server/src/main/java/org/apache/hadoop/hive/metastore/txn/jdbc/queries/ReadyToCleanHandler.java
@@ -120,7 +120,8 @@ public List<CompactionInfo> extractData(ResultSet rs) 
throws SQLException, DataA
       info.retryRetention = rs.getInt(9);
       info.nextTxnId = rs.getLong(10);
       if (TxnHandler.ConfVars.useMinHistoryWriteId()) {
-        info.minOpenWriteId = rs.getLong(11);
+        long value = rs.getLong(11);
+        info.minOpenWriteId = !rs.wasNull() ? value : Long.MAX_VALUE;
       }
       infos.add(info);
     }


Reply via email to