This is an automated email from the ASF dual-hosted git repository.
jooger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new da1523341c IGNITE-23335: Sql. Do not store aggregation state in
Accumulators (#4522)
da1523341c is described below
commit da1523341c5279c2690555201920888a823b2cba
Author: Max Zhuravkov <[email protected]>
AuthorDate: Tue Oct 15 13:17:00 2024 +0300
IGNITE-23335: Sql. Do not store aggregation state in Accumulators (#4522)
---
.../sql/engine/exec/exp/agg/Accumulator.java | 29 +-
.../engine/exec/exp/agg/AccumulatorWrapper.java | 21 +-
.../sql/engine/exec/exp/agg/Accumulators.java | 421 ++++++++++++---------
.../engine/exec/exp/agg/AccumulatorsFactory.java | 28 +-
.../sql/engine/exec/exp/agg/AccumulatorsState.java | 64 ++++
.../sql/engine/exec/exp/agg/AggregateRow.java | 67 +++-
.../agg/{Accumulator.java => MutableDouble.java} | 42 +-
.../exp/agg/{Accumulator.java => MutableLong.java} | 42 +-
.../sql/engine/exec/rel/HashAggregateNode.java | 32 +-
.../sql/engine/exec/rel/SortAggregateNode.java | 34 +-
.../engine/exec/exp/agg/AnyValAccumulatorTest.java | 57 +++
.../exec/exp/agg/DecimalAvgAccumulatorTest.java | 67 ++++
.../exec/exp/agg/DoubleAvgAccumulatorTest.java} | 35 +-
.../exec/exp/agg/LiteralValAccumulatorTest.java | 55 +++
.../exec/exp/agg/LongCountAccumulatorTest.java} | 34 +-
.../engine/exec/exp/agg/MinMaxAccumulatorTest.java | 138 +++++++
.../exec/exp/agg/SingleValAccumulatorTest.java | 70 ++++
.../engine/exec/exp/agg/StatefulAccumulator.java} | 33 +-
.../engine/exec/exp/agg/SumAccumulatorTest.java | 86 +++++
.../exec/exp/agg/SumIsZeroAccumulatorTest.java | 86 +++++
20 files changed, 1154 insertions(+), 287 deletions(-)
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
index dcb7d41831..8a126fb560 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
@@ -25,11 +25,36 @@ import
org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
* Accumulator interface.
*/
public interface Accumulator {
- void add(Object... args);
- Object end();
+ /**
+ * Updates this accumulator.
+ *
+ * @param state state of the accumulator.
+ * @param args arguments.
+ */
+ void add(AccumulatorsState state, Object[] args);
+ /**
+ * Computes result of this accumulator.
+ *
+ * @param state Accumulator state.
+ * @param result Result holder.
+ */
+ void end(AccumulatorsState state, AccumulatorsState result);
+
+ /**
+ * Returns types of arguments for this accumulator.
+ *
+ * @param typeFactory Type factory.
+ * @return List of argument types.
+ */
List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+ /**
+ * Returns a result type for this accumulator.
+ *
+ * @param typeFactory Type factory.
+ * @return A result type.
+ */
RelDataType returnType(IgniteTypeFactory typeFactory);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorWrapper.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorWrapper.java
index 4ecc0edbf7..a5802ca9dd 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorWrapper.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorWrapper.java
@@ -17,12 +17,25 @@
package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+import org.jetbrains.annotations.Nullable;
+
/**
- * AccumulatorWrapper interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Adapter that provides means to convert accumulator arguments and return
types.
*/
public interface AccumulatorWrapper<RowT> {
- void add(RowT row);
- Object end();
+ /** Returns {@code true} if the accumulator function should be applied to
distinct elements. */
+ boolean isDistinct();
+
+ /** Returns the accumulator function. */
+ Accumulator accumulator();
+
+ /**
+ * Creates accumulator arguments from the given row. If this method
returns {@code null},
+ * then the accumulator function should not be applied to the given row.
+ */
+ Object @Nullable [] getArguments(RowT row);
+
+ /** Converts accumulator result. */
+ Object convertResult(@Nullable Object result);
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
index 54bec16a39..ad63934889 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
@@ -28,9 +28,7 @@ import static
org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
import java.math.BigDecimal;
import java.util.Comparator;
-import java.util.HashSet;
import java.util.List;
-import java.util.Set;
import java.util.function.IntFunction;
import java.util.function.Supplier;
import org.apache.calcite.avatica.util.ByteString;
@@ -59,17 +57,10 @@ public class Accumulators {
}
/**
- * AccumulatorFactory.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Returns a supplier that creates a accumulator functions for the given
aggregate call.
*/
public Supplier<Accumulator> accumulatorFactory(AggregateCall call) {
- if (!call.isDistinct()) {
- return accumulatorFunctionFactory(call);
- }
-
- Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
-
- return () -> new DistinctAccumulator(fac);
+ return accumulatorFunctionFactory(call);
}
private Supplier<Accumulator> accumulatorFunctionFactory(AggregateCall
call) {
@@ -214,82 +205,102 @@ public class Accumulators {
}
/**
- * SingleVal.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * {@code SINGLE_VALUE(SUBQUERY)} accumulator. Pseudo accumulator that
returns a first value produced by a subquery and an error if a
+ * subquery returns more than one row.
*/
- private static class SingleVal extends AnyVal {
- private boolean touched;
+ public static class SingleVal implements Accumulator {
- private SingleVal(RelDataType type) {
- super(type);
- }
+ private final RelDataType type;
- static Supplier<Accumulator> newAccumulator(RelDataType type) {
+ public static Supplier<Accumulator> newAccumulator(RelDataType type) {
return () -> new SingleVal(type);
}
+ private SingleVal(RelDataType type) {
+ this.type = type;
+ }
+
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
- if (touched) {
+ public void add(AccumulatorsState state, Object... args) {
+ assert args.length == 1;
+
+ if (state.hasValue()) {
throw new SqlException(Sql.RUNTIME_ERR, "Subquery returned
more than 1 value.");
+ } else {
+ state.set(args[0]);
}
- touched = true;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ result.set(state.get());
+ }
- super.add(args);
+ /** {@inheritDoc} */
+ @Override
+ public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
+ return
List.of(typeFactory.createTypeWithNullability(typeFactory.createSqlType(ANY),
true));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public RelDataType returnType(IgniteTypeFactory typeFactory) {
+ return type;
}
}
/**
- * LITERAL_AGG accumulator, return {@code true} if incoming data is not
empty, {@code false} otherwise.
- * Calcite`s implementation RexImpTable#LiteralAggImplementor.
+ * {@code LITERAL_AGG} accumulator, return {@code true} if incoming data
is not empty, {@code false} otherwise. Calcite`s implementation
+ * RexImpTable#LiteralAggImplementor.
*/
- private static class LiteralVal extends AnyVal {
+ public static class LiteralVal extends AnyVal {
private LiteralVal(RelDataType type) {
super(type);
}
- static Supplier<Accumulator> newAccumulator(RelDataType type) {
+ public static Supplier<Accumulator> newAccumulator(RelDataType type) {
return () -> new LiteralVal(type);
}
/** {@inheritDoc} */
@Override
- public Object end() {
- return holder != null;
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ Object val = state.get();
+ result.set(val != null);
}
}
/**
- * ANY_VALUE accumulator.
+ * {@code ANY_VALUE} accumulator.
*/
- private static class AnyVal implements Accumulator {
- protected Object holder;
-
+ public static class AnyVal implements Accumulator {
private final RelDataType type;
private AnyVal(RelDataType type) {
this.type = type;
}
- static Supplier<Accumulator> newAccumulator(RelDataType type) {
+ public static Supplier<Accumulator> newAccumulator(RelDataType type) {
return () -> new AnyVal(type);
}
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object[] args) {
assert args.length == 1 : args.length;
- if (holder == null) {
- holder = args[0];
+ Object current = state.get();
+ if (current == null) {
+ state.set(args[0]);
}
}
/** {@inheritDoc} */
@Override
- public Object end() {
- return holder;
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ result.set(state.get());
}
/** {@inheritDoc} */
@@ -306,15 +317,17 @@ public class Accumulators {
}
/**
- * DecimalAvg.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * {AVG(DECIMAL)} accumulator.
*/
public static class DecimalAvg implements Accumulator {
public static final IntFunction<Accumulator> FACTORY = DecimalAvg::new;
- private BigDecimal sum = BigDecimal.ZERO;
+ /** State. */
+ public static class DecimalAvgState {
+ private BigDecimal sum = BigDecimal.ZERO;
- private BigDecimal cnt = BigDecimal.ZERO;
+ private BigDecimal cnt = BigDecimal.ZERO;
+ }
private final int precision;
@@ -327,21 +340,37 @@ public class Accumulators {
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object... args) {
BigDecimal in = (BigDecimal) args[0];
if (in == null) {
return;
}
- sum = sum.add(in);
- cnt = cnt.add(BigDecimal.ONE);
+ DecimalAvgState sumState = (DecimalAvgState) state.get();
+ if (sumState == null) {
+ sumState = new DecimalAvgState();
+ state.set(sumState);
+ }
+
+ sumState.sum = sumState.sum.add(in);
+ sumState.cnt = sumState.cnt.add(BigDecimal.ONE);
}
/** {@inheritDoc} */
@Override
- public Object end() {
- return cnt.compareTo(BigDecimal.ZERO) == 0 ? null :
IgniteMath.decimalDivide(sum, cnt, precision, scale);
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ DecimalAvgState sumState = (DecimalAvgState) state.get();
+
+ if (sumState == null) {
+ result.set(null);
+ } else {
+ if (sumState.cnt.compareTo(BigDecimal.ZERO) == 0) {
+ result.set(null);
+ } else {
+ result.set(IgniteMath.decimalDivide(sumState.sum,
sumState.cnt, precision, scale));
+ }
+ }
}
/** {@inheritDoc} */
@@ -358,33 +387,51 @@ public class Accumulators {
}
/**
- * DoubleAvg.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * {@code AVG(DOUBLE)} accumulator.
*/
public static class DoubleAvg implements Accumulator {
public static final Supplier<Accumulator> FACTORY = DoubleAvg::new;
- private double sum;
+ /** State. */
+ public static class DoubleAvgState {
+ private double sum;
- private long cnt;
+ private long cnt;
+ }
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object... args) {
Double in = (Double) args[0];
if (in == null) {
return;
}
- sum += in;
- cnt++;
+ DoubleAvgState avgState = (DoubleAvgState) state.get();
+ if (avgState == null) {
+ avgState = new DoubleAvgState();
+ state.set(avgState);
+ }
+
+ avgState.sum += in;
+ avgState.cnt++;
}
/** {@inheritDoc} */
@Override
- public Object end() {
- return cnt > 0 ? sum / cnt : null;
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ DoubleAvgState avgState = (DoubleAvgState) state.get();
+ if (avgState == null) {
+ result.set(null);
+ } else {
+ if (avgState.cnt > 0) {
+ result.set(avgState.sum / avgState.cnt);
+ } else {
+ result.set(null);
+ }
+ }
+
}
/** {@inheritDoc} */
@@ -400,25 +447,36 @@ public class Accumulators {
}
}
- private static class LongCount implements Accumulator {
+ /** {@code COUNT(LONG)} accumulator.. */
+ public static class LongCount implements Accumulator {
public static final Supplier<Accumulator> FACTORY = LongCount::new;
- private long cnt;
-
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object... args) {
assert nullOrEmpty(args) || args.length == 1;
if (nullOrEmpty(args) || args[0] != null) {
- cnt++;
+ MutableLong cnt = (MutableLong) state.get();
+
+ if (cnt == null) {
+ cnt = new MutableLong();
+ state.set(cnt);
+ }
+
+ cnt.add(1);
}
}
/** {@inheritDoc} */
@Override
- public Object end() {
- return cnt;
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ if (state.get() == null) {
+ result.set(0L);
+ } else {
+ MutableLong cnt = (MutableLong) state.get();
+ result.set(cnt.longValue());
+ }
}
/** {@inheritDoc} */
@@ -434,62 +492,77 @@ public class Accumulators {
}
}
- private static class Sum implements Accumulator {
- private Accumulator acc;
-
- private boolean empty = true;
+ /** Wraps another sum accumulator and returns {@code null} if there was
updates. */
+ public static class Sum implements Accumulator {
+ private final Accumulator acc;
public Sum(Accumulator acc) {
this.acc = acc;
}
/** {@inheritDoc} */
- @Override public void add(Object... args) {
+ @Override
+ public void add(AccumulatorsState state, Object... args) {
if (args[0] == null) {
return;
}
- empty = false;
- acc.add(args[0]);
+ acc.add(state, args);
}
/** {@inheritDoc} */
- @Override public Object end() {
- return empty ? null : acc.end();
+ @Override
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ if (!state.hasValue()) {
+ result.set(null);
+ } else {
+ acc.end(state, result);
+ }
}
/** {@inheritDoc} */
- @Override public List<RelDataType> argumentTypes(IgniteTypeFactory
typeFactory) {
+ @Override
+ public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
return acc.argumentTypes(typeFactory);
}
/** {@inheritDoc} */
- @Override public RelDataType returnType(IgniteTypeFactory typeFactory)
{
+ @Override
+ public RelDataType returnType(IgniteTypeFactory typeFactory) {
return acc.returnType(typeFactory);
}
}
- private static class DoubleSumEmptyIsZero implements Accumulator {
+ /** {@code SUM(DOUBLE)} accumulator. */
+ public static class DoubleSumEmptyIsZero implements Accumulator {
public static final Supplier<Accumulator> FACTORY =
DoubleSumEmptyIsZero::new;
- private double sum;
-
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object... args) {
Double in = (Double) args[0];
if (in == null) {
return;
}
- sum += in;
+ MutableDouble sum = (MutableDouble) state.get();
+ if (sum == null) {
+ sum = new MutableDouble();
+ state.set(sum);
+ }
+ sum.add(in);
}
/** {@inheritDoc} */
@Override
- public Object end() {
- return sum;
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ if (!state.hasValue()) {
+ result.set(0.0d);
+ } else {
+ MutableDouble sum = (MutableDouble) state.get();
+ result.set(sum.doubleValue());
+ }
}
/** {@inheritDoc} */
@@ -505,27 +578,36 @@ public class Accumulators {
}
}
- private static class LongSumEmptyIsZero implements Accumulator {
+ /** {@code SUM(LONG)} accumulator. */
+ public static class LongSumEmptyIsZero implements Accumulator {
public static final Supplier<Accumulator> FACTORY =
LongSumEmptyIsZero::new;
- private long sum;
-
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object... args) {
Long in = (Long) args[0];
if (in == null) {
return;
}
- sum += in;
+ MutableLong sum = (MutableLong) state.get();
+ if (sum == null) {
+ sum = new MutableLong();
+ state.set(sum);
+ }
+ sum.add(in);
}
/** {@inheritDoc} */
@Override
- public Object end() {
- return sum;
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ if (!state.hasValue()) {
+ result.set(0L);
+ } else {
+ MutableLong sum = (MutableLong) state.get();
+ result.set(sum.longValue());
+ }
}
/** {@inheritDoc} */
@@ -541,27 +623,35 @@ public class Accumulators {
}
}
- private static class DecimalSumEmptyIsZero implements Accumulator {
+ /** SUM(DECIMAL) accumulator. */
+ public static class DecimalSumEmptyIsZero implements Accumulator {
public static final Supplier<Accumulator> FACTORY =
DecimalSumEmptyIsZero::new;
- private BigDecimal sum = BigDecimal.ZERO;
-
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object... args) {
BigDecimal in = (BigDecimal) args[0];
if (in == null) {
return;
}
- sum = sum.add(in);
+ BigDecimal sum = (BigDecimal) state.get();
+ if (sum == null) {
+ state.set(in);
+ } else {
+ state.set(sum.add(in));
+ }
}
/** {@inheritDoc} */
@Override
- public Object end() {
- return sum;
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ if (!state.hasValue()) {
+ result.set(BigDecimal.ZERO);
+ } else {
+ result.set(state.get());
+ }
}
/** {@inheritDoc} */
@@ -577,7 +667,8 @@ public class Accumulators {
}
}
- private static final class MinMaxAccumulator implements Accumulator {
+ /** {@code MIN/MAX} accumulator. */
+ public static final class MinMaxAccumulator implements Accumulator {
private final boolean min;
@@ -585,9 +676,6 @@ public class Accumulators {
private final RelDataType returnType;
- @SuppressWarnings({"rawtypes"})
- private Comparable val;
-
private MinMaxAccumulator(boolean min, RelDataTypeFactory typeFactory,
RelDataType relDataType) {
var nullableType =
typeFactory.createTypeWithNullability(relDataType, true);
@@ -596,23 +684,32 @@ public class Accumulators {
this.returnType = nullableType;
}
- static Supplier<Accumulator> newAccumulator(boolean min,
RelDataTypeFactory typeFactory, RelDataType type) {
+ public static Supplier<Accumulator> newAccumulator(boolean min,
RelDataTypeFactory typeFactory, RelDataType type) {
return () -> new MinMaxAccumulator(min, typeFactory, type);
}
/** {@inheritDoc} **/
@Override
@SuppressWarnings({"rawtypes"})
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object... args) {
Comparable in = (Comparable) args[0];
- doApply(in);
+ if (in == null) {
+ return;
+ }
+
+ Comparable current = (Comparable) state.get();
+ if (current == null) {
+ state.set(in);
+ } else {
+ state.set(doApply(in, current));
+ }
}
/** {@inheritDoc} **/
@Override
- public Object end() {
- return val;
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ result.set(state.get());
}
/** {@inheritDoc} **/
@@ -628,11 +725,7 @@ public class Accumulators {
}
@SuppressWarnings({"rawtypes", "unchecked"})
- private void doApply(Comparable in) {
- if (in == null) {
- return;
- }
-
+ private Comparable doApply(Comparable in, Comparable val) {
if (val == null) {
val = in;
} else {
@@ -643,44 +736,48 @@ public class Accumulators {
val = cmp < 0 ? in : val;
}
}
+ return val;
}
}
- private static class VarCharMinMax implements Accumulator {
+ /** {@code MIN/MAX} for {@code VARCHAR} type. */
+ public static class VarCharMinMax implements Accumulator {
public static final Supplier<Accumulator> MIN_FACTORY = () -> new
VarCharMinMax(true);
public static final Supplier<Accumulator> MAX_FACTORY = () -> new
VarCharMinMax(false);
private final boolean min;
- private CharSequence val;
-
- private boolean empty = true;
-
- private VarCharMinMax(boolean min) {
+ VarCharMinMax(boolean min) {
this.min = min;
}
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object... args) {
CharSequence in = (CharSequence) args[0];
if (in == null) {
return;
}
- val = empty ? in : min
- ? (CharSeqComparator.INSTANCE.compare(val, in) < 0 ? val :
in) :
- (CharSeqComparator.INSTANCE.compare(val, in) < 0 ? in :
val);
+ CharSequence val = (CharSequence) state.get();
+
+ if (val == null) {
+ val = in;
+ } else if (min) {
+ val = CharSeqComparator.INSTANCE.compare(val, in) < 0 ? val :
in;
+ } else {
+ val = CharSeqComparator.INSTANCE.compare(val, in) < 0 ? in :
val;
+ }
- empty = false;
+ state.set(val);
}
/** {@inheritDoc} */
@Override
- public Object end() {
- return empty ? null : val;
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ result.set(state.get());
}
/** {@inheritDoc} */
@@ -717,103 +814,57 @@ public class Accumulators {
}
}
- private static class VarBinaryMinMax implements Accumulator {
+ /** {@code MIN/MAX} for {@code VARBINARY} type. */
+ public static class VarBinaryMinMax implements Accumulator {
public static final Supplier<Accumulator> MIN_FACTORY = () -> new
VarBinaryMinMax(true);
-
public static final Supplier<Accumulator> MAX_FACTORY = () -> new
VarBinaryMinMax(false);
-
private final boolean min;
-
- private ByteString val;
-
-
- private boolean empty = true;
-
-
- private VarBinaryMinMax(boolean min) {
+ VarBinaryMinMax(boolean min) {
this.min = min;
}
/** {@inheritDoc} */
@Override
- public void add(Object... args) {
+ public void add(AccumulatorsState state, Object... args) {
ByteString in = (ByteString) args[0];
if (in == null) {
return;
}
- val = empty ? in : min
- ? (val.compareTo(in) < 0 ? val : in)
- : (val.compareTo(in) < 0 ? in : val);
-
- empty = false;
- }
-
- /** {@inheritDoc} */
- @Override
- public Object end() {
- return empty ? null : val;
- }
-
- /** {@inheritDoc} */
- @Override
- public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
- return
ArrayUtils.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARBINARY),
true));
- }
-
- /** {@inheritDoc} */
- @Override
- public RelDataType returnType(IgniteTypeFactory typeFactory) {
- return
typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARBINARY),
true);
- }
- }
-
- private static class DistinctAccumulator implements Accumulator {
- private final Accumulator acc;
-
- private final Set<Object> set = new HashSet<>();
+ ByteString val = (ByteString) state.get();
- private DistinctAccumulator(Supplier<Accumulator> accSup) {
- this.acc = accSup.get();
- }
-
- /** {@inheritDoc} */
- @Override
- public void add(Object... args) {
- Object in = args[0];
-
- if (in == null) {
- return;
+ if (val == null) {
+ val = in;
+ } else if (min) {
+ val = val.compareTo(in) < 0 ? val : in;
+ } else {
+ val = val.compareTo(in) < 0 ? in : val;
}
- set.add(in);
+ state.set(val);
}
/** {@inheritDoc} */
@Override
- public Object end() {
- for (Object o : set) {
- acc.add(o);
- }
-
- return acc.end();
+ public void end(AccumulatorsState state, AccumulatorsState result) {
+ result.set(state.get());
}
/** {@inheritDoc} */
@Override
public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
- return acc.argumentTypes(typeFactory);
+ return
ArrayUtils.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARBINARY),
true));
}
/** {@inheritDoc} */
@Override
public RelDataType returnType(IgniteTypeFactory typeFactory) {
- return acc.returnType(typeFactory);
+ return
typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARBINARY),
true);
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
index 5d1599bebf..848c6d6a09 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
@@ -51,6 +51,7 @@ import
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator;
import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
import org.apache.ignite.internal.sql.engine.util.Commons;
import org.apache.ignite.internal.sql.engine.util.Primitives;
+import org.jetbrains.annotations.Nullable;
/**
* AccumulatorsFactory.
@@ -274,7 +275,7 @@ public class AccumulatorsFactory<RowT> implements
Supplier<List<AccumulatorWrapp
this.accumulator = accumulator;
this.inAdapter = inAdapter;
this.outAdapter = outAdapter;
- distinct = call.isDistinct();
+ this.distinct = call.isDistinct();
// need to be refactored after
https://issues.apache.org/jira/browse/IGNITE-22320
literalAgg = call.getAggregation() == LITERAL_AGG;
@@ -285,11 +286,20 @@ public class AccumulatorsFactory<RowT> implements
Supplier<List<AccumulatorWrapp
handler = ctx.rowHandler();
}
- /** {@inheritDoc} */
@Override
- public void add(RowT row) {
+ public boolean isDistinct() {
+ return distinct;
+ }
+
+ @Override
+ public Accumulator accumulator() {
+ return accumulator;
+ }
+
+ @Override
+ public Object @Nullable [] getArguments(RowT row) {
if (type != AggregateType.REDUCE && filterArg >= 0 &&
!Boolean.TRUE.equals(handler.get(filterArg, row))) {
- return;
+ return null;
}
int params = argList.size();
@@ -309,18 +319,16 @@ public class AccumulatorsFactory<RowT> implements
Supplier<List<AccumulatorWrapp
args[i] = handler.get(argPos, row);
if (ignoreNulls && args[i] == null) {
- return;
+ return null;
}
}
- accumulator.add(inAdapter.apply(args));
+ return inAdapter.apply(args);
}
- /** {@inheritDoc} */
@Override
- public Object end() {
- return outAdapter.apply(accumulator.end());
+ public Object convertResult(Object result) {
+ return outAdapter.apply(result);
}
-
}
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsState.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsState.java
new file mode 100644
index 0000000000..d7fa472908
--- /dev/null
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import java.util.BitSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Stores states of multiple accumulator functions.
+ */
+public class AccumulatorsState {
+
+ private final Object[] row;
+
+ private final BitSet set = new BitSet();
+
+ private int index;
+
+ /** Constructor. */
+ public AccumulatorsState(int rowSize) {
+ this.row = new Object[rowSize];
+ }
+
+ /** Sets current field index. */
+ public void setIndex(int i) {
+ this.index = i;
+ }
+
+ /** Resets current field index. */
+ public void resetIndex() {
+ this.index = -1;
+ }
+
+ /** Returns a value of the current field. */
+ public @Nullable Object get() {
+ return row[index];
+ }
+
+ /** Set a value of the current field. */
+ public void set(@Nullable Object value) {
+ row[index] = value;
+ set.set(index);
+ }
+
+ /** Returns {@code true} if current field has been set. */
+ public boolean hasValue() {
+ return set.get(index);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AggregateRow.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AggregateRow.java
index acf7b5089b..9437ec87d8 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AggregateRow.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AggregateRow.java
@@ -17,25 +17,30 @@
package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import java.util.List;
+import java.util.Set;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
/**
* Per group/grouping set row that contains state of accumulators.
*/
-public final class AggregateRow<RowT> {
+public class AggregateRow<RowT> {
/** A placeholder of absent group id. */
public static final byte NO_GROUP_ID = -1;
- private final List<AccumulatorWrapper<RowT>> accs;
+ private final AccumulatorsState state;
- private final AggregateType type;
+ private final Int2ObjectArrayMap<Set<Object>> distinctSets;
/** Constructor. */
- public AggregateRow(List<AccumulatorWrapper<RowT>> accs, AggregateType
type) {
- this.type = type;
- this.accs = accs;
+ public AggregateRow(
+ AccumulatorsState state,
+ Int2ObjectArrayMap<Set<Object>> distinctSets
+ ) {
+ this.state = state;
+ this.distinctSets = distinctSets;
}
/** Initialized an empty group if necessary. */
@@ -56,14 +61,30 @@ public final class AggregateRow<RowT> {
}
/** Updates this row by using data of the given row. */
- public void update(ImmutableBitSet allFields, RowHandler<RowT> handler,
RowT row) {
- for (AccumulatorWrapper<RowT> acc : accs) {
- acc.add(row);
+ public void update(List<AccumulatorWrapper<RowT>> accs, ImmutableBitSet
allFields, RowHandler<RowT> handler, RowT row) {
+ for (int i = 0; i < accs.size(); i++) {
+ AccumulatorWrapper<RowT> acc = accs.get(i);
+
+ Object[] args = acc.getArguments(row);
+ if (args == null) {
+ continue;
+ }
+
+ state.setIndex(i);
+
+ if (acc.isDistinct()) {
+ Set<Object> distinctSet = distinctSets.get(i);
+ distinctSet.add(args[0]);
+ } else {
+ acc.accumulator().add(state, args);
+ }
+
+ state.resetIndex();
}
}
/** Creates an empty array for fields to populate output row with. */
- public Object[] createOutput(ImmutableBitSet allFields, byte groupId) {
+ public Object[] createOutput(AggregateType type,
List<AccumulatorWrapper<RowT>> accs, ImmutableBitSet allFields, byte groupId) {
int extra = groupId == NO_GROUP_ID || type != AggregateType.MAP ? 0 :
1;
int rowSize = allFields.cardinality() + accs.size() + extra;
@@ -71,11 +92,31 @@ public final class AggregateRow<RowT> {
}
/** Writes aggregate state of the given row to given array. */
- public void writeTo(Object[] output, ImmutableBitSet allFields, byte
groupId) {
+ public void writeTo(AggregateType type, List<AccumulatorWrapper<RowT>>
accs, Object[] output, ImmutableBitSet allFields, byte groupId) {
int cardinality = allFields.cardinality();
+
+ AccumulatorsState result = new AccumulatorsState(accs.size());
+
for (int i = 0; i < accs.size(); i++) {
- AccumulatorWrapper<RowT> wrapper = accs.get(i);
- output[i + cardinality] = wrapper.end();
+ AccumulatorWrapper<RowT> acc = accs.get(i);
+
+ state.setIndex(i);
+ result.setIndex(i);
+
+ if (acc.isDistinct()) {
+ Set<Object> distinctSet = distinctSets.get(i);
+
+ for (var arg : distinctSet) {
+ acc.accumulator().add(state, new Object[]{arg});
+ }
+ }
+
+ acc.accumulator().end(state, result);
+
+ output[i + cardinality] = acc.convertResult(result.get());
+
+ state.resetIndex();
+ result.resetIndex();
}
if (groupId != NO_GROUP_ID && type == AggregateType.MAP) {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableDouble.java
similarity index 54%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableDouble.java
index dcb7d41831..2f0041e8c5 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableDouble.java
@@ -17,19 +17,41 @@
package org.apache.ignite.internal.sql.engine.exec.exp.agg;
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-
/**
- * Accumulator interface.
+ * Mutable variant of {@link Double} used by some {@link Accumulator}.
*/
-public interface Accumulator {
- void add(Object... args);
+final class MutableDouble extends Number {
+
+ private static final long serialVersionUID = -7021424478810048306L;
+
+ private double value;
+
+ /** Adds the given value to this double. */
+ public void add(double v) {
+ value += v;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int intValue() {
+ return (int) value;
+ }
- Object end();
+ /** {@inheritDoc} */
+ @Override
+ public long longValue() {
+ return (long) value;
+ }
- List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+ /** {@inheritDoc} */
+ @Override
+ public float floatValue() {
+ return (float) value;
+ }
- RelDataType returnType(IgniteTypeFactory typeFactory);
+ /** {@inheritDoc} */
+ @Override
+ public double doubleValue() {
+ return value;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableLong.java
similarity index 55%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableLong.java
index dcb7d41831..50027ef57f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableLong.java
@@ -17,19 +17,41 @@
package org.apache.ignite.internal.sql.engine.exec.exp.agg;
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-
/**
- * Accumulator interface.
+ * Mutable variant of {@link Long} used by some {@link Accumulator}.
*/
-public interface Accumulator {
- void add(Object... args);
+final class MutableLong extends Number {
+
+ private static final long serialVersionUID = -147172356021174032L;
+
+ private long value;
+
+ /** Adds the given value to this long. */
+ public void add(long v) {
+ value += v;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public int intValue() {
+ return (int) value;
+ }
- Object end();
+ /** {@inheritDoc} */
+ @Override
+ public long longValue() {
+ return value;
+ }
- List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+ /** {@inheritDoc} */
+ @Override
+ public float floatValue() {
+ return value;
+ }
- RelDataType returnType(IgniteTypeFactory typeFactory);
+ /** {@inheritDoc} */
+ @Override
+ public double doubleValue() {
+ return value;
+ }
}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
index 01b75bd4ae..c926f675da 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
@@ -20,19 +20,23 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static java.util.stream.Collectors.toCollection;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Supplier;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorsState;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateRow;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.GroupKey;
@@ -44,9 +48,6 @@ import
org.apache.ignite.internal.sql.engine.exec.exp.agg.GroupKey;
public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements
SingleNode<RowT>, Downstream<RowT> {
private final AggregateType type;
- /** May be {@code null} when there are not accumulators (DISTINCT
aggregate node). */
- private final Supplier<List<AccumulatorWrapper<RowT>>> accFactory;
-
private final RowFactory<RowT> rowFactory;
/** A bit set that contains fields included in all grouping sets. */
@@ -54,6 +55,8 @@ public class HashAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
private final List<Grouping> groupings;
+ private final List<AccumulatorWrapper<RowT>> accs;
+
private int requested;
private int waiting;
@@ -71,7 +74,6 @@ public class HashAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
super(ctx);
this.type = type;
- this.accFactory = accFactory;
this.rowFactory = rowFactory;
assert grpSets.size() <= Byte.MAX_VALUE : "Too many grouping sets";
@@ -79,6 +81,7 @@ public class HashAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
ImmutableBitSet.Builder b = ImmutableBitSet.builder();
groupings = new ArrayList<>(grpSets.size());
+ accs = accFactory != null ? accFactory.get() : Collections.emptyList();
for (byte i = 0; i < grpSets.size(); i++) {
ImmutableBitSet grpFields = grpSets.get(i);
@@ -257,7 +260,7 @@ public class HashAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
GroupKey grpKey = b.build();
AggregateRow<RowT> aggRow = groups.computeIfAbsent(grpKey, k ->
create());
- aggRow.update(allFields, handler, row);
+ aggRow.update(accs, allFields, handler, row);
}
/**
@@ -278,7 +281,7 @@ public class HashAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
GroupKey grpKey = entry.getKey();
AggregateRow<RowT> aggRow = entry.getValue();
- Object[] fields = aggRow.createOutput(allFields, grpId);
+ Object[] fields = aggRow.createOutput(type, accs, allFields,
grpId);
int j = 0;
int k = 0;
@@ -287,7 +290,7 @@ public class HashAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
fields[j++] = grpFields.get(field) ? grpKey.field(k++) :
null;
}
- aggRow.writeTo(fields, allFields, grpId);
+ aggRow.writeTo(type, accs, fields, allFields, grpId);
RowT row = rowFactory.create(fields);
@@ -299,15 +302,18 @@ public class HashAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
}
private AggregateRow<RowT> create() {
- List<AccumulatorWrapper<RowT>> wrappers;
+ Int2ObjectArrayMap<Set<Object>> distinctSets = new
Int2ObjectArrayMap<>();
- if (accFactory == null) {
- wrappers = Collections.emptyList();
- } else {
- wrappers = accFactory.get();
+ for (int i = 0; i < accs.size(); i++) {
+ AccumulatorWrapper<RowT> acc = accs.get(i);
+ if (acc.isDistinct()) {
+ distinctSets.put(i, new HashSet<>());
+ }
}
- return new AggregateRow<>(wrappers, type);
+ AccumulatorsState state = new AccumulatorsState(accs.size());
+
+ return new AggregateRow<>(state, distinctSets);
}
private boolean isEmpty() {
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
index 836a82a2ea..4be19d8f1f 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
+++
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
@@ -20,18 +20,22 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
import static org.apache.ignite.internal.util.ArrayUtils.OBJECT_EMPTY_ARRAY;
import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
import java.util.ArrayDeque;
import java.util.Collections;
import java.util.Comparator;
import java.util.Deque;
+import java.util.HashSet;
import java.util.List;
import java.util.Objects;
+import java.util.Set;
import java.util.function.Supplier;
import org.apache.calcite.util.ImmutableBitSet;
import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
import org.apache.ignite.internal.sql.engine.exec.RowHandler;
import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorsState;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateRow;
import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
@@ -42,9 +46,6 @@ import
org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
public class SortAggregateNode<RowT> extends AbstractNode<RowT> implements
SingleNode<RowT>, Downstream<RowT> {
private final AggregateType type;
- /** May be {@code null} when there are not accumulators (DISTINCT
aggregate node). */
- private final Supplier<List<AccumulatorWrapper<RowT>>> accFactory;
-
private final RowFactory<RowT> rowFactory;
private final ImmutableBitSet grpSet;
@@ -53,6 +54,8 @@ public class SortAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
private final Deque<RowT> outBuf = new ArrayDeque<>(inBufSize);
+ private final List<AccumulatorWrapper<RowT>> accs;
+
private RowT prevRow;
private Group grp;
@@ -85,10 +88,10 @@ public class SortAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
assert Objects.nonNull(comp);
this.type = type;
- this.accFactory = accFactory;
this.rowFactory = rowFactory;
this.grpSet = grpSet;
this.comp = comp;
+ this.accs = accFactory != null ? accFactory.get() :
Collections.emptyList();
init();
}
@@ -210,10 +213,6 @@ public class SortAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
return this;
}
- private boolean hasAccumulators() {
- return accFactory != null;
- }
-
private Group newGroup(RowT r) {
final Object[] grpKeys = new Object[grpSet.cardinality()];
List<Integer> fldIdxs = grpSet.asList();
@@ -248,16 +247,25 @@ public class SortAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
private Group(Object[] grpKeys) {
this.grpKeys = grpKeys;
- List<AccumulatorWrapper<RowT>> wrappers = hasAccumulators() ?
accFactory.get() : Collections.emptyList();
- aggRow = new AggregateRow<>(wrappers, type);
+ AccumulatorsState state = new AccumulatorsState(accs.size());
+
+ Int2ObjectArrayMap<Set<Object>> distinctSets = new
Int2ObjectArrayMap<>();
+ for (int i = 0; i < accs.size(); i++) {
+ AccumulatorWrapper<RowT> acc = accs.get(i);
+ if (acc.isDistinct()) {
+ distinctSets.put(i, new HashSet<>());
+ }
+ }
+
+ aggRow = new AggregateRow<>(state, distinctSets);
}
private void add(RowT row) {
- aggRow.update(grpSet, context().rowHandler(), row);
+ aggRow.update(accs, grpSet, context().rowHandler(), row);
}
private RowT row() {
- Object[] fields = aggRow.createOutput(grpSet,
AggregateRow.NO_GROUP_ID);
+ Object[] fields = aggRow.createOutput(type, accs, grpSet,
AggregateRow.NO_GROUP_ID);
int i = 0;
@@ -265,7 +273,7 @@ public class SortAggregateNode<RowT> extends
AbstractNode<RowT> implements Singl
fields[i++] = grpKey;
}
- aggRow.writeTo(fields, grpSet, AggregateRow.NO_GROUP_ID);
+ aggRow.writeTo(type, accs, fields, grpSet,
AggregateRow.NO_GROUP_ID);
return rowFactory.create(fields);
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AnyValAccumulatorTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AnyValAccumulatorTest.java
new file mode 100644
index 0000000000..a3e7a8a6a8
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AnyValAccumulatorTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.function.Supplier;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.AnyVal;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@code ANY_VAL} accumulator function.
+ */
+public class AnyValAccumulatorTest extends BaseIgniteAbstractTest {
+
+ @Test
+ public void test() {
+ StatefulAccumulator accumulator = newCall();
+
+ accumulator.add(new Object[]{null});
+ accumulator.add("1");
+ accumulator.add("2");
+
+ assertEquals("1", accumulator.end());
+ }
+
+ @Test
+ public void empty() {
+ StatefulAccumulator accumulator = newCall();
+
+ assertNull(accumulator.end());
+ }
+
+ private StatefulAccumulator newCall() {
+ Supplier<Accumulator> supplier =
AnyVal.newAccumulator(Commons.typeFactory().createSqlType(SqlTypeName.VARCHAR));
+ return new StatefulAccumulator(supplier);
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DecimalAvgAccumulatorTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DecimalAvgAccumulatorTest.java
new file mode 100644
index 0000000000..0e30e80e14
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DecimalAvgAccumulatorTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.math.BigDecimal;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DecimalAvg;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@code AVG(DECIMAL)} accumulator function.
+ */
+public class DecimalAvgAccumulatorTest extends BaseIgniteAbstractTest {
+
+ @Test
+ public void testScale3() {
+ StatefulAccumulator acc = newCall(2);
+
+ acc.add(new BigDecimal("50"));
+ acc.add(new BigDecimal("20"));
+ acc.add(new Object[]{null});
+ acc.add(new BigDecimal("30"));
+
+ assertEquals(new BigDecimal("33.33"), acc.end());
+ }
+
+ @Test
+ public void testScale0() {
+ StatefulAccumulator acc = newCall(0);
+
+ acc.add(new BigDecimal("50"));
+ acc.add(new BigDecimal("20"));
+ acc.add(new Object[]{null});
+ acc.add(new BigDecimal("30"));
+
+ assertEquals(new BigDecimal("33"), acc.end());
+ }
+
+ @Test
+ public void empty() {
+ StatefulAccumulator acc = newCall(3);
+
+ assertNull(acc.end());
+ }
+
+ private StatefulAccumulator newCall(int scale) {
+ return new StatefulAccumulator(DecimalAvg.FACTORY.apply(scale));
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DoubleAvgAccumulatorTest.java
similarity index 53%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DoubleAvgAccumulatorTest.java
index dcb7d41831..2e2f0e320d 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DoubleAvgAccumulatorTest.java
@@ -17,19 +17,36 @@
package org.apache.ignite.internal.sql.engine.exec.exp.agg;
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DoubleAvg;
+import org.junit.jupiter.api.Test;
/**
- * Accumulator interface.
+ * Tests for {@code AVG(DOUBLE)} accumulator function.
*/
-public interface Accumulator {
- void add(Object... args);
+public class DoubleAvgAccumulatorTest {
+
+ @Test
+ public void test() {
+ StatefulAccumulator acc = newCall();
+
+ acc.add(3.0d);
+ acc.add(new Object[]{null});
+ acc.add(1.0d);
+
+ assertEquals(2.0d, acc.end());
+ }
- Object end();
+ @Test
+ public void empty() {
+ StatefulAccumulator acc = newCall();
- List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+ assertNull(acc.end());
+ }
- RelDataType returnType(IgniteTypeFactory typeFactory);
+ private StatefulAccumulator newCall() {
+ return new StatefulAccumulator(DoubleAvg.FACTORY);
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LiteralValAccumulatorTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LiteralValAccumulatorTest.java
new file mode 100644
index 0000000000..b766d303b8
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LiteralValAccumulatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.function.Supplier;
+import org.apache.calcite.sql.type.SqlTypeName;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.LiteralVal;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@code LITERAL_AGG(EXPR)}.
+ */
+public class LiteralValAccumulatorTest extends BaseIgniteAbstractTest {
+
+ @Test
+ public void test() {
+ StatefulAccumulator accumulator = newCall();
+
+ accumulator.add("1");
+ accumulator.add("2");
+
+ assertEquals(true, accumulator.end());
+ }
+
+ @Test
+ public void empty() {
+ StatefulAccumulator accumulator = newCall();
+
+ assertEquals(false, accumulator.end());
+ }
+
+ private StatefulAccumulator newCall() {
+ Supplier<Accumulator> supplier =
LiteralVal.newAccumulator(Commons.typeFactory().createSqlType(SqlTypeName.VARCHAR));
+ return new StatefulAccumulator(supplier);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LongCountAccumulatorTest.java
similarity index 55%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LongCountAccumulatorTest.java
index dcb7d41831..282e2a9030 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LongCountAccumulatorTest.java
@@ -17,19 +17,35 @@
package org.apache.ignite.internal.sql.engine.exec.exp.agg;
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.LongCount;
+import org.junit.jupiter.api.Test;
/**
- * Accumulator interface.
+ * Tests for {@code COUNT(BIGINT)} accumulator function.
*/
-public interface Accumulator {
- void add(Object... args);
+public class LongCountAccumulatorTest {
+
+ @Test
+ public void test() {
+ StatefulAccumulator acc = newCall();
+
+ acc.add(1L);
+ acc.add(new Object[]{null});
+ acc.add(2L);
+
+ assertEquals(2L, acc.end());
+ }
- Object end();
+ @Test
+ public void empty() {
+ StatefulAccumulator acc = newCall();
- List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+ assertEquals(0L, acc.end());
+ }
- RelDataType returnType(IgniteTypeFactory typeFactory);
+ private StatefulAccumulator newCall() {
+ return new StatefulAccumulator(LongCount.FACTORY);
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MinMaxAccumulatorTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MinMaxAccumulatorTest.java
new file mode 100644
index 0000000000..242278f517
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MinMaxAccumulatorTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.rel.type.RelDataType;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.MinMaxAccumulator;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.VarBinaryMinMax;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.VarCharMinMax;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypeSpec;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@code MIN/MAX} accumulator functions.
+ */
+public class MinMaxAccumulatorTest {
+
+ @ParameterizedTest
+ @MethodSource("minMaxArgs")
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void min(NativeType nativeType, Comparable[] vals) {
+ StatefulAccumulator acc = newCall(true, nativeType);
+
+ for (var val : vals) {
+ acc.add(val);
+ }
+
+ acc.add(new Object[]{null});
+
+ Object result =
Arrays.stream(vals).min(Comparator.naturalOrder()).orElseThrow();
+
+ assertEquals(result, acc.end());
+ }
+
+ @ParameterizedTest
+ @MethodSource("minMaxArgs")
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ public void max(NativeType nativeType, Comparable[] vals) {
+ StatefulAccumulator acc = newCall(false, nativeType);
+
+ for (var val : vals) {
+ acc.add(val);
+ }
+
+ acc.add(new Object[]{null});
+
+ Object result =
Arrays.stream(vals).max(Comparator.naturalOrder()).orElseThrow();
+
+ assertEquals(result, acc.end());
+ }
+
+ private static Stream<Arguments> minMaxArgs() {
+ return Stream.of(
+ Arguments.of(NativeTypes.INT8, new Comparable[]{(byte) 2,
(byte) 1}),
+ Arguments.of(NativeTypes.INT16, new Comparable[]{(short) 2,
(short) 1}),
+ Arguments.of(NativeTypes.INT32, new Comparable[]{2, 1}),
+ Arguments.of(NativeTypes.INT64, new Comparable[]{2L, 1L}),
+ Arguments.of(NativeTypes.FLOAT, new Comparable[]{2.0f, 1.0f}),
+ Arguments.of(NativeTypes.DOUBLE, new Comparable[]{2.0d, 1.0d}),
+ Arguments.of(NativeTypes.decimalOf(2, 1), new Comparable[]{new
BigDecimal("20.1"), new BigDecimal(("10.8"))}),
+ Arguments.of(NativeTypes.stringOf(10), new Comparable[]{"abc",
"abcd"}),
+ Arguments.of(NativeTypes.stringOf(10), new Comparable[]{"a",
"b"}),
+ Arguments.of(NativeTypes.blobOf(10), new
Comparable[]{newByteString("abc"), newByteString("abcd")}),
+ Arguments.of(NativeTypes.blobOf(10), new
Comparable[]{newByteString("a"), newByteString("b")})
+ );
+ }
+
+ private static ByteString newByteString(String str) {
+ return new ByteString(str.getBytes(StandardCharsets.UTF_8));
+ }
+
+ @ParameterizedTest
+ @MethodSource("emptyArgs")
+ public void rejectNull(NativeType nativeType) {
+ StatefulAccumulator acc = newCall(true, nativeType);
+
+ assertThrows(NullPointerException.class, () -> acc.add(null));
+ }
+
+ private static Stream<NativeType> emptyArgs() {
+ return Stream.of(
+ NativeTypes.INT8,
+ NativeTypes.INT16,
+ NativeTypes.INT32,
+ NativeTypes.INT64,
+ NativeTypes.FLOAT,
+ NativeTypes.DOUBLE,
+ NativeTypes.decimalOf(2, 1)
+ );
+ }
+
+ private StatefulAccumulator newCall(boolean min, NativeType nativeType) {
+ IgniteTypeFactory tf = Commons.typeFactory();
+ RelDataType dataType = TypeUtils.native2relationalType(tf, nativeType);
+
+ Supplier<Accumulator> supplier;
+ if (nativeType.spec() == NativeTypeSpec.STRING) {
+ supplier = min ? VarCharMinMax.MIN_FACTORY :
VarCharMinMax.MAX_FACTORY;
+ } else if (nativeType.spec() == NativeTypeSpec.BYTES) {
+ supplier = min ? VarBinaryMinMax.MIN_FACTORY :
VarBinaryMinMax.MAX_FACTORY;
+ } else {
+ supplier = MinMaxAccumulator.newAccumulator(min, tf, dataType);
+ }
+
+ return new StatefulAccumulator(supplier);
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SingleValAccumulatorTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SingleValAccumulatorTest.java
new file mode 100644
index 0000000000..2681e96bce
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SingleValAccumulatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.function.Supplier;
+import org.apache.calcite.sql.type.SqlTypeName;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.SingleVal;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@code SINGLE_VAL(SUBQUERY)} accumulator function.
+ */
+public class SingleValAccumulatorTest {
+
+ @Test
+ public void test() {
+ StatefulAccumulator acc = newCall();
+ acc.add("1");
+
+ assertThrowsSqlException(Sql.RUNTIME_ERR, "Subquery returned more
than 1 value.", () -> acc.add("2"));
+ assertThrowsSqlException(Sql.RUNTIME_ERR, "Subquery returned more
than 1 value.", () -> acc.add(new Object[]{null}));
+
+ assertEquals("1", acc.end());
+ }
+
+ @Test
+ public void nullFirst() {
+ StatefulAccumulator acc = newCall();
+
+ acc.add(new Object[]{null});
+
+ assertThrowsSqlException(Sql.RUNTIME_ERR, "Subquery returned more
than 1 value.", () -> acc.add(new Object[]{null}));
+ assertThrowsSqlException(Sql.RUNTIME_ERR, "Subquery returned more
than 1 value.", () -> acc.add("2"));
+
+ assertNull(acc.end());
+ }
+
+ @Test
+ public void empty() {
+ StatefulAccumulator acc = newCall();
+
+ assertNull(acc.end());
+ }
+
+ private StatefulAccumulator newCall() {
+ Supplier<Accumulator> supplier =
SingleVal.newAccumulator(Commons.typeFactory().createSqlType(SqlTypeName.VARCHAR));
+ return new StatefulAccumulator(supplier);
+ }
+}
diff --git
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/StatefulAccumulator.java
similarity index 53%
copy from
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/StatefulAccumulator.java
index dcb7d41831..42b859ea93 100644
---
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/StatefulAccumulator.java
@@ -17,19 +17,34 @@
package org.apache.ignite.internal.sql.engine.exec.exp.agg;
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
/**
- * Accumulator interface.
+ * A helper class for testing accumulator functions.
*/
-public interface Accumulator {
- void add(Object... args);
+public final class StatefulAccumulator {
- Object end();
+ private final Accumulator accumulator;
- List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+ private final AccumulatorsState state = new AccumulatorsState(1);
- RelDataType returnType(IgniteTypeFactory typeFactory);
+ private final AccumulatorsState result = new AccumulatorsState(1);
+
+ public StatefulAccumulator(Supplier<? extends Accumulator> supplier) {
+ this(supplier.get());
+ }
+
+ public StatefulAccumulator(Accumulator accumulator) {
+ this.accumulator = accumulator;
+ }
+
+ public void add(Object... args) {
+ accumulator.add(state, args);
+ }
+
+ public @Nullable Object end() {
+ accumulator.end(state, result);
+ return result.get();
+ }
}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumAccumulatorTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumAccumulatorTest.java
new file mode 100644
index 0000000000..a9a87b1caf
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumAccumulatorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.math.BigDecimal;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DecimalSumEmptyIsZero;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DoubleSumEmptyIsZero;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.LongSumEmptyIsZero;
+import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.Sum;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@code SUM} accumulator functions.
+ */
+public class SumAccumulatorTest extends BaseIgniteAbstractTest {
+
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ public void test(Accumulator sum, Object result, Object[] args) {
+ StatefulAccumulator acc = newCall(sum);
+
+ for (var a : args) {
+ acc.add(a);
+ }
+
+ assertEquals(result, acc.end());
+ }
+
+ private static Stream<Arguments> testArgs() {
+ return Stream.of(
+ Arguments.of(namedAccumulator(DoubleSumEmptyIsZero.FACTORY),
4.0d, new Object[]{3.0d, 1.0d}),
+ Arguments.of(namedAccumulator(LongSumEmptyIsZero.FACTORY), 4L,
new Object[]{3L, 1L}),
+ Arguments.of(namedAccumulator(DecimalSumEmptyIsZero.FACTORY),
new BigDecimal("3.4"),
+ new Object[]{new BigDecimal("1.3"), new
BigDecimal("2.1")})
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("emptyArgs")
+ public void empty(Accumulator sum) {
+ StatefulAccumulator acc = newCall(sum);
+
+ assertNull(acc.end());
+ }
+
+ private static Stream<Arguments> emptyArgs() {
+ return Stream.of(
+ Arguments.of(namedAccumulator(DoubleSumEmptyIsZero.FACTORY)),
+ Arguments.of(namedAccumulator(LongSumEmptyIsZero.FACTORY)),
+ Arguments.of(namedAccumulator(DecimalSumEmptyIsZero.FACTORY))
+ );
+ }
+
+ private StatefulAccumulator newCall(Accumulator accumulator) {
+ return new StatefulAccumulator(new Sum(accumulator));
+ }
+
+ private static Named<Accumulator> namedAccumulator(Supplier<Accumulator>
supplier) {
+ Accumulator accumulator = supplier.get();
+ return Named.of(accumulator.getClass().getName(), accumulator);
+ }
+}
diff --git
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumIsZeroAccumulatorTest.java
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumIsZeroAccumulatorTest.java
new file mode 100644
index 0000000000..f27db0c851
--- /dev/null
+++
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumIsZeroAccumulatorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.math.BigDecimal;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DecimalSumEmptyIsZero;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DoubleSumEmptyIsZero;
+import
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.LongSumEmptyIsZero;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@code $SUM0} accumulator functions.
+ */
+public class SumIsZeroAccumulatorTest extends BaseIgniteAbstractTest {
+
+ @ParameterizedTest
+ @MethodSource("testArgs")
+ public void test(Accumulator sum, Object result, Object[] args) {
+ StatefulAccumulator acc = newCall(sum);
+
+ for (var a : args) {
+ acc.add(a);
+ }
+
+ acc.add(new Object[]{null});
+
+ assertEquals(result, acc.end());
+ }
+
+ private static Stream<Arguments> testArgs() {
+ return Stream.of(
+ Arguments.of(namedAccumulator(DoubleSumEmptyIsZero.FACTORY),
4.0d, new Object[]{3.0d, 1.0d}),
+ Arguments.of(namedAccumulator(LongSumEmptyIsZero.FACTORY), 4L,
new Object[]{3L, 1L}),
+ Arguments.of(namedAccumulator(DecimalSumEmptyIsZero.FACTORY),
new BigDecimal("3.4"),
+ new Object[]{new BigDecimal("1.3"), new
BigDecimal("2.1")})
+ );
+ }
+
+ @ParameterizedTest
+ @MethodSource("zeroArgs")
+ public void zero(Accumulator sum, Object zero) {
+ StatefulAccumulator acc = newCall(sum);
+
+ assertEquals(zero, acc.end());
+ }
+
+ private static Stream<Arguments> zeroArgs() {
+ return Stream.of(
+ Arguments.of(namedAccumulator(DoubleSumEmptyIsZero.FACTORY),
0.0d),
+ Arguments.of(namedAccumulator(LongSumEmptyIsZero.FACTORY), 0L),
+ Arguments.of(namedAccumulator(DecimalSumEmptyIsZero.FACTORY),
BigDecimal.ZERO)
+ );
+ }
+
+ private static StatefulAccumulator newCall(Accumulator sum) {
+ return new StatefulAccumulator(sum);
+ }
+
+ private static Named<Accumulator> namedAccumulator(Supplier<Accumulator>
supplier) {
+ Accumulator accumulator = supplier.get();
+ return Named.of(accumulator.getClass().getName(), accumulator);
+ }
+}