[
https://issues.apache.org/jira/browse/PHOENIX-7106?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17814948#comment-17814948
]
ASF GitHub Bot commented on PHOENIX-7106:
-----------------------------------------
tkhurana commented on code in PR #1736:
URL: https://github.com/apache/phoenix/pull/1736#discussion_r1480385813
##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java:
##########
@@ -411,6 +487,115 @@ private UnorderedGroupByRegionScanner(final
ObserverContext<RegionCoprocessorEnv
@Override
public boolean next(List<Cell> resultsToReturn) throws IOException {
+ if (firstScan && prevScanStartRowKey != null &&
prevScanIncludeStartRowKey != null) {
+ firstScan = false;
+ if (scanStartRowKey.length > 0 &&
!ScanUtil.isLocalIndex(scan)) {
+ if (Bytes.compareTo(prevScanStartRowKey, scanStartRowKey)
!= 0 ||
+ prevScanIncludeStartRowKey != includeStartRowKey) {
+ LOGGER.info("Region has moved.. Prev scan start rowkey
{} is not same as" +
+ " current scan start rowkey {}",
+ Bytes.toStringBinary(prevScanStartRowKey),
+ Bytes.toStringBinary(scanStartRowKey));
+ // If region has moved in the middle of the scan
operation, after resetting
+ // the scanner, hbase client uses (latest received
rowkey + \x00) as new
+ // start rowkey for resuming the scan operation on the
new scanner.
+ if (Bytes.compareTo(
+ ByteUtil.concat(prevScanStartRowKey,
Bytes.toBytesBinary("\\x00")),
+ scanStartRowKey) == 0) {
+
scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY,
+ prevScanStartRowKey);
+ scan.setAttribute(
+
QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE,
+ Bytes.toBytes(prevScanIncludeStartRowKey));
+ } else {
+ // This happens when the server side scanner has
already sent some
+ // rows back to the client and region has moved,
so now we need to
+ // use updateScannerBasedOnPrevRowKey flag and
also reset the scanner
+ // at paging region scanner level to re-read the
previously sent
+ // values in order to re-compute the aggregation
and then return
+ // only the next rowkey that was not yet sent back
to the client.
+ updateScannerBasedOnPrevRowKey = true;
+
scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY,
+ prevScanStartRowKey);
+ scan.setAttribute(
+
QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE,
+ Bytes.toBytes(prevScanIncludeStartRowKey));
+ }
+ }
+ }
+ }
+ boolean moreRows = next0(resultsToReturn);
+ if (ScanUtil.isDummy(resultsToReturn)) {
+ return true;
+ }
+ if (updateScannerBasedOnPrevRowKey) {
+ while (true) {
+ if (!moreRows) {
+ updateScannerBasedOnPrevRowKey = false;
+ if (resultsToReturn.size() > 0) {
+ lastReturnedRowKey =
CellUtil.cloneRow(resultsToReturn.get(0));
+ }
+ return moreRows;
+ }
+ Cell firstCell = resultsToReturn.get(0);
+ byte[] resultRowKey = new byte[firstCell.getRowLength()];
+ System.arraycopy(firstCell.getRowArray(),
firstCell.getRowOffset(),
+ resultRowKey, 0, resultRowKey.length);
+ if (Bytes.compareTo(resultRowKey, scanStartRowKey) == 0) {
+ updateScannerBasedOnPrevRowKey = false;
+ if (includeStartRowKey) {
+ if (resultsToReturn.size() > 0) {
+ lastReturnedRowKey =
CellUtil.cloneRow(resultsToReturn.get(0));
+ }
+ return moreRows;
+ }
+ resultsToReturn.clear();
+ moreRows = next0(resultsToReturn);
+ if (ScanUtil.isDummy(resultsToReturn)) {
+ return true;
+ }
+ if (resultsToReturn.size() > 0) {
+ lastReturnedRowKey =
CellUtil.cloneRow(resultsToReturn.get(0));
+ }
+ return moreRows;
+ } else if (
+ Bytes.compareTo(
+ ByteUtil.concat(resultRowKey,
Bytes.toBytesBinary("\\x00")),
Review Comment:
We can create a static final object for `Bytes.toBytesBinary("\\x00"))`
> Data Integrity issues due to invalid rowkeys returned by various coprocessors
> -----------------------------------------------------------------------------
>
> Key: PHOENIX-7106
> URL: https://issues.apache.org/jira/browse/PHOENIX-7106
> Project: Phoenix
> Issue Type: Improvement
> Affects Versions: 5.2.0, 5.1.4
> Reporter: Viraj Jasani
> Assignee: Viraj Jasani
> Priority: Blocker
> Fix For: 5.2.0, 5.1.4
>
>
> HBase scanner interface expects server to perform scan of the cells from
> HFile or Block cache and return consistent data i.e. rowkey of the cells
> returned should stay in the range of the scan boundaries. When a region moves
> and scanner needs reset, or if the current row is too large and the server
> returns partial row, the subsequent scanner#next is supposed to return
> remaining cells. When this happens, cell rowkeys returned by servers i.e. any
> coprocessors is expected to be in the scan boundary range so that server can
> reliably perform its validation and return remaining cells as expected.
> Phoenix client initiates serial or parallel scans from the aggregators based
> on the region boundaries and the scan boundaries are sometimes adjusted based
> on where optimizer provided key ranges, to include tenant boundaries, salt
> boundaries etc. After the client opens the scanner and performs scan
> operation, some of the coprocs return invalid rowkey for the following cases:
> # Grouped aggregate queries
> # Some Ungrouped aggregate queries
> # Offset queries
> # Dummy cells returned with empty rowkey
> # Update statistics queries
> # Uncovered Index queries
> # Ordered results at server side
> # ORDER BY DESC on rowkey
> # Global Index read-repair
> # Paging region scanner with HBase scanner reopen
> # ORDER BY on non-pk column(s) with/without paging
> # GROUP BY on non-pk column(s) with/without paging
> Since many of these cases return reserved rowkeys, they are likely not going
> to match scan or region boundaries. It has potential to cause data integrity
> issues in certain scenarios as explained above. Empty rowkey returned by
> server can be treated as end of the region scan by HBase client.
> With the paging feature enabled, if the page size is kept low, we have higher
> chances of scanners returning dummy cell, resulting in increased num of RPC
> calls for better latency and timeouts. We should return only valid rowkey in
> the scan range for all the cases where we perform above mentioned operations
> like complex aggregate or offset queries etc.
--
This message was sent by Atlassian Jira
(v8.20.10#820010)