This is an automated email from the ASF dual-hosted git repository. tkhurana pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push: new b47bb8bccb PHOENIX-7701 UngroupedAggregateRegionScanner doesn't release rpc handler on page timeouts (#2289) b47bb8bccb is described below commit b47bb8bccb5b0853fa6b30878900aaa8cd24a377 Author: tkhurana <khurana.ta...@gmail.com> AuthorDate: Fri Sep 19 12:55:34 2025 -0700 PHOENIX-7701 UngroupedAggregateRegionScanner doesn't release rpc handler on page timeouts (#2289) --- .../hbase/regionserver/ScannerContextUtil.java | 8 ++++ .../phoenix/coprocessor/DelegateRegionScanner.java | 5 ++- .../UngroupedAggregateRegionScanner.java | 14 +++++++ .../org/apache/phoenix/end2end/ServerPagingIT.java | 45 ++++++++++++++++++++++ 4 files changed, 70 insertions(+), 2 deletions(-) diff --git a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java index 64e73461dc..23bf60cde0 100644 --- a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java +++ b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java @@ -56,4 +56,12 @@ public class ScannerContextUtil { public static void setReturnImmediately(ScannerContext sc) { sc.returnImmediately(); } + + /** + * returnImmediately is a private field in ScannerContext and there is no getter API on it But the + * checkTimeLimit API on the ScannerContext will return true if returnImmediately is set + */ + public static boolean checkTimeLimit(ScannerContext sc) { + return sc.checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS); + } } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java index 30c140a374..ad64e3ae58 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java @@ -106,8 +106,9 @@ public class DelegateRegionScanner implements RegionScanner { ScannerContext noLimitContext = ScannerContextUtil.copyNoLimitScanner(scannerContext); boolean hasMore = raw ? delegate.nextRaw(result, noLimitContext) : delegate.next(result, noLimitContext); - if (isDummy(result)) { - // when a dummy row is returned by a lower layer, set returnImmediately + if (isDummy(result) || ScannerContextUtil.checkTimeLimit(noLimitContext)) { + // when a dummy row is returned by a lower layer or if the result is valid but the lower + // layer signals us to return immediately, we need to set returnImmediately // on the ScannerContext to force HBase to return a response to the client ScannerContextUtil.setReturnImmediately(scannerContext); } diff --git a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java index 8f7c880b31..20ace9a02e 100644 --- a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java +++ b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable; import org.apache.hadoop.hbase.regionserver.Region; import org.apache.hadoop.hbase.regionserver.RegionScanner; import org.apache.hadoop.hbase.regionserver.ScannerContext; +import org.apache.hadoop.hbase.regionserver.ScannerContextUtil; import org.apache.hadoop.hbase.util.Bytes; import org.apache.hadoop.hbase.util.Pair; import org.apache.phoenix.cache.GlobalCache; @@ -603,6 +604,7 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { public boolean next(List<Cell> resultsToReturn, ScannerContext scannerContext) throws IOException { boolean hasMore; + boolean returnImmediately = false; long startTime = EnvironmentEdgeManager.currentTimeMillis(); Configuration conf = env.getConfiguration(); final TenantCache tenantCache = GlobalCache.getTenantCache(env, ScanUtil.getTenantId(scan)); @@ -646,6 +648,10 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { resultsToReturn.addAll(results); return true; } + // we got a dummy result from the lower scanner but hasAny is true which means that + // we have a valid result which can be returned to the client instead of a dummy. + // We need to signal the RPC handler to return. + returnImmediately = true; break; } if (!results.isEmpty()) { @@ -702,6 +708,10 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { } while ( hasMore && (EnvironmentEdgeManager.currentTimeMillis() - startTime) < pageSizeMs ); + if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= pageSizeMs) { + // we hit a page scanner timeout, signal the RPC handler to return. + returnImmediately = true; + } if (!mutations.isEmpty()) { if (!isSingleRowDelete) { annotateAndCommit(mutations); @@ -753,6 +763,10 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { } resultsToReturn.add(keyValue); } + if (returnImmediately && scannerContext != null) { + // signal the RPC handler to return + ScannerContextUtil.setReturnImmediately(scannerContext); + } return hasMore; } } diff --git a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java index dfd72201e3..1a43161568 100644 --- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java +++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java @@ -266,6 +266,51 @@ public class ServerPagingIT extends ParallelStatsDisabledIT { } } + @Test + public void testAggregateQuery() throws Exception { + final String tablename = generateUniqueName(); + Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES); + // use a higher timeout value so that we can trigger a page timeout from the scanner + // rather than the page filter + props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(10)); + String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + "k1 INTEGER NOT NULL,\n" + + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n" + + "CONSTRAINT pk PRIMARY KEY (id, k1, k2)) "; + try (Connection conn = DriverManager.getConnection(getUrl(), props)) { + createTestTable(getUrl(), ddl); + String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)"; + PreparedStatement ps = conn.prepareStatement(dml); + int totalRows = 10000; + for (int i = 0; i < totalRows; ++i) { + ps.setString(1, "id_" + i % 3); + ps.setInt(2, i % 20); + ps.setInt(3, i); + ps.setInt(4, i % 10); + ps.setString(5, "val"); + ps.executeUpdate(); + if (i != 0 && i % 100 == 0) { + conn.commit(); + } + } + conn.commit(); + String dql = String.format("SELECT count(*) from %s where id = '%s'", tablename, "id_2"); + try (ResultSet rs = conn.createStatement().executeQuery(dql)) { + assertTrue(rs.next()); + assertEquals(totalRows / 3, rs.getInt(1)); + assertFalse(rs.next()); + assertServerPagingMetric(tablename, rs, false); // no dummy rows + Map<String, Map<MetricType, Long>> metrics = PhoenixRuntime.getRequestReadMetricInfo(rs); + for (Map.Entry<String, Map<MetricType, Long>> entry : metrics.entrySet()) { + Map<MetricType, Long> metricValues = entry.getValue(); + Long rpcCalls = metricValues.get(MetricType.COUNT_RPC_CALLS); + assertNotNull(rpcCalls); + // multiple scan rpcs will be executed for every page timeout + assertTrue(String.format("Got %d", rpcCalls.longValue()), rpcCalls > 1); + } + } + } + } + @Test public void testLimitOffset() throws SQLException { final String tablename = generateUniqueName();