This is an automated email from the ASF dual-hosted git repository.
chenglei pushed a commit to branch branch-2
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2 by this push:
new 5affa20 HBASE-26869 RSRpcServices.scan should deep clone cells when
RpcCallContext is null (#4264)
5affa20 is described below
commit 5affa20359b67cd54229b00d15ab4e0f3726fbba
Author: chenglei <[email protected]>
AuthorDate: Wed Mar 23 18:31:47 2022 +0800
HBASE-26869 RSRpcServices.scan should deep clone cells when RpcCallContext
is null (#4264)
---
.../java/org/apache/hadoop/hbase/CellUtil.java | 14 ++
.../apache/hadoop/hbase/regionserver/HRegion.java | 6 +-
.../hadoop/hbase/regionserver/RSRpcServices.java | 24 ++-
.../hbase/regionserver/TestRegionServerScan.java | 214 +++++++++++++++++++++
4 files changed, 251 insertions(+), 7 deletions(-)
diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
index 71f5a66..a98df84 100644
--- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
+++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java
@@ -1764,4 +1764,18 @@ public final class CellUtil {
if (diff != 0) return diff;
return compareQualifiers(left, right, rqoffset, rqlength);
}
+
+ public static void cloneIfNecessary(ArrayList<Cell> cells) {
+ if (cells == null || cells.isEmpty()) {
+ return;
+ }
+ for (int i = 0; i < cells.size(); i++) {
+ Cell cell = cells.get(i);
+ cells.set(i, cloneIfNecessary(cell));
+ }
+ }
+
+ public static Cell cloneIfNecessary(Cell cell) {
+ return (cell instanceof ByteBufferExtendedCell ?
KeyValueUtil.copyToNewKeyValue(cell) : cell);
+ }
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
index e0496b4..a17fbc8 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java
@@ -81,7 +81,6 @@ import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.hbase.ByteBufferExtendedCell;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.CellBuilderType;
import org.apache.hadoop.hbase.CellComparator;
@@ -97,7 +96,6 @@ import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.HConstants.OperationStatusCode;
import org.apache.hadoop.hbase.HDFSBlocksDistribution;
import org.apache.hadoop.hbase.KeyValue;
-import org.apache.hadoop.hbase.KeyValueUtil;
import org.apache.hadoop.hbase.MetaCellComparator;
import org.apache.hadoop.hbase.NamespaceDescriptor;
import org.apache.hadoop.hbase.NotServingRegionException;
@@ -7585,8 +7583,8 @@ public class HRegion implements HeapSize,
PropagatingConfigurationObserver, Regi
// This can be an EXPENSIVE call. It may make an extra copy from offheap
to onheap buffers.
// See more details in HBASE-26036.
for (Cell cell : tmp) {
- results.add(cell instanceof ByteBufferExtendedCell ?
- KeyValueUtil.copyToNewKeyValue(cell) : cell);
+ results.add(
+ CellUtil.cloneIfNecessary(cell));
}
}
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index b0b6dd0..abbf35f 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3340,7 +3340,7 @@ public class RSRpcServices implements
HBaseRPCErrorHandler, AdminService.Blockin
// This is cells inside a row. Default size is 10 so if many versions or
many cfs,
// then we'll resize. Resizings show in profiler. Set it higher than 10.
For now
// arbitrary 32. TODO: keep record of general size of results being
returned.
- List<Cell> values = new ArrayList<>(32);
+ ArrayList<Cell> values = new ArrayList<>(32);
region.startRegionOperation(Operation.SCAN);
long before = EnvironmentEdgeManager.currentTime();
// Used to check if we've matched the row limit set on the Scan
@@ -3401,9 +3401,16 @@ public class RSRpcServices implements
HBaseRPCErrorHandler, AdminService.Blockin
// reset the batch progress between nextRaw invocations since we
don't want the
// batch progress from previous calls to affect future calls
scannerContext.setBatchProgress(0);
+ assert values.isEmpty();
// Collect values to be returned here
moreRows = scanner.nextRaw(values, scannerContext);
+ if (context == null) {
+ // When there is no RpcCallContext,copy EC to heap, then the
scanner would close,
+ // This can be an EXPENSIVE call. It may make an extra copy from
offheap to onheap
+ // buffers.See more details in HBASE-26036.
+ CellUtil.cloneIfNecessary(values);
+ }
numOfNextRawCalls++;
if (!values.isEmpty()) {
@@ -3760,14 +3767,25 @@ public class RSRpcServices implements
HBaseRPCErrorHandler, AdminService.Blockin
if (context != null) {
context.setCallBack(rsh.shippedCallback);
} else {
- // When context != null, adding back the lease will be done in
callback set above.
- addScannerLeaseBack(lease);
+ // If context is null,here we call rsh.shippedCallback directly to
reuse the logic in
+ // rsh.shippedCallback to release the internal resources in rsh,and
lease is also added
+ // back to regionserver's LeaseManager in rsh.shippedCallback.
+ runShippedCallback(rsh);
}
}
quota.close();
}
}
+ private void runShippedCallback(RegionScannerHolder rsh) throws
ServiceException {
+ assert rsh.shippedCallback != null;
+ try {
+ rsh.shippedCallback.run();
+ } catch (IOException ioe) {
+ throw new ServiceException(ioe);
+ }
+ }
+
private void closeScanner(HRegion region, RegionScanner scanner, String
scannerName,
RpcCallContext context) throws IOException {
if (region.getCoprocessorHost() != null) {
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java
new file mode 100644
index 0000000..cf27726
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java
@@ -0,0 +1,214 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
+
+import java.io.IOException;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicReference;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.MiniHBaseCluster.MiniHBaseClusterRegionServer;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Admin;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.Result;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.io.ByteBuffAllocator;
+import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator;
+import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory;
+import org.apache.hadoop.hbase.ipc.RpcCall;
+import org.apache.hadoop.hbase.ipc.RpcServer;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.testclassification.RegionServerTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.rules.TestName;
+
+import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;
+import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException;
+
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
+import
org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse;
+
+@Category({ RegionServerTests.class, LargeTests.class })
+public class TestRegionServerScan {
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestRegionServerScan.class);
+
+ @Rule
+ public TestName name = new TestName();
+
+ private static final byte[] CF = Bytes.toBytes("CF");
+ private static final byte[] CQ = Bytes.toBytes("CQ");
+ private static final byte[] VALUE = new byte[1200];
+
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ private static final Configuration conf = TEST_UTIL.getConfiguration();
+ private static Admin admin = null;
+ static final TableName tableName = TableName.valueOf("TestRegionServerScan");
+ static final byte[] r0 = Bytes.toBytes("row-0");
+ static final byte[] r1 = Bytes.toBytes("row-1");
+ static final byte[] r2 = Bytes.toBytes("row-2");
+
+ @BeforeClass
+ public static void setupBeforeClass() throws Exception {
+ /**
+ * Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the
bytebuffers right after
+ * released.
+ */
+ conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS,
+ DeallocateRewriteByteBuffAllocator.class.getName());
+ conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true);
+ conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0);
+ conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20);
+ conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048);
+ conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap");
+ conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64);
+ conf.setStrings(HConstants.REGION_SERVER_IMPL,
MyRegionServer.class.getName());
+ conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000);
+ conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 *
1000);
+
+ conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000);
+ conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1);
+ conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000);
+ conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 *
1024);
+ TEST_UTIL.startMiniCluster(1);
+ admin = TEST_UTIL.getAdmin();
+ }
+
+ @AfterClass
+ public static void tearDownAfterClass() throws Exception {
+ TEST_UTIL.shutdownMiniCluster();
+ }
+
+ @Test
+ public void testScannWhenRpcCallContextNull() throws Exception {
+ ResultScanner resultScanner = null;
+ Table table = null;
+ try {
+ table =
+ TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null);
+ putToTable(table, r0);
+ putToTable(table, r1);
+ putToTable(table, r2);
+
+ admin.flush(table.getName());
+
+ Scan scan = new Scan();
+ scan.setCaching(2);
+ scan.withStartRow(r0, true).withStopRow(r2, true);
+
+ MyRSRpcServices.inTest = true;
+ resultScanner = table.getScanner(scan);
+ Result result = resultScanner.next();
+ byte[] rowKey = result.getRow();
+ assertTrue(Bytes.equals(r0, rowKey));
+
+ result = resultScanner.next();
+ rowKey = result.getRow();
+ assertTrue(Bytes.equals(r1, rowKey));
+
+ result = resultScanner.next();
+ rowKey = result.getRow();
+ assertTrue(Bytes.equals(r2, rowKey));
+ assertNull(resultScanner.next());
+ assertTrue(MyRSRpcServices.exceptionRef.get() == null);
+ } finally {
+ MyRSRpcServices.inTest = false;
+ if (resultScanner != null) {
+ resultScanner.close();
+ }
+ if (table != null) {
+ table.close();
+ }
+ }
+ }
+
+ private static void putToTable(Table table, byte[] rowkey) throws
IOException {
+ Put put = new Put(rowkey);
+ put.addColumn(CF, CQ, VALUE);
+ table.put(put);
+ }
+
+ private static class MyRegionServer extends MiniHBaseClusterRegionServer {
+ public MyRegionServer(Configuration conf) throws IOException,
InterruptedException {
+ super(conf);
+ }
+
+ @Override
+ protected RSRpcServices createRpcServices() throws IOException {
+ return new MyRSRpcServices(this);
+ }
+ }
+
+ private static class MyRSRpcServices extends RSRpcServices {
+ private static AtomicReference<Throwable> exceptionRef = new
AtomicReference<Throwable>(null);
+ private static volatile boolean inTest = false;
+
+ public MyRSRpcServices(HRegionServer rs) throws IOException {
+ super(rs);
+ }
+
+ @Override
+ public ScanResponse scan(RpcController controller, ScanRequest request)
+ throws ServiceException {
+ try {
+ if (!inTest) {
+ return super.scan(controller, request);
+ }
+
+ HRegion region = null;
+ if (request.hasRegion()) {
+ region = this.getRegion(request.getRegion());
+ }
+
+ if (region != null
+ && !tableName.equals(region.getTableDescriptor().getTableName())) {
+ return super.scan(controller, request);
+ }
+
+ ScanResponse result = null;
+ //Simulate RpcCallContext is null for test.
+ Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall();
+ try {
+ result = super.scan(controller, request);
+ } finally {
+ rpcCall.ifPresent(RpcServer::setCurrentCall);
+ }
+ return result;
+ } catch (Throwable e) {
+ exceptionRef.set(e);
+ throw new ServiceException(e);
+ }
+ }
+ }
+
+}