This is an automated email from the ASF dual-hosted git repository.
krisztiankasa pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 6b0139188ab HIVE-26903: Compactor threads should gracefully shutdown
(Laszlo Vegh, reviewed by Krisztian Kasa)
6b0139188ab is described below
commit 6b0139188aba6a95808c8d1bec63a651ec9e4bdc
Author: veghlaci05 <[email protected]>
AuthorDate: Mon Jan 9 09:46:25 2023 +0100
HIVE-26903: Compactor threads should gracefully shutdown (Laszlo Vegh,
reviewed by Krisztian Kasa)
---
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 33 +++++++++++-----
.../hive/ql/txn/compactor/CompactorThread.java | 14 ++++---
.../hadoop/hive/ql/txn/compactor/Initiator.java | 45 ++++++++++++++--------
.../hadoop/hive/ql/txn/compactor/Worker.java | 33 +++++++---------
4 files changed, 76 insertions(+), 49 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 9d203c880ed..dd06ac9aa4c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -139,7 +139,12 @@ public class Cleaner extends MetaStoreCompactorThread {
long minOpenTxnId = txnHandler.findMinOpenTxnIdForCleaner();
+ checkInterrupt();
+
List<CompactionInfo> readyToClean =
txnHandler.findReadyToClean(minOpenTxnId, retentionTime);
+
+ checkInterrupt();
+
if (!readyToClean.isEmpty()) {
long minTxnIdSeenOpen = txnHandler.findMinTxnIdSeenOpen();
final long cleanerWaterMark =
@@ -154,6 +159,10 @@ public class Cleaner extends MetaStoreCompactorThread {
// when min_history_level is finally dropped, than every HMS will
commit compaction the new way
// and minTxnIdSeenOpen can be removed and minOpenTxnId can be
used instead.
for (CompactionInfo compactionInfo : readyToClean) {
+
+ //Check for interruption before scheduling each compactionInfo
and return if necessary
+ checkInterrupt();
+
CompletableFuture<Void> asyncJob =
CompletableFuture.runAsync(
ThrowingRunnable.unchecked(() ->
clean(compactionInfo, cleanerWaterMark, metricsEnabled)),
@@ -164,8 +173,13 @@ public class Cleaner extends MetaStoreCompactorThread {
});
cleanerList.add(asyncJob);
}
- CompletableFuture.allOf(cleanerList.toArray(new
CompletableFuture[0])).join();
+
+ //Use get instead of join, so we can receive InterruptedException
and shutdown gracefully
+ CompletableFuture.allOf(cleanerList.toArray(new
CompletableFuture[0])).get();
}
+ } catch (InterruptedException e) {
+ // do not ignore interruption requests
+ return;
} catch (Throwable t) {
LOG.error("Caught an exception in the main loop of compactor
cleaner, " +
StringUtils.stringifyException(t));
@@ -179,15 +193,17 @@ public class Cleaner extends MetaStoreCompactorThread {
stopCycleUpdater();
}
// Now, go back to bed until it's time to do this again
- long elapsedTime = System.currentTimeMillis() - startedAt;
- doPostLoopActions(elapsedTime, CompactorThreadType.CLEANER);
+ doPostLoopActions(System.currentTimeMillis() - startedAt);
} while (!stop.get());
} catch (InterruptedException ie) {
LOG.error("Compactor cleaner thread interrupted, exiting " +
StringUtils.stringifyException(ie));
} finally {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.info("Interrupt received, Cleaner is shutting down.");
+ }
if (cleanerExecutor != null) {
- this.cleanerExecutor.shutdownNow();
+ cleanerExecutor.shutdownNow();
}
}
}
@@ -342,7 +358,7 @@ public class Cleaner extends MetaStoreCompactorThread {
}
private boolean removeFiles(String location, long minOpenTxnGLB,
CompactionInfo ci, boolean dropPartition)
- throws MetaException, IOException, NoSuchObjectException,
NoSuchTxnException {
+ throws MetaException, IOException, NoSuchTxnException {
if (dropPartition) {
LockRequest lockRequest = createLockRequest(ci, 0, LockType.EXCL_WRITE,
DataOperationType.DELETE);
@@ -414,11 +430,11 @@ public class Cleaner extends MetaStoreCompactorThread {
* @return true if any files were removed
*/
private boolean removeFiles(String location, ValidWriteIdList writeIdList,
CompactionInfo ci)
- throws IOException, NoSuchObjectException, MetaException {
+ throws IOException, MetaException {
Path path = new Path(location);
FileSystem fs = path.getFileSystem(conf);
- // Collect all of the files/dirs
+ // Collect all the files/dirs
Map<Path, AcidUtils.HdfsDirSnapshot> dirSnapshots =
AcidUtils.getHdfsDirSnapshotsForCleaner(fs, path);
AcidDirectory dir = AcidUtils.getAcidState(fs, path, conf, writeIdList,
Ref.from(false), false,
dirSnapshots);
@@ -470,8 +486,7 @@ public class Cleaner extends MetaStoreCompactorThread {
return obsoleteDirs;
}
- private boolean removeFiles(String location, CompactionInfo ci)
- throws NoSuchObjectException, IOException, MetaException {
+ private boolean removeFiles(String location, CompactionInfo ci) throws
IOException, MetaException {
String strIfPurge = ci.getProperty("ifPurge");
boolean ifPurge = strIfPurge != null ||
Boolean.parseBoolean(ci.getProperty("ifPurge"));
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 215e38d37c5..f26e832e088 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -67,8 +67,6 @@ public abstract class CompactorThread extends Thread
implements Configurable {
protected long checkInterval = 0;
- enum CompactorThreadType {INITIATOR, WORKER, CLEANER}
-
@Override
public void setConf(Configuration configuration) {
// TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to
call other methods with
@@ -86,12 +84,18 @@ public abstract class CompactorThread extends Thread
implements Configurable {
public void init(AtomicBoolean stop) throws Exception {
setPriority(MIN_PRIORITY);
- setDaemon(true); // this means the process will exit without waiting for
this thread
+ setDaemon(false);
this.stop = stop;
this.hostName = ServerUtils.hostname();
this.runtimeVersion = getRuntimeVersion();
}
+ protected void checkInterrupt() throws InterruptedException {
+ if (Thread.interrupted()) {
+ throw new InterruptedException(getClass().getName() + " execution is
interrupted.");
+ }
+ }
+
/**
* Find the table being compacted
* @param ci compaction info returned from the compaction queue
@@ -228,8 +232,8 @@ public abstract class CompactorThread extends Thread
implements Configurable {
return requestBuilder.build();
}
- protected void doPostLoopActions(long elapsedTime, CompactorThreadType type)
throws InterruptedException {
- String threadTypeName = type.name();
+ protected void doPostLoopActions(long elapsedTime) throws
InterruptedException {
+ String threadTypeName = getClass().getName();
if (elapsedTime < checkInterval && !stop.get()) {
Thread.sleep(checkInterval - elapsedTime);
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index 08848a61390..612cd492ca1 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -152,9 +152,11 @@ public class Initiator extends MetaStoreCompactorThread {
final ShowCompactResponse currentCompactions =
txnHandler.showCompact(new ShowCompactRequest());
+ checkInterrupt();
+
// Currently we invalidate all entries after each cycle, because the
bootstrap replication is marked via
// table property hive.repl.first.inc.pending which would be cached.
- metaCache.ifPresent(c -> c.invalidateAll());
+ metaCache.ifPresent(Cache::invalidateAll);
Set<String> skipDBs = Sets.newConcurrentHashSet();
Set<String> skipTables = Sets.newConcurrentHashSet();
@@ -166,6 +168,8 @@ public class Initiator extends MetaStoreCompactorThread {
LOG.debug("Found " + potentials.size() + " potential compactions, " +
"checking to see if we should compact any of them");
+ checkInterrupt();
+
Map<String, String> tblNameOwners = new HashMap<>();
List<CompletableFuture<Void>> compactionList = new ArrayList<>();
@@ -177,6 +181,11 @@ public class Initiator extends MetaStoreCompactorThread {
for (CompactionInfo ci : potentials) {
try {
+ //Check for interruption before scheduling each compactionInfo
and return if necessary
+ if (Thread.currentThread().isInterrupted()) {
+ return;
+ }
+
Table t = computeIfAbsent(ci.getFullTableName(),() ->
resolveTable(ci));
String poolName = getPoolName(ci, t);
Partition p = resolvePartition(ci);
@@ -208,10 +217,14 @@ public class Initiator extends MetaStoreCompactorThread {
}
}
- CompletableFuture.allOf(compactionList.toArray(new
CompletableFuture[0])).join();
+ //Use get instead of join, so we can receive InterruptedException
and shutdown gracefully
+ CompletableFuture.allOf(compactionList.toArray(new
CompletableFuture[0])).get();
// Check for timed out remote workers.
recoverFailedCompactions(true);
+ } catch (InterruptedException e) {
+ // do not ignore interruption requests
+ return;
} catch (Throwable t) {
LOG.error("Initiator loop caught unexpected exception this time
through the loop", t);
exceptionally = true;
@@ -226,15 +239,16 @@ public class Initiator extends MetaStoreCompactorThread {
stopCycleUpdater();
}
- long elapsedTime = System.currentTimeMillis() - startedAt;
- doPostLoopActions(elapsedTime, CompactorThreadType.INITIATOR);
-
+ doPostLoopActions(System.currentTimeMillis() - startedAt);
} while (!stop.get());
} catch (Throwable t) {
LOG.error("Caught an exception in the main loop of compactor initiator,
exiting.", t);
} finally {
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.info("Interrupt received, Initiator is shutting down.");
+ }
if (compactionExecutor != null) {
- this.compactionExecutor.shutdownNow();
+ compactionExecutor.shutdownNow();
}
}
}
@@ -245,12 +259,18 @@ public class Initiator extends MetaStoreCompactorThread {
StorageDescriptor sd = resolveStorageDescriptor(t, p);
try {
ValidWriteIdList validWriteIds = resolveValidWriteIds(t);
+
+ checkInterrupt();
+
CompactionType type = checkForCompaction(ci, validWriteIds, sd,
t.getParameters(), runAs);
if (type != null) {
ci.type = type;
ci.poolName = poolName;
requestCompaction(ci, runAs);
}
+ } catch (InterruptedException e) {
+ //Handle InterruptedException separately so the compactioninfo won't be
marked as failed.
+ LOG.info("Initiator pool is being shut down, task received
interruption.");
} catch (Throwable ex) {
String errorMessage = "Caught exception while trying to determine if we
should compact " + ci + ". Marking "
+ "failed to avoid repeated failures, " + ex;
@@ -439,12 +459,8 @@ public class Initiator extends MetaStoreCompactorThread {
UserGroupInformation.getLoginUser());
CompactionType compactionType;
try {
- compactionType = ugi.doAs(new
PrivilegedExceptionAction<CompactionType>() {
- @Override
- public CompactionType run() throws Exception {
- return determineCompactionType(ci, acidDirectory, tblproperties,
baseSize, deltaSize);
- }
- });
+ compactionType = ugi.doAs(
+ (PrivilegedExceptionAction<CompactionType>) () ->
determineCompactionType(ci, acidDirectory, tblproperties, baseSize, deltaSize));
} finally {
try {
FileSystem.closeAllForUGI(ugi);
@@ -477,7 +493,7 @@ public class Initiator extends MetaStoreCompactorThread {
Float.parseFloat(deltaPctProp);
boolean bigEnough = (float)deltaSize/(float)baseSize >
deltaPctThreshold;
boolean multiBase = dir.getObsolete().stream()
- .filter(path ->
path.getName().startsWith(AcidUtils.BASE_PREFIX)).findAny().isPresent();
+ .anyMatch(path ->
path.getName().startsWith(AcidUtils.BASE_PREFIX));
boolean initiateMajor = bigEnough || (deltaSize == 0 && multiBase);
if (LOG.isDebugEnabled()) {
@@ -536,11 +552,10 @@ public class Initiator extends MetaStoreCompactorThread {
}
private long getDirSize(FileSystem fs, ParsedDirectory dir) throws
IOException {
- long size = dir.getFiles(fs, Ref.from(false)).stream()
+ return dir.getFiles(fs, Ref.from(false)).stream()
.map(HdfsFileStatusWithId::getFileStatus)
.mapToLong(FileStatus::getLen)
.sum();
- return size;
}
private void requestCompaction(CompactionInfo ci, String runAs) throws
MetaException {
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index adadc415d4d..e48b7488e67 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -105,6 +105,7 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
try {
do {
long startedAt = System.currentTimeMillis();
+ launchedJob = true;
Future<Boolean> singleRun = executor.submit(() ->
findNextCompactionAndExecute(genericStats, mrStats));
try {
launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
@@ -115,33 +116,31 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
singleRun.cancel(true);
executor.shutdownNow();
executor = getTimeoutHandlingExecutor();
- launchedJob = true;
} catch (ExecutionException e) {
LOG.info("Exception during executing compaction", e);
- launchedJob = true;
} catch (InterruptedException ie) {
- // Do not do anything - stop should be set anyway
- launchedJob = true;
+ // do not ignore interruption requests
+ return;
}
+ doPostLoopActions(System.currentTimeMillis() - startedAt);
+
// If we didn't try to launch a job it either means there was no work
to do or we got
// here as the result of a communication failure with the DB. Either
way we want to wait
- // a bit before we restart the loop.
+ // a bit before, otherwise we can start over the loop immediately.
if (!launchedJob && !stop.get()) {
- try {
- Thread.sleep(SLEEP_TIME);
- } catch (InterruptedException e) {
- }
+ Thread.sleep(SLEEP_TIME);
}
- long elapsedTime = System.currentTimeMillis() - startedAt;
- doPostLoopActions(elapsedTime, CompactorThreadType.WORKER);
} while (!stop.get());
+ } catch (InterruptedException e) {
+ // do not ignore interruption requests
} catch (Throwable t) {
LOG.error("Caught an exception in the main loop of compactor worker,
exiting.", t);
} finally {
- if (executor != null) {
- executor.shutdownNow();
+ if (Thread.currentThread().isInterrupted()) {
+ LOG.info("Interrupt received, Worker is shutting down.");
}
+ executor.shutdownNow();
if (msc != null) {
msc.close();
}
@@ -271,7 +270,7 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
if (ci == null) {
return false;
}
- if ((runtimeVersion != null || ci.initiatorVersion != null) &&
!runtimeVersion.equals(ci.initiatorVersion)) {
+ if ((runtimeVersion == null && ci.initiatorVersion != null) ||
(runtimeVersion != null && !runtimeVersion.equals(ci.initiatorVersion))) {
LOG.warn("Worker and Initiator versions do not match. Worker: v{},
Initiator: v{}", runtimeVersion, ci.initiatorVersion);
}
@@ -563,12 +562,6 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
}
}
- private void checkInterrupt() throws InterruptedException {
- if (Thread.interrupted()) {
- throw new InterruptedException("Compaction execution is interrupted");
- }
- }
-
private static boolean isDynPartAbort(Table t, CompactionInfo ci) {
return t.getPartitionKeys() != null && t.getPartitionKeys().size() > 0
&& ci.partName == null;