This is an automated email from the ASF dual-hosted git repository.
vjasani pushed a commit to branch branch-2.5
in repository https://gitbox.apache.org/repos/asf/hbase.git
The following commit(s) were added to refs/heads/branch-2.5 by this push:
new 94fd5686c7d HBASE-29722 (backport portion of HBASE-27558) Coproc -
possible data integrity issues for scan with heavy filters (#7470)
94fd5686c7d is described below
commit 94fd5686c7d95e56c531357d976088a57dd78431
Author: Viraj Jasani <[email protected]>
AuthorDate: Fri Nov 21 19:35:50 2025 +0530
HBASE-29722 (backport portion of HBASE-27558) Coproc - possible data
integrity issues for scan with heavy filters (#7470)
Signed-off-by: Istvan Toth <[email protected]>
Signed-off-by: sanjeet006py <[email protected]>
Signed-off-by: Aman Poonia <[email protected]>
Signed-off-by: Umesh <[email protected]>
---
.../hadoop/hbase/regionserver/RSRpcServices.java | 10 +-
.../regionserver/TestScannerBlockSizeLimits.java | 140 +++++++++++++++++++++
2 files changed, 148 insertions(+), 2 deletions(-)
diff --git
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
index c63a3e827b2..95f94e3a622 100644
---
a/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
+++
b/hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/RSRpcServices.java
@@ -3469,12 +3469,18 @@ public class RSRpcServices
limitReached = sizeLimitReached || timeLimitReached ||
resultsLimitReached;
if (limitReached || !moreRows) {
+ // With block size limit, we may exceed size limit without
collecting any results.
+ // In this case we want to send heartbeat and/or cursor. We don't
want to send heartbeat
+ // or cursor if results were collected, for example for cell size
or heap size limits.
+ boolean sizeLimitReachedWithoutResults = sizeLimitReached &&
results.isEmpty();
// We only want to mark a ScanResponse as a heartbeat message in
the event that
// there are more values to be read server side. If there aren't
more values,
// marking it as a heartbeat is wasteful because the client will
need to issue
// another ScanRequest only to realize that they already have all
the values
- if (moreRows && timeLimitReached) {
- // Heartbeat messages occur when the time limit has been reached.
+ if (moreRows && (timeLimitReached ||
sizeLimitReachedWithoutResults)) {
+ // Heartbeat messages occur when the time limit has been
reached, or size limit has
+ // been reached before collecting any results. This can happen
for heavily filtered
+ // scans which scan over too many blocks.
builder.setHeartbeatMessage(true);
if (rsh.needCursor) {
Cell cursorCell = scannerContext.getLastPeekedCell();
diff --git
a/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java
new file mode 100644
index 00000000000..25cf47df647
--- /dev/null
+++
b/hbase-server/src/test/java/org/apache/hadoop/hbase/regionserver/TestScannerBlockSizeLimits.java
@@ -0,0 +1,140 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hbase.regionserver;
+
+import static org.junit.Assert.assertEquals;
+
+import java.io.IOException;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseClassTestRule;
+import org.apache.hadoop.hbase.HBaseTestingUtility;
+import org.apache.hadoop.hbase.HConstants;
+import org.apache.hadoop.hbase.TableName;
+import org.apache.hadoop.hbase.client.Put;
+import org.apache.hadoop.hbase.client.RegionLocator;
+import org.apache.hadoop.hbase.client.ResultScanner;
+import org.apache.hadoop.hbase.client.Scan;
+import org.apache.hadoop.hbase.client.Table;
+import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
+import org.apache.hadoop.hbase.testclassification.LargeTests;
+import org.apache.hadoop.hbase.util.Bytes;
+import org.junit.Before;
+import org.junit.BeforeClass;
+import org.junit.ClassRule;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+
+@Category({ LargeTests.class })
+public class TestScannerBlockSizeLimits {
+
+ @ClassRule
+ public static final HBaseClassTestRule CLASS_RULE =
+ HBaseClassTestRule.forClass(TestScannerBlockSizeLimits.class);
+
+ private static final HBaseTestingUtility TEST_UTIL = new
HBaseTestingUtility();
+ private static final TableName TABLE =
TableName.valueOf("TestScannerBlockSizeLimits");
+ private static final byte[] FAMILY1 = Bytes.toBytes("0");
+ private static final byte[] FAMILY2 = Bytes.toBytes("1");
+
+ private static final byte[] DATA = new byte[1000];
+ private static final byte[][] FAMILIES = new byte[][] { FAMILY1, FAMILY2 };
+
+ private static final byte[] COLUMN1 = Bytes.toBytes(0);
+ private static final byte[] COLUMN2 = Bytes.toBytes(1);
+ private static final byte[] COLUMN3 = Bytes.toBytes(2);
+ private static final byte[] COLUMN5 = Bytes.toBytes(5);
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ Configuration conf = TEST_UTIL.getConfiguration();
+ conf.setInt(HConstants.HBASE_SERVER_SCANNER_MAX_RESULT_SIZE_KEY, 4200);
+ TEST_UTIL.startMiniCluster(1);
+ TEST_UTIL.createTable(TABLE, FAMILIES, 1, 2048);
+ createTestData();
+ }
+
+ @Before
+ public void setupEach() throws Exception {
+ HRegionServer regionServer =
TEST_UTIL.getMiniHBaseCluster().getRegionServer(0);
+ for (HRegion region : regionServer.getRegions(TABLE)) {
+ System.out.println("Clearing cache for region " +
region.getRegionInfo().getEncodedName());
+ regionServer.clearRegionBlockCache(region);
+ }
+ }
+
+ private static void createTestData() throws IOException,
InterruptedException {
+ RegionLocator locator = TEST_UTIL.getConnection().getRegionLocator(TABLE);
+ String regionName =
locator.getAllRegionLocations().get(0).getRegion().getEncodedName();
+ HRegion region =
TEST_UTIL.getRSForFirstRegionInTable(TABLE).getRegion(regionName);
+
+ for (int i = 1; i < 10; i++) {
+ // 5 columns per row, in 2 families
+ // Each column value is 1000 bytes, which is enough to fill a full block
with row and header.
+ // So 5 blocks per row in FAMILY1
+ Put put = new Put(Bytes.toBytes(i));
+ for (int j = 0; j < 6; j++) {
+ put.addColumn(FAMILY1, Bytes.toBytes(j), DATA);
+ }
+
+ // Additional block in FAMILY2 (notably smaller than block size)
+ put.addColumn(FAMILY2, COLUMN1, DATA);
+
+ region.put(put);
+
+ if (i % 2 == 0) {
+ region.flush(true);
+ }
+ }
+
+ // we've created 10 storefiles at this point, 5 per family
+ region.flush(true);
+
+ }
+
+ /**
+ * Simplest test that ensures we don't count block sizes too much. These 2
requested cells are in
+ * the same block, so should be returned in 1 request. If we mis-counted
blocks, it'd come in 2
+ * requests.
+ */
+ @Test
+ public void testSingleBlock() throws IOException {
+ Table table = TEST_UTIL.getConnection().getTable(TABLE);
+
+ ResultScanner scanner =
+
table.getScanner(getBaseScan().withStartRow(Bytes.toBytes(1)).withStopRow(Bytes.toBytes(2))
+ .addColumn(FAMILY1, COLUMN1).addColumn(FAMILY1,
COLUMN2).setReadType(Scan.ReadType.STREAM));
+
+ ScanMetrics metrics = scanner.getScanMetrics();
+
+ scanner.next(100);
+
+ // we fetch 2 columns from 1 row, so about 2 blocks
+ assertEquals(1, metrics.countOfRowsScanned.get());
+ assertEquals(1, metrics.countOfRPCcalls.get());
+ }
+
+ /**
+ * We enable cursors and partial results to give us more granularity over
counting of results, and
+ * we enable STREAM so that no auto switching from pread to stream occurs --
this throws off the
+ * rpc counts.
+ */
+ private Scan getBaseScan() {
+ return new Scan().setScanMetricsEnabled(true).setNeedCursorResult(true)
+ .setAllowPartialResults(true).setReadType(Scan.ReadType.STREAM);
+ }
+}