tkhurana commented on code in PR #1736:
URL: https://github.com/apache/phoenix/pull/1736#discussion_r1480385813
##########
phoenix-core-server/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java:
##########
@@ -411,6 +487,115 @@ private UnorderedGroupByRegionScanner(final
ObserverContext<RegionCoprocessorEnv
@Override
public boolean next(List<Cell> resultsToReturn) throws IOException {
+ if (firstScan && prevScanStartRowKey != null &&
prevScanIncludeStartRowKey != null) {
+ firstScan = false;
+ if (scanStartRowKey.length > 0 &&
!ScanUtil.isLocalIndex(scan)) {
+ if (Bytes.compareTo(prevScanStartRowKey, scanStartRowKey)
!= 0 ||
+ prevScanIncludeStartRowKey != includeStartRowKey) {
+ LOGGER.info("Region has moved.. Prev scan start rowkey
{} is not same as" +
+ " current scan start rowkey {}",
+ Bytes.toStringBinary(prevScanStartRowKey),
+ Bytes.toStringBinary(scanStartRowKey));
+ // If region has moved in the middle of the scan
operation, after resetting
+ // the scanner, hbase client uses (latest received
rowkey + \x00) as new
+ // start rowkey for resuming the scan operation on the
new scanner.
+ if (Bytes.compareTo(
+ ByteUtil.concat(prevScanStartRowKey,
Bytes.toBytesBinary("\\x00")),
+ scanStartRowKey) == 0) {
+
scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY,
+ prevScanStartRowKey);
+ scan.setAttribute(
+
QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE,
+ Bytes.toBytes(prevScanIncludeStartRowKey));
+ } else {
+ // This happens when the server side scanner has
already sent some
+ // rows back to the client and region has moved,
so now we need to
+ // use updateScannerBasedOnPrevRowKey flag and
also reset the scanner
+ // at paging region scanner level to re-read the
previously sent
+ // values in order to re-compute the aggregation
and then return
+ // only the next rowkey that was not yet sent back
to the client.
+ updateScannerBasedOnPrevRowKey = true;
+
scan.setAttribute(QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY,
+ prevScanStartRowKey);
+ scan.setAttribute(
+
QueryServices.PHOENIX_PAGING_NEW_SCAN_START_ROWKEY_INCLUDE,
+ Bytes.toBytes(prevScanIncludeStartRowKey));
+ }
+ }
+ }
+ }
+ boolean moreRows = next0(resultsToReturn);
+ if (ScanUtil.isDummy(resultsToReturn)) {
+ return true;
+ }
+ if (updateScannerBasedOnPrevRowKey) {
+ while (true) {
+ if (!moreRows) {
+ updateScannerBasedOnPrevRowKey = false;
+ if (resultsToReturn.size() > 0) {
+ lastReturnedRowKey =
CellUtil.cloneRow(resultsToReturn.get(0));
+ }
+ return moreRows;
+ }
+ Cell firstCell = resultsToReturn.get(0);
+ byte[] resultRowKey = new byte[firstCell.getRowLength()];
+ System.arraycopy(firstCell.getRowArray(),
firstCell.getRowOffset(),
+ resultRowKey, 0, resultRowKey.length);
+ if (Bytes.compareTo(resultRowKey, scanStartRowKey) == 0) {
+ updateScannerBasedOnPrevRowKey = false;
+ if (includeStartRowKey) {
+ if (resultsToReturn.size() > 0) {
+ lastReturnedRowKey =
CellUtil.cloneRow(resultsToReturn.get(0));
+ }
+ return moreRows;
+ }
+ resultsToReturn.clear();
+ moreRows = next0(resultsToReturn);
+ if (ScanUtil.isDummy(resultsToReturn)) {
+ return true;
+ }
+ if (resultsToReturn.size() > 0) {
+ lastReturnedRowKey =
CellUtil.cloneRow(resultsToReturn.get(0));
+ }
+ return moreRows;
+ } else if (
+ Bytes.compareTo(
+ ByteUtil.concat(resultRowKey,
Bytes.toBytesBinary("\\x00")),
Review Comment:
We can create a static final object for `Bytes.toBytesBinary("\\x00"))`
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]