This is an automated email from the ASF dual-hosted git repository. yanxinyi pushed a commit to branch 4.16 in repository https://gitbox.apache.org/repos/asf/phoenix.git
commit 7ac162da9151094a4a6efe38fa06931dca0dd532 Author: kadirozde <[email protected]> AuthorDate: Wed Jan 27 11:04:27 2021 -0800 PHOENIX-6339 Older client using aggregate queries shows incorrect results (#1111) --- .../coprocessor/BaseScannerRegionObserver.java | 2 +- .../UngroupedAggregateRegionScanner.java | 11 +++++++++-- .../apache/phoenix/iterate/TableResultIterator.java | 21 +++++++++++++-------- .../org/apache/phoenix/query/QueryServices.java | 2 ++ .../apache/phoenix/query/QueryServicesOptions.java | 1 + .../main/java/org/apache/phoenix/util/ScanUtil.java | 9 ++++----- 6 files changed, 30 insertions(+), 16 deletions(-) diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java index 772d1c0..2ad520f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/BaseScannerRegionObserver.java @@ -253,7 +253,7 @@ abstract public class BaseScannerRegionObserver extends BaseRegionObserver { // last possible moment. You need to swap the start/stop and make the // start exclusive and the stop inclusive. ScanUtil.setupReverseScan(scan); - if (!(scan.getFilter() instanceof PagedFilter)) { + if (scan.getFilter() != null && !(scan.getFilter() instanceof PagedFilter)) { byte[] pageSizeMsBytes = scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS); if (pageSizeMsBytes != null) { scan.setFilter(new PagedFilter(scan.getFilter(), getPageSizeMsForFilter(scan))); diff --git a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java index fcee172..c9ae09f 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionScanner.java @@ -29,6 +29,7 @@ import static org.apache.phoenix.coprocessor.UngroupedAggregateRegionObserver.se import static org.apache.phoenix.query.QueryConstants.AGG_TIMESTAMP; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN; import static org.apache.phoenix.query.QueryConstants.SINGLE_COLUMN_FAMILY; +import static org.apache.phoenix.query.QueryConstants.UNGROUPED_AGG_ROW_KEY; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_ATTRIB; import static org.apache.phoenix.query.QueryServices.MUTATE_BATCH_SIZE_BYTES_ATTRIB; import static org.apache.phoenix.query.QueryServices.SOURCE_OPERATION_ATTRIB; @@ -625,8 +626,14 @@ public class UngroupedAggregateRegionScanner extends BaseRegionScanner { Cell cell; if (hasAny) { byte[] value = aggregators.toBytes(rowAggregators); - cell = CellUtil.createCell(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, - AGG_TIMESTAMP, KeyValue.Type.Put.getCode(), value); + if (pageSizeMs == Long.MAX_VALUE) { + // Paging is not set. To be compatible with older clients, do not set the row key + cell = CellUtil.createCell(UNGROUPED_AGG_ROW_KEY, SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + AGG_TIMESTAMP, KeyValue.Type.Put.getCode(), value); + } else { + cell = CellUtil.createCell(CellUtil.cloneRow(lastCell), SINGLE_COLUMN_FAMILY, SINGLE_COLUMN, + AGG_TIMESTAMP, KeyValue.Type.Put.getCode(), value); + } resultsToReturn.add(cell); } return hasMore; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java index da5edbd..e2d9bfa 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/iterate/TableResultIterator.java @@ -55,6 +55,7 @@ import org.apache.phoenix.join.HashCacheClient; import org.apache.phoenix.monitoring.ScanMetricsHolder; import org.apache.phoenix.query.QueryConstants; import org.apache.phoenix.query.QueryServices; +import org.apache.phoenix.query.QueryServicesOptions; import org.apache.phoenix.schema.PTable; import org.apache.phoenix.schema.tuple.Tuple; import org.apache.phoenix.util.ByteUtil; @@ -139,15 +140,19 @@ public class TableResultIterator implements ResultIterator { .getInt(QueryConstants.HASH_JOIN_CACHE_RETRIES, QueryConstants.DEFAULT_HASH_JOIN_CACHE_RETRIES); ScanUtil.setScanAttributesForIndexReadRepair(scan, table, plan.getContext().getConnection()); ScanUtil.setScanAttributesForPhoenixTTL(scan, table, plan.getContext().getConnection()); - long pageSizeMs = plan.getContext().getConnection().getQueryServices().getProps() - .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1); - if (pageSizeMs == -1) { - // Use the half of the HBase RPC timeout value as the the server page size to make sure that the HBase - // region server will be able to send a heartbeat message to the client before the client times out - pageSizeMs = (long) (plan.getContext().getConnection().getQueryServices().getProps() - .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5); + if (plan.getContext().getConnection().getQueryServices().getProps().getBoolean( + QueryServices.PHOENIX_SERVER_PAGING_ENABLED_ATTRIB, + QueryServicesOptions.DEFAULT_PHOENIX_SERVER_PAGING_ENABLED)) { + long pageSizeMs = plan.getContext().getConnection().getQueryServices().getProps() + .getInt(QueryServices.PHOENIX_SERVER_PAGE_SIZE_MS, -1); + if (pageSizeMs == -1) { + // Use the half of the HBase RPC timeout value as the the server page size to make sure that the HBase + // region server will be able to send a heartbeat message to the client before the client times out + pageSizeMs = (long) (plan.getContext().getConnection().getQueryServices().getProps() + .getLong(HConstants.HBASE_RPC_TIMEOUT_KEY, HConstants.DEFAULT_HBASE_RPC_TIMEOUT) * 0.5); + } + scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, Bytes.toBytes(Long.valueOf(pageSizeMs))); } - scan.setAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS, Bytes.toBytes(Long.valueOf(pageSizeMs))); } @Override diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java index faade11..2bf2350 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java @@ -326,6 +326,8 @@ public interface QueryServices extends SQLCloseable { public static final String GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS_ATTRIB = "phoenix.global.index.row.age.threshold.to.delete.ms"; // Enable the IndexRegionObserver Coprocessor public static final String INDEX_REGION_OBSERVER_ENABLED_ATTRIB = "phoenix.index.region.observer.enabled"; + // Enable Phoenix server paging + public static final String PHOENIX_SERVER_PAGING_ENABLED_ATTRIB = "phoenix.server.paging.enabled"; // Enable support for long view index(default is false) public static final String LONG_VIEW_INDEX_ENABLED_ATTRIB = "phoenix.index.longViewIndex.enabled"; // The number of index rows to be rebuild in one RPC call diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java index 9c6ff29..95fb801 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java @@ -341,6 +341,7 @@ public class QueryServicesOptions { public static final long DEFAULT_GLOBAL_INDEX_ROW_AGE_THRESHOLD_TO_DELETE_MS = 7*24*60*60*1000; /* 7 days */ public static final boolean DEFAULT_INDEX_REGION_OBSERVER_ENABLED = true; + public static final boolean DEFAULT_PHOENIX_SERVER_PAGING_ENABLED = true; public static final long DEFAULT_INDEX_REBUILD_PAGE_SIZE_IN_ROWS = 32*1024; public static final boolean DEFAULT_ALLOW_SPLITTABLE_SYSTEM_CATALOG_ROLLBACK = false; diff --git a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java index a9c53e3..f8df8f9 100644 --- a/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java +++ b/phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java @@ -1304,21 +1304,20 @@ public class ScanUtil { * each HBase RegionScanner#next() time which is controlled by PagedFilter is set to 0.3 * SERVER_PAGE_SIZE_MS. * */ - private static long getPageSizeMs(Scan scan) { + private static long getPageSizeMs(Scan scan, double factor) { long pageSizeMs = Long.MAX_VALUE; byte[] pageSizeMsBytes = scan.getAttribute(BaseScannerRegionObserver.SERVER_PAGE_SIZE_MS); if (pageSizeMsBytes != null) { pageSizeMs = Bytes.toLong(pageSizeMsBytes); + pageSizeMs = (long) (pageSizeMs * factor); } return pageSizeMs; } - public static long getPageSizeMsForRegionScanner(Scan scan) { - return (long) (getPageSizeMs(scan) * 0.6); - } + public static long getPageSizeMsForRegionScanner(Scan scan) { return getPageSizeMs(scan, 0.6); } public static long getPageSizeMsForFilter(Scan scan) { - return (long) (getPageSizeMs(scan) * 0.3); + return getPageSizeMs(scan, 0.3); } /**
