This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch 5.2
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/5.2 by this push:
new 8c2c4ffc73 PHOENIX-7701 UngroupedAggregateRegionScanner doesn't
release rpc handler on page timeouts (#2289)
8c2c4ffc73 is described below
commit 8c2c4ffc7339666dab783fc65aad0e8a9c33e193
Author: tkhurana <[email protected]>
AuthorDate: Fri Sep 19 12:55:34 2025 -0700
PHOENIX-7701 UngroupedAggregateRegionScanner doesn't release rpc handler on
page timeouts (#2289)
---
.../hbase/regionserver/ScannerContextUtil.java | 8 ++++
.../phoenix/coprocessor/DelegateRegionScanner.java | 5 ++-
.../UngroupedAggregateRegionScanner.java | 14 +++++++
.../org/apache/phoenix/end2end/ServerPagingIT.java | 45 ++++++++++++++++++++++
4 files changed, 70 insertions(+), 2 deletions(-)
diff --git
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
index 64e73461dc..23bf60cde0 100644
---
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
+++
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
@@ -56,4 +56,12 @@ public class ScannerContextUtil {
public static void setReturnImmediately(ScannerContext sc) {
sc.returnImmediately();
}
+
+ /**
+ * returnImmediately is a private field in ScannerContext and there is no
getter API on it But the
+ * checkTimeLimit API on the ScannerContext will return true if
returnImmediately is set
+ */
+ public static boolean checkTimeLimit(ScannerContext sc) {
+ return sc.checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
+ }
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index 30c140a374..ad64e3ae58 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -106,8 +106,9 @@ public class DelegateRegionScanner implements RegionScanner
{
ScannerContext noLimitContext =
ScannerContextUtil.copyNoLimitScanner(scannerContext);
boolean hasMore =
raw ? delegate.nextRaw(result, noLimitContext) : delegate.next(result,
noLimitContext);
- if (isDummy(result)) {
- // when a dummy row is returned by a lower layer, set returnImmediately
+ if (isDummy(result) ||
ScannerContextUtil.checkTimeLimit(noLimitContext)) {
+ // when a dummy row is returned by a lower layer or if the result is
valid but the lower
+ // layer signals us to return immediately, we need to set
returnImmediately
// on the ScannerContext to force HBase to return a response to the
client
ScannerContextUtil.setReturnImmediately(scannerContext);
}
diff --git
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index ec21ddb869..fd8951fe91 100644
---
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -63,6 +63,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
import org.apache.hadoop.hbase.regionserver.Region;
import org.apache.hadoop.hbase.regionserver.RegionScanner;
import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.phoenix.cache.GlobalCache;
@@ -582,6 +583,7 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
public boolean next(List<Cell> resultsToReturn, ScannerContext
scannerContext)
throws IOException {
boolean hasMore;
+ boolean returnImmediately = false;
long startTime = EnvironmentEdgeManager.currentTimeMillis();
Configuration conf = env.getConfiguration();
final TenantCache tenantCache = GlobalCache.getTenantCache(env,
ScanUtil.getTenantId(scan));
@@ -624,6 +626,10 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
resultsToReturn.addAll(results);
return true;
}
+ // we got a dummy result from the lower scanner but hasAny is
true which means that
+ // we have a valid result which can be returned to the client
instead of a dummy.
+ // We need to signal the RPC handler to return.
+ returnImmediately = true;
break;
}
if (!results.isEmpty()) {
@@ -676,6 +682,10 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
} while (
hasMore && (EnvironmentEdgeManager.currentTimeMillis() -
startTime) < pageSizeMs
);
+ if (EnvironmentEdgeManager.currentTimeMillis() - startTime >=
pageSizeMs) {
+ // we hit a page scanner timeout, signal the RPC handler to return.
+ returnImmediately = true;
+ }
if (!mutations.isEmpty()) {
annotateAndCommit(mutations);
}
@@ -717,6 +727,10 @@ public class UngroupedAggregateRegionScanner extends
BaseRegionScanner {
}
resultsToReturn.add(keyValue);
}
+ if (returnImmediately && scannerContext != null) {
+ // signal the RPC handler to return
+ ScannerContextUtil.setReturnImmediately(scannerContext);
+ }
return hasMore;
}
}
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
index dfd72201e3..1a43161568 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -266,6 +266,51 @@ public class ServerPagingIT extends
ParallelStatsDisabledIT {
}
}
+ @Test
+ public void testAggregateQuery() throws Exception {
+ final String tablename = generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+ // use a higher timeout value so that we can trigger a page timeout from
the scanner
+ // rather than the page filter
+ props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(10));
+ String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" +
"k1 INTEGER NOT NULL,\n"
+ + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+ + "CONSTRAINT pk PRIMARY KEY (id, k1, k2)) ";
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ createTestTable(getUrl(), ddl);
+ String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+ PreparedStatement ps = conn.prepareStatement(dml);
+ int totalRows = 10000;
+ for (int i = 0; i < totalRows; ++i) {
+ ps.setString(1, "id_" + i % 3);
+ ps.setInt(2, i % 20);
+ ps.setInt(3, i);
+ ps.setInt(4, i % 10);
+ ps.setString(5, "val");
+ ps.executeUpdate();
+ if (i != 0 && i % 100 == 0) {
+ conn.commit();
+ }
+ }
+ conn.commit();
+ String dql = String.format("SELECT count(*) from %s where id = '%s'",
tablename, "id_2");
+ try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+ assertTrue(rs.next());
+ assertEquals(totalRows / 3, rs.getInt(1));
+ assertFalse(rs.next());
+ assertServerPagingMetric(tablename, rs, false); // no dummy rows
+ Map<String, Map<MetricType, Long>> metrics =
PhoenixRuntime.getRequestReadMetricInfo(rs);
+ for (Map.Entry<String, Map<MetricType, Long>> entry :
metrics.entrySet()) {
+ Map<MetricType, Long> metricValues = entry.getValue();
+ Long rpcCalls = metricValues.get(MetricType.COUNT_RPC_CALLS);
+ assertNotNull(rpcCalls);
+ // multiple scan rpcs will be executed for every page timeout
+ assertTrue(String.format("Got %d", rpcCalls.longValue()), rpcCalls >
1);
+ }
+ }
+ }
+ }
+
@Test
public void testLimitOffset() throws SQLException {
final String tablename = generateUniqueName();