This is an automated email from the ASF dual-hosted git repository.

vjasani pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.2 by this push:
     new 35d3d75  HBASE-26012 Improve logging and dequeue logic in DelayQueue 
(#3397)
35d3d75 is described below

commit 35d3d7543fa4dd0c9288d7842530794c08b2148c
Author: Viraj Jasani <[email protected]>
AuthorDate: Thu Jun 24 15:27:11 2021 +0530

    HBASE-26012 Improve logging and dequeue logic in DelayQueue (#3397)
    
    Signed-off-by: Duo Zhang <[email protected]>
---
 .../apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java | 8 +++++++-
 .../org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java | 4 +++-
 .../java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java | 5 +++--
 3 files changed, 13 insertions(+), 4 deletions(-)

diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
index fadf0fb..594caa4 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -310,8 +310,14 @@ public abstract class RemoteProcedureDispatcher<TEnv, 
TRemote extends Comparable
     @Override
     public void run() {
       while (running.get()) {
-        final DelayedWithTimeout task = 
DelayedUtil.takeWithoutInterrupt(queue);
+        final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue,
+          20, TimeUnit.SECONDS);
         if (task == null || task == DelayedUtil.DELAYED_POISON) {
+          if (task == null && queue.size() > 0) {
+            LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty 
when timed waiting"
+              + " elapsed. If this is repeated consistently, it means no 
element is getting expired"
+              + " from the queue and it might freeze the system. Queue: {}", 
queue);
+          }
           // the executor may be shutting down, and the task is just the 
shutdown request
           continue;
         }
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 1e796d9..fc917b6 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -18,6 +18,7 @@
 package org.apache.hadoop.hbase.procedure2;
 
 import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
 import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
 import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
 import org.apache.yetus.audience.InterfaceAudience;
@@ -52,7 +53,8 @@ class TimeoutExecutorThread<TEnvironment> extends 
StoppableThread {
   @Override
   public void run() {
     while (executor.isRunning()) {
-      final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
+      final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue, 
20,
+        TimeUnit.SECONDS);
       if (task == null || task == DelayedUtil.DELAYED_POISON) {
         // the executor may be shutting down,
         // and the task is just the shutdown request
diff --git 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
index 4d3ebd9..fa796ae 100644
--- 
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
+++ 
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
@@ -77,9 +77,10 @@ public final class DelayedUtil {
   /**
    * @return null (if an interrupt) or an instance of E; resets interrupt on 
calling thread.
    */
-  public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> 
queue) {
+  public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E> 
queue,
+      final long timeout, final TimeUnit timeUnit) {
     try {
-      return queue.take();
+      return queue.poll(timeout, timeUnit);
     } catch (InterruptedException e) {
       Thread.currentThread().interrupt();
       return null;

Reply via email to