This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2.2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.2 by this push:
new 35d3d75 HBASE-26012 Improve logging and dequeue logic in DelayQueue
(#3397)
35d3d75 is described below
commit 35d3d7543fa4dd0c9288d7842530794c08b2148c
Author: Viraj Jasani <[email protected]>
AuthorDate: Thu Jun 24 15:27:11 2021 +0530
HBASE-26012 Improve logging and dequeue logic in DelayQueue (#3397)
Signed-off-by: Duo Zhang <[email protected]>
---
.../apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java | 8 +++++++-
.../org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java | 4 +++-
.../java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java | 5 +++--
3 files changed, 13 insertions(+), 4 deletions(-)
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
index fadf0fb..594caa4 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/RemoteProcedureDispatcher.java
@@ -310,8 +310,14 @@ public abstract class RemoteProcedureDispatcher<TEnv,
TRemote extends Comparable
@Override
public void run() {
while (running.get()) {
- final DelayedWithTimeout task =
DelayedUtil.takeWithoutInterrupt(queue);
+ final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue,
+ 20, TimeUnit.SECONDS);
if (task == null || task == DelayedUtil.DELAYED_POISON) {
+ if (task == null && queue.size() > 0) {
+ LOG.error("DelayQueue for RemoteProcedureDispatcher is not empty
when timed waiting"
+ + " elapsed. If this is repeated consistently, it means no
element is getting expired"
+ + " from the queue and it might freeze the system. Queue: {}",
queue);
+ }
// the executor may be shutting down, and the task is just the
shutdown request
continue;
}
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
index 1e796d9..fc917b6 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/TimeoutExecutorThread.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.procedure2;
import java.util.concurrent.DelayQueue;
+import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil;
import org.apache.hadoop.hbase.procedure2.util.DelayedUtil.DelayedWithTimeout;
import org.apache.yetus.audience.InterfaceAudience;
@@ -52,7 +53,8 @@ class TimeoutExecutorThread<TEnvironment> extends
StoppableThread {
@Override
public void run() {
while (executor.isRunning()) {
- final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue);
+ final DelayedWithTimeout task = DelayedUtil.takeWithoutInterrupt(queue,
20,
+ TimeUnit.SECONDS);
if (task == null || task == DelayedUtil.DELAYED_POISON) {
// the executor may be shutting down,
// and the task is just the shutdown request
diff --git
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
index 4d3ebd9..fa796ae 100644
---
a/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
+++
b/hbase-procedure/src/main/java/org/apache/hadoop/hbase/procedure2/util/DelayedUtil.java
@@ -77,9 +77,10 @@ public final class DelayedUtil {
/**
* @return null (if an interrupt) or an instance of E; resets interrupt on
calling thread.
*/
- public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E>
queue) {
+ public static <E extends Delayed> E takeWithoutInterrupt(final DelayQueue<E>
queue,
+ final long timeout, final TimeUnit timeUnit) {
try {
- return queue.take();
+ return queue.poll(timeout, timeUnit);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return null;