This is an automated email from the ASF dual-hosted git repository.

alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d088dac  [ASTERIXDB-2564][RT] Too many objects created in min() and 
max()
d088dac is described below

commit d088dacbfd7601e41ff2cf4b0ed96cdefa3363fd
Author: Ali Alsuliman <[email protected]>
AuthorDate: Sun May 12 04:39:57 2019 -0700

    [ASTERIXDB-2564][RT] Too many objects created in min() and max()
    
    - user model changes: no
    - storage format changes: no
    - interface changes: no
    
    Details:
    During min() and max() aggregation, the functions keep track of
    the aggregation type in order to handle heterogeneous  lists.
    It promotes the aggregation type if needed (e.g. encountered double).
    Don't switch to new aggregation type and create a new comparator
    when the new input value type is the same as the previously
    aggregated values. That is because canPromote(agg_type, new_val_type)
    will always return true for same types.
    
    Change-Id: I0bb9f0715985ae555de00bbf3173c80371d8968b
    Reviewed-on: https://asterix-gerrit.ics.uci.edu/3391
    Contrib: Jenkins <[email protected]>
    Sonar-Qube: Jenkins <[email protected]>
    Tested-by: Jenkins <[email protected]>
    Integration-Tests: Jenkins <[email protected]>
    Reviewed-by: Dmitry Lychagin <[email protected]>
---
 .../std/AbstractMinMaxAggregateFunction.java       | 103 +++++++++------------
 1 file changed, 43 insertions(+), 60 deletions(-)

diff --git 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
index 86ae924..90f006d 100644
--- 
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
+++ 
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -30,7 +30,6 @@ import 
org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
 import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
 import org.apache.hyracks.api.context.IHyracksTaskContext;
 import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
 import org.apache.hyracks.api.exceptions.HyracksDataException;
 import org.apache.hyracks.api.exceptions.SourceLocation;
 import org.apache.hyracks.data.std.api.IPointable;
@@ -39,18 +38,16 @@ import 
org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
 import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
 
 public abstract class AbstractMinMaxAggregateFunction extends 
AbstractAggregateFunction {
-    private IPointable inputVal = new VoidPointable();
-    private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
-    private ArrayBackedValueStorage tempValForCasting = new 
ArrayBackedValueStorage();
-
-    protected ArrayBackedValueStorage resultStorage = new 
ArrayBackedValueStorage();
-    private IScalarEvaluator eval;
+    protected final ArrayBackedValueStorage resultStorage = new 
ArrayBackedValueStorage();
+    private final IPointable inputVal = new VoidPointable();
+    private final ArrayBackedValueStorage outputVal = new 
ArrayBackedValueStorage();
+    private final ArrayBackedValueStorage tempValForCasting = new 
ArrayBackedValueStorage();
+    private final IScalarEvaluator eval;
+    private final boolean isMin;
     protected ATypeTag aggType;
     private IBinaryComparator cmp;
-    private ITypeConvertComputer tpc;
-    private final boolean isMin;
 
-    public AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext context, boolean isMin,
+    AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args, 
IHyracksTaskContext context, boolean isMin,
             SourceLocation sourceLoc) throws HyracksDataException {
         super(sourceLoc);
         eval = args[0].createScalarEvaluator(context);
@@ -82,9 +79,8 @@ public abstract class AbstractMinMaxAggregateFunction extends 
AbstractAggregateF
             // First value encountered. Set type, comparator, and initial 
value.
             aggType = typeTag;
             // Set comparator.
-            IBinaryComparatorFactory cmpFactory =
-                    
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, 
isMin);
-            cmp = cmpFactory.createBinaryComparator();
+            cmp = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, 
isMin)
+                    .createBinaryComparator();
             // Initialize min value.
             outputVal.assign(inputVal);
         } else if (typeTag != ATypeTag.SYSTEM_NULL && 
!ATypeHierarchy.isCompatible(typeTag, aggType)) {
@@ -94,56 +90,27 @@ public abstract class AbstractMinMaxAggregateFunction 
extends AbstractAggregateF
                 throw new IncompatibleTypeException(sourceLoc, "min/max", 
aggType.serialize(), typeTag.serialize());
             }
         } else {
-
             // If a system_null is encountered locally, it would be an error; 
otherwise if it is seen
             // by a global aggregator, it is simple ignored.
             if (typeTag == ATypeTag.SYSTEM_NULL) {
                 processSystemNull();
                 return;
             }
-
+            if (aggType == typeTag) {
+                compareAndUpdate(cmp, inputVal, outputVal);
+                return;
+            }
             if (ATypeHierarchy.canPromote(aggType, typeTag)) {
-                tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
-                aggType = typeTag;
-                cmp = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType, 
isMin)
+                // switch to new comp & aggregation type (i.e. current min/max 
is int and new input is double)
+                cmp = 
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(typeTag, 
isMin)
                         .createBinaryComparator();
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.convertType(outputVal.getByteArray(), 
outputVal.getStartOffset() + 1,
-                                outputVal.getLength() - 1, 
tempValForCasting.getDataOutput());
-                    } catch (IOException e) {
-                        throw HyracksDataException.create(e);
-                    }
-                    outputVal.assign(tempValForCasting);
-                }
-                if (cmp.compare(inputVal.getByteArray(), 
inputVal.getStartOffset(), inputVal.getLength(),
-                        outputVal.getByteArray(), outputVal.getStartOffset(), 
outputVal.getLength()) < 0) {
-                    outputVal.assign(inputVal);
-                }
-
+                castValue(ATypeHierarchy.getTypePromoteComputer(aggType, 
typeTag), outputVal, tempValForCasting);
+                outputVal.assign(tempValForCasting);
+                compareAndUpdate(cmp, inputVal, outputVal);
+                aggType = typeTag;
             } else {
-                tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
-                if (tpc != null) {
-                    tempValForCasting.reset();
-                    try {
-                        tpc.convertType(inputVal.getByteArray(), 
inputVal.getStartOffset() + 1,
-                                inputVal.getLength() - 1, 
tempValForCasting.getDataOutput());
-                    } catch (IOException e) {
-                        throw HyracksDataException.create(e);
-                    }
-                    if (cmp.compare(tempValForCasting.getByteArray(), 
tempValForCasting.getStartOffset(),
-                            tempValForCasting.getLength(), 
outputVal.getByteArray(), outputVal.getStartOffset(),
-                            outputVal.getLength()) < 0) {
-                        outputVal.assign(tempValForCasting);
-                    }
-                } else {
-                    if (cmp.compare(inputVal.getByteArray(), 
inputVal.getStartOffset(), inputVal.getLength(),
-                            outputVal.getByteArray(), 
outputVal.getStartOffset(), outputVal.getLength()) < 0) {
-                        outputVal.assign(inputVal);
-                    }
-                }
-
+                castValue(ATypeHierarchy.getTypePromoteComputer(typeTag, 
aggType), inputVal, tempValForCasting);
+                compareAndUpdate(cmp, tempValForCasting, outputVal);
             }
         }
     }
@@ -153,20 +120,17 @@ public abstract class AbstractMinMaxAggregateFunction 
extends AbstractAggregateF
         resultStorage.reset();
         try {
             switch (aggType) {
-                case NULL: {
+                case NULL:
                     
resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
                     result.set(resultStorage);
                     break;
-                }
-                case SYSTEM_NULL: {
+                case SYSTEM_NULL:
                     finishSystemNull();
                     result.set(resultStorage);
                     break;
-                }
-                default: {
+                default:
                     result.set(outputVal);
                     break;
-                }
             }
         } catch (IOException e) {
             throw HyracksDataException.create(e);
@@ -187,4 +151,23 @@ public abstract class AbstractMinMaxAggregateFunction 
extends AbstractAggregateF
     protected abstract void processSystemNull() throws HyracksDataException;
 
     protected abstract void finishSystemNull() throws IOException;
+
+    private static void compareAndUpdate(IBinaryComparator comp, IPointable 
newVal, ArrayBackedValueStorage oldVal)
+            throws HyracksDataException {
+        if (comp.compare(newVal.getByteArray(), newVal.getStartOffset(), 
newVal.getLength(), oldVal.getByteArray(),
+                oldVal.getStartOffset(), oldVal.getLength()) < 0) {
+            oldVal.assign(newVal);
+        }
+    }
+
+    private static void castValue(ITypeConvertComputer typeConverter, 
IPointable inputValue,
+            ArrayBackedValueStorage tempValForCasting) throws 
HyracksDataException {
+        tempValForCasting.reset();
+        try {
+            typeConverter.convertType(inputValue.getByteArray(), 
inputValue.getStartOffset() + 1,
+                    inputValue.getLength() - 1, 
tempValForCasting.getDataOutput());
+        } catch (IOException e) {
+            throw HyracksDataException.create(e);
+        }
+    }
 }

Reply via email to