This is an automated email from the ASF dual-hosted git repository.
tkhurana pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/phoenix.git
The following commit(s) were added to refs/heads/master by this push:
new 4abb39f364 PHOENIX-7302 Server Paging doesn't work on scans with limit
4abb39f364 is described below
commit 4abb39f364f19ffe593d8820d6b15cc1613c4904
Author: tkhurana <[email protected]>
AuthorDate: Mon Apr 8 12:40:15 2024 -0700
PHOENIX-7302 Server Paging doesn't work on scans with limit
Fix paging for scans with limit but no other filter
Co-authored-by: Tanuj Khurana <[email protected]>
---
.../apache/phoenix/iterate/SerialIterators.java | 13 +++-
.../java/org/apache/phoenix/util/ScanUtil.java | 11 +++-
.../org/apache/phoenix/end2end/ServerPagingIT.java | 74 ++++++++++++++++++++++
3 files changed, 95 insertions(+), 3 deletions(-)
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
index e439086686..a783c3558a 100644
---
a/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
+++
b/phoenix-core-client/src/main/java/org/apache/phoenix/iterate/SerialIterators.java
@@ -44,10 +44,13 @@ import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.schema.tuple.Tuple;
import org.apache.phoenix.schema.types.PInteger;
import org.apache.phoenix.trace.util.Tracing;
+import org.apache.phoenix.util.EnvironmentEdgeManager;
+import org.apache.phoenix.util.LogUtil;
import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.thirdparty.com.google.common.base.Preconditions;
import org.apache.phoenix.thirdparty.com.google.common.collect.Lists;
+import org.apache.phoenix.util.ScanUtil;
/**
@@ -188,7 +191,15 @@ public class SerialIterators extends BaseResultIterators {
renewLeaseThreshold, plan, scanGrouper,
caches, maxQueryEndTime);
PeekingResultIterator peekingItr =
iteratorFactory.newIterator(context, itr, currentScan, tableName, plan);
Tuple tuple;
- if ((tuple = peekingItr.peek()) == null) {
+ long startTime = EnvironmentEdgeManager.currentTimeMillis();
+ tuple = peekingItr.peek();
+ if (LOGGER.isDebugEnabled()) {
+ LOGGER.debug(LogUtil.addCustomAnnotations("Id: " + scanId
+ ", Time: " +
+
(EnvironmentEdgeManager.currentTimeMillis() - startTime) +
+ "ms, Table: " + tableName + ", Scan: " +
currentScan,
+ ScanUtil.getCustomAnnotations(currentScan)));
+ }
+ if (tuple == null) {
peekingItr.close();
continue;
} else if ((remainingOffset =
QueryUtil.getRemainingOffset(tuple)) != null) {
diff --git
a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
index dd476653b2..c6411c79c5 100644
--- a/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core-client/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1615,14 +1615,21 @@ public class ScanUtil {
public static PageFilter removePageFilter(Scan scan) {
Filter filter = scan.getFilter();
if (filter != null) {
+ PagingFilter pagingFilter = null;
if (filter instanceof PagingFilter) {
- filter = ((PagingFilter) filter).getDelegateFilter();
+ pagingFilter = (PagingFilter) filter;
+ filter = pagingFilter.getDelegateFilter();
if (filter == null) {
return null;
}
}
if (filter instanceof PageFilter) {
- scan.setFilter(null);
+ if (pagingFilter != null) {
+ pagingFilter.setDelegateFilter(null);
+ scan.setFilter(pagingFilter);
+ } else {
+ scan.setFilter(null);
+ }
return (PageFilter) filter;
} else if (filter instanceof FilterList) {
return removePageFilterFromFilterList((FilterList) filter);
diff --git
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
index db19a06ee7..b82ac71454 100644
--- a/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
+++ b/phoenix-core/src/it/java/org/apache/phoenix/end2end/ServerPagingIT.java
@@ -36,6 +36,7 @@ import java.util.Map;
import java.util.Properties;
import org.apache.hadoop.hbase.util.Bytes;
+import org.apache.phoenix.jdbc.PhoenixResultSet;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.types.PDate;
@@ -44,6 +45,7 @@ import org.apache.phoenix.util.ByteUtil;
import org.apache.phoenix.util.DateUtil;
import org.apache.phoenix.util.PhoenixRuntime;
import org.apache.phoenix.util.PropertiesUtil;
+import org.apache.phoenix.util.QueryUtil;
import org.apache.phoenix.util.ReadOnlyProps;
import org.junit.Assert;
import org.junit.BeforeClass;
@@ -77,6 +79,78 @@ public class ServerPagingIT extends ParallelStatsDisabledIT {
assertTrue(GLOBAL_PAGED_ROWS_COUNTER.getMetric().getValue() > 0);
}
+ @Test
+ public void testScanWithLimit() throws Exception {
+ final String tablename = "T_" + generateUniqueName();
+ final String indexName = "I_" + generateUniqueName();
+ Properties props = PropertiesUtil.deepCopy(TEST_PROPERTIES);
+
+ try (Connection conn = DriverManager.getConnection(getUrl(), props)) {
+ String ddl = "create table " + tablename +
+ "(id1 integer not null, id2 integer not null, val varchar
constraint pk primary key (id1, id2))";
+ conn.createStatement().execute(ddl);
+ conn.commit();
+ PreparedStatement stmt = conn.prepareStatement("upsert into " +
tablename + " VALUES(?, ?, ?)");
+ // upsert 50 rows
+ for (int i = 0; i < 5; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, j);
+ stmt.setString(3, "abcdefghijklmnopqrstuvwxyz");
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+ // delete first 40 rows
+ stmt = conn.prepareStatement("delete from " + tablename + " where
id1 = ? and id2 = ?");
+ for (int i = 0; i < 4; ++i) {
+ for (int j = 0; j < 10; ++j) {
+ stmt.setInt(1, i);
+ stmt.setInt(2, j);
+ stmt.executeUpdate();
+ }
+ conn.commit();
+ }
+ int limit = 10;
+ stmt = conn.prepareStatement("select * from " + tablename + "
where id1 >= 3 limit " + limit);
+ try (ResultSet rs = stmt.executeQuery()) {
+ int expectedRowCount = 0;
+ int expectedId1 = 4;
+ int expectedId2 = 0;
+ while (rs.next()) {
+ ++expectedRowCount;
+ assertEquals(expectedId1, rs.getInt(1));
+ assertEquals(expectedId2, rs.getInt(2));
+ expectedId2++;
+ }
+ assertEquals(expectedRowCount, limit);
+ assertServerPagingMetric(tablename, rs, true);
+ }
+
+ ddl = "create index " + indexName + " ON " + tablename + " (id2,
id1) INCLUDE (val)";
+ conn.createStatement().execute(ddl);
+ conn.commit();
+
+ stmt = conn.prepareStatement("select * from " + tablename + "
limit " + limit);
+ try (ResultSet rs = stmt.executeQuery()) {
+ PhoenixResultSet prs = rs.unwrap(PhoenixResultSet.class);
+ String explainPlan =
QueryUtil.getExplainPlan(prs.getUnderlyingIterator());
+ assertTrue(explainPlan.contains(indexName));
+ int expectedRowCount = 0;
+ int expectedId1 = 4;
+ int expectedId2 = 0;
+ while (rs.next()) {
+ ++expectedRowCount;
+ assertEquals(expectedId1, rs.getInt(1));
+ assertEquals(expectedId2, rs.getInt(2));
+ expectedId2++;
+ }
+ assertEquals(expectedRowCount, limit);
+ assertServerPagingMetric(indexName, rs, true);
+ }
+ }
+ }
+
@Test
public void testOrderByNonAggregation() throws Exception {
final String tablename = generateUniqueName();