This is an automated email from the ASF dual-hosted git repository.

tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git


The following commit(s) were added to refs/heads/master by this push:
     new b47bb8bccb PHOENIX-7701 UngroupedAggregateRegionScanner doesn't 
release rpc handler on page timeouts (#2289)
b47bb8bccb is described below

commit b47bb8bccb5b0853fa6b30878900aaa8cd24a377
Author: tkhurana <khurana.ta...@gmail.com>
AuthorDate: Fri Sep 19 12:55:34 2025 -0700

    PHOENIX-7701 UngroupedAggregateRegionScanner doesn't release rpc handler on 
page timeouts (#2289)
---
 .../hbase/regionserver/ScannerContextUtil.java     |  8 ++++
 .../phoenix/coprocessor/DelegateRegionScanner.java |  5 ++-
 .../UngroupedAggregateRegionScanner.java           | 14 +++++++
 .../org/apache/phoenix/end2end/ServerPagingIT.java | 45 ++++++++++++++++++++++
 4 files changed, 70 insertions(+), 2 deletions(-)

diff --git 
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
 
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
index 64e73461dc..23bf60cde0 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/hadoop/hbase/regionserver/ScannerContextUtil.java
@@ -56,4 +56,12 @@ public class ScannerContextUtil {
   public static void setReturnImmediately(ScannerContext sc) {
     sc.returnImmediately();
   }
+
+  /**
+   * returnImmediately is a private field in ScannerContext and there is no 
getter API on it But the
+   * checkTimeLimit API on the ScannerContext will return true if 
returnImmediately is set
+   */
+  public static boolean checkTimeLimit(ScannerContext sc) {
+    return sc.checkTimeLimit(ScannerContext.LimitScope.BETWEEN_ROWS);
+  }
 }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
index 30c140a374..ad64e3ae58 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/DelegateRegionScanner.java
@@ -106,8 +106,9 @@ public class DelegateRegionScanner implements RegionScanner 
{
       ScannerContext noLimitContext = 
ScannerContextUtil.copyNoLimitScanner(scannerContext);
       boolean hasMore =
         raw ? delegate.nextRaw(result, noLimitContext) : delegate.next(result, 
noLimitContext);
-      if (isDummy(result)) {
-        // when a dummy row is returned by a lower layer, set returnImmediately
+      if (isDummy(result) || 
ScannerContextUtil.checkTimeLimit(noLimitContext)) {
+        // when a dummy row is returned by a lower layer or if the result is 
valid but the lower
+        // layer signals us to return immediately, we need to set 
returnImmediately
         // on the ScannerContext to force HBase to return a response to the 
client
         ScannerContextUtil.setReturnImmediately(scannerContext);
       }
diff --git 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index 8f7c880b31..20ace9a02e 100644
--- 
a/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ 
b/phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -64,6 +64,7 @@ import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.regionserver.Region;
 import org.apache.hadoop.hbase.regionserver.RegionScanner;
 import org.apache.hadoop.hbase.regionserver.ScannerContext;
+import org.apache.hadoop.hbase.regionserver.ScannerContextUtil;
 import org.apache.hadoop.hbase.util.Bytes;
 import org.apache.hadoop.hbase.util.Pair;
 import org.apache.phoenix.cache.GlobalCache;
@@ -603,6 +604,7 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
   public boolean next(List<Cell> resultsToReturn, ScannerContext 
scannerContext)
     throws IOException {
     boolean hasMore;
+    boolean returnImmediately = false;
     long startTime = EnvironmentEdgeManager.currentTimeMillis();
     Configuration conf = env.getConfiguration();
     final TenantCache tenantCache = GlobalCache.getTenantCache(env, 
ScanUtil.getTenantId(scan));
@@ -646,6 +648,10 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
                 resultsToReturn.addAll(results);
                 return true;
               }
+              // we got a dummy result from the lower scanner but hasAny is 
true which means that
+              // we have a valid result which can be returned to the client 
instead of a dummy.
+              // We need to signal the RPC handler to return.
+              returnImmediately = true;
               break;
             }
             if (!results.isEmpty()) {
@@ -702,6 +708,10 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
           } while (
             hasMore && (EnvironmentEdgeManager.currentTimeMillis() - 
startTime) < pageSizeMs
           );
+          if (EnvironmentEdgeManager.currentTimeMillis() - startTime >= 
pageSizeMs) {
+            // we hit a page scanner timeout, signal the RPC handler to return.
+            returnImmediately = true;
+          }
           if (!mutations.isEmpty()) {
             if (!isSingleRowDelete) {
               annotateAndCommit(mutations);
@@ -753,6 +763,10 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
         }
         resultsToReturn.add(keyValue);
       }
+      if (returnImmediately && scannerContext != null) {
+        // signal the RPC handler to return
+        ScannerContextUtil.setReturnImmediately(scannerContext);
+      }
       return hasMore;
     }
   }
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
index dfd72201e3..1a43161568 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -266,6 +266,51 @@ public class ServerPagingIT extends 
ParallelStatsDisabledIT {
     }
   }
 
+  @Test
+  public void testAggregateQuery() throws Exception {
+    final String tablename = generateUniqueName();
+    Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+    // use a higher timeout value so that we can trigger a page timeout from 
the scanner
+    // rather than the page filter
+    props.put(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, Long.toString(10));
+    String ddl = "CREATE TABLE " + tablename + " (id VARCHAR NOT NULL,\n" + 
"k1 INTEGER NOT NULL,\n"
+      + "k2 INTEGER NOT NULL,\n" + "k3 INTEGER,\n" + "v1 VARCHAR,\n"
+      + "CONSTRAINT pk PRIMARY KEY (id, k1, k2)) ";
+    try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+      createTestTable(getUrl(), ddl);
+      String dml = "UPSERT INTO " + tablename + " VALUES(?, ?, ?, ?, ?)";
+      PreparedStatement ps = conn.prepareStatement(dml);
+      int totalRows = 10000;
+      for (int i = 0; i < totalRows; ++i) {
+        ps.setString(1, "id_" + i % 3);
+        ps.setInt(2, i % 20);
+        ps.setInt(3, i);
+        ps.setInt(4, i % 10);
+        ps.setString(5, "val");
+        ps.executeUpdate();
+        if (i != 0 && i % 100 == 0) {
+          conn.commit();
+        }
+      }
+      conn.commit();
+      String dql = String.format("SELECT count(*) from %s where id = '%s'", 
tablename, "id_2");
+      try (ResultSet rs = conn.createStatement().executeQuery(dql)) {
+        assertTrue(rs.next());
+        assertEquals(totalRows / 3, rs.getInt(1));
+        assertFalse(rs.next());
+        assertServerPagingMetric(tablename, rs, false); // no dummy rows
+        Map<String, Map<MetricType, Long>> metrics = 
PhoenixRuntime.getRequestReadMetricInfo(rs);
+        for (Map.Entry<String, Map<MetricType, Long>> entry : 
metrics.entrySet()) {
+          Map<MetricType, Long> metricValues = entry.getValue();
+          Long rpcCalls = metricValues.get(MetricType.COUNT_RPC_CALLS);
+          assertNotNull(rpcCalls);
+          // multiple scan rpcs will be executed for every page timeout
+          assertTrue(String.format("Got %d", rpcCalls.longValue()), rpcCalls > 
1);
+        }
+      }
+    }
+  }
+
   @Test
   public void testLimitOffset() throws SQLException {
     final String tablename = generateUniqueName();

Reply via email to