This is an automated email from the ASF dual-hosted git repository.

krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 6b0139188ab HIVE-26903: Compactor threads should gracefully shutdown 
(Laszlo Vegh, reviewed by Krisztian Kasa)
6b0139188ab is described below

commit 6b0139188aba6a95808c8d1bec63a651ec9e4bdc
Author: veghlaci05 <[email protected]>
AuthorDate: Mon Jan 9 09:46:25 2023 +0100

    HIVE-26903: Compactor threads should gracefully shutdown (Laszlo Vegh, 
reviewed by Krisztian Kasa)
---
 .../hadoop/hive/ql/txn/compactor/Cleaner.java      | 33 +++++++++++-----
 .../hive/ql/txn/compactor/CompactorThread.java     | 14 ++++---
 .../hadoop/hive/ql/txn/compactor/Initiator.java    | 45 ++++++++++++++--------
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 33 +++++++---------
 4 files changed, 76 insertions(+), 49 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 9d203c880ed..dd06ac9aa4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -139,7 +139,12 @@ public class Cleaner extends MetaStoreCompactorThread {
 
           long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
 
+          checkInterrupt();
+
           List<CompactionInfo> readyToClean = 
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+
+          checkInterrupt();
+
           if (!readyToClean.isEmpty()) {
             long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
             final long cleanerWaterMark =
@@ -154,6 +159,10 @@ public class Cleaner extends MetaStoreCompactorThread {
             // when min_history_level is finally dropped, than every HMS will 
commit compaction the new way
             // and minTxnIdSeenOpen can be removed and minOpenTxnId can be 
used instead.
             for (CompactionInfo compactionInfo : readyToClean) {
+
+              //Check for interruption before scheduling each compactionInfo 
and return if necessary
+              checkInterrupt();
+
               CompletableFuture<Void> asyncJob =
                   CompletableFuture.runAsync(
                           ThrowingRunnable.unchecked(() -> 
clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
@@ -164,8 +173,13 @@ public class Cleaner extends MetaStoreCompactorThread {
                       });
               cleanerList.add(asyncJob);
             }
-            CompletableFuture.allOf(cleanerList.toArray(new 
CompletableFuture[0])).join();
+
+            //Use get instead of join, so we can receive InterruptedException 
and shutdown gracefully
+            CompletableFuture.allOf(cleanerList.toArray(new 
CompletableFuture[0])).get();
           }
+        } catch (InterruptedException e) {
+          // do not ignore interruption requests
+          return;
         } catch (Throwable t) {
           LOG.error("Caught an exception in the main loop of compactor 
cleaner, " +
               StringUtils.stringifyException(t));
@@ -179,15 +193,17 @@ public class Cleaner extends MetaStoreCompactorThread {
           stopCycleUpdater();
         }
         // Now, go back to bed until it's time to do this again
-        long elapsedTime = System.currentTimeMillis() - startedAt;
-        doPostLoopActions(elapsedTime, CompactorThreadType.CLEANER);
+        doPostLoopActions(System.currentTimeMillis() - startedAt);
       } while (!stop.get());
     } catch (InterruptedException ie) {
       LOG.error("Compactor cleaner thread interrupted, exiting " +
         StringUtils.stringifyException(ie));
     } finally {
+      if (Thread.currentThread().isInterrupted()) {
+        LOG.info("Interrupt received, Cleaner is shutting down.");
+      }
       if (cleanerExecutor != null) {
-        this.cleanerExecutor.shutdownNow();
+        cleanerExecutor.shutdownNow();
       }
     }
   }
@@ -342,7 +358,7 @@ public class Cleaner extends MetaStoreCompactorThread {
   }
 
   private boolean removeFiles(String location, long minOpenTxnGLB, 
CompactionInfo ci, boolean dropPartition)
-      throws MetaException, IOException, NoSuchObjectException, 
NoSuchTxnException {
+      throws MetaException, IOException, NoSuchTxnException {
 
     if (dropPartition) {
       LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE, 
DataOperationType.DELETE);
@@ -414,11 +430,11 @@ public class Cleaner extends MetaStoreCompactorThread {
    * @return true if any files were removed
    */
   private boolean removeFiles(String location, ValidWriteIdList writeIdList, 
CompactionInfo ci)
-      throws IOException, NoSuchObjectException, MetaException {
+      throws IOException, MetaException {
     Path path = new Path(location);
     FileSystem fs = path.getFileSystem(conf);
     
-    // Collect all of the files/dirs
+    // Collect all the files/dirs
     Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots = 
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
     AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList, 
Ref.from(false), false, 
         dirSnapshots);
@@ -470,8 +486,7 @@ public class Cleaner extends MetaStoreCompactorThread {
     return obsoleteDirs;
   }
 
-  private boolean removeFiles(String location, CompactionInfo ci)
-      throws NoSuchObjectException, IOException, MetaException {
+  private boolean removeFiles(String location, CompactionInfo ci) throws 
IOException, MetaException {
     String strIfPurge = ci.getProperty("ifPurge");
     boolean ifPurge = strIfPurge != null || 
Boolean.parseBoolean(ci.getProperty("ifPurge"));
     
diff --git 
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 215e38d37c5..f26e832e088 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -67,8 +67,6 @@ public abstract class CompactorThread extends Thread 
implements Configurable {
 
   protected long checkInterval = 0;
 
-  enum CompactorThreadType {INITIATOR, WORKER, CLEANER}
-
   @Override
   public void setConf(Configuration configuration) {
     // TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to 
call other methods with
@@ -86,12 +84,18 @@ public abstract class CompactorThread extends Thread 
implements Configurable {
 
   public void init(AtomicBoolean stop) throws Exception {
     setPriority(MIN_PRIORITY);
-    setDaemon(true); // this means the process will exit without waiting for 
this thread
+    setDaemon(false);
     this.stop = stop;
     this.hostName = ServerUtils.hostname();
     this.runtimeVersion = getRuntimeVersion();
   }
 
+  protected void checkInterrupt() throws InterruptedException {
+    if (Thread.interrupted()) {
+      throw new InterruptedException(getClass().getName() + " execution is 
interrupted.");
+    }
+  }
+
   /**
    * Find the table being compacted
    * @param ci compaction info returned from the compaction queue
@@ -228,8 +232,8 @@ public abstract class CompactorThread extends Thread 
implements Configurable {
     return requestBuilder.build();
   }
 
-  protected void doPostLoopActions(long elapsedTime, CompactorThreadType type) 
throws InterruptedException {
-    String threadTypeName = type.name();
+  protected void doPostLoopActions(long elapsedTime) throws 
InterruptedException {
+    String threadTypeName = getClass().getName();
     if (elapsedTime < checkInterval && !stop.get()) {
       Thread.sleep(checkInterval - elapsedTime);
     }
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 08848a61390..612cd492ca1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -152,9 +152,11 @@ public class Initiator extends MetaStoreCompactorThread {
 
           final ShowCompactResponse currentCompactions = 
txnHandler.showCompact(new ShowCompactRequest());
 
+          checkInterrupt();
+
           // Currently we invalidate all entries after each cycle, because the 
bootstrap replication is marked via
           // table property hive.repl.first.inc.pending which would be cached.
-          metaCache.ifPresent(c -> c.invalidateAll());
+          metaCache.ifPresent(Cache::invalidateAll);
           Set<String> skipDBs = Sets.newConcurrentHashSet();
           Set<String> skipTables = Sets.newConcurrentHashSet();
 
@@ -166,6 +168,8 @@ public class Initiator extends MetaStoreCompactorThread {
           LOG.debug("Found " + potentials.size() + " potential compactions, " +
               "checking to see if we should compact any of them");
 
+          checkInterrupt();
+
           Map<String, String> tblNameOwners = new HashMap<>();
           List<CompletableFuture<Void>> compactionList = new ArrayList<>();
 
@@ -177,6 +181,11 @@ public class Initiator extends MetaStoreCompactorThread {
 
           for (CompactionInfo ci : potentials) {
             try {
+              //Check for interruption before scheduling each compactionInfo 
and return if necessary
+              if (Thread.currentThread().isInterrupted()) {
+                return;
+              }
+
               Table t = computeIfAbsent(ci.getFullTableName(),() -> 
resolveTable(ci));
               String poolName = getPoolName(ci, t);
               Partition p = resolvePartition(ci);
@@ -208,10 +217,14 @@ public class Initiator extends MetaStoreCompactorThread {
             }
           }
 
-          CompletableFuture.allOf(compactionList.toArray(new 
CompletableFuture[0])).join();
+          //Use get instead of join, so we can receive InterruptedException 
and shutdown gracefully
+          CompletableFuture.allOf(compactionList.toArray(new 
CompletableFuture[0])).get();
 
           // Check for timed out remote workers.
           recoverFailedCompactions(true);
+        } catch (InterruptedException e) {
+          // do not ignore interruption requests
+          return;
         } catch (Throwable t) {
           LOG.error("Initiator loop caught unexpected exception this time 
through the loop", t);
           exceptionally = true;
@@ -226,15 +239,16 @@ public class Initiator extends MetaStoreCompactorThread {
           stopCycleUpdater();
         }
 
-        long elapsedTime = System.currentTimeMillis() - startedAt;
-        doPostLoopActions(elapsedTime, CompactorThreadType.INITIATOR);
-
+        doPostLoopActions(System.currentTimeMillis() - startedAt);
       } while (!stop.get());
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor initiator, 
exiting.", t);
     } finally {
+      if (Thread.currentThread().isInterrupted()) {
+        LOG.info("Interrupt received, Initiator is shutting down.");
+      }
       if (compactionExecutor != null) {
-        this.compactionExecutor.shutdownNow();
+        compactionExecutor.shutdownNow();
       }
     }
   }
@@ -245,12 +259,18 @@ public class Initiator extends MetaStoreCompactorThread {
     StorageDescriptor sd = resolveStorageDescriptor(t, p);
     try {
       ValidWriteIdList validWriteIds = resolveValidWriteIds(t);
+
+      checkInterrupt();
+
       CompactionType type = checkForCompaction(ci, validWriteIds, sd, 
t.getParameters(), runAs);
       if (type != null) {
         ci.type = type;
         ci.poolName = poolName;
         requestCompaction(ci, runAs);
       }
+    } catch (InterruptedException e) {
+      //Handle InterruptedException separately so the compactioninfo won't be 
marked as failed.
+      LOG.info("Initiator pool is being shut down, task received 
interruption.");
     } catch (Throwable ex) {
       String errorMessage = "Caught exception while trying to determine if we 
should compact " + ci + ". Marking "
           + "failed to avoid repeated failures, " + ex;
@@ -439,12 +459,8 @@ public class Initiator extends MetaStoreCompactorThread {
         UserGroupInformation.getLoginUser());
       CompactionType compactionType;
       try {
-        compactionType = ugi.doAs(new 
PrivilegedExceptionAction<CompactionType>() {
-          @Override
-          public CompactionType run() throws Exception {
-            return determineCompactionType(ci, acidDirectory, tblproperties, 
baseSize, deltaSize);
-          }
-        });
+        compactionType = ugi.doAs(
+            (PrivilegedExceptionAction<CompactionType>) () -> 
determineCompactionType(ci, acidDirectory, tblproperties, baseSize, deltaSize));
       } finally {
         try {
           FileSystem.closeAllForUGI(ugi);
@@ -477,7 +493,7 @@ public class Initiator extends MetaStoreCompactorThread {
           Float.parseFloat(deltaPctProp);
       boolean bigEnough =   (float)deltaSize/(float)baseSize > 
deltaPctThreshold;
       boolean multiBase = dir.getObsolete().stream()
-              .filter(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX)).findAny().isPresent();
+              .anyMatch(path -> 
path.getName().startsWith(AcidUtils.BASE_PREFIX));
 
       boolean initiateMajor =  bigEnough || (deltaSize == 0  && multiBase);
       if (LOG.isDebugEnabled()) {
@@ -536,11 +552,10 @@ public class Initiator extends MetaStoreCompactorThread {
   }
 
   private long getDirSize(FileSystem fs, ParsedDirectory dir) throws 
IOException {
-    long size = dir.getFiles(fs, Ref.from(false)).stream()
+    return dir.getFiles(fs, Ref.from(false)).stream()
         .map(HdfsFileStatusWithId::getFileStatus)
         .mapToLong(FileStatus::getLen)
         .sum();
-    return size;
   }
 
   private void requestCompaction(CompactionInfo ci, String runAs) throws 
MetaException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index adadc415d4d..e48b7488e67 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -105,6 +105,7 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
     try {
       do {
         long startedAt = System.currentTimeMillis();
+        launchedJob = true;
         Future<Boolean> singleRun = executor.submit(() -> 
findNextCompactionAndExecute(genericStats, mrStats));
         try {
           launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
@@ -115,33 +116,31 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
           singleRun.cancel(true);
           executor.shutdownNow();
           executor = getTimeoutHandlingExecutor();
-          launchedJob = true;
         } catch (ExecutionException e) {
           LOG.info("Exception during executing compaction", e);
-          launchedJob = true;
         } catch (InterruptedException ie) {
-          // Do not do anything - stop should be set anyway
-          launchedJob = true;
+          // do not ignore interruption requests
+          return;
         }
 
+        doPostLoopActions(System.currentTimeMillis() - startedAt);
+
         // If we didn't try to launch a job it either means there was no work 
to do or we got
         // here as the result of a communication failure with the DB.  Either 
way we want to wait
-        // a bit before we restart the loop.
+        // a bit before, otherwise we can start over the loop immediately.
         if (!launchedJob && !stop.get()) {
-          try {
-            Thread.sleep(SLEEP_TIME);
-          } catch (InterruptedException e) {
-          }
+          Thread.sleep(SLEEP_TIME);
         }
-        long elapsedTime = System.currentTimeMillis() - startedAt;
-        doPostLoopActions(elapsedTime, CompactorThreadType.WORKER);
       } while (!stop.get());
+    } catch (InterruptedException e) {
+      // do not ignore interruption requests
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor worker, 
exiting.", t);
     } finally {
-      if (executor != null) {
-        executor.shutdownNow();
+      if (Thread.currentThread().isInterrupted()) {
+        LOG.info("Interrupt received, Worker is shutting down.");
       }
+      executor.shutdownNow();
       if (msc != null) {
         msc.close();
       }
@@ -271,7 +270,7 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
       if (ci == null) {
         return false;
       }
-      if ((runtimeVersion != null || ci.initiatorVersion != null) && 
!runtimeVersion.equals(ci.initiatorVersion)) {
+      if ((runtimeVersion == null && ci.initiatorVersion != null) || 
(runtimeVersion != null && !runtimeVersion.equals(ci.initiatorVersion))) {
         LOG.warn("Worker and Initiator versions do not match. Worker: v{}, 
Initiator: v{}", runtimeVersion, ci.initiatorVersion);
       }
 
@@ -563,12 +562,6 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
     }
   }
 
-  private void checkInterrupt() throws InterruptedException {
-    if (Thread.interrupted()) {
-      throw new InterruptedException("Compaction execution is interrupted");
-    }
-  }
-
   private static boolean isDynPartAbort(Table t, CompactionInfo ci) {
     return t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0
         && ci.partName == null;

Reply via email to