PHOENIX-4718 Decrease overhead of tracking aggregate heap size

Project: http://git-wip-us.apache.org/repos/asf/phoenix/repo
Commit: http://git-wip-us.apache.org/repos/asf/phoenix/commit/ea6771df
Tree: http://git-wip-us.apache.org/repos/asf/phoenix/tree/ea6771df
Diff: http://git-wip-us.apache.org/repos/asf/phoenix/diff/ea6771df

Branch: refs/heads/4.x-cdh5.12
Commit: ea6771df9cb1110cdf8d0bf8580d454cb86d395a
Parents: 540d5a9
Author: James Taylor <jtay...@salesforce.com>
Authored: Mon Apr 30 22:03:38 2018 -0700
Committer: James Taylor <jtay...@salesforce.com>
Committed: Tue May 1 11:30:11 2018 -0700

----------------------------------------------------------------------
 .../phoenix/end2end/SpillableGroupByIT.java     | 17 ++--
 .../GroupedAggregateRegionObserver.java         | 95 ++++++++++----------
 .../UngroupedAggregateRegionObserver.java       | 48 +++++-----
 .../phoenix/execute/ClientAggregatePlan.java    |  2 +-
 .../expression/aggregator/Aggregator.java       |  9 +-
 .../expression/aggregator/Aggregators.java      |  3 +-
 .../expression/aggregator/BaseAggregator.java   |  4 +
 .../aggregator/ClientAggregators.java           |  3 +-
 .../DistinctValueWithCountServerAggregator.java | 15 ++--
 .../NonSizeTrackingServerAggregators.java       | 42 +++++++++
 .../aggregator/ServerAggregators.java           | 42 +++++----
 .../SizeTrackingServerAggregators.java          | 59 ++++++++++++
 .../org/apache/phoenix/query/QueryServices.java |  1 +
 .../phoenix/query/QueryServicesOptions.java     |  1 +
 .../phoenix/compile/QueryCompilerTest.java      |  4 +-
 .../phoenix/query/QueryServicesTestImpl.java    |  2 +
 16 files changed, 231 insertions(+), 116 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
index 3689c4c..21b2ac9 100644
--- 
a/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
+++ 
b/phoenix-core/src/it/java/org/apache/phoenix/end2end/SpillableGroupByIT.java
@@ -53,9 +53,9 @@ public class SpillableGroupByIT extends BaseOwnClusterIT {
 
     private static final int NUM_ROWS_INSERTED = 1000;
     
-    // covers: COUNT, COUNT(DISTINCT) SUM, AVG, MIN, MAX 
+    // covers: COUNT, SUM, AVG, MIN, MAX 
     private static String GROUPBY1 = "select "
-            + "count(*), count(distinct uri), sum(appcpu), avg(appcpu), uri, 
min(id), max(id) from %s "
+            + "count(*), sum(appcpu), avg(appcpu), uri, min(id), max(id) from 
%s "
             + "group by uri";
     
     private static String GROUPBY2 = "select count(distinct uri) from %s";
@@ -135,13 +135,12 @@ public class SpillableGroupByIT extends BaseOwnClusterIT {
 
         int count = 0;
         while (rs.next()) {
-            String uri = rs.getString(5);
+            String uri = rs.getString(4);
             assertEquals(2, rs.getInt(1));
-            assertEquals(1, rs.getInt(2));
-            assertEquals(20, rs.getInt(3));
-            assertEquals(10, rs.getInt(4));
-            int a = Integer.valueOf(rs.getString(6)).intValue();
-            int b = Integer.valueOf(rs.getString(7)).intValue();
+            assertEquals(20, rs.getInt(2));
+            assertEquals(10, rs.getInt(3));
+            int a = Integer.valueOf(rs.getString(5)).intValue();
+            int b = Integer.valueOf(rs.getString(6)).intValue();
             assertEquals(Integer.valueOf(uri).intValue(), Math.min(a, b));
             assertEquals(NUM_ROWS_INSERTED / 2 + Integer.valueOf(uri), 
Math.max(a, b));
             count++;
@@ -206,4 +205,4 @@ public class SpillableGroupByIT extends BaseOwnClusterIT {
         }
 
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
index 201bcec..a6fa6a5 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/GroupedAggregateRegionObserver.java
@@ -124,53 +124,56 @@ public class GroupedAggregateRegionObserver extends 
BaseScannerRegionObserver {
         }
 
         List<Expression> expressions = 
deserializeGroupByExpressions(expressionBytes, 0);
-        ServerAggregators aggregators =
-                ServerAggregators.deserialize(scan
-                        .getAttribute(BaseScannerRegionObserver.AGGREGATORS), c
-                        .getEnvironment().getConfiguration());
-
-        RegionScanner innerScanner = s;
-        boolean useProto = false;
-        byte[] localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
-        useProto = localIndexBytes != null;
-        if (localIndexBytes == null) {
-            localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
-        }
-        List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes, useProto);
-        TupleProjector tupleProjector = null;
-        byte[][] viewConstants = null;
-        ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
-
-        final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
-        final HashJoinInfo j = HashJoinInfo.deserializeHashJoinFromScan(scan);
-        boolean useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
-        if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) {
-            if (dataColumns != null) {
-                tupleProjector = IndexUtil.getTupleProjector(scan, 
dataColumns);
-                viewConstants = 
IndexUtil.deserializeViewConstantsFromScan(scan);
+        final TenantCache tenantCache = 
GlobalCache.getTenantCache(c.getEnvironment(), ScanUtil.getTenantId(scan));
+        try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
+            ServerAggregators aggregators =
+                    ServerAggregators.deserialize(scan
+                            
.getAttribute(BaseScannerRegionObserver.AGGREGATORS), c
+                            .getEnvironment().getConfiguration(), em);
+    
+            RegionScanner innerScanner = s;
+            boolean useProto = false;
+            byte[] localIndexBytes = 
scan.getAttribute(LOCAL_INDEX_BUILD_PROTO);
+            useProto = localIndexBytes != null;
+            if (localIndexBytes == null) {
+                localIndexBytes = scan.getAttribute(LOCAL_INDEX_BUILD);
+            }
+            List<IndexMaintainer> indexMaintainers = localIndexBytes == null ? 
null : IndexMaintainer.deserialize(localIndexBytes, useProto);
+            TupleProjector tupleProjector = null;
+            byte[][] viewConstants = null;
+            ColumnReference[] dataColumns = 
IndexUtil.deserializeDataTableColumnsToJoin(scan);
+    
+            final TupleProjector p = 
TupleProjector.deserializeProjectorFromScan(scan);
+            final HashJoinInfo j = 
HashJoinInfo.deserializeHashJoinFromScan(scan);
+            boolean useQualifierAsIndex = 
EncodedColumnsUtil.useQualifierAsIndex(EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan));
+            if (ScanUtil.isLocalIndex(scan) || (j == null && p != null)) {
+                if (dataColumns != null) {
+                    tupleProjector = IndexUtil.getTupleProjector(scan, 
dataColumns);
+                    viewConstants = 
IndexUtil.deserializeViewConstantsFromScan(scan);
+                }
+                ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
+                innerScanner =
+                        getWrappedScanner(c, innerScanner, offset, scan, 
dataColumns, tupleProjector, 
+                                c.getEnvironment().getRegion(), 
indexMaintainers == null ? null : indexMaintainers.get(0), viewConstants, p, 
tempPtr, useQualifierAsIndex);
+            } 
+    
+            if (j != null) {
+                innerScanner =
+                        new HashJoinRegionScanner(innerScanner, p, j, 
ScanUtil.getTenantId(scan),
+                                c.getEnvironment(), useQualifierAsIndex, 
useNewValueColumnQualifier);
+            }
+    
+            long limit = Long.MAX_VALUE;
+            byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT);
+            if (limitBytes != null) {
+                limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, 
SortOrder.getDefault());
+            }
+            if (keyOrdered) { // Optimize by taking advantage that the rows are
+                              // already in the required group by key order
+                return scanOrdered(c, scan, innerScanner, expressions, 
aggregators, limit);
+            } else { // Otherwse, collect them all up in an in memory map
+                return scanUnordered(c, scan, innerScanner, expressions, 
aggregators, limit);
             }
-            ImmutableBytesPtr tempPtr = new ImmutableBytesPtr();
-            innerScanner =
-                    getWrappedScanner(c, innerScanner, offset, scan, 
dataColumns, tupleProjector, 
-                            c.getEnvironment().getRegion(), indexMaintainers 
== null ? null : indexMaintainers.get(0), viewConstants, p, tempPtr, 
useQualifierAsIndex);
-        } 
-
-        if (j != null) {
-            innerScanner =
-                    new HashJoinRegionScanner(innerScanner, p, j, 
ScanUtil.getTenantId(scan),
-                            c.getEnvironment(), useQualifierAsIndex, 
useNewValueColumnQualifier);
-        }
-
-        long limit = Long.MAX_VALUE;
-        byte[] limitBytes = scan.getAttribute(GROUP_BY_LIMIT);
-        if (limitBytes != null) {
-            limit = PInteger.INSTANCE.getCodec().decodeInt(limitBytes, 0, 
SortOrder.getDefault());
-        }
-        if (keyOrdered) { // Optimize by taking advantage that the rows are
-                          // already in the required group by key order
-            return scanOrdered(c, scan, innerScanner, expressions, 
aggregators, limit);
-        } else { // Otherwse, collect them all up in an in memory map
-            return scanUnordered(c, scan, innerScanner, expressions, 
aggregators, limit);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
index abdcf72..6bee65c 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/coprocessor/UngroupedAggregateRegionObserver.java
@@ -94,7 +94,6 @@ import 
org.apache.phoenix.hbase.index.exception.IndexWriteException;
 import org.apache.phoenix.hbase.index.util.GenericKeyValueBuilder;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.hbase.index.util.KeyValueBuilder;
-import org.apache.phoenix.hbase.index.write.IndexWriterUtils;
 import org.apache.phoenix.index.IndexMaintainer;
 import org.apache.phoenix.index.PhoenixIndexCodec;
 import org.apache.phoenix.index.PhoenixIndexFailurePolicy;
@@ -516,31 +515,33 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             maxBatchSizeBytes = conf.getLong(MUTATE_BATCH_SIZE_BYTES_ATTRIB,
                 QueryServicesOptions.DEFAULT_MUTATE_BATCH_SIZE_BYTES);
         }
-        Aggregators aggregators = ServerAggregators.deserialize(
-                scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
conf);
-        Aggregator[] rowAggregators = aggregators.getAggregators();
         boolean hasMore;
-        boolean hasAny = false;
-        Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
-        Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
-        if (logger.isDebugEnabled()) {
-            logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " "+region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
-        }
         int rowCount = 0;
-        final RegionScanner innerScanner = theScanner;
-        boolean useIndexProto = true;
-        byte[] indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
-        // for backward compatiblity fall back to look by the old attribute
-        if (indexMaintainersPtr == null) {
-            indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
-            useIndexProto = false;
-        }
-
-        byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
+        boolean hasAny = false;
         boolean acquiredLock = false;
         boolean incrScanRefCount = false;
+        Aggregators aggregators = null;
+        Aggregator[] rowAggregators = null;
+        final RegionScanner innerScanner = theScanner;
         final TenantCache tenantCache = GlobalCache.getTenantCache(env, 
ScanUtil.getTenantId(scan));
         try (MemoryChunk em = tenantCache.getMemoryManager().allocate(0)) {
+            aggregators = ServerAggregators.deserialize(
+                    scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
conf, em);
+            rowAggregators = aggregators.getAggregators();
+            Pair<Integer, Integer> minMaxQualifiers = 
EncodedColumnsUtil.getMinMaxQualifiersFromScan(scan);
+            Tuple result = useQualifierAsIndex ? new 
PositionBasedMultiKeyValueTuple() : new MultiKeyValueTuple();
+            if (logger.isDebugEnabled()) {
+                logger.debug(LogUtil.addCustomAnnotations("Starting ungrouped 
coprocessor scan " + scan + " "+region.getRegionInfo(), 
ScanUtil.getCustomAnnotations(scan)));
+            }
+            boolean useIndexProto = true;
+            byte[] indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_PROTO_MD);
+            // for backward compatiblity fall back to look by the old attribute
+            if (indexMaintainersPtr == null) {
+                indexMaintainersPtr = 
scan.getAttribute(PhoenixIndexCodec.INDEX_MD);
+                useIndexProto = false;
+            }
+    
+            byte[] clientVersionBytes = 
scan.getAttribute(BaseScannerRegionObserver.CLIENT_VERSION);
             if(needToWrite) {
                 synchronized (lock) {
                     if (isRegionClosingOrSplitting) {
@@ -553,7 +554,6 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
             }
             region.startRegionOperation();
             acquiredLock = true;
-            long size = 0;
             synchronized (innerScanner) {
                 do {
                     List<Cell> results = useQualifierAsIndex ? new 
EncodedColumnQualiferCellsList(minMaxQualifiers.getFirst(), 
minMaxQualifiers.getSecond(), encodingScheme) : new ArrayList<Cell>();
@@ -785,11 +785,7 @@ public class UngroupedAggregateRegionObserver extends 
BaseScannerRegionObserver
                             commitBatch(region, indexMutations, 
blockingMemStoreSize);
                             indexMutations.clear();
                         }
-                        size += aggregators.aggregate(rowAggregators, result);
-                        while(size > em.getSize()) {
-                            logger.info("Request: {}, resizing {} by 
1024*1024", size, em.getSize());
-                            em.resize(em.getSize() + 1024*1024);
-                        }
+                        aggregators.aggregate(rowAggregators, result);
                         hasAny = true;
                     }
                 } while (hasMore);

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
index 21cbc2d..c306aca 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/execute/ClientAggregatePlan.java
@@ -89,7 +89,7 @@ public class ClientAggregatePlan extends ClientProcessingPlan 
{
         // aggregators. We use the Configuration directly here to avoid the 
expense of creating
         // another one.
         this.serverAggregators = 
ServerAggregators.deserialize(context.getScan()
-                        .getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
context.getConnection().getQueryServices().getConfiguration());
+                        .getAttribute(BaseScannerRegionObserver.AGGREGATORS), 
context.getConnection().getQueryServices().getConfiguration(), null);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java
index e436deb..6e57025 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregator.java
@@ -18,7 +18,6 @@
 package org.apache.phoenix.expression.aggregator;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
-
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.schema.tuple.Tuple;
 
@@ -43,4 +42,12 @@ public interface Aggregator extends Expression {
      * Get the size in bytes
      */
     public int getSize();
+    
+    /**
+     * Determines whether or not we should track the heap size as
+     * this aggregator is executing on the server side.
+     * @return true if the size should be tracked and false
+     * otherwise.
+     */
+    public boolean trackSize();
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
index 2f6e6ee..b1dc658 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/Aggregators.java
@@ -19,7 +19,6 @@ package org.apache.phoenix.expression.aggregator;
 
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.phoenix.expression.function.SingleAggregateFunction;
-import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
 import org.apache.phoenix.schema.KeyValueSchema;
 import org.apache.phoenix.schema.KeyValueSchema.KeyValueSchemaBuilder;
 import org.apache.phoenix.schema.ValueBitSet;
@@ -83,7 +82,7 @@ abstract public class Aggregators {
      * Aggregate over aggregators
      * @param result the single row Result from scan iteration
      */
-    abstract public long aggregate(Aggregator[] aggregators, Tuple result);
+    abstract public void aggregate(Aggregator[] aggregators, Tuple result);
 
     protected static int calculateSize(Aggregator[] aggregators) {
         

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
index 9b673bf..16ce588 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/BaseAggregator.java
@@ -63,4 +63,8 @@ public abstract class BaseAggregator extends 
BaseTerminalExpression implements A
         return null;
     }
 
+    @Override
+    public boolean trackSize() {
+        return false;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
index f1ed2a9..54d5690 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ClientAggregators.java
@@ -50,7 +50,7 @@ public class ClientAggregators extends Aggregators {
     }
     
     @Override
-    public long aggregate(Aggregator[] aggregators, Tuple result) {
+    public void aggregate(Aggregator[] aggregators, Tuple result) {
         TupleUtil.getAggregateValue(result, ptr);
         tempValueSet.clear();
         tempValueSet.or(ptr);
@@ -64,7 +64,6 @@ public class ClientAggregators extends Aggregators {
             }
             i++;
         }
-        return 0;
     }
     
     @Override

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
index ea7474f..af64900 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/DistinctValueWithCountServerAggregator.java
@@ -24,14 +24,13 @@ import java.util.Map.Entry;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.hbase.io.ImmutableBytesWritable;
 import org.apache.hadoop.hbase.util.Bytes;
-import org.apache.phoenix.cache.GlobalCache;
 import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
 import org.apache.phoenix.query.QueryServices;
 import org.apache.phoenix.query.QueryServicesOptions;
-import org.apache.phoenix.schema.types.PDataType;
-import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.schema.SortOrder;
 import org.apache.phoenix.schema.tuple.Tuple;
+import org.apache.phoenix.schema.types.PDataType;
+import org.apache.phoenix.schema.types.PVarbinary;
 import org.apache.phoenix.util.ByteUtil;
 import org.apache.phoenix.util.SizedUtil;
 import org.iq80.snappy.Snappy;
@@ -157,11 +156,11 @@ public class DistinctValueWithCountServerAggregator 
extends BaseAggregator {
 
     @Override
     public int getSize() {
-        // TODO make this size correct.??
-        // This size is being called initially at the begin of the scanner 
open. At that time we any
-        // way can not tell the exact size of the Map. The Aggregators get 
size from all Aggregator
-        // and stores in a variable for future use. This size of the 
Aggregators is being used in
-        // Grouped unordered scan. Do we need some changes there in that 
calculation?
         return super.getSize() + SizedUtil.ARRAY_SIZE + countMapHeapSize();
     }
+    
+    @Override
+    public boolean trackSize() {
+        return true;
+    }
 }

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NonSizeTrackingServerAggregators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NonSizeTrackingServerAggregators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NonSizeTrackingServerAggregators.java
new file mode 100644
index 0000000..8836c45
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/NonSizeTrackingServerAggregators.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.schema.tuple.Tuple;
+
+public class NonSizeTrackingServerAggregators extends ServerAggregators {
+    public static final ServerAggregators EMPTY_AGGREGATORS = new 
NonSizeTrackingServerAggregators(new SingleAggregateFunction[0], new 
Aggregator[0], new Expression[0], 0);
+
+    public NonSizeTrackingServerAggregators(SingleAggregateFunction[] 
functions, Aggregator[] aggregators,
+            Expression[] expressions, int minNullableIndex) {
+        super(functions, aggregators, expressions, minNullableIndex);
+    }
+
+    @Override
+    public void aggregate(Aggregator[] aggregators, Tuple result) {
+        for (int i = 0; i < expressions.length; i++) {
+            if (expressions[i].evaluate(result, ptr) && ptr.getLength() != 0) {
+                aggregators[i].aggregate(result, ptr);
+            }
+            expressions[i].reset();
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
index 790939c..ef9ca0f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/ServerAggregators.java
@@ -30,6 +30,9 @@ import org.apache.hadoop.io.WritableUtils;
 import org.apache.phoenix.expression.Expression;
 import org.apache.phoenix.expression.ExpressionType;
 import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
 import org.apache.phoenix.schema.tuple.Tuple;
 
 
@@ -38,11 +41,10 @@ import org.apache.phoenix.schema.tuple.Tuple;
  * Aggregators that execute on the server-side
  *
  */
-public class ServerAggregators extends Aggregators {
-    public static final ServerAggregators EMPTY_AGGREGATORS = new 
ServerAggregators(new SingleAggregateFunction[0], new Aggregator[0], new 
Expression[0], 0);
-    private final Expression[] expressions;
+public abstract class ServerAggregators extends Aggregators {
+    protected final Expression[] expressions;
     
-    private ServerAggregators(SingleAggregateFunction[] functions, 
Aggregator[] aggregators, Expression[] expressions, int minNullableIndex) {
+    protected ServerAggregators(SingleAggregateFunction[] functions, 
Aggregator[] aggregators, Expression[] expressions, int minNullableIndex) {
         super(functions, aggregators, minNullableIndex);
         if (aggregators.length != expressions.length) {
             throw new IllegalArgumentException("Number of aggregators (" + 
aggregators.length 
@@ -52,18 +54,7 @@ public class ServerAggregators extends Aggregators {
     }
     
     @Override
-    public long aggregate(Aggregator[] aggregators, Tuple result) {
-        long dsize = 0;
-        for (int i = 0; i < expressions.length; i++) {
-            if (expressions[i].evaluate(result, ptr) && ptr.getLength() != 0) {
-                dsize -= aggregators[i].getSize();
-                aggregators[i].aggregate(result, ptr);
-                dsize += aggregators[i].getSize();
-            }
-            expressions[i].reset();
-        }
-        return dsize;
-    }
+    public abstract void aggregate(Aggregator[] aggregators, Tuple result);
     
     /**
      * Serialize an Aggregator into a byte array
@@ -112,9 +103,9 @@ public class ServerAggregators extends Aggregators {
      * @param conf Server side configuration used by HBase
      * @return newly instantiated Aggregators instance
      */
-    public static ServerAggregators deserialize(byte[] b, Configuration conf) {
+    public static ServerAggregators deserialize(byte[] b, Configuration conf, 
MemoryChunk chunk) {
         if (b == null) {
-            return ServerAggregators.EMPTY_AGGREGATORS;
+            return NonSizeTrackingServerAggregators.EMPTY_AGGREGATORS;
         }
         ByteArrayInputStream stream = new ByteArrayInputStream(b);
         try {
@@ -131,7 +122,20 @@ public class ServerAggregators extends Aggregators {
                 aggregators[i] = aggFunc.getAggregator();
                 expressions[i] = aggFunc.getAggregatorExpression();
             }
-            return new ServerAggregators(functions, aggregators,expressions, 
minNullableIndex);
+            boolean trackSize = false;
+            if (chunk != null) {
+                for (Aggregator aggregator : aggregators) {
+                    if (aggregator.trackSize()) {
+                        trackSize = true;
+                        break;
+                    }
+                }
+            }
+            return trackSize ?
+                    new SizeTrackingServerAggregators(functions, 
aggregators,expressions, minNullableIndex, chunk, 
+                            
conf.getInt(QueryServices.AGGREGATE_CHUNK_SIZE_INCREASE_ATTRIB, 
+                                    
QueryServicesOptions.DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE)) :
+                    new NonSizeTrackingServerAggregators(functions, 
aggregators,expressions, minNullableIndex);
         } catch (IOException e) {
             throw new RuntimeException(e);
         } finally {

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/SizeTrackingServerAggregators.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/SizeTrackingServerAggregators.java
 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/SizeTrackingServerAggregators.java
new file mode 100644
index 0000000..983968b
--- /dev/null
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/expression/aggregator/SizeTrackingServerAggregators.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.expression.aggregator;
+
+import org.apache.phoenix.expression.Expression;
+import org.apache.phoenix.expression.function.SingleAggregateFunction;
+import org.apache.phoenix.memory.MemoryManager.MemoryChunk;
+import org.apache.phoenix.schema.tuple.Tuple;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class SizeTrackingServerAggregators extends ServerAggregators {
+    private static final Logger logger = 
LoggerFactory.getLogger(SizeTrackingServerAggregators.class);
+
+    private final MemoryChunk chunk;
+    private final int sizeIncrease;
+    private long memoryUsed = 0;
+
+    public SizeTrackingServerAggregators(SingleAggregateFunction[] functions, 
Aggregator[] aggregators,
+            Expression[] expressions, int minNullableIndex, MemoryChunk chunk, 
int sizeIncrease) {
+        super(functions, aggregators, expressions, minNullableIndex);
+        this.chunk = chunk;
+        this.sizeIncrease = sizeIncrease;
+    }
+
+    @Override
+    public void aggregate(Aggregator[] aggregators, Tuple result) {
+        long dsize = memoryUsed;
+        for (int i = 0; i < expressions.length; i++) {
+            if (expressions[i].evaluate(result, ptr) && ptr.getLength() != 0) {
+                dsize -= aggregators[i].getSize();
+                aggregators[i].aggregate(result, ptr);
+                dsize += aggregators[i].getSize();
+            }
+            expressions[i].reset();
+        }
+        while(dsize > chunk.getSize()) {
+            logger.info("Request: {}, resizing {} by 1024*1024", dsize, 
chunk.getSize());
+            chunk.resize(chunk.getSize() + sizeIncrease);
+        }
+        memoryUsed = dsize;
+    }
+    
+}

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index 8cc156c..db0b10b 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -103,6 +103,7 @@ public interface QueryServices extends SQLCloseable {
     public static final String GROUPBY_SPILL_FILES_ATTRIB = 
"phoenix.groupby.spillFiles";
     public static final String GROUPBY_MAX_CACHE_SIZE_ATTRIB = 
"phoenix.groupby.maxCacheSize";
     public static final String GROUPBY_ESTIMATED_DISTINCT_VALUES_ATTRIB = 
"phoenix.groupby.estimatedDistinctValues";
+    public static final String AGGREGATE_CHUNK_SIZE_INCREASE_ATTRIB = 
"phoenix.aggregate.chunk_size_increase";
 
     public static final String CALL_QUEUE_PRODUCER_ATTRIB_NAME = 
"CALL_QUEUE_PRODUCER";
 

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index 23aed7c..0e6e89f 100644
--- 
a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ 
b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -161,6 +161,7 @@ public class QueryServicesOptions {
     public static final int DEFAULT_SCAN_CACHE_SIZE = 1000;
     public static final int DEFAULT_MAX_INTRA_REGION_PARALLELIZATION = 
DEFAULT_MAX_QUERY_CONCURRENCY;
     public static final int DEFAULT_DISTINCT_VALUE_COMPRESS_THRESHOLD = 1024 * 
1024 * 1; // 1 Mb
+    public static final int DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE = 1024 * 
1024 * 1; // 1 Mb
     public static final int DEFAULT_INDEX_MUTATE_BATCH_SIZE_THRESHOLD = 3;
     public static final long DEFAULT_MAX_SPOOL_TO_DISK_BYTES = 1024000000;
     // Only the first chunked batches are fetched in parallel, so this default

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
index 07bd3a9..110afc2 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/compile/QueryCompilerTest.java
@@ -764,7 +764,7 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
             for (int i = 0; i < queries.length; i++) {
                 query = queries[i];
                 Scan scan = compileQuery(query, binds);
-                ServerAggregators aggregators = 
ServerAggregators.deserialize(scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS),
 null);
+                ServerAggregators aggregators = 
ServerAggregators.deserialize(scan.getAttribute(BaseScannerRegionObserver.AGGREGATORS),
 null, null);
                 Aggregator aggregator = aggregators.getAggregators()[0];
                 assertTrue(aggregator instanceof CountAggregator);
             }
@@ -2431,7 +2431,7 @@ public class QueryCompilerTest extends 
BaseConnectionlessQueryTest {
             Scan scan = projectQuery("select A.i1 from X group by i1 order by 
avg(B.i2) " +
                     "desc");
             ServerAggregators aggregators = 
ServerAggregators.deserialize(scan.getAttribute
-                    (BaseScannerRegionObserver.AGGREGATORS), null);
+                    (BaseScannerRegionObserver.AGGREGATORS), null, null);
             assertEquals(2,aggregators.getAggregatorCount());
         } finally {
             conn.close();

http://git-wip-us.apache.org/repos/asf/phoenix/blob/ea6771df/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
----------------------------------------------------------------------
diff --git 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
index a7569f7..e279074 100644
--- 
a/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
+++ 
b/phoenix-core/src/test/java/org/apache/phoenix/query/QueryServicesTestImpl.java
@@ -67,6 +67,7 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
     public static final long DEFAULT_INDEX_POPULATION_WAIT_TIME = 0;
     public static final long DEFAULT_SEQUENCE_CACHE_SIZE = 3;
     public static final boolean DEFAULT_TRANSACTIONS_ENABLED = true;
+    public static final int DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE = 1000;
     /*
      * Effectively disable running the index rebuild task by having an 
infinite delay
      * because we want to control it's execution ourselves
@@ -122,6 +123,7 @@ public final class QueryServicesTestImpl extends 
BaseQueryServicesImpl {
                 .setMaxThreadsPerHTable(DEFAULT_HTABLE_MAX_THREADS)
                 
.setDefaultIndexPopulationWaitTime(DEFAULT_INDEX_POPULATION_WAIT_TIME)
                 
.setIndexRebuildTaskInitialDelay(DEFAULT_INDEX_REBUILD_TASK_INITIAL_DELAY)
+                .set(AGGREGATE_CHUNK_SIZE_INCREASE_ATTRIB, 
DEFAULT_AGGREGATE_CHUNK_SIZE_INCREASE)
                 // setup default configs for Tephra
                 .set(TxConstants.Manager.CFG_DO_PERSIST, false)
                 .set(TxConstants.Service.CFG_DATA_TX_CLIENT_RETRY_STRATEGY, 
"n-times")

Reply via email to