This is an automated email from the ASF dual-hosted git repository.
dkuzmenko pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new e8fca5116d3 HIVE-26770: Make end of loop compaction logs appear more
selectively and reduce code duplication (Akshat Mathur, reviewed by Denys
Kuzmenko)
e8fca5116d3 is described below
commit e8fca5116d34bc915f0575fba360a267ff59004b
Author: Akshat <[email protected]>
AuthorDate: Wed Dec 7 15:44:34 2022 +0530
HIVE-26770: Make end of loop compaction logs appear more selectively and
reduce code duplication (Akshat Mathur, reviewed by Denys Kuzmenko)
Closes #3832
---
.../hadoop/hive/ql/txn/compactor/Cleaner.java | 8 ++------
.../hive/ql/txn/compactor/CompactorThread.java | 24 +++++++++++++++++++++-
.../hadoop/hive/ql/txn/compactor/Initiator.java | 6 +-----
.../hadoop/hive/ql/txn/compactor/Worker.java | 6 +++++-
4 files changed, 31 insertions(+), 13 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
index 2dc2fa873ed..9d203c880ed 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Cleaner.java
@@ -95,7 +95,6 @@ public class Cleaner extends MetaStoreCompactorThread {
static final private String CLASS_NAME = Cleaner.class.getName();
static final private Logger LOG = LoggerFactory.getLogger(CLASS_NAME);
- private long cleanerCheckInterval = 0;
private boolean metricsEnabled = false;
private ReplChangeManager replChangeManager;
@@ -105,7 +104,7 @@ public class Cleaner extends MetaStoreCompactorThread {
public void init(AtomicBoolean stop) throws Exception {
super.init(stop);
replChangeManager = ReplChangeManager.getInstance(conf);
- cleanerCheckInterval = conf.getTimeVar(
+ checkInterval = conf.getTimeVar(
HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_RUN_INTERVAL,
TimeUnit.MILLISECONDS);
cleanerExecutor = CompactorUtil.createExecutorWithThreadFactory(
conf.getIntVar(HiveConf.ConfVars.HIVE_COMPACTOR_CLEANER_THREADS_NUM),
@@ -181,10 +180,7 @@ public class Cleaner extends MetaStoreCompactorThread {
}
// Now, go back to bed until it's time to do this again
long elapsedTime = System.currentTimeMillis() - startedAt;
- if (elapsedTime < cleanerCheckInterval && !stop.get()) {
- Thread.sleep(cleanerCheckInterval - elapsedTime);
- }
- LOG.debug("Cleaner thread finished one loop.");
+ doPostLoopActions(elapsedTime, CompactorThreadType.CLEANER);
} while (!stop.get());
} catch (InterruptedException ie) {
LOG.error("Compactor cleaner thread interrupted, exiting " +
diff --git
a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
index 167b2728f54..215e38d37c5 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/CompactorThread.java
@@ -61,6 +61,14 @@ public abstract class CompactorThread extends Thread
implements Configurable {
protected String hostName;
protected String runtimeVersion;
+ //Time threshold for compactor thread log
+ //In milliseconds:
+ private static final Integer MAX_WARN_LOG_TIME = 1200000; //20 min
+
+ protected long checkInterval = 0;
+
+ enum CompactorThreadType {INITIATOR, WORKER, CLEANER}
+
@Override
public void setConf(Configuration configuration) {
// TODO MS-SPLIT for now, keep a copy of HiveConf around as we need to
call other methods with
@@ -196,7 +204,7 @@ public abstract class CompactorThread extends Thread
implements Configurable {
protected String getRuntimeVersion() {
return this.getClass().getPackage().getImplementationVersion();
}
-
+
protected LockRequest createLockRequest(CompactionInfo ci, long txnId,
LockType lockType, DataOperationType opType) {
String agentInfo = Thread.currentThread().getName();
LockRequestBuilder requestBuilder = new LockRequestBuilder(agentInfo);
@@ -219,4 +227,18 @@ public abstract class CompactorThread extends Thread
implements Configurable {
!conf.getBoolVar(HiveConf.ConfVars.TXN_WRITE_X_LOCK));
return requestBuilder.build();
}
+
+ protected void doPostLoopActions(long elapsedTime, CompactorThreadType type)
throws InterruptedException {
+ String threadTypeName = type.name();
+ if (elapsedTime < checkInterval && !stop.get()) {
+ Thread.sleep(checkInterval - elapsedTime);
+ }
+
+ if (elapsedTime < MAX_WARN_LOG_TIME) {
+ LOG.debug("{} loop took {} seconds to finish.", threadTypeName,
elapsedTime/1000);
+ } else {
+ LOG.warn("Possible {} slowdown, loop took {} seconds to finish.",
threadTypeName, elapsedTime/1000);
+ }
+
+ }
}
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
index a434bacfdd6..08848a61390 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Initiator.java
@@ -100,7 +100,6 @@ public class Initiator extends MetaStoreCompactorThread {
static final private String COMPACTORTHRESHOLD_PREFIX =
"compactorthreshold.";
- private long checkInterval;
private ExecutorService compactionExecutor;
private Optional<Cache<String, TBase>> metaCache = Optional.empty();
private boolean metricsEnabled;
@@ -228,11 +227,8 @@ public class Initiator extends MetaStoreCompactorThread {
}
long elapsedTime = System.currentTimeMillis() - startedAt;
- if (elapsedTime < checkInterval && !stop.get()) {
- Thread.sleep(checkInterval - elapsedTime);
- }
+ doPostLoopActions(elapsedTime, CompactorThreadType.INITIATOR);
- LOG.info("Initiator thread finished one loop.");
} while (!stop.get());
} catch (Throwable t) {
LOG.error("Caught an exception in the main loop of compactor initiator,
exiting.", t);
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index fea7ff02289..7667cfc971e 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -95,6 +95,7 @@ public class Worker extends RemoteCompactorThread implements
MetaStoreThread {
ExecutorService executor = getTimeoutHandlingExecutor();
try {
do {
+ long startedAt = System.currentTimeMillis();
Future<Boolean> singleRun = executor.submit(() ->
findNextCompactionAndExecute(genericStats, mrStats));
try {
launchedJob = singleRun.get(timeout, TimeUnit.MILLISECONDS);
@@ -123,8 +124,11 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
} catch (InterruptedException e) {
}
}
- LOG.info("Worker thread finished one loop.");
+ long elapsedTime = System.currentTimeMillis() - startedAt;
+ doPostLoopActions(elapsedTime, CompactorThreadType.WORKER);
} while (!stop.get());
+ } catch (Throwable t) {
+ LOG.error("Caught an exception in the main loop of compactor worker,
exiting.", t);
} finally {
if (executor != null) {
executor.shutdownNow();