This is an automated email from the ASF dual-hosted git repository.
alsuliman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/asterixdb.git
The following commit(s) were added to refs/heads/master by this push:
new d088dac [ASTERIXDB-2564][RT] Too many objects created in min() and
max()
d088dac is described below
commit d088dacbfd7601e41ff2cf4b0ed96cdefa3363fd
Author: Ali Alsuliman <[email protected]>
AuthorDate: Sun May 12 04:39:57 2019 -0700
[ASTERIXDB-2564][RT] Too many objects created in min() and max()
- user model changes: no
- storage format changes: no
- interface changes: no
Details:
During min() and max() aggregation, the functions keep track of
the aggregation type in order to handle heterogeneous lists.
It promotes the aggregation type if needed (e.g. encountered double).
Don't switch to new aggregation type and create a new comparator
when the new input value type is the same as the previously
aggregated values. That is because canPromote(agg_type, new_val_type)
will always return true for same types.
Change-Id: I0bb9f0715985ae555de00bbf3173c80371d8968b
Reviewed-on: https://asterix-gerrit.ics.uci.edu/3391
Contrib: Jenkins <[email protected]>
Sonar-Qube: Jenkins <[email protected]>
Tested-by: Jenkins <[email protected]>
Integration-Tests: Jenkins <[email protected]>
Reviewed-by: Dmitry Lychagin <[email protected]>
---
.../std/AbstractMinMaxAggregateFunction.java | 103 +++++++++------------
1 file changed, 43 insertions(+), 60 deletions(-)
diff --git
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
index 86ae924..90f006d 100644
---
a/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
+++
b/asterixdb/asterix-runtime/src/main/java/org/apache/asterix/runtime/aggregates/std/AbstractMinMaxAggregateFunction.java
@@ -30,7 +30,6 @@ import
org.apache.hyracks.algebricks.runtime.base.IScalarEvaluator;
import org.apache.hyracks.algebricks.runtime.base.IScalarEvaluatorFactory;
import org.apache.hyracks.api.context.IHyracksTaskContext;
import org.apache.hyracks.api.dataflow.value.IBinaryComparator;
-import org.apache.hyracks.api.dataflow.value.IBinaryComparatorFactory;
import org.apache.hyracks.api.exceptions.HyracksDataException;
import org.apache.hyracks.api.exceptions.SourceLocation;
import org.apache.hyracks.data.std.api.IPointable;
@@ -39,18 +38,16 @@ import
org.apache.hyracks.data.std.util.ArrayBackedValueStorage;
import org.apache.hyracks.dataflow.common.data.accessors.IFrameTupleReference;
public abstract class AbstractMinMaxAggregateFunction extends
AbstractAggregateFunction {
- private IPointable inputVal = new VoidPointable();
- private ArrayBackedValueStorage outputVal = new ArrayBackedValueStorage();
- private ArrayBackedValueStorage tempValForCasting = new
ArrayBackedValueStorage();
-
- protected ArrayBackedValueStorage resultStorage = new
ArrayBackedValueStorage();
- private IScalarEvaluator eval;
+ protected final ArrayBackedValueStorage resultStorage = new
ArrayBackedValueStorage();
+ private final IPointable inputVal = new VoidPointable();
+ private final ArrayBackedValueStorage outputVal = new
ArrayBackedValueStorage();
+ private final ArrayBackedValueStorage tempValForCasting = new
ArrayBackedValueStorage();
+ private final IScalarEvaluator eval;
+ private final boolean isMin;
protected ATypeTag aggType;
private IBinaryComparator cmp;
- private ITypeConvertComputer tpc;
- private final boolean isMin;
- public AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args,
IHyracksTaskContext context, boolean isMin,
+ AbstractMinMaxAggregateFunction(IScalarEvaluatorFactory[] args,
IHyracksTaskContext context, boolean isMin,
SourceLocation sourceLoc) throws HyracksDataException {
super(sourceLoc);
eval = args[0].createScalarEvaluator(context);
@@ -82,9 +79,8 @@ public abstract class AbstractMinMaxAggregateFunction extends
AbstractAggregateF
// First value encountered. Set type, comparator, and initial
value.
aggType = typeTag;
// Set comparator.
- IBinaryComparatorFactory cmpFactory =
-
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType,
isMin);
- cmp = cmpFactory.createBinaryComparator();
+ cmp =
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType,
isMin)
+ .createBinaryComparator();
// Initialize min value.
outputVal.assign(inputVal);
} else if (typeTag != ATypeTag.SYSTEM_NULL &&
!ATypeHierarchy.isCompatible(typeTag, aggType)) {
@@ -94,56 +90,27 @@ public abstract class AbstractMinMaxAggregateFunction
extends AbstractAggregateF
throw new IncompatibleTypeException(sourceLoc, "min/max",
aggType.serialize(), typeTag.serialize());
}
} else {
-
// If a system_null is encountered locally, it would be an error;
otherwise if it is seen
// by a global aggregator, it is simple ignored.
if (typeTag == ATypeTag.SYSTEM_NULL) {
processSystemNull();
return;
}
-
+ if (aggType == typeTag) {
+ compareAndUpdate(cmp, inputVal, outputVal);
+ return;
+ }
if (ATypeHierarchy.canPromote(aggType, typeTag)) {
- tpc = ATypeHierarchy.getTypePromoteComputer(aggType, typeTag);
- aggType = typeTag;
- cmp =
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(aggType,
isMin)
+ // switch to new comp & aggregation type (i.e. current min/max
is int and new input is double)
+ cmp =
BinaryComparatorFactoryProvider.INSTANCE.getBinaryComparatorFactory(typeTag,
isMin)
.createBinaryComparator();
- if (tpc != null) {
- tempValForCasting.reset();
- try {
- tpc.convertType(outputVal.getByteArray(),
outputVal.getStartOffset() + 1,
- outputVal.getLength() - 1,
tempValForCasting.getDataOutput());
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- outputVal.assign(tempValForCasting);
- }
- if (cmp.compare(inputVal.getByteArray(),
inputVal.getStartOffset(), inputVal.getLength(),
- outputVal.getByteArray(), outputVal.getStartOffset(),
outputVal.getLength()) < 0) {
- outputVal.assign(inputVal);
- }
-
+ castValue(ATypeHierarchy.getTypePromoteComputer(aggType,
typeTag), outputVal, tempValForCasting);
+ outputVal.assign(tempValForCasting);
+ compareAndUpdate(cmp, inputVal, outputVal);
+ aggType = typeTag;
} else {
- tpc = ATypeHierarchy.getTypePromoteComputer(typeTag, aggType);
- if (tpc != null) {
- tempValForCasting.reset();
- try {
- tpc.convertType(inputVal.getByteArray(),
inputVal.getStartOffset() + 1,
- inputVal.getLength() - 1,
tempValForCasting.getDataOutput());
- } catch (IOException e) {
- throw HyracksDataException.create(e);
- }
- if (cmp.compare(tempValForCasting.getByteArray(),
tempValForCasting.getStartOffset(),
- tempValForCasting.getLength(),
outputVal.getByteArray(), outputVal.getStartOffset(),
- outputVal.getLength()) < 0) {
- outputVal.assign(tempValForCasting);
- }
- } else {
- if (cmp.compare(inputVal.getByteArray(),
inputVal.getStartOffset(), inputVal.getLength(),
- outputVal.getByteArray(),
outputVal.getStartOffset(), outputVal.getLength()) < 0) {
- outputVal.assign(inputVal);
- }
- }
-
+ castValue(ATypeHierarchy.getTypePromoteComputer(typeTag,
aggType), inputVal, tempValForCasting);
+ compareAndUpdate(cmp, tempValForCasting, outputVal);
}
}
}
@@ -153,20 +120,17 @@ public abstract class AbstractMinMaxAggregateFunction
extends AbstractAggregateF
resultStorage.reset();
try {
switch (aggType) {
- case NULL: {
+ case NULL:
resultStorage.getDataOutput().writeByte(ATypeTag.SERIALIZED_NULL_TYPE_TAG);
result.set(resultStorage);
break;
- }
- case SYSTEM_NULL: {
+ case SYSTEM_NULL:
finishSystemNull();
result.set(resultStorage);
break;
- }
- default: {
+ default:
result.set(outputVal);
break;
- }
}
} catch (IOException e) {
throw HyracksDataException.create(e);
@@ -187,4 +151,23 @@ public abstract class AbstractMinMaxAggregateFunction
extends AbstractAggregateF
protected abstract void processSystemNull() throws HyracksDataException;
protected abstract void finishSystemNull() throws IOException;
+
+ private static void compareAndUpdate(IBinaryComparator comp, IPointable
newVal, ArrayBackedValueStorage oldVal)
+ throws HyracksDataException {
+ if (comp.compare(newVal.getByteArray(), newVal.getStartOffset(),
newVal.getLength(), oldVal.getByteArray(),
+ oldVal.getStartOffset(), oldVal.getLength()) < 0) {
+ oldVal.assign(newVal);
+ }
+ }
+
+ private static void castValue(ITypeConvertComputer typeConverter,
IPointable inputValue,
+ ArrayBackedValueStorage tempValForCasting) throws
HyracksDataException {
+ tempValForCasting.reset();
+ try {
+ typeConverter.convertType(inputValue.getByteArray(),
inputValue.getStartOffset() + 1,
+ inputValue.getLength() - 1,
tempValForCasting.getDataOutput());
+ } catch (IOException e) {
+ throw HyracksDataException.create(e);
+ }
+ }
}