[ 
https://issues.apache.org/jira/browse/PHOENIX-6207?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17235002#comment-17235002
 ] 

ASF GitHub Bot commented on PHOENIX-6207:
-----------------------------------------

gjacoby126 commented on a change in pull request #947:
URL: https://github.com/apache/phoenix/pull/947#discussion_r526406972



##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
##########
@@ -398,66 +414,84 @@ private RegionScanner 
scanUnordered(ObserverContext<RegionCoprocessorEnvironment
                             (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
         }
         
-        Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
-        boolean useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+        final Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+        final boolean useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
         final boolean spillableEnabled =
                 conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, 
DEFAULT_GROUPBY_SPILLABLE);
         final PTable.QualifierEncodingScheme encodingScheme = 
EncodedColumnsUtil.getQualifierEncodingScheme(scan);
 
-        GroupByCache groupByCache =
+        final GroupByCache groupByCache =
                 GroupByCacheFactory.INSTANCE.newCache(
                         env, ScanUtil.getTenantId(scan), 
ScanUtil.getCustomAnnotations(scan),
                         aggregators, estDistVals);
-        boolean success = false;
-        try {
-            boolean hasMore;
-            Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug(LogUtil.addCustomAnnotations(
-                        "Spillable groupby enabled: " + spillableEnabled,
-                        ScanUtil.getCustomAnnotations(scan)));
-            }
-            Region region = c.getEnvironment().getRegion();
-            boolean acquiredLock = false;
-            try {
-                region.startRegionOperation();
-                acquiredLock = true;
-                synchronized (scanner) {
-                    do {
-                        List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
-                        // Results are potentially returned even when the 
return
-                        // value of s.next is false
-                        // since this is an indication of whether or not there 
are
-                        // more values after the
-                        // ones returned
-                        hasMore = scanner.nextRaw(results);
-                        if (!results.isEmpty()) {
-                            result.setKeyValues(results);
-                            ImmutableBytesPtr key =
-                                TupleUtil.getConcatenatedValue(result, 
expressions);
-                            Aggregator[] rowAggregators = 
groupByCache.cache(key);
-                            // Aggregate values here
-                            aggregators.aggregate(rowAggregators, result);
+        return new BaseRegionScanner(scanner) {
+            RegionScanner regionScanner = null;
+            @Override
+            public boolean next(List<Cell> resultsToReturn) throws IOException 
{
+                boolean hasMore;
+                long startTime = EnvironmentEdgeManager.currentTimeMillis();
+                long now;
+                Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+                if (LOGGER.isDebugEnabled()) {
+                    LOGGER.debug(LogUtil.addCustomAnnotations(
+                            "Spillable groupby enabled: " + spillableEnabled,
+                            ScanUtil.getCustomAnnotations(scan)));
+                }
+                Region region = c.getEnvironment().getRegion();
+                boolean acquiredLock = false;
+                try {
+                    region.startRegionOperation();
+                    acquiredLock = true;
+                    synchronized (scanner) {
+                        if (regionScanner != null) {
+                            return regionScanner.next(resultsToReturn);
                         }
-                    } while (hasMore && groupByCache.size() < limit);
+                        do {
+                            List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();

Review comment:
       tiny nit: good to stick in a newline in there, looks like more than 100 
chars wide. (If it's not, feel free to disregard)

##########
File path: 
phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
##########
@@ -398,66 +414,84 @@ private RegionScanner 
scanUnordered(ObserverContext<RegionCoprocessorEnvironment
                             (int) (Bytes.toInt(estDistValsBytes) * 1.5f));
         }
         
-        Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
-        boolean useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+        final Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+        final boolean useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
         final boolean spillableEnabled =
                 conf.getBoolean(GROUPBY_SPILLABLE_ATTRIB, 
DEFAULT_GROUPBY_SPILLABLE);
         final PTable.QualifierEncodingScheme encodingScheme = 
EncodedColumnsUtil.getQualifierEncodingScheme(scan);
 
-        GroupByCache groupByCache =
+        final GroupByCache groupByCache =
                 GroupByCacheFactory.INSTANCE.newCache(
                         env, ScanUtil.getTenantId(scan), 
ScanUtil.getCustomAnnotations(scan),
                         aggregators, estDistVals);
-        boolean success = false;
-        try {
-            boolean hasMore;
-            Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
-            if (LOGGER.isDebugEnabled()) {
-                LOGGER.debug(LogUtil.addCustomAnnotations(
-                        "Spillable groupby enabled: " + spillableEnabled,
-                        ScanUtil.getCustomAnnotations(scan)));
-            }
-            Region region = c.getEnvironment().getRegion();
-            boolean acquiredLock = false;
-            try {
-                region.startRegionOperation();
-                acquiredLock = true;
-                synchronized (scanner) {
-                    do {
-                        List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
-                        // Results are potentially returned even when the 
return
-                        // value of s.next is false
-                        // since this is an indication of whether or not there 
are
-                        // more values after the
-                        // ones returned
-                        hasMore = scanner.nextRaw(results);
-                        if (!results.isEmpty()) {
-                            result.setKeyValues(results);
-                            ImmutableBytesPtr key =
-                                TupleUtil.getConcatenatedValue(result, 
expressions);
-                            Aggregator[] rowAggregators = 
groupByCache.cache(key);
-                            // Aggregate values here
-                            aggregators.aggregate(rowAggregators, result);
+        return new BaseRegionScanner(scanner) {
+            RegionScanner regionScanner = null;

Review comment:
       Please extract to a named inner class -- 70ish lines is a bit too long 
IMO for an anonymous inner class. 

##########
File path: phoenix-core/src/main/java/org/apache/phoenix/util/ScanUtil.java
##########
@@ -957,4 +961,41 @@ public static int getClientVersion(Scan scan) {
     public static void setClientVersion(Scan scan, int version) {
         scan.setAttribute(BaseScannerRegionObserver.CLIENT_VERSION, 
Bytes.toBytes(version));
     }
+
+    public static void getDummyResult(byte[] rowKey, List<Cell> result) {
+        result.clear();
+        KeyValue keyValue =
+                KeyValueUtil.newKeyValue(rowKey, 0,
+                        rowKey.length, EMPTY_BYTE_ARRAY, EMPTY_BYTE_ARRAY,
+                        0, EMPTY_BYTE_ARRAY, 0, EMPTY_BYTE_ARRAY.length);
+        result.add(keyValue);
+    }
+
+    public static void getDummyResult(List<Cell> result) {
+        getDummyResult(EMPTY_BYTE_ARRAY, result);
+    }
+
+    public static boolean isDummy(Result result) {
+        // Check if the result is a dummy result
+        if (result.rawCells().length != 1) {
+            return false;
+        }
+        Cell cell = result.rawCells()[0];
+        return Bytes.compareTo(cell.getFamilyArray(), cell.getFamilyOffset(), 
cell.getFamilyLength(),

Review comment:
       nit: can replace both checks with CellUtil.matchingColumn




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


> Paged server side grouped aggregate operations
> ----------------------------------------------
>
>                 Key: PHOENIX-6207
>                 URL: https://issues.apache.org/jira/browse/PHOENIX-6207
>             Project: Phoenix
>          Issue Type: Improvement
>    Affects Versions: 5.0.0, 4.14.3
>            Reporter: Kadir OZDEMIR
>            Assignee: Kadir OZDEMIR
>            Priority: Major
>             Fix For: 4.16.0
>
>         Attachments: PHOENIX-6207.4.x.001.patch, PHOENIX-6207.4.x.002.patch, 
> PHOENIX-6207.4.x.003.patch
>
>
> Phoenix provides the option of performing query operations on the client or 
> server side. This is decided by the Phoenix optimizer based on configuration 
> parameters. For the server side option, the table operation is parallelized 
> such that multiple table regions are scanned. However, currently there is no 
> paging capability and the server side operation can take long enough lead to 
> HBase client timeouts. Putting a limit on the number of rows to be processed 
> within a single RPC call (i.e., the next operation on the scanner) on the 
> server side using a Phoenix level paging is highly desirable. This paging 
> mechanism has been already implemented for index rebuild and verification 
> operations and proven to be effective to prevent timeouts. This Jira is for 
> implementing this paging for the server side grouped aggregate operations. 



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to