This is an automated email from the ASF dual-hosted git repository.

chenglei pushed a commit to branch branch-2.4
in repository https://gitbox.apache.org/repos/asf/hbase.git


The following commit(s) were added to refs/heads/branch-2.4 by this push:
     new 4bb1d75  HBASE-26869 RSRpcServices.scan should deep clone cells when 
RpcCallContext is null (#4265)
4bb1d75 is described below

commit 4bb1d75f674a53739c6f8d0deb975ae0ce2e4c09
Author: chenglei <[email protected]>
AuthorDate: Wed Mar 23 19:13:46 2022 +0800

    HBASE-26869 RSRpcServices.scan should deep clone cells when RpcCallContext 
is null (#4265)
---
 .../java/org/apache/hadoop/hbase/CellUtil.java     |  14 ++
 .../apache/hadoop/hbase/regionserver/HRegion.java  |   6 +-
 .../hadoop/hbase/regionserver/RSRpcServices.java   |  24 ++-
 .../hbase/regionserver/TestRegionServerScan.java   | 214 +++++++++++++++++++++
 4 files changed, 251 insertions(+), 7 deletions(-)

diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java 
b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 71f5a66..a98df84 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -1764,4 +1764,18 @@ public final class CellUtil {
     if (diff != 0) return diff;
     return compareQualifiers(left, right, rqoffset, rqlength);
   }
+
+  public static void cloneIfNecessary(ArrayList<Cell> cells) {
+    if (cells == null || cells.isEmpty()) {
+      return;
+    }
+    for (int i = 0; i < cells.size(); i++) {
+      Cell cell = cells.get(i);
+      cells.set(i, cloneIfNecessary(cell));
+    }
+  }
+
+  public static Cell cloneIfNecessary(Cell cell) {
+    return (cell instanceof ByteBufferExtendedCell ? 
KeyValueUtil.copyToNewKeyValue(cell) : cell);
+  }
 }
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index 9b230f3..48f4f32 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -77,7 +77,6 @@ import org.apache.hadoop.fs.FileStatus;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.LocatedFileStatus;
 import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ByteBufferExtendedCell;
 import org.apache.hadoop.hbase.Cell;
 import org.apache.hadoop.hbase.CellBuilderType;
 import org.apache.hadoop.hbase.CellComparator;
@@ -93,7 +92,6 @@ import org.apache.hadoop.hbase.HConstants;
 import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
 import org.apache.hadoop.hbase.HDFSBlocksDistribution;
 import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
 import org.apache.hadoop.hbase.MetaCellComparator;
 import org.apache.hadoop.hbase.NamespaceDescriptor;
 import org.apache.hadoop.hbase.NotServingRegionException;
@@ -8282,8 +8280,8 @@ public class HRegion implements HeapSize, 
PropagatingConfigurationObserver, Regi
       // This can be an EXPENSIVE call. It may make an extra copy from offheap 
to onheap buffers.
       // See more details in HBASE-26036.
       for (Cell cell : tmp) {
-        results.add(cell instanceof ByteBufferExtendedCell ?
-            KeyValueUtil.copyToNewKeyValue(cell) : cell);
+        results.add(
+          CellUtil.cloneIfNecessary(cell));
       }
     }
 
diff --git 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index 594b0a8..63eba8b 100644
--- 
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++ 
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3306,7 +3306,7 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
     // This is cells inside a row. Default size is 10 so if many versions or 
many cfs,
     // then we'll resize. Resizings show in profiler. Set it higher than 10. 
For now
     // arbitrary 32. TODO: keep record of general size of results being 
returned.
-    List<Cell> values = new ArrayList<>(32);
+    ArrayList<Cell> values = new ArrayList<>(32);
     region.startRegionOperation(Operation.SCAN);
     long before = EnvironmentEdgeManager.currentTime();
     // Used to check if we've matched the row limit set on the Scan
@@ -3367,9 +3367,16 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
           // reset the batch progress between nextRaw invocations since we 
don't want the
           // batch progress from previous calls to affect future calls
           scannerContext.setBatchProgress(0);
+          assert values.isEmpty();
 
           // Collect values to be returned here
           moreRows = scanner.nextRaw(values, scannerContext);
+          if (context == null) {
+            // When there is no RpcCallContext,copy EC to heap, then the 
scanner would close,
+            // This can be an EXPENSIVE call. It may make an extra copy from 
offheap to onheap
+            // buffers.See more details in HBASE-26036.
+            CellUtil.cloneIfNecessary(values);
+          }
           numOfNextRawCalls++;
 
           if (!values.isEmpty()) {
@@ -3726,14 +3733,25 @@ public class RSRpcServices implements 
HBaseRPCErrorHandler,
         if (context != null) {
           context.setCallBack(rsh.shippedCallback);
         } else {
-          // When context != null, adding back the lease will be done in 
callback set above.
-          addScannerLeaseBack(lease);
+          // If context is null,here we call rsh.shippedCallback directly to 
reuse the logic in
+          // rsh.shippedCallback to release the internal resources in rsh,and 
lease is also added
+          // back to regionserver's LeaseManager in rsh.shippedCallback.
+          runShippedCallback(rsh);
         }
       }
       quota.close();
     }
   }
 
+  private void runShippedCallback(RegionScannerHolder rsh) throws 
ServiceException {
+    assert rsh.shippedCallback != null;
+    try {
+      rsh.shippedCallback.run();
+    } catch (IOException ioe) {
+      throw new ServiceException(ioe);
+    }
+  }
+
   private void closeScanner(HRegion region, RegionScanner scanner, String 
scannerName,
       RpcCallContext context) throws IOException {
     if (region.getCoprocessorHost() != null) {
diff --git 
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java
 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java
new file mode 100644
index 0000000..cf27726
--- /dev/null
+++ 
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import 
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestRegionServerScan {
+  @ClassRule
+  public static final HBaseClassTestRule CLASS_RULE =
+      HBaseClassTestRule.forClass(TestRegionServerScan.class);
+
+  @Rule
+  public TestName name = new TestName();
+
+  private static final byte[] CF = Bytes.toBytes("CF");
+  private static final byte[] CQ = Bytes.toBytes("CQ");
+  private static final byte[] VALUE = new byte[1200];
+
+  private static final HBaseTestingUtility TEST_UTIL = new 
HBaseTestingUtility();
+  private static final Configuration conf = TEST_UTIL.getConfiguration();
+  private static Admin admin = null;
+  static final TableName tableName = TableName.valueOf("TestRegionServerScan");
+  static final byte[] r0 = Bytes.toBytes("row-0");
+  static final byte[] r1 = Bytes.toBytes("row-1");
+  static final byte[] r2 = Bytes.toBytes("row-2");
+
+  @BeforeClass
+  public static void setupBeforeClass() throws Exception {
+    /**
+     * Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the 
bytebuffers right after
+     * released.
+     */
+    conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS,
+      DeallocateRewriteByteBuffAllocator.class.getName());
+    conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
+    conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
+    conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20);
+    conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048);
+    conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
+    conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);
+    conf.setStrings(HConstants.REGION_SERVER_IMPL, 
MyRegionServer.class.getName());
+    conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000);
+    conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 
1000);
+
+    conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000);
+    conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+    conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000);
+    conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 
1024);
+    TEST_UTIL.startMiniCluster(1);
+    admin = TEST_UTIL.getAdmin();
+  }
+
+  @AfterClass
+  public static void tearDownAfterClass() throws Exception {
+    TEST_UTIL.shutdownMiniCluster();
+  }
+
+  @Test
+  public void testScannWhenRpcCallContextNull() throws Exception {
+    ResultScanner resultScanner = null;
+    Table table = null;
+    try {
+      table =
+          TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null);
+      putToTable(table, r0);
+      putToTable(table, r1);
+      putToTable(table, r2);
+
+      admin.flush(table.getName());
+
+      Scan scan = new Scan();
+      scan.setCaching(2);
+      scan.withStartRow(r0, true).withStopRow(r2, true);
+
+      MyRSRpcServices.inTest = true;
+      resultScanner = table.getScanner(scan);
+      Result result = resultScanner.next();
+      byte[] rowKey = result.getRow();
+      assertTrue(Bytes.equals(r0, rowKey));
+
+      result = resultScanner.next();
+      rowKey = result.getRow();
+      assertTrue(Bytes.equals(r1, rowKey));
+
+      result = resultScanner.next();
+      rowKey = result.getRow();
+      assertTrue(Bytes.equals(r2, rowKey));
+      assertNull(resultScanner.next());
+      assertTrue(MyRSRpcServices.exceptionRef.get() == null);
+    } finally {
+      MyRSRpcServices.inTest = false;
+      if (resultScanner != null) {
+        resultScanner.close();
+      }
+      if (table != null) {
+        table.close();
+      }
+    }
+  }
+
+  private static void putToTable(Table table, byte[] rowkey) throws 
IOException {
+    Put put = new Put(rowkey);
+    put.addColumn(CF, CQ, VALUE);
+    table.put(put);
+  }
+
+  private static class MyRegionServer extends MiniHBaseClusterRegionServer {
+    public MyRegionServer(Configuration conf) throws IOException, 
InterruptedException {
+      super(conf);
+    }
+
+    @Override
+    protected RSRpcServices createRpcServices() throws IOException {
+      return new MyRSRpcServices(this);
+    }
+  }
+
+  private static class MyRSRpcServices extends RSRpcServices {
+    private static AtomicReference<Throwable> exceptionRef = new 
AtomicReference<Throwable>(null);
+    private static volatile boolean inTest = false;
+
+    public MyRSRpcServices(HRegionServer rs) throws IOException {
+      super(rs);
+    }
+
+    @Override
+    public ScanResponse scan(RpcController controller, ScanRequest request)
+        throws ServiceException {
+      try {
+        if (!inTest) {
+          return super.scan(controller, request);
+        }
+
+        HRegion region = null;
+        if (request.hasRegion()) {
+          region = this.getRegion(request.getRegion());
+        }
+
+        if (region != null
+            && !tableName.equals(region.getTableDescriptor().getTableName())) {
+          return super.scan(controller, request);
+        }
+
+        ScanResponse result = null;
+        //Simulate RpcCallContext is null for test.
+        Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
+        try {
+          result = super.scan(controller, request);
+        } finally {
+          rpcCall.ifPresent(RpcServer::setCurrentCall);
+        }
+        return result;
+      } catch (Throwable e) {
+        exceptionRef.set(e);
+        throw new ServiceException(e);
+      }
+    }
+  }
+
+}

Reply via email to