This is an automated email from the ASF dual-hosted git repository. chenglei pushed a commit to branch revert-4262-scanbuff2 in repository https://gitbox.apache.org/repos/asf/hbase.git
commit 6ec89095c43d11186152ca0d843e2294c6cd87b6 Author: chenglei <[email protected]> AuthorDate: Wed Mar 23 17:51:39 2022 +0800 Revert "HBASE-26869 RSRpcServices.scan should deep clone cells when RpcCallContext is null (#4262)" This reverts commit d889746bc6d3cc7bebf532d3974e060c5fc2de4c. --- .../java/org/apache/hadoop/hbase/CellUtil.java | 14 -- .../apache/hadoop/hbase/regionserver/HRegion.java | 6 +- .../hadoop/hbase/regionserver/RSRpcServices.java | 24 +-- .../hbase/regionserver/TestRegionServerScan.java | 214 --------------------- 4 files changed, 7 insertions(+), 251 deletions(-) diff --git a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java index a98df84..71f5a66 100644 --- a/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java +++ b/hbase-common/src/main/java/org/apache/hadoop/hbase/CellUtil.java @@ -1764,18 +1764,4 @@ public final class CellUtil { if (diff != 0) return diff; return compareQualifiers(left, right, rqoffset, rqlength); } - - public static void cloneIfNecessary(ArrayList<Cell> cells) { - if (cells == null || cells.isEmpty()) { - return; - } - for (int i = 0; i < cells.size(); i++) { - Cell cell = cells.get(i); - cells.set(i, cloneIfNecessary(cell)); - } - } - - public static Cell cloneIfNecessary(Cell cell) { - return (cell instanceof ByteBufferExtendedCell ? KeyValueUtil.copyToNewKeyValue(cell) : cell); - } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java index a17fbc8..e0496b4 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/HRegion.java @@ -81,6 +81,7 @@ import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.LocatedFileStatus; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.ByteBufferExtendedCell; import org.apache.hadoop.hbase.Cell; import org.apache.hadoop.hbase.CellBuilderType; import org.apache.hadoop.hbase.CellComparator; @@ -96,6 +97,7 @@ import org.apache.hadoop.hbase.HConstants; import org.apache.hadoop.hbase.HConstants.OperationStatusCode; import org.apache.hadoop.hbase.HDFSBlocksDistribution; import org.apache.hadoop.hbase.KeyValue; +import org.apache.hadoop.hbase.KeyValueUtil; import org.apache.hadoop.hbase.MetaCellComparator; import org.apache.hadoop.hbase.NamespaceDescriptor; import org.apache.hadoop.hbase.NotServingRegionException; @@ -7583,8 +7585,8 @@ public class HRegion implements HeapSize, PropagatingConfigurationObserver, Regi // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap buffers. // See more details in HBASE-26036. for (Cell cell : tmp) { - results.add( - CellUtil.cloneIfNecessary(cell)); + results.add(cell instanceof ByteBufferExtendedCell ? + KeyValueUtil.copyToNewKeyValue(cell) : cell); } } diff --git a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java index abbf35f..b0b6dd0 100644 --- a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java +++ b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java @@ -3340,7 +3340,7 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin // This is cells inside a row. Default size is 10 so if many versions or many cfs, // then we'll resize. Resizings show in profiler. Set it higher than 10. For now // arbitrary 32. TODO: keep record of general size of results being returned. - ArrayList<Cell> values = new ArrayList<>(32); + List<Cell> values = new ArrayList<>(32); region.startRegionOperation(Operation.SCAN); long before = EnvironmentEdgeManager.currentTime(); // Used to check if we've matched the row limit set on the Scan @@ -3401,16 +3401,9 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin // reset the batch progress between nextRaw invocations since we don't want the // batch progress from previous calls to affect future calls scannerContext.setBatchProgress(0); - assert values.isEmpty(); // Collect values to be returned here moreRows = scanner.nextRaw(values, scannerContext); - if (context == null) { - // When there is no RpcCallContext,copy EC to heap, then the scanner would close, - // This can be an EXPENSIVE call. It may make an extra copy from offheap to onheap - // buffers.See more details in HBASE-26036. - CellUtil.cloneIfNecessary(values); - } numOfNextRawCalls++; if (!values.isEmpty()) { @@ -3767,25 +3760,14 @@ public class RSRpcServices implements HBaseRPCErrorHandler, AdminService.Blockin if (context != null) { context.setCallBack(rsh.shippedCallback); } else { - // If context is null,here we call rsh.shippedCallback directly to reuse the logic in - // rsh.shippedCallback to release the internal resources in rsh,and lease is also added - // back to regionserver's LeaseManager in rsh.shippedCallback. - runShippedCallback(rsh); + // When context != null, adding back the lease will be done in callback set above. + addScannerLeaseBack(lease); } } quota.close(); } } - private void runShippedCallback(RegionScannerHolder rsh) throws ServiceException { - assert rsh.shippedCallback != null; - try { - rsh.shippedCallback.run(); - } catch (IOException ioe) { - throw new ServiceException(ioe); - } - } - private void closeScanner(HRegion region, RegionScanner scanner, String scannerName, RpcCallContext context) throws IOException { if (region.getCoprocessorHost() != null) { diff --git a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java deleted file mode 100644 index 4357669..0000000 --- a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestRegionServerScan.java +++ /dev/null @@ -1,214 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.hadoop.hbase.regionserver; - -import static org.junit.Assert.assertNull; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.Optional; -import java.util.concurrent.atomic.AtomicReference; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.hbase.HBaseClassTestRule; -import org.apache.hadoop.hbase.HBaseTestingUtil; -import org.apache.hadoop.hbase.HConstants; -import org.apache.hadoop.hbase.SingleProcessHBaseCluster.MiniHBaseClusterRegionServer; -import org.apache.hadoop.hbase.TableName; -import org.apache.hadoop.hbase.client.Admin; -import org.apache.hadoop.hbase.client.Put; -import org.apache.hadoop.hbase.client.Result; -import org.apache.hadoop.hbase.client.ResultScanner; -import org.apache.hadoop.hbase.client.Scan; -import org.apache.hadoop.hbase.client.Table; -import org.apache.hadoop.hbase.io.ByteBuffAllocator; -import org.apache.hadoop.hbase.io.DeallocateRewriteByteBuffAllocator; -import org.apache.hadoop.hbase.io.hfile.BlockCacheFactory; -import org.apache.hadoop.hbase.ipc.RpcCall; -import org.apache.hadoop.hbase.ipc.RpcServer; -import org.apache.hadoop.hbase.testclassification.LargeTests; -import org.apache.hadoop.hbase.testclassification.RegionServerTests; -import org.apache.hadoop.hbase.util.Bytes; -import org.junit.AfterClass; -import org.junit.BeforeClass; -import org.junit.ClassRule; -import org.junit.Rule; -import org.junit.Test; -import org.junit.experimental.categories.Category; -import org.junit.rules.TestName; - -import org.apache.hbase.thirdparty.com.google.protobuf.RpcController; -import org.apache.hbase.thirdparty.com.google.protobuf.ServiceException; - -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest; -import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanResponse; - -@Category({ RegionServerTests.class, LargeTests.class }) -public class TestRegionServerScan { - @ClassRule - public static final HBaseClassTestRule CLASS_RULE = - HBaseClassTestRule.forClass(TestRegionServerScan.class); - - @Rule - public TestName name = new TestName(); - - private static final byte[] CF = Bytes.toBytes("CF"); - private static final byte[] CQ = Bytes.toBytes("CQ"); - private static final byte[] VALUE = new byte[1200]; - - private static final HBaseTestingUtil TEST_UTIL = new HBaseTestingUtil(); - private static final Configuration conf = TEST_UTIL.getConfiguration(); - private static Admin admin = null; - static final TableName tableName = TableName.valueOf("TestRegionServerScan"); - static final byte[] r0 = Bytes.toBytes("row-0"); - static final byte[] r1 = Bytes.toBytes("row-1"); - static final byte[] r2 = Bytes.toBytes("row-2"); - - @BeforeClass - public static void setupBeforeClass() throws Exception { - /** - * Use {@link DeallocateRewriteByteBuffAllocator} to rewrite the bytebuffers right after - * released. - */ - conf.set(ByteBuffAllocator.BYTEBUFF_ALLOCATOR_CLASS, - DeallocateRewriteByteBuffAllocator.class.getName()); - conf.setBoolean(ByteBuffAllocator.ALLOCATOR_POOL_ENABLED_KEY, true); - conf.setInt(ByteBuffAllocator.MIN_ALLOCATE_SIZE_KEY, 0); - conf.setInt(BlockCacheFactory.BUCKET_CACHE_WRITER_THREADS_KEY, 20); - conf.setInt(ByteBuffAllocator.BUFFER_SIZE_KEY, 2048); - conf.set(HConstants.BUCKET_CACHE_IOENGINE_KEY, "offheap"); - conf.setInt(HConstants.BUCKET_CACHE_SIZE_KEY, 64); - conf.setStrings(HConstants.REGION_SERVER_IMPL, MyRegionServer.class.getName()); - conf.setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 30 * 60 * 1000); - conf.setInt(HConstants.HBASE_CLIENT_SCANNER_TIMEOUT_PERIOD, 30 * 60 * 1000); - - conf.setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 60 * 60 * 1000); - conf.setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 1); - conf.setInt(HConstants.HBASE_CLIENT_PAUSE, 10000); - conf.setLong(StoreScanner.STORESCANNER_PREAD_MAX_BYTES, 1024 * 1024 * 1024); - TEST_UTIL.startMiniCluster(1); - admin = TEST_UTIL.getAdmin(); - } - - @AfterClass - public static void tearDownAfterClass() throws Exception { - TEST_UTIL.shutdownMiniCluster(); - } - - @Test - public void testScannWhenRpcCallContextNull() throws Exception { - ResultScanner resultScanner = null; - Table table = null; - try { - table = - TEST_UTIL.createTable(tableName, new byte[][] { CF }, 1, 1024, null); - putToTable(table, r0); - putToTable(table, r1); - putToTable(table, r2); - - admin.flush(table.getName()); - - Scan scan = new Scan(); - scan.setCaching(2); - scan.withStartRow(r0, true).withStopRow(r2, true); - - MyRSRpcServices.inTest = true; - resultScanner = table.getScanner(scan); - Result result = resultScanner.next(); - byte[] rowKey = result.getRow(); - assertTrue(Bytes.equals(r0, rowKey)); - - result = resultScanner.next(); - rowKey = result.getRow(); - assertTrue(Bytes.equals(r1, rowKey)); - - result = resultScanner.next(); - rowKey = result.getRow(); - assertTrue(Bytes.equals(r2, rowKey)); - assertNull(resultScanner.next()); - assertTrue(MyRSRpcServices.exceptionRef.get() == null); - } finally { - MyRSRpcServices.inTest = false; - if (resultScanner != null) { - resultScanner.close(); - } - if (table != null) { - table.close(); - } - } - } - - private static void putToTable(Table table, byte[] rowkey) throws IOException { - Put put = new Put(rowkey); - put.addColumn(CF, CQ, VALUE); - table.put(put); - } - - private static class MyRegionServer extends MiniHBaseClusterRegionServer { - public MyRegionServer(Configuration conf) throws IOException, InterruptedException { - super(conf); - } - - @Override - protected RSRpcServices createRpcServices() throws IOException { - return new MyRSRpcServices(this); - } - } - - private static class MyRSRpcServices extends RSRpcServices { - private static AtomicReference<Throwable> exceptionRef = new AtomicReference<Throwable>(null); - private static volatile boolean inTest = false; - - public MyRSRpcServices(HRegionServer rs) throws IOException { - super(rs); - } - - @Override - public ScanResponse scan(RpcController controller, ScanRequest request) - throws ServiceException { - try { - if (!inTest) { - return super.scan(controller, request); - } - - HRegion region = null; - if (request.hasRegion()) { - region = this.getRegion(request.getRegion()); - } - - if (region != null - && !tableName.equals(region.getTableDescriptor().getTableName())) { - return super.scan(controller, request); - } - - ScanResponse result = null; - //Simulate RpcCallContext is null for test. - Optional<RpcCall> rpcCall = RpcServer.unsetCurrentCall(); - try { - result = super.scan(controller, request); - } finally { - rpcCall.ifPresent(RpcServer::setCurrentCall); - } - return result; - } catch (Throwable e) { - exceptionRef.set(e); - throw new ServiceException(e); - } - } - } - -}
