This is an automated email from the ASF dual-hosted git repository.
mbod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git
The following commit(s) were added to refs/heads/master by this push:
new 0e8b485 HIVE-25740: Avoid compaction heartbeater error logging in
race conditions (Marton Bod, reviewed by Adam Szita, Peter Vary, Karen Coppage)
0e8b485 is described below
commit 0e8b485759392a4bc58563d88c7c307903bfd61b
Author: Marton Bod <[email protected]>
AuthorDate: Thu Dec 2 17:27:10 2021 +0100
HIVE-25740: Avoid compaction heartbeater error logging in race conditions
(Marton Bod, reviewed by Adam Szita, Peter Vary, Karen Coppage)
---
.../hadoop/hive/ql/txn/compactor/Worker.java | 68 +++++++++++++---------
1 file changed, 40 insertions(+), 28 deletions(-)
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 6a3d719..bcd4833 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hive.ql.txn.compactor;
import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.ScheduledExecutorService;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
@@ -233,39 +234,35 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
private final CompactionTxn compactionTxn;
private final String tableName;
private final HiveConf conf;
- private final long txnTimeout;
+ private final AtomicBoolean errorLogEnabled;
public CompactionHeartbeater(CompactionTxn compactionTxn, String
tableName, HiveConf conf) {
this.tableName = Objects.requireNonNull(tableName);
this.compactionTxn = Objects.requireNonNull(compactionTxn);
this.conf = Objects.requireNonNull(conf);
-
- this.txnTimeout = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+ this.errorLogEnabled = new AtomicBoolean(true);
setDaemon(true);
setPriority(MIN_PRIORITY);
setName("CompactionHeartbeater-" + compactionTxn.getTxnId());
}
+ public void shouldLogError(boolean shouldLogError) {
+ this.errorLogEnabled.set(shouldLogError);
+ }
+
@Override
public void run() {
LOG.debug("Heartbeating compaction transaction id {} for table: {}",
compactionTxn, tableName);
-
IMetaStoreClient msc = null;
try {
// Create a metastore client for each thread since it is not thread
safe
msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
- while (true) {
- msc.heartbeat(compactionTxn.getTxnId(), compactionTxn.getLockId());
-
- // Send a heart beat before a timeout occurs. Scale the interval
based
- // on the server's transaction timeout allowance
- Thread.sleep(txnTimeout / 2);
- }
- } catch (InterruptedException ie) {
- LOG.debug("Successfully stop the heartbeating the transaction {}",
this.compactionTxn);
+ msc.heartbeat(compactionTxn.getTxnId(), compactionTxn.getLockId());
} catch (Exception e) {
- LOG.error("Error while heartbeating txn {}", compactionTxn, e);
+ if (errorLogEnabled.get()) {
+ LOG.error("Error while heartbeating transaction id {} for table:
{}", compactionTxn, tableName, e);
+ }
} finally {
if (msc != null) {
msc.close();
@@ -360,7 +357,6 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
PerfLogger perfLogger = SessionState.getPerfLogger(false);
String workerMetric = null;
- CompactionHeartbeater heartbeater = null;
CompactionInfo ci = null;
try (CompactionTxn compactionTxn = new CompactionTxn()) {
if (msc == null) {
@@ -455,9 +451,6 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
* multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList,
long)} */
compactionTxn.open(ci);
- heartbeater = new CompactionHeartbeater(compactionTxn, fullTableName,
conf);
- heartbeater.start();
-
ValidTxnList validTxnList = msc.getValidTxns(compactionTxn.getTxnId());
//with this ValidWriteIdList is capped at whatever HWM validTxnList has
final ValidCompactorWriteIdList tblValidWriteIds =
@@ -521,7 +514,6 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
LOG.info("Will compact id: " + ci.id + " via MR job");
runCompactionViaMrJob(ci, t, p, sd, tblValidWriteIds, jobName, dir,
su);
}
- heartbeater.interrupt();
LOG.info("Completed " + ci.type.toString() + " compaction for " +
ci.getFullPartitionName() + " in "
+ compactionTxn + ", marking as compacted.");
@@ -561,9 +553,6 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
} catch (Throwable t) {
LOG.error("Caught an exception in the main loop of compactor worker " +
workerName, t);
} finally {
- if (heartbeater != null) {
- heartbeater.interrupt();
- }
if (workerMetric != null && MetastoreConf.getBoolVar(conf,
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
perfLogger.perfLogEnd(CLASS_NAME, workerMetric);
}
@@ -715,6 +704,8 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
private TxnStatus status = TxnStatus.UNKNOWN;
private boolean succeessfulCompaction = false;
+ private CompactionHeartbeater heartbeater;
+ private ScheduledExecutorService heartbeatExecutor;
/**
* Try to open a new txn.
@@ -734,6 +725,11 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
throw new TException("Unable to acquire lock(S) on " +
ci.getFullPartitionName());
}
lockId = res.getLockid();
+
+ heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+ long txnTimeout = MetastoreConf.getTimeVar(conf,
MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+ heartbeater = new CompactionHeartbeater(this,
TxnUtils.getFullTableName(ci.dbname, ci.tableName), conf);
+ heartbeatExecutor.scheduleAtFixedRate(heartbeater, txnTimeout / 4,
txnTimeout / 2, TimeUnit.MILLISECONDS);
}
/**
@@ -748,13 +744,29 @@ public class Worker extends RemoteCompactorThread
implements MetaStoreThread {
* @throws Exception
*/
@Override public void close() throws Exception {
- if (status == TxnStatus.UNKNOWN) {
- return;
+ if (status != TxnStatus.UNKNOWN) {
+ // turn off error logging in heartbeater in case of race condition
between commit/abort and heartbeating
+ heartbeater.shouldLogError(false);
+ if (succeessfulCompaction) {
+ commit();
+ } else {
+ abort();
+ }
}
- if (succeessfulCompaction) {
- commit();
- } else {
- abort();
+ shutdownHeartbeater();
+ }
+
+ private void shutdownHeartbeater() {
+ if (heartbeatExecutor != null) {
+ heartbeatExecutor.shutdownNow();
+ try {
+ if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+ heartbeatExecutor.shutdownNow();
+ }
+ LOG.debug("Successfully stopped heartbeating for transaction {}",
this);
+ } catch (InterruptedException ex) {
+ heartbeatExecutor.shutdownNow();
+ }
}
}