This is an automated email from the ASF dual-hosted git repository.

yanxinyi pushed a commit to branch 4.16
in repository https://gitbox.apache.org/repos/asf/phoenix.git

commit 7ac162da9151094a4a6efe38fa06931dca0dd532
Author: kadirozde <[email protected]>
AuthorDate: Wed Jan 27 11:04:27 2021 -0800

    PHOENIX-6339 Older client using aggregate queries shows incorrect results 
(#1111)
---
 .../coprocessor/BaseScannerRegionObserver.java      |  2 +-
 .../UngroupedAggregateRegionScanner.java            | 11 +++++++++--
 .../apache/phoenix/iterate/TableResultIterator.java | 21 +++++++++++++--------
 .../org/apache/phoenix/query/QueryServices.java     |  2 ++
 .../apache/phoenix/query/QueryServicesOptions.java  |  1 +
 .../main/java/org/apache/phoenix/util/ScanUtil.java |  9 ++++-----
 6 files changed, 30 insertions(+), 16 deletions(-)

diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
index 772d1c0..2ad520f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java
@@ -253,7 +253,7 @@ abstract public class BaseScannerRegionObserver extends 
BaseRegionObserver {
             // last possible moment. You need to swap the start/stop and make 
the
             // start exclusive and the stop inclusive.
             ScanUtil.setupReverseScan(scan);
-            if (!(scan.getFilter() instanceof PagedFilter)) {
+            if (scan.getFilter() != null && !(scan.getFilter() instanceof 
PagedFilter)) {
                 byte[] pageSizeMsBytes = 
scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS);
                 if (pageSizeMsBytes != null) {
                     scan.setFilter(new PagedFilter(scan.getFilter(), 
getPageSizeMsForFilter(scan)));
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
index fcee172..c9ae09f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java
@@ -29,6 +29,7 @@ import static 
org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.se
 import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN;
 import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY;
+import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY;
 import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB;
 import static 
org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB;
 import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB;
@@ -625,8 +626,14 @@ public class UngroupedAggregateRegionScanner extends 
BaseRegionScanner {
             Cell cell;
             if (hasAny) {
                 byte[] value = aggregators.toBytes(rowAggregators);
-                cell = CellUtil.createCell(CellUtil.cloneRow(lastCell), 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
-                        AGG_TIMESTAMP, KeyValue.Type.Put.getCode(), value);
+                if (pageSizeMs == Long.MAX_VALUE) {
+                    // Paging is not set. To be compatible with older clients, 
do not set the row key
+                    cell = CellUtil.createCell(UNGROUPED_AGG_ROW_KEY, 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                            AGG_TIMESTAMP, KeyValue.Type.Put.getCode(), value);
+                } else {
+                    cell = CellUtil.createCell(CellUtil.cloneRow(lastCell), 
SINGLE_COLUMN_FAMILY, SINGLE_COLUMN,
+                            AGG_TIMESTAMP, KeyValue.Type.Put.getCode(), value);
+                }
                 resultsToReturn.add(cell);
             }
             return hasMore;
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
index da5edbd..e2d9bfa 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java
@@ -55,6 +55,7 @@ import org.apache.phoenix.join.HashCacheClient;
 import org.apache.phoenix.monitoring.ScanMetricsHolder;
 import org.apache.phoenix.query.QueryConstants;
 import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.PTable;
 import org.apache.phoenix.schema.tuple.Tuple;
 import org.apache.phoenix.util.ByteUtil;
@@ -139,15 +140,19 @@ public class TableResultIterator implements 
ResultIterator {
                 .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, 
QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES);
         ScanUtil.setScanAttributesForIndexReadRepair(scan, table, 
plan.getContext().getConnection());
         ScanUtil.setScanAttributesForPhoenixTTL(scan, table, 
plan.getContext().getConnection());
-        long pageSizeMs = 
plan.getContext().getConnection().getQueryServices().getProps()
-                .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1);
-        if (pageSizeMs == -1) {
-            // Use the half of the HBase RPC timeout value as the the server 
page size to make sure that the HBase
-            // region server will be able to send a heartbeat message to the 
client before the client times out
-            pageSizeMs = (long) 
(plan.getContext().getConnection().getQueryServices().getProps()
-                    .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5);
+        if 
(plan.getContext().getConnection().getQueryServices().getProps().getBoolean(
+                QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB,
+                QueryServicesOptions.DEFAULT_PHOENIX_SERVER_PAGING_ENABLED)) {
+            long pageSizeMs = 
plan.getContext().getConnection().getQueryServices().getProps()
+                    .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1);
+            if (pageSizeMs == -1) {
+                // Use the half of the HBase RPC timeout value as the the 
server page size to make sure that the HBase
+                // region server will be able to send a heartbeat message to 
the client before the client times out
+                pageSizeMs = (long) 
(plan.getContext().getConnection().getQueryServices().getProps()
+                        .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, 
HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5);
+            }
+            scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, 
Bytes.toBytes(Long.valueOf(pageSizeMs)));
         }
-        scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, 
Bytes.toBytes(Long.valueOf(pageSizeMs)));
     }
 
     @Override
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index faade11..2bf2350 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -326,6 +326,8 @@ public interface QueryServices extends SQLCloseable {
     public static final String 
GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB = 
"phoenix.global.index.row.age.threshold.to.delete.ms";
     // Enable the IndexRegionObserver Coprocessor
     public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = 
"phoenix.index.region.observer.enabled";
+    // Enable Phoenix server paging
+    public static final String PHOENIX_SERVER_PAGING_ENABLED_ATTRIB = 
"phoenix.server.paging.enabled";
     // Enable support for long view index(default is false)
     public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = 
"phoenix.index.longViewIndex.enabled";
     // The number of index rows to be rebuild in one RPC call
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 9c6ff29..95fb801 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -341,6 +341,7 @@ public class QueryServicesOptions {
 
     public static final long 
DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 
days */
     public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true;
+    public static final boolean DEFAULT_PHOENIX_SERVER_PAGING_ENABLED = true;
     public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024;
     public static final boolean 
DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false;
 
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java 
b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
index a9c53e3..f8df8f9 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
@@ -1304,21 +1304,20 @@ public class ScanUtil {
      * each HBase RegionScanner#next() time which is controlled by PagedFilter 
is set to 0.3 * SERVER_PAGE_SIZE_MS.
      *
      */
-    private static long getPageSizeMs(Scan scan) {
+    private static long getPageSizeMs(Scan scan, double factor) {
         long pageSizeMs = Long.MAX_VALUE;
         byte[] pageSizeMsBytes = 
scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS);
         if (pageSizeMsBytes != null) {
             pageSizeMs = Bytes.toLong(pageSizeMsBytes);
+            pageSizeMs = (long) (pageSizeMs * factor);
         }
         return pageSizeMs;
     }
 
-    public static long getPageSizeMsForRegionScanner(Scan scan) {
-        return (long) (getPageSizeMs(scan) * 0.6);
-    }
+    public static long getPageSizeMsForRegionScanner(Scan scan)  { return 
getPageSizeMs(scan, 0.6); }
 
     public static long getPageSizeMsForFilter(Scan scan) {
-        return (long) (getPageSizeMs(scan) * 0.3);
+        return getPageSizeMs(scan, 0.3);
     }
 
     /**

Reply via email to