This is an automated email from the ASF dual-hosted git repository.

mbod pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hive.git


The following commit(s) were added to refs/heads/master by this push:
     new 0e8b485  HIVE-25740: Avoid compaction heartbeater error logging in 
race conditions (Marton Bod, reviewed by Adam Szita, Peter Vary, Karen Coppage)
0e8b485 is described below

commit 0e8b485759392a4bc58563d88c7c307903bfd61b
Author: Marton Bod <[email protected]>
AuthorDate: Thu Dec 2 17:27:10 2021 +0100

    HIVE-25740: Avoid compaction heartbeater error logging in race conditions 
(Marton Bod, reviewed by Adam Szita, Peter Vary, Karen Coppage)
---
 .../hadoop/hive/ql/txn/compactor/Worker.java       | 68 +++++++++++++---------
 1 file changed, 40 insertions(+), 28 deletions(-)

diff --git a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java 
b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
index 6a3d719..bcd4833 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/txn/compactor/Worker.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hive.ql.txn.compactor;
 
 import com.google.common.annotations.VisibleForTesting;
+import java.util.concurrent.ScheduledExecutorService;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.common.ValidCompactorWriteIdList;
@@ -233,39 +234,35 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
     private final CompactionTxn compactionTxn;
     private final String tableName;
     private final HiveConf conf;
-    private final long txnTimeout;
+    private final AtomicBoolean errorLogEnabled;
 
     public CompactionHeartbeater(CompactionTxn compactionTxn, String 
tableName, HiveConf conf) {
       this.tableName = Objects.requireNonNull(tableName);
       this.compactionTxn = Objects.requireNonNull(compactionTxn);
       this.conf = Objects.requireNonNull(conf);
-
-      this.txnTimeout = MetastoreConf.getTimeVar(conf, 
MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+      this.errorLogEnabled = new AtomicBoolean(true);
 
       setDaemon(true);
       setPriority(MIN_PRIORITY);
       setName("CompactionHeartbeater-" + compactionTxn.getTxnId());
     }
 
+    public void shouldLogError(boolean shouldLogError) {
+      this.errorLogEnabled.set(shouldLogError);
+    }
+
     @Override
     public void run() {
       LOG.debug("Heartbeating compaction transaction id {} for table: {}", 
compactionTxn, tableName);
-
       IMetaStoreClient msc = null;
       try {
         // Create a metastore client for each thread since it is not thread 
safe
         msc = HiveMetaStoreUtils.getHiveMetastoreClient(conf);
-        while (true) {
-          msc.heartbeat(compactionTxn.getTxnId(), compactionTxn.getLockId());
-
-          // Send a heart beat before a timeout occurs. Scale the interval 
based
-          // on the server's transaction timeout allowance
-          Thread.sleep(txnTimeout / 2);
-        }
-      } catch (InterruptedException ie) {
-        LOG.debug("Successfully stop the heartbeating the transaction {}", 
this.compactionTxn);
+        msc.heartbeat(compactionTxn.getTxnId(), compactionTxn.getLockId());
       } catch (Exception e) {
-        LOG.error("Error while heartbeating txn {}", compactionTxn, e);
+        if (errorLogEnabled.get()) {
+          LOG.error("Error while heartbeating transaction id {} for table: 
{}", compactionTxn, tableName, e);
+        }
       } finally {
         if (msc != null) {
           msc.close();
@@ -360,7 +357,6 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
     PerfLogger perfLogger = SessionState.getPerfLogger(false);
     String workerMetric = null;
 
-    CompactionHeartbeater heartbeater = null;
     CompactionInfo ci = null;
     try (CompactionTxn compactionTxn = new CompactionTxn()) {
       if (msc == null) {
@@ -455,9 +451,6 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
        * multi-stmt txn. {@link Driver#setCompactionWriteIds(ValidWriteIdList, 
long)} */
       compactionTxn.open(ci);
 
-      heartbeater = new CompactionHeartbeater(compactionTxn, fullTableName, 
conf);
-      heartbeater.start();
-
       ValidTxnList validTxnList = msc.getValidTxns(compactionTxn.getTxnId());
       //with this ValidWriteIdList is capped at whatever HWM validTxnList has
       final ValidCompactorWriteIdList tblValidWriteIds =
@@ -521,7 +514,6 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
           LOG.info("Will compact id: " + ci.id + " via MR job");
           runCompactionViaMrJob(ci, t, p, sd, tblValidWriteIds, jobName, dir, 
su);
         }
-        heartbeater.interrupt();
 
         LOG.info("Completed " + ci.type.toString() + " compaction for " + 
ci.getFullPartitionName() + " in "
             + compactionTxn + ", marking as compacted.");
@@ -561,9 +553,6 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
     } catch (Throwable t) {
       LOG.error("Caught an exception in the main loop of compactor worker " + 
workerName, t);
     } finally {
-      if (heartbeater != null) {
-        heartbeater.interrupt();
-      }
       if (workerMetric != null && MetastoreConf.getBoolVar(conf, 
MetastoreConf.ConfVars.METASTORE_ACIDMETRICS_EXT_ON)) {
         perfLogger.perfLogEnd(CLASS_NAME, workerMetric);
       }
@@ -715,6 +704,8 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
 
     private TxnStatus status = TxnStatus.UNKNOWN;
     private boolean succeessfulCompaction = false;
+    private CompactionHeartbeater heartbeater;
+    private ScheduledExecutorService heartbeatExecutor;
 
     /**
      * Try to open a new txn.
@@ -734,6 +725,11 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
         throw new TException("Unable to acquire lock(S) on " + 
ci.getFullPartitionName());
       }
       lockId = res.getLockid();
+
+      heartbeatExecutor = Executors.newSingleThreadScheduledExecutor();
+      long txnTimeout = MetastoreConf.getTimeVar(conf, 
MetastoreConf.ConfVars.TXN_TIMEOUT, TimeUnit.MILLISECONDS);
+      heartbeater = new CompactionHeartbeater(this, 
TxnUtils.getFullTableName(ci.dbname, ci.tableName), conf);
+      heartbeatExecutor.scheduleAtFixedRate(heartbeater, txnTimeout / 4, 
txnTimeout / 2, TimeUnit.MILLISECONDS);
     }
 
     /**
@@ -748,13 +744,29 @@ public class Worker extends RemoteCompactorThread 
implements MetaStoreThread {
      * @throws Exception
      */
     @Override public void close() throws Exception {
-      if (status == TxnStatus.UNKNOWN) {
-        return;
+      if (status != TxnStatus.UNKNOWN) {
+        // turn off error logging in heartbeater in case of race condition 
between commit/abort and heartbeating
+        heartbeater.shouldLogError(false);
+        if (succeessfulCompaction) {
+          commit();
+        } else {
+          abort();
+        }
       }
-      if (succeessfulCompaction) {
-        commit();
-      } else {
-        abort();
+      shutdownHeartbeater();
+    }
+
+    private void shutdownHeartbeater() {
+      if (heartbeatExecutor != null) {
+        heartbeatExecutor.shutdownNow();
+        try {
+          if (!heartbeatExecutor.awaitTermination(5, TimeUnit.SECONDS)) {
+            heartbeatExecutor.shutdownNow();
+          }
+          LOG.debug("Successfully stopped heartbeating for transaction {}", 
this);
+        } catch (InterruptedException ex) {
+          heartbeatExecutor.shutdownNow();
+        }
       }
     }
 

Reply via email to