Repository: hive
Updated Branches:
  refs/heads/branch-2.1 3eb16ebec -> 45c1775e1


HIVE-13833 : Add an initial delay when starting the heartbeat (Wei Zheng, 
reviewed by Eugene Koifman)


Project: http://git-wip-us.apache.org/repos/asf/hive/repo
Commit: http://git-wip-us.apache.org/repos/asf/hive/commit/45c1775e
Tree: http://git-wip-us.apache.org/repos/asf/hive/tree/45c1775e
Diff: http://git-wip-us.apache.org/repos/asf/hive/diff/45c1775e

Branch: refs/heads/branch-2.1
Commit: 45c1775e1b32234d576e7a474a372d9fa9326053
Parents: 3eb16eb
Author: Wei Zheng <w...@apache.org>
Authored: Tue Jun 14 15:30:56 2016 -0700
Committer: Wei Zheng <w...@apache.org>
Committed: Wed Jun 15 10:27:48 2016 -0700

----------------------------------------------------------------------
 .../hadoop/hive/ql/lockmgr/DbTxnManager.java    | 45 ++++++++++----------
 1 file changed, 22 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/hive/blob/45c1775e/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
----------------------------------------------------------------------
diff --git a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java 
b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
index 9988eec..5b6f20c 100644
--- a/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
+++ b/ql/src/java/org/apache/hadoop/hive/ql/lockmgr/DbTxnManager.java
@@ -20,7 +20,6 @@ package org.apache.hadoop.hive.ql.lockmgr;
 import com.google.common.annotations.VisibleForTesting;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hive.ql.io.AcidUtils;
-import org.apache.hadoop.hive.ql.parse.SemanticAnalyzer;
 import org.apache.hive.common.util.ShutdownHookManager;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -84,7 +83,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
   private static ScheduledExecutorService heartbeatExecutorService = null;
   private ScheduledFuture<?> heartbeatTask = null;
   private Runnable shutdownRunner = null;
-  static final int SHUTDOWN_HOOK_PRIORITY = 0;
+  private static final int SHUTDOWN_HOOK_PRIORITY = 0;
 
   DbTxnManager() {
     shutdownRunner = new Runnable() {
@@ -161,10 +160,11 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     getLockManager();
 
     boolean atLeastOneLock = false;
+    queryId = plan.getQueryId();
 
-    LockRequestBuilder rqstBuilder = new LockRequestBuilder(plan.getQueryId());
+    LockRequestBuilder rqstBuilder = new LockRequestBuilder(queryId);
     //link queryId to txnId
-    LOG.info("Setting lock request transaction to " + 
JavaUtils.txnIdToString(txnId) + " for queryId=" + plan.getQueryId());
+    LOG.info("Setting lock request transaction to " + 
JavaUtils.txnIdToString(txnId) + " for queryId=" + queryId);
     rqstBuilder.setTransactionId(txnId)
         .setUser(username);
 
@@ -304,7 +304,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     // Make sure we need locks.  It's possible there's nothing to lock in
     // this operation.
     if (!atLeastOneLock) {
-      LOG.debug("No locks needed for queryId" + plan.getQueryId());
+      LOG.debug("No locks needed for queryId" + queryId);
       return null;
     }
 
@@ -312,7 +312,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     if(isTxnOpen()) {
       statementId++;
     }
-    LockState lockState = lockMgr.lock(rqstBuilder.build(), plan.getQueryId(), 
isBlocking, locks);
+    LockState lockState = lockMgr.lock(rqstBuilder.build(), queryId, 
isBlocking, locks);
     ctx.setHiveLocks(locks);
     return lockState;
   }
@@ -324,15 +324,13 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     return t;
   }
   /**
-   * This is for testing only.
+   * This is for testing only. Normally client should call {@link 
#acquireLocks(QueryPlan, Context, String, boolean)}
    * @param delay time to delay for first heartbeat
-   * @return null if no locks were needed
    */
   @VisibleForTesting
   void acquireLocksWithHeartbeatDelay(QueryPlan plan, Context ctx, String 
username, long delay) throws LockException {
     acquireLocks(plan, ctx, username, true);
     ctx.setHeartbeater(startHeartbeat(delay));
-    queryId = plan.getQueryId();
   }
 
 
@@ -439,24 +437,25 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     }
   }
 
-  private Heartbeater startHeartbeat() throws LockException {
-    return startHeartbeat(0);
-  }
-
   /**
-   *  This is for testing only.  Normally client should call {@link 
#startHeartbeat()}
-   *  Make the heartbeater start before an initial delay period.
-   *  @param delay time to delay before first execution, in milliseconds
-   *  @return heartbeater
+   * Start the heartbeater threadpool and return the task.
+   * @param initialDelay time to delay before first execution, in milliseconds
+   * @return heartbeater
    */
-  Heartbeater startHeartbeat(long delay) throws LockException {
+  private Heartbeater startHeartbeat(long initialDelay) throws LockException {
     long heartbeatInterval = getHeartbeatInterval(conf);
     assert heartbeatInterval > 0;
     Heartbeater heartbeater = new Heartbeater(this, conf);
+    // For negative testing purpose..
+    if(conf.getBoolVar(HiveConf.ConfVars.HIVE_IN_TEST) && 
conf.getBoolVar(HiveConf.ConfVars.HIVETESTMODEFAILHEARTBEATER)) {
+      initialDelay = 0;
+    } else if (initialDelay == 0) {
+      initialDelay = heartbeatInterval;
+    }
     heartbeatTask = heartbeatExecutorService.scheduleAtFixedRate(
-        heartbeater, delay, heartbeatInterval, TimeUnit.MILLISECONDS);
-    LOG.info("Started heartbeat with delay/interval = " + 0 + "/" + 
heartbeatInterval + " " +
-        TimeUnit.MILLISECONDS + " for query: " + queryId);
+        heartbeater, initialDelay, heartbeatInterval, TimeUnit.MILLISECONDS);
+    LOG.info("Started heartbeat with delay/interval = " + initialDelay + "/" + 
heartbeatInterval +
+        " " + TimeUnit.MILLISECONDS + " for query: " + queryId);
     return heartbeater;
   }
 
@@ -584,7 +583,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
     return statementId;
   }
 
-  public static long getHeartbeatInterval(Configuration conf) throws 
LockException {
+  private static long getHeartbeatInterval(Configuration conf) throws 
LockException {
     // Retrieve HIVE_TXN_TIMEOUT in MILLISECONDS (it's defined as SECONDS),
     // then divide it by 2 to give us a safety factor.
     long interval =
@@ -612,7 +611,7 @@ public class DbTxnManager extends HiveTxnManagerImpl {
      *
      * @param txnMgr transaction manager for this operation
      */
-    public Heartbeater(HiveTxnManager txnMgr, HiveConf conf) {
+    Heartbeater(HiveTxnManager txnMgr, HiveConf conf) {
       this.txnMgr = txnMgr;
       this.conf = conf;
       lockException = null;

Reply via email to