This is an automated email from the ASF dual-hosted git repository.

bbeaudreault pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/master by this push:
     new e2af8f4f14d HBASE-28128 Reject requests at RPC layer when RegionServer 
is aborting (#5447)
e2af8f4f14d is described below

commit e2af8f4f14debc50bff6ef0181bd07575ba3b8de
Author: Bryan Beaudreault <bbeaudrea...@apache.org>
AuthorDate: Thu Oct 5 08:25:03 2023 -0400

    HBASE-28128 Reject requests at RPC layer when RegionServer is aborting 
(#5447)
    
    Signed-off-by: Nick Dimiduk <ndimi...@apache.org>
    Reviewed-by: Duo Zhang <zhang...@apache.org>
---
 .../hadoop/hbase/ipc/ServerRpcConnection.java      |  40 +++-
 .../TestRegionServerRejectDuringAbort.java         | 253 +++++++++++++++++++++
 2 files changed, 281 insertions(+), 12 deletions(-)

diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
index e0f69e4b84c..695f1e7050c 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/ipc/ServerRpcConnection.java
@@ -46,6 +46,7 @@ import org.apache.hadoop.hbase.io.ByteBufferOutputStream;
 import org.apache.hadoop.hbase.io.crypto.aes.CryptoAES;
 import org.apache.hadoop.hbase.ipc.RpcServer.CallCleanup;
 import org.apache.hadoop.hbase.nio.ByteBuff;
+import org.apache.hadoop.hbase.regionserver.RegionServerAbortedException;
 import org.apache.hadoop.hbase.security.AccessDeniedException;
 import org.apache.hadoop.hbase.security.HBaseSaslRpcServer;
 import org.apache.hadoop.hbase.security.SaslStatus;
@@ -548,6 +549,19 @@ abstract class ServerRpcConnection implements Closeable {
     Span span = TraceUtil.createRemoteSpan("RpcServer.process", traceCtx);
     try (Scope ignored = span.makeCurrent()) {
       int id = header.getCallId();
+      // HBASE-28128 - if server is aborting, don't bother trying to process. 
It will
+      // fail at the handler layer, but worse might result in 
CallQueueTooBigException if the
+      // queue is full but server is not properly processing requests. Better 
to throw an aborted
+      // exception here so that the client can properly react.
+      if (rpcServer.server != null && rpcServer.server.isAborted()) {
+        RegionServerAbortedException serverIsAborted = new 
RegionServerAbortedException(
+          "Server " + rpcServer.server.getServerName() + " aborting");
+        this.rpcServer.metrics.exception(serverIsAborted);
+        sendErrorResponseForCall(id, totalRequestSize, span, 
serverIsAborted.getMessage(),
+          serverIsAborted);
+        return;
+      }
+
       if (RpcServer.LOG.isTraceEnabled()) {
         RpcServer.LOG.trace("RequestHeader " + 
TextFormat.shortDebugString(header)
           + " totalRequestSize: " + totalRequestSize + " bytes");
@@ -559,14 +573,11 @@ abstract class ServerRpcConnection implements Closeable {
         (totalRequestSize + this.rpcServer.callQueueSizeInBytes.sum())
             > this.rpcServer.maxQueueSizeInBytes
       ) {
-        final ServerCall<?> callTooBig = createCall(id, this.service, null, 
null, null, null,
-          totalRequestSize, null, 0, this.callCleanup);
         
this.rpcServer.metrics.exception(RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
-        callTooBig.setResponse(null, null, 
RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION,
+        sendErrorResponseForCall(id, totalRequestSize, span,
           "Call queue is full on " + this.rpcServer.server.getServerName()
-            + ", is hbase.ipc.server.max.callqueue.size too small?");
-        TraceUtil.setError(span, RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
-        callTooBig.sendResponseIfReady();
+            + ", is hbase.ipc.server.max.callqueue.size too small?",
+          RpcServer.CALL_QUEUE_TOO_BIG_EXCEPTION);
         return;
       }
       MethodDescriptor md = null;
@@ -621,12 +632,8 @@ abstract class ServerRpcConnection implements Closeable {
           responseThrowable = thrown;
         }
 
-        ServerCall<?> readParamsFailedCall = createCall(id, this.service, 
null, null, null, null,
-          totalRequestSize, null, 0, this.callCleanup);
-        readParamsFailedCall.setResponse(null, null, responseThrowable,
-          msg + "; " + responseThrowable.getMessage());
-        TraceUtil.setError(span, responseThrowable);
-        readParamsFailedCall.sendResponseIfReady();
+        sendErrorResponseForCall(id, totalRequestSize, span,
+          msg + "; " + responseThrowable.getMessage(), responseThrowable);
         return;
       }
 
@@ -656,6 +663,15 @@ abstract class ServerRpcConnection implements Closeable {
     }
   }
 
+  private void sendErrorResponseForCall(int id, long totalRequestSize, Span 
span, String msg,
+    Throwable responseThrowable) throws IOException {
+    ServerCall<?> failedcall = createCall(id, this.service, null, null, null, 
null,
+      totalRequestSize, null, 0, this.callCleanup);
+    failedcall.setResponse(null, null, responseThrowable, msg);
+    TraceUtil.setError(span, responseThrowable);
+    failedcall.sendResponseIfReady();
+  }
+
   protected final RpcResponse getErrorResponse(String msg, Exception e) throws 
IOException {
     ResponseHeader.Builder headerBuilder = 
ResponseHeader.newBuilder().setCallId(-1);
     ServerCall.setExceptionResponse(e, msg, headerBuilder);
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerRejectDuringAbort.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerRejectDuringAbort.java
new file mode 100644
index 00000000000..8add191f9ab
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerRejectDuringAbort.java
@@ -0,0 +1,253 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtil;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.HRegionLocation;
+import org.apache.hadoop.hbase.ServerName;
+import org.apache.hadoop.hbase.StartTestingClusterOption;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
+import org.apache.hadoop.hbase.client.Connection;
+import org.apache.hadoop.hbase.client.ConnectionFactory;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionInfo;
+import org.apache.hadoop.hbase.client.RetriesExhaustedException;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.TableDescriptor;
+import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
+import org.apache.hadoop.hbase.coprocessor.ObserverContext;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessor;
+import org.apache.hadoop.hbase.coprocessor.RegionCoprocessorEnvironment;
+import org.apache.hadoop.hbase.coprocessor.RegionObserver;
+import org.apache.hadoop.hbase.ipc.CallRunner;
+import org.apache.hadoop.hbase.ipc.PluggableBlockingQueue;
+import org.apache.hadoop.hbase.ipc.PriorityFunction;
+import org.apache.hadoop.hbase.ipc.TestPluggableQueueImpl;
+import org.apache.hadoop.hbase.testclassification.MediumTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.hadoop.hbase.util.JVMClusterUtil;
+import org.apache.hadoop.hbase.util.Threads;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+@Category({ RegionServerTests.class, MediumTests.class })
+public class TestRegionServerRejectDuringAbort {
+
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+    HBaseClassTestRule.forClass(TestRegionServerRejectDuringAbort.class);
+
+  private static final Logger LOG =
+    LoggerFactory.getLogger(TestRegionServerRejectDuringAbort.class);
+
+  private static final HBaseTestingUtil UTIL = new HBaseTestingUtil();
+
+  private static TableName TABLE_NAME = TableName.valueOf("RSRejectOnAbort");
+
+  private static byte[] CF = Bytes.toBytes("cf");
+
+  private static final int REGIONS_NUM = 5;
+
+  private static final AtomicReference<Exception> THROWN_EXCEPTION = new 
AtomicReference<>(null);
+
+  private static volatile boolean shouldThrowTooBig = false;
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    // Will schedule a abort timeout task after SLEEP_TIME_WHEN_CLOSE_REGION ms
+    UTIL.getConfiguration().set("hbase.ipc.server.callqueue.type", 
"pluggable");
+    
UTIL.getConfiguration().setClass("hbase.ipc.server.callqueue.pluggable.queue.class.name",
+      CallQueueTooBigThrowingQueue.class, PluggableBlockingQueue.class);
+    StartTestingClusterOption option =
+      StartTestingClusterOption.builder().numRegionServers(2).build();
+    UTIL.startMiniCluster(option);
+    TableDescriptor td = TableDescriptorBuilder.newBuilder(TABLE_NAME)
+      .setCoprocessor(SleepWhenCloseCoprocessor.class.getName())
+      
.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(CF).build()).build();
+    UTIL.getAdmin().createTable(td, Bytes.toBytes("0"), Bytes.toBytes("9"), 
REGIONS_NUM);
+  }
+
+  public static final class CallQueueTooBigThrowingQueue extends 
TestPluggableQueueImpl {
+
+    public CallQueueTooBigThrowingQueue(int maxQueueLength, PriorityFunction 
priority,
+      Configuration conf) {
+      super(maxQueueLength, priority, conf);
+    }
+
+    @Override
+    public boolean offer(CallRunner callRunner) {
+      if (shouldThrowTooBig && 
callRunner.getRpcCall().getRequestAttribute("test") != null) {
+        return false;
+      }
+      return super.offer(callRunner);
+    }
+  }
+
+  @AfterClass
+  public static void tearDown() throws Exception {
+    UTIL.shutdownMiniCluster();
+  }
+
+  /**
+   * Tests that the logic in ServerRpcConnection works such that if the server 
is aborted, it short
+   * circuits any other logic. This means we no longer even attempt to enqueue 
the request onto the
+   * call queue. We verify this by using a special call queue which we can 
trigger to always return
+   * CallQueueTooBigException. If the logic works, despite forcing those 
exceptions, we should not
+   * see them.
+   */
+  @Test
+  public void testRejectRequestsOnAbort() throws Exception {
+    // We don't want to disrupt the server carrying meta, because we plan to 
disrupt requests to
+    // the server. Disrupting meta requests messes with the test.
+    HRegionServer serverWithoutMeta = null;
+    for (JVMClusterUtil.RegionServerThread regionServerThread : 
UTIL.getMiniHBaseCluster()
+      .getRegionServerThreads()) {
+      HRegionServer regionServer = regionServerThread.getRegionServer();
+      if (
+        regionServer.getRegions(TableName.META_TABLE_NAME).isEmpty()
+          && !regionServer.getRegions(TABLE_NAME).isEmpty()
+      ) {
+        serverWithoutMeta = regionServer;
+        break;
+      }
+    }
+
+    assertNotNull("couldn't find a server without meta, but with test table 
regions",
+      serverWithoutMeta);
+
+    Thread writer = new 
Thread(getWriterThreadRunnable(serverWithoutMeta.getServerName()));
+    writer.setDaemon(true);
+    writer.start();
+
+    // Trigger the abort. Our WriterThread will detect the first 
RegionServerAbortedException
+    // and trigger our custom queue to reject any more requests. This would 
typically result in
+    // CallQueueTooBigException, unless our logic in ServerRpcConnection to 
preempt the processing
+    // of a request is working.
+    serverWithoutMeta.abort("Abort RS for test");
+
+    UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
+    assertEquals(THROWN_EXCEPTION.get().getCause().getClass(), 
RegionServerAbortedException.class);
+  }
+
+  private Runnable getWriterThreadRunnable(ServerName loadServer) {
+    return () -> {
+      try {
+        Configuration conf = UTIL.getConfiguration();
+        conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 5);
+        try (Connection conn = ConnectionFactory.createConnection(conf);
+          Table table = conn.getTableBuilder(TABLE_NAME, null)
+            .setRequestAttribute("test", new byte[] { 0 }).build()) {
+          // find the first region to exist on our test server, then submit 
requests to it
+          for (HRegionLocation regionLocation : 
table.getRegionLocator().getAllRegionLocations()) {
+            if (regionLocation.getServerName().equals(loadServer)) {
+              submitRequestsToRegion(table, regionLocation.getRegion());
+              return;
+            }
+          }
+          throw new RuntimeException("Failed to find any regions for 
loadServer " + loadServer);
+        }
+      } catch (Exception e) {
+        LOG.warn("Failed to load data", e);
+        synchronized (THROWN_EXCEPTION) {
+          THROWN_EXCEPTION.set(e);
+          THROWN_EXCEPTION.notifyAll();
+        }
+      }
+    };
+  }
+
+  private void submitRequestsToRegion(Table table, RegionInfo regionInfo) 
throws IOException {
+    // We will block closes of the regions with a CP, so no need to worry 
about the region getting
+    // reassigned. Just use the same rowkey always.
+    byte[] rowKey = getRowKeyWithin(regionInfo);
+
+    int i = 0;
+    while (true) {
+      try {
+        i++;
+        table.put(new Put(rowKey).addColumn(CF, Bytes.toBytes(i), 
Bytes.toBytes(i)));
+      } catch (IOException e) {
+        // only catch RegionServerAbortedException once. After that, the next 
exception thrown
+        // is our test case
+        if (
+          !shouldThrowTooBig && e instanceof RetriesExhaustedException
+            && e.getCause() instanceof RegionServerAbortedException
+        ) {
+          shouldThrowTooBig = true;
+        } else {
+          throw e;
+        }
+      }
+
+      // small sleep to relieve pressure
+      Threads.sleep(10);
+    }
+  }
+
+  private byte[] getRowKeyWithin(RegionInfo regionInfo) {
+    byte[] rowKey;
+    // region is start of table, find one after start key
+    if (regionInfo.getStartKey().length == 0) {
+      if (regionInfo.getEndKey().length == 0) {
+        // doesn't matter, single region table
+        return Bytes.toBytes(1);
+      } else {
+        // find a row just before endkey
+        rowKey = Bytes.copy(regionInfo.getEndKey());
+        rowKey[rowKey.length - 1]--;
+        return rowKey;
+      }
+    } else {
+      return regionInfo.getStartKey();
+    }
+  }
+
+  public static class SleepWhenCloseCoprocessor implements RegionCoprocessor, 
RegionObserver {
+
+    public SleepWhenCloseCoprocessor() {
+    }
+
+    @Override
+    public Optional<RegionObserver> getRegionObserver() {
+      return Optional.of(this);
+    }
+
+    @Override
+    public void preClose(ObserverContext<RegionCoprocessorEnvironment> c, 
boolean abortRequested)
+      throws IOException {
+      // Wait so that the region can't close until we get the information we 
need from our test
+      UTIL.waitFor(60_000, () -> THROWN_EXCEPTION.get() != null);
+    }
+  }
+}

Reply via email to