This is an automated email from the ASF dual-hosted git repository.

stack pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f614ce  HBASE-6908: Add pluggable rpc queue implementation (#3522)
7f614ce is described below

commit 7f614ce77eee8b30533c81c7cbeba7b8975eeb6f
Author: Richard Marscher <[email protected]>
AuthorDate: Mon Aug 9 15:54:18 2021 -0400

    HBASE-6908: Add pluggable rpc queue implementation (#3522)
    
    Can pass in a FQCN to load as the call queue implementation.
    
    Standardized arguments to the constructor are the max queue length, the
    PriorityFunction, and the Configuration.
    
    `PluggableBlockingQueue` abstract class provided to help guide the
    correct constructor signature
    
    Hard fails if the class fails to load as a `BlockingQueue<CallRunner>`
    
    Signed-off-by: stack <[email protected]>
---
 .../hadoop/hbase/ipc/PluggableBlockingQueue.java   |  55 ++++++++
 .../hbase/ipc/PluggableRpcQueueNotFound.java       |  34 +++++
 .../org/apache/hadoop/hbase/ipc/RpcExecutor.java   |  55 +++++++-
 .../hadoop/hbase/ipc/SimpleRpcScheduler.java       |   3 +-
 .../hadoop/hbase/ipc/TestPluggableQueueImpl.java   | 155 +++++++++++++++++++++
 .../hadoop/hbase/ipc/TestSimpleRpcScheduler.java   |  75 ++++++++++
 6 files changed, 372 insertions(+), 5 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableBlockingQueue.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableBlockingQueue.java
new file mode 100644
index 0000000..0b88b6c
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableBlockingQueue.java
@@ -0,0 +1,55 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.concurrent.BlockingQueue;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Abstract class template for defining a pluggable blocking queue 
implementation to be used
+ * by the 'pluggable' call queue type in the RpcExecutor.
+ *
+ * The intention is that the constructor shape helps re-inforce the expected 
parameters needed
+ * to match up to how the RpcExecutor will instantiate instances of the queue.
+ *
+ * If the implementation class implements the
+ * {@link org.apache.hadoop.hbase.conf.ConfigurationObserver} interface, it 
will also be wired
+ * into configuration changes.
+ *
+ * Instantiation requires a constructor with {@code
+ *     final int maxQueueLength,
+ *     final PriorityFunction priority,
+ *     final Configuration conf)}
+ *  as the arguments.
+ */
[email protected]
[email protected]
+public abstract class PluggableBlockingQueue implements 
BlockingQueue<CallRunner> {
+  protected final int maxQueueLength;
+  protected final PriorityFunction priority;
+  protected final Configuration conf;
+
+  public PluggableBlockingQueue(final int maxQueueLength,
+        final PriorityFunction priority, final Configuration conf) {
+    this.maxQueueLength = maxQueueLength;
+    this.priority = priority;
+    this.conf = conf;
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableRpcQueueNotFound.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableRpcQueueNotFound.java
new file mode 100644
index 0000000..dade53c
--- /dev/null
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/PluggableRpcQueueNotFound.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import org.apache.yetus.audience.InterfaceAudience;
+import org.apache.yetus.audience.InterfaceStability;
+
+/**
+ * Internal runtime error type to indicate the RpcExecutor failed to execute a 
`pluggable`
+ * call queue type. Either the FQCN for the class was missing in 
Configuration, not found on the
+ * classpath, or is not a subtype of {@code BlockingQueue<CallRunner>}
+ */
[email protected]
[email protected]
+public class PluggableRpcQueueNotFound extends RuntimeException {
+  public PluggableRpcQueueNotFound(String message) {
+    super(message);
+  }
+}
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
index 3de5fa1..db51234 100644
--- a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
+++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/RpcExecutor.java
@@ -22,6 +22,7 @@ import java.util.ArrayList;
 import java.util.Comparator;
 import java.util.List;
 import java.util.Locale;
+import java.util.Optional;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.ThreadLocalRandom;
@@ -33,6 +34,7 @@ import java.util.HashMap;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.Abortable;
 import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
 import org.apache.hbase.thirdparty.io.netty.util.internal.StringUtil;
 import org.apache.yetus.audience.InterfaceAudience;
 import org.slf4j.Logger;
@@ -67,6 +69,7 @@ public abstract class RpcExecutor {
   public static final String CALL_QUEUE_TYPE_CODEL_CONF_VALUE = "codel";
   public static final String CALL_QUEUE_TYPE_DEADLINE_CONF_VALUE = "deadline";
   public static final String CALL_QUEUE_TYPE_FIFO_CONF_VALUE = "fifo";
+  public static final String CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE = 
"pluggable";
   public static final String CALL_QUEUE_TYPE_CONF_KEY = 
"hbase.ipc.server.callqueue.type";
   public static final String CALL_QUEUE_TYPE_CONF_DEFAULT = 
CALL_QUEUE_TYPE_FIFO_CONF_VALUE;
 
@@ -79,6 +82,9 @@ public abstract class RpcExecutor {
   public static final int CALL_QUEUE_CODEL_DEFAULT_INTERVAL = 100;
   public static final double CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD = 0.8;
 
+  public static final String PLUGGABLE_CALL_QUEUE_CLASS_NAME =
+    "hbase.ipc.server.callqueue.pluggable.queue.class.name";
+
   private LongAdder numGeneralCallsDropped = new LongAdder();
   private LongAdder numLifoModeSwitches = new LongAdder();
 
@@ -150,13 +156,23 @@ public abstract class RpcExecutor {
       int codelInterval = conf.getInt(CALL_QUEUE_CODEL_INTERVAL, 
CALL_QUEUE_CODEL_DEFAULT_INTERVAL);
       double codelLifoThreshold = 
conf.getDouble(CALL_QUEUE_CODEL_LIFO_THRESHOLD,
         CALL_QUEUE_CODEL_DEFAULT_LIFO_THRESHOLD);
-      queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, 
codelInterval,
+      this.queueInitArgs = new Object[] { maxQueueLength, codelTargetDelay, 
codelInterval,
           codelLifoThreshold, numGeneralCallsDropped, numLifoModeSwitches };
-      queueClass = AdaptiveLifoCoDelCallQueue.class;
+      this.queueClass = AdaptiveLifoCoDelCallQueue.class;
+    } else if (isPluggableQueueType(callQueueType)) {
+      Optional<Class<? extends BlockingQueue<CallRunner>>> pluggableQueueClass 
= getPluggableQueueClass();
+
+      if (!pluggableQueueClass.isPresent()) {
+        throw new PluggableRpcQueueNotFound("Pluggable call queue failed to 
load and selected call"
+          + " queue type required");
+      } else {
+        this.queueInitArgs = new Object[] { maxQueueLength, this.priority, 
conf };
+        this.queueClass = pluggableQueueClass.get();
+      }
     } else {
       this.name += ".Fifo";
-      queueInitArgs = new Object[] { maxQueueLength };
-      queueClass = LinkedBlockingQueue.class;
+      this.queueInitArgs = new Object[] { maxQueueLength };
+      this.queueClass = LinkedBlockingQueue.class;
     }
 
     LOG.info("Instantiated {} with queueClass={}; " +
@@ -445,6 +461,35 @@ public abstract class RpcExecutor {
     return callQueueType.equals(CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
   }
 
+  public static boolean isPluggableQueueType(String callQueueType) {
+    return callQueueType.equals(CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
+  }
+
+  private Optional<Class<? extends BlockingQueue<CallRunner>>> 
getPluggableQueueClass() {
+    String queueClassName = conf.get(PLUGGABLE_CALL_QUEUE_CLASS_NAME);
+
+    if (queueClassName == null) {
+      LOG.error("Pluggable queue class config at " + 
PLUGGABLE_CALL_QUEUE_CLASS_NAME +
+        " was not found");
+      return Optional.empty();
+    }
+
+    try {
+      Class<?> clazz = Class.forName(queueClassName);
+
+      if (BlockingQueue.class.isAssignableFrom(clazz)) {
+        return Optional.of((Class<? extends BlockingQueue<CallRunner>>) clazz);
+      } else {
+        LOG.error("Pluggable Queue class " + queueClassName +
+          " does not extend BlockingQueue<CallRunner>");
+        return Optional.empty();
+      }
+    } catch (ClassNotFoundException exception) {
+      LOG.error("Could not find " + queueClassName + " on the classpath to 
load.");
+      return Optional.empty();
+    }
+  }
+
   public long getNumGeneralCallsDropped() {
     return numGeneralCallsDropped.longValue();
   }
@@ -522,6 +567,8 @@ public abstract class RpcExecutor {
       if (queue instanceof AdaptiveLifoCoDelCallQueue) {
         ((AdaptiveLifoCoDelCallQueue) queue).updateTunables(codelTargetDelay, 
codelInterval,
           codelLifoThreshold);
+      } else if (queue instanceof ConfigurationObserver) {
+        ((ConfigurationObserver)queue).onConfigurationChange(conf);
       }
     }
   }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
index b4bc347..3b7c0bb 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/SimpleRpcScheduler.java
@@ -154,7 +154,8 @@ public class SimpleRpcScheduler extends RpcScheduler 
implements ConfigurationObs
 
     String callQueueType = conf.get(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
       RpcExecutor.CALL_QUEUE_TYPE_CONF_DEFAULT);
-    if (RpcExecutor.isCodelQueueType(callQueueType)) {
+    if (RpcExecutor.isCodelQueueType(callQueueType) ||
+      RpcExecutor.isPluggableQueueType(callQueueType)) {
       callExecutor.onConfigurationChange(conf);
     }
   }
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPluggableQueueImpl.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPluggableQueueImpl.java
new file mode 100644
index 0000000..eeb057c
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestPluggableQueueImpl.java
@@ -0,0 +1,155 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.ipc;
+
+import java.util.Collection;
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.concurrent.TimeUnit;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.conf.ConfigurationObserver;
+import org.apache.hadoop.hbase.util.BoundedPriorityBlockingQueue;
+
+/**
+ * Implementation of the PluggableBlockingQueue abstract class.
+ *
+ * Used to verify that the pluggable call queue type for the RpcExecutor can 
load correctly
+ * via the FQCN reflection semantics.
+ */
+public class TestPluggableQueueImpl extends PluggableBlockingQueue implements
+  ConfigurationObserver {
+
+  private final BoundedPriorityBlockingQueue<CallRunner> inner;
+  private static boolean configurationRecentlyChanged = false;
+
+  public TestPluggableQueueImpl(int maxQueueLength, PriorityFunction priority, 
Configuration conf) {
+    super(maxQueueLength, priority, conf);
+    Comparator<CallRunner> comparator = Comparator.comparingInt(r -> 
r.getRpcCall().getPriority());
+    inner = new BoundedPriorityBlockingQueue<>(maxQueueLength, comparator);
+    configurationRecentlyChanged = false;
+  }
+
+  @Override public boolean add(CallRunner callRunner) {
+    return inner.add(callRunner);
+  }
+
+  @Override public boolean offer(CallRunner callRunner) {
+    return inner.offer(callRunner);
+  }
+
+  @Override public CallRunner remove() {
+    return inner.remove();
+  }
+
+  @Override public CallRunner poll() {
+    return inner.poll();
+  }
+
+  @Override public CallRunner element() {
+    return inner.element();
+  }
+
+  @Override public CallRunner peek() {
+    return inner.peek();
+  }
+
+  @Override public void put(CallRunner callRunner) throws InterruptedException 
{
+    inner.put(callRunner);
+  }
+
+  @Override public boolean offer(CallRunner callRunner, long timeout, TimeUnit 
unit)
+    throws InterruptedException {
+    return inner.offer(callRunner, timeout, unit);
+  }
+
+  @Override public CallRunner take() throws InterruptedException {
+    return inner.take();
+  }
+
+  @Override public CallRunner poll(long timeout, TimeUnit unit) throws 
InterruptedException {
+    return inner.poll(timeout, unit);
+  }
+
+  @Override public int remainingCapacity() {
+    return inner.remainingCapacity();
+  }
+
+  @Override public boolean remove(Object o) {
+    return inner.remove(o);
+  }
+
+  @Override public boolean containsAll(Collection<?> c) {
+    return inner.containsAll(c);
+  }
+
+  @Override public boolean addAll(Collection<? extends CallRunner> c) {
+    return inner.addAll(c);
+  }
+
+  @Override public boolean removeAll(Collection<?> c) {
+    return inner.removeAll(c);
+  }
+
+  @Override public boolean retainAll(Collection<?> c) {
+    return inner.retainAll(c);
+  }
+
+  @Override public void clear() {
+    inner.clear();
+  }
+
+  @Override public int size() {
+    return inner.size();
+  }
+
+  @Override public boolean isEmpty() {
+    return inner.isEmpty();
+  }
+
+  @Override public boolean contains(Object o) {
+    return inner.contains(o);
+  }
+
+  @Override public Iterator<CallRunner> iterator() {
+    return inner.iterator();
+  }
+
+  @Override public Object[] toArray() {
+    return inner.toArray();
+  }
+
+  @Override public <T> T[] toArray(T[] a) {
+    return inner.toArray(a);
+  }
+
+  @Override public int drainTo(Collection<? super CallRunner> c) {
+    return inner.drainTo(c);
+  }
+
+  @Override public int drainTo(Collection<? super CallRunner> c, int 
maxElements) {
+    return inner.drainTo(c, maxElements);
+  }
+
+  public static boolean hasObservedARecentConfigurationChange() {
+    return configurationRecentlyChanged;
+  }
+
+  @Override public void onConfigurationChange(Configuration conf) {
+    configurationRecentlyChanged = true;
+  }
+}
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
index f791421..7d32f35 100644
--- 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/ipc/TestSimpleRpcScheduler.java
@@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotEquals;
 import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.ArgumentMatchers.eq;
 import static org.mockito.Mockito.doAnswer;
@@ -247,10 +248,84 @@ public class TestSimpleRpcScheduler {
     testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_FIFO_CONF_VALUE);
   }
 
+  @Test
+  public void testPluggableRpcQueue() throws Exception {
+    testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
+      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
+
+    try {
+      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
+        "MissingClass");
+      fail("Expected a PluggableRpcQueueNotFound for unloaded class");
+    } catch (PluggableRpcQueueNotFound e) {
+      // expected
+    } catch (Exception e) {
+      fail("Expected a PluggableRpcQueueNotFound for unloaded class, but 
instead got " + e);
+    }
+
+    try {
+      testRpcScheduler(RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE,
+        "org.apache.hadoop.hbase.ipc.SimpleRpcServer");
+      fail("Expected a PluggableRpcQueueNotFound for incompatible class");
+    } catch (PluggableRpcQueueNotFound e) {
+      // expected
+    } catch (Exception e) {
+      fail("Expected a PluggableRpcQueueNotFound for incompatible class, but 
instead got " + e);
+    }
+  }
+
+  @Test
+  public void testPluggableRpcQueueCanListenToConfigurationChanges() throws 
Exception {
+
+    Configuration schedConf = HBaseConfiguration.create();
+
+    schedConf.setInt(HConstants.REGION_SERVER_HANDLER_COUNT, 2);
+    schedConf.setInt("hbase.ipc.server.max.callqueue.length", 5);
+    schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY,
+      RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE);
+    schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME,
+      "org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl");
+
+    PriorityFunction priority = mock(PriorityFunction.class);
+    when(priority.getPriority(any(), any(), 
any())).thenReturn(HConstants.NORMAL_QOS);
+    SimpleRpcScheduler scheduler = new SimpleRpcScheduler(schedConf, 0, 0, 0, 
priority,
+      HConstants.QOS_THRESHOLD);
+    try {
+      scheduler.start();
+
+      CallRunner putCallTask = mock(CallRunner.class);
+      ServerCall putCall = mock(ServerCall.class);
+      putCall.param = RequestConverter.buildMutateRequest(
+        Bytes.toBytes("abc"), new Put(Bytes.toBytes("row")));
+      RequestHeader putHead = 
RequestHeader.newBuilder().setMethodName("mutate").build();
+      when(putCallTask.getRpcCall()).thenReturn(putCall);
+      when(putCall.getHeader()).thenReturn(putHead);
+
+      assertTrue(scheduler.dispatch(putCallTask));
+
+      schedConf.setInt("hbase.ipc.server.max.callqueue.length", 4);
+      scheduler.onConfigurationChange(schedConf);
+      
assertTrue(TestPluggableQueueImpl.hasObservedARecentConfigurationChange());
+      waitUntilQueueEmpty(scheduler);
+    } finally {
+      scheduler.stop();
+    }
+  }
+
   private void testRpcScheduler(final String queueType) throws Exception {
+    testRpcScheduler(queueType, null);
+  }
+
+  private void testRpcScheduler(final String queueType, final String 
pluggableQueueClass)
+    throws Exception {
+
     Configuration schedConf = HBaseConfiguration.create();
     schedConf.set(RpcExecutor.CALL_QUEUE_TYPE_CONF_KEY, queueType);
 
+    if (RpcExecutor.CALL_QUEUE_TYPE_PLUGGABLE_CONF_VALUE.equals(queueType)) {
+      schedConf.set(RpcExecutor.PLUGGABLE_CALL_QUEUE_CLASS_NAME, 
pluggableQueueClass);
+    }
+
     PriorityFunction priority = mock(PriorityFunction.class);
     when(priority.getPriority(any(), any(), 
any())).thenReturn(HConstants.NORMAL_QOS);
 

Reply via email to