This is an automated email from the ASF dual-hosted git repository.

jooger pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git


The following commit(s) were added to refs/heads/main by this push:
     new da1523341c IGNITE-23335: Sql. Do not store aggregation state in 
Accumulators (#4522)
da1523341c is described below

commit da1523341c5279c2690555201920888a823b2cba
Author: Max Zhuravkov <[email protected]>
AuthorDate: Tue Oct 15 13:17:00 2024 +0300

    IGNITE-23335: Sql. Do not store aggregation state in Accumulators (#4522)
---
 .../sql/engine/exec/exp/agg/Accumulator.java       |  29 +-
 .../engine/exec/exp/agg/AccumulatorWrapper.java    |  21 +-
 .../sql/engine/exec/exp/agg/Accumulators.java      | 421 ++++++++++++---------
 .../engine/exec/exp/agg/AccumulatorsFactory.java   |  28 +-
 .../sql/engine/exec/exp/agg/AccumulatorsState.java |  64 ++++
 .../sql/engine/exec/exp/agg/AggregateRow.java      |  67 +++-
 .../agg/{Accumulator.java => MutableDouble.java}   |  42 +-
 .../exp/agg/{Accumulator.java => MutableLong.java} |  42 +-
 .../sql/engine/exec/rel/HashAggregateNode.java     |  32 +-
 .../sql/engine/exec/rel/SortAggregateNode.java     |  34 +-
 .../engine/exec/exp/agg/AnyValAccumulatorTest.java |  57 +++
 .../exec/exp/agg/DecimalAvgAccumulatorTest.java    |  67 ++++
 .../exec/exp/agg/DoubleAvgAccumulatorTest.java}    |  35 +-
 .../exec/exp/agg/LiteralValAccumulatorTest.java    |  55 +++
 .../exec/exp/agg/LongCountAccumulatorTest.java}    |  34 +-
 .../engine/exec/exp/agg/MinMaxAccumulatorTest.java | 138 +++++++
 .../exec/exp/agg/SingleValAccumulatorTest.java     |  70 ++++
 .../engine/exec/exp/agg/StatefulAccumulator.java}  |  33 +-
 .../engine/exec/exp/agg/SumAccumulatorTest.java    |  86 +++++
 .../exec/exp/agg/SumIsZeroAccumulatorTest.java     |  86 +++++
 20 files changed, 1154 insertions(+), 287 deletions(-)

diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
index dcb7d41831..8a126fb560 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
@@ -25,11 +25,36 @@ import 
org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
  * Accumulator interface.
  */
 public interface Accumulator {
-    void add(Object... args);
 
-    Object end();
+    /**
+     * Updates this accumulator.
+     *
+     * @param state state of the accumulator.
+     * @param args arguments.
+     */
+    void add(AccumulatorsState state, Object[] args);
 
+    /**
+     * Computes result of this accumulator.
+     *
+     * @param state Accumulator state.
+     * @param result Result holder.
+     */
+    void end(AccumulatorsState state, AccumulatorsState result);
+
+    /**
+     * Returns types of arguments for this accumulator.
+     *
+     * @param typeFactory Type factory.
+     * @return List of argument types.
+     */
     List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
 
+    /**
+     * Returns a result type for this accumulator.
+     *
+     * @param typeFactory Type factory.
+     * @return A result type.
+     */
     RelDataType returnType(IgniteTypeFactory typeFactory);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorWrapper.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorWrapper.java
index 4ecc0edbf7..a5802ca9dd 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorWrapper.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorWrapper.java
@@ -17,12 +17,25 @@
 
 package org.apache.ignite.internal.sql.engine.exec.exp.agg;
 
+import org.jetbrains.annotations.Nullable;
+
 /**
- * AccumulatorWrapper interface.
- * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+ * Adapter that provides means to convert accumulator arguments and return 
types.
  */
 public interface AccumulatorWrapper<RowT> {
-    void add(RowT row);
 
-    Object end();
+    /** Returns {@code true} if the accumulator function should be applied to 
distinct elements. */
+    boolean isDistinct();
+
+    /** Returns the accumulator function. */
+    Accumulator accumulator();
+
+    /**
+     * Creates accumulator arguments from the given row. If this method 
returns {@code null},
+     * then the accumulator function should not be applied to the given row.
+     */
+    Object @Nullable [] getArguments(RowT row);
+
+    /** Converts accumulator result. */
+    Object convertResult(@Nullable Object result);
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
index 54bec16a39..ad63934889 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulators.java
@@ -28,9 +28,7 @@ import static 
org.apache.ignite.internal.util.ArrayUtils.nullOrEmpty;
 
 import java.math.BigDecimal;
 import java.util.Comparator;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 import java.util.function.IntFunction;
 import java.util.function.Supplier;
 import org.apache.calcite.avatica.util.ByteString;
@@ -59,17 +57,10 @@ public class Accumulators {
     }
 
     /**
-     * AccumulatorFactory.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * Returns a supplier that creates a accumulator functions for the given 
aggregate call.
      */
     public Supplier<Accumulator> accumulatorFactory(AggregateCall call) {
-        if (!call.isDistinct()) {
-            return accumulatorFunctionFactory(call);
-        }
-
-        Supplier<Accumulator> fac = accumulatorFunctionFactory(call);
-
-        return () -> new DistinctAccumulator(fac);
+        return accumulatorFunctionFactory(call);
     }
 
     private Supplier<Accumulator> accumulatorFunctionFactory(AggregateCall 
call) {
@@ -214,82 +205,102 @@ public class Accumulators {
     }
 
     /**
-     * SingleVal.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * {@code SINGLE_VALUE(SUBQUERY)} accumulator. Pseudo accumulator that 
returns a first value produced by a subquery and an error if a
+     * subquery returns more than one row.
      */
-    private static class SingleVal extends AnyVal {
-        private boolean touched;
+    public static class SingleVal implements Accumulator {
 
-        private SingleVal(RelDataType type) {
-            super(type);
-        }
+        private final RelDataType type;
 
-        static Supplier<Accumulator> newAccumulator(RelDataType type) {
+        public static Supplier<Accumulator> newAccumulator(RelDataType type) {
             return () -> new SingleVal(type);
         }
 
+        private SingleVal(RelDataType type) {
+            this.type = type;
+        }
+
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
-            if (touched) {
+        public void add(AccumulatorsState state, Object... args) {
+            assert args.length == 1;
+
+            if (state.hasValue()) {
                 throw new SqlException(Sql.RUNTIME_ERR, "Subquery returned 
more than 1 value.");
+            } else {
+                state.set(args[0]);
             }
-            touched = true;
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            result.set(state.get());
+        }
 
-            super.add(args);
+        /** {@inheritDoc} */
+        @Override
+        public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
+            return 
List.of(typeFactory.createTypeWithNullability(typeFactory.createSqlType(ANY), 
true));
+        }
+
+        /** {@inheritDoc} */
+        @Override
+        public RelDataType returnType(IgniteTypeFactory typeFactory) {
+            return type;
         }
     }
 
     /**
-     * LITERAL_AGG accumulator, return {@code true} if incoming data is not 
empty, {@code false} otherwise.
-     * Calcite`s implementation RexImpTable#LiteralAggImplementor.
+     * {@code LITERAL_AGG} accumulator, return {@code true} if incoming data 
is not empty, {@code false} otherwise. Calcite`s implementation
+     * RexImpTable#LiteralAggImplementor.
      */
-    private static class LiteralVal extends AnyVal {
+    public static class LiteralVal extends AnyVal {
         private LiteralVal(RelDataType type) {
             super(type);
         }
 
-        static Supplier<Accumulator> newAccumulator(RelDataType type) {
+        public static Supplier<Accumulator> newAccumulator(RelDataType type) {
             return () -> new LiteralVal(type);
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return holder != null;
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            Object val = state.get();
+            result.set(val != null);
         }
     }
 
     /**
-     * ANY_VALUE accumulator.
+     * {@code ANY_VALUE} accumulator.
      */
-    private static class AnyVal implements Accumulator {
-        protected Object holder;
-
+    public static class AnyVal implements Accumulator {
         private final RelDataType type;
 
         private AnyVal(RelDataType type) {
             this.type = type;
         }
 
-        static Supplier<Accumulator> newAccumulator(RelDataType type) {
+        public static Supplier<Accumulator> newAccumulator(RelDataType type) {
             return () -> new AnyVal(type);
         }
 
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object[] args) {
             assert args.length == 1 : args.length;
 
-            if (holder == null) {
-                holder = args[0];
+            Object current = state.get();
+            if (current == null) {
+                state.set(args[0]);
             }
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return holder;
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            result.set(state.get());
         }
 
         /** {@inheritDoc} */
@@ -306,15 +317,17 @@ public class Accumulators {
     }
 
     /**
-     * DecimalAvg.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * {AVG(DECIMAL)} accumulator.
      */
     public static class DecimalAvg implements Accumulator {
         public static final IntFunction<Accumulator> FACTORY = DecimalAvg::new;
 
-        private BigDecimal sum = BigDecimal.ZERO;
+        /** State. */
+        public static class DecimalAvgState {
+            private BigDecimal sum = BigDecimal.ZERO;
 
-        private BigDecimal cnt = BigDecimal.ZERO;
+            private BigDecimal cnt = BigDecimal.ZERO;
+        }
 
         private final int precision;
 
@@ -327,21 +340,37 @@ public class Accumulators {
 
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object... args) {
             BigDecimal in = (BigDecimal) args[0];
 
             if (in == null) {
                 return;
             }
 
-            sum = sum.add(in);
-            cnt = cnt.add(BigDecimal.ONE);
+            DecimalAvgState sumState = (DecimalAvgState) state.get();
+            if (sumState == null) {
+                sumState = new DecimalAvgState();
+                state.set(sumState);
+            }
+
+            sumState.sum = sumState.sum.add(in);
+            sumState.cnt = sumState.cnt.add(BigDecimal.ONE);
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return cnt.compareTo(BigDecimal.ZERO) == 0 ? null : 
IgniteMath.decimalDivide(sum, cnt, precision, scale);
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            DecimalAvgState sumState = (DecimalAvgState) state.get();
+
+            if (sumState == null) {
+                result.set(null);
+            } else {
+                if (sumState.cnt.compareTo(BigDecimal.ZERO) == 0) {
+                    result.set(null);
+                } else {
+                    result.set(IgniteMath.decimalDivide(sumState.sum, 
sumState.cnt, precision, scale));
+                }
+            }
         }
 
         /** {@inheritDoc} */
@@ -358,33 +387,51 @@ public class Accumulators {
     }
 
     /**
-     * DoubleAvg.
-     * TODO Documentation https://issues.apache.org/jira/browse/IGNITE-15859
+     * {@code AVG(DOUBLE)} accumulator.
      */
     public static class DoubleAvg implements Accumulator {
         public static final Supplier<Accumulator> FACTORY = DoubleAvg::new;
 
-        private double sum;
+        /** State. */
+        public static class DoubleAvgState {
+            private double sum;
 
-        private long cnt;
+            private long cnt;
+        }
 
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object... args) {
             Double in = (Double) args[0];
 
             if (in == null) {
                 return;
             }
 
-            sum += in;
-            cnt++;
+            DoubleAvgState avgState = (DoubleAvgState) state.get();
+            if (avgState == null) {
+                avgState = new DoubleAvgState();
+                state.set(avgState);
+            }
+
+            avgState.sum += in;
+            avgState.cnt++;
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return cnt > 0 ? sum / cnt : null;
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            DoubleAvgState avgState = (DoubleAvgState) state.get();
+            if (avgState == null) {
+                result.set(null);
+            } else {
+                if (avgState.cnt > 0) {
+                    result.set(avgState.sum / avgState.cnt);
+                } else {
+                    result.set(null);
+                }
+            }
+
         }
 
         /** {@inheritDoc} */
@@ -400,25 +447,36 @@ public class Accumulators {
         }
     }
 
-    private static class LongCount implements Accumulator {
+    /** {@code COUNT(LONG)} accumulator.. */
+    public static class LongCount implements Accumulator {
         public static final Supplier<Accumulator> FACTORY = LongCount::new;
 
-        private long cnt;
-
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object... args) {
             assert nullOrEmpty(args) || args.length == 1;
 
             if (nullOrEmpty(args) || args[0] != null) {
-                cnt++;
+                MutableLong cnt = (MutableLong) state.get();
+
+                if (cnt == null) {
+                    cnt = new MutableLong();
+                    state.set(cnt);
+                }
+
+                cnt.add(1);
             }
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return cnt;
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            if (state.get() == null) {
+                result.set(0L);
+            } else {
+                MutableLong cnt = (MutableLong) state.get();
+                result.set(cnt.longValue());
+            }
         }
 
         /** {@inheritDoc} */
@@ -434,62 +492,77 @@ public class Accumulators {
         }
     }
 
-    private static class Sum implements Accumulator {
-        private Accumulator acc;
-
-        private boolean empty = true;
+    /** Wraps another sum accumulator and returns {@code null} if there was 
updates. */
+    public static class Sum implements Accumulator {
+        private final Accumulator acc;
 
         public Sum(Accumulator acc) {
             this.acc = acc;
         }
 
         /** {@inheritDoc} */
-        @Override public void add(Object... args) {
+        @Override
+        public void add(AccumulatorsState state, Object... args) {
             if (args[0] == null) {
                 return;
             }
 
-            empty = false;
-            acc.add(args[0]);
+            acc.add(state, args);
         }
 
         /** {@inheritDoc} */
-        @Override public Object end() {
-            return empty ? null : acc.end();
+        @Override
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            if (!state.hasValue()) {
+                result.set(null);
+            } else {
+                acc.end(state, result);
+            }
         }
 
         /** {@inheritDoc} */
-        @Override public List<RelDataType> argumentTypes(IgniteTypeFactory 
typeFactory) {
+        @Override
+        public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
             return acc.argumentTypes(typeFactory);
         }
 
         /** {@inheritDoc} */
-        @Override public RelDataType returnType(IgniteTypeFactory typeFactory) 
{
+        @Override
+        public RelDataType returnType(IgniteTypeFactory typeFactory) {
             return acc.returnType(typeFactory);
         }
     }
 
-    private static class DoubleSumEmptyIsZero implements Accumulator {
+    /** {@code SUM(DOUBLE)} accumulator. */
+    public static class DoubleSumEmptyIsZero implements Accumulator {
         public static final Supplier<Accumulator> FACTORY = 
DoubleSumEmptyIsZero::new;
 
-        private double sum;
-
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object... args) {
             Double in = (Double) args[0];
 
             if (in == null) {
                 return;
             }
 
-            sum += in;
+            MutableDouble sum = (MutableDouble) state.get();
+            if (sum == null) {
+                sum = new MutableDouble();
+                state.set(sum);
+            }
+            sum.add(in);
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return sum;
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            if (!state.hasValue()) {
+                result.set(0.0d);
+            } else {
+                MutableDouble sum = (MutableDouble) state.get();
+                result.set(sum.doubleValue());
+            }
         }
 
         /** {@inheritDoc} */
@@ -505,27 +578,36 @@ public class Accumulators {
         }
     }
 
-    private static class LongSumEmptyIsZero implements Accumulator {
+    /** {@code SUM(LONG)} accumulator. */
+    public static class LongSumEmptyIsZero implements Accumulator {
         public static final Supplier<Accumulator> FACTORY = 
LongSumEmptyIsZero::new;
 
-        private long sum;
-
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object... args) {
             Long in = (Long) args[0];
 
             if (in == null) {
                 return;
             }
 
-            sum += in;
+            MutableLong sum = (MutableLong) state.get();
+            if (sum == null) {
+                sum = new MutableLong();
+                state.set(sum);
+            }
+            sum.add(in);
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return sum;
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            if (!state.hasValue()) {
+                result.set(0L);
+            } else {
+                MutableLong sum = (MutableLong) state.get();
+                result.set(sum.longValue());
+            }
         }
 
         /** {@inheritDoc} */
@@ -541,27 +623,35 @@ public class Accumulators {
         }
     }
 
-    private static class DecimalSumEmptyIsZero implements Accumulator {
+    /** SUM(DECIMAL) accumulator. */
+    public static class DecimalSumEmptyIsZero implements Accumulator {
         public static final Supplier<Accumulator> FACTORY = 
DecimalSumEmptyIsZero::new;
 
-        private BigDecimal sum = BigDecimal.ZERO;
-
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object... args) {
             BigDecimal in = (BigDecimal) args[0];
 
             if (in == null) {
                 return;
             }
 
-            sum = sum.add(in);
+            BigDecimal sum = (BigDecimal) state.get();
+            if (sum == null) {
+                state.set(in);
+            } else {
+                state.set(sum.add(in));
+            }
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return sum;
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            if (!state.hasValue()) {
+                result.set(BigDecimal.ZERO);
+            } else {
+                result.set(state.get());
+            }
         }
 
         /** {@inheritDoc} */
@@ -577,7 +667,8 @@ public class Accumulators {
         }
     }
 
-    private static final class MinMaxAccumulator implements Accumulator {
+    /** {@code MIN/MAX} accumulator. */
+    public static final class MinMaxAccumulator implements Accumulator {
 
         private final boolean min;
 
@@ -585,9 +676,6 @@ public class Accumulators {
 
         private final RelDataType returnType;
 
-        @SuppressWarnings({"rawtypes"})
-        private Comparable val;
-
         private MinMaxAccumulator(boolean min, RelDataTypeFactory typeFactory, 
RelDataType relDataType) {
             var nullableType = 
typeFactory.createTypeWithNullability(relDataType, true);
 
@@ -596,23 +684,32 @@ public class Accumulators {
             this.returnType = nullableType;
         }
 
-        static Supplier<Accumulator> newAccumulator(boolean min, 
RelDataTypeFactory typeFactory, RelDataType type) {
+        public static Supplier<Accumulator> newAccumulator(boolean min, 
RelDataTypeFactory typeFactory, RelDataType type) {
             return () -> new MinMaxAccumulator(min, typeFactory, type);
         }
 
         /** {@inheritDoc} **/
         @Override
         @SuppressWarnings({"rawtypes"})
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object... args) {
             Comparable in = (Comparable) args[0];
 
-            doApply(in);
+            if (in == null) {
+                return;
+            }
+
+            Comparable current = (Comparable) state.get();
+            if (current == null) {
+                state.set(in);
+            } else {
+                state.set(doApply(in, current));
+            }
         }
 
         /** {@inheritDoc} **/
         @Override
-        public Object end() {
-            return val;
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            result.set(state.get());
         }
 
         /** {@inheritDoc} **/
@@ -628,11 +725,7 @@ public class Accumulators {
         }
 
         @SuppressWarnings({"rawtypes", "unchecked"})
-        private void doApply(Comparable in) {
-            if (in == null) {
-                return;
-            }
-
+        private Comparable doApply(Comparable in, Comparable val) {
             if (val == null) {
                 val = in;
             } else {
@@ -643,44 +736,48 @@ public class Accumulators {
                     val = cmp < 0 ? in : val;
                 }
             }
+            return val;
         }
     }
 
-    private static class VarCharMinMax implements Accumulator {
+    /** {@code MIN/MAX} for {@code VARCHAR} type. */
+    public static class VarCharMinMax implements Accumulator {
         public static final Supplier<Accumulator> MIN_FACTORY = () -> new 
VarCharMinMax(true);
 
         public static final Supplier<Accumulator> MAX_FACTORY = () -> new 
VarCharMinMax(false);
 
         private final boolean min;
 
-        private CharSequence val;
-
-        private boolean empty = true;
-
-        private VarCharMinMax(boolean min) {
+        VarCharMinMax(boolean min) {
             this.min = min;
         }
 
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object... args) {
             CharSequence in = (CharSequence) args[0];
 
             if (in == null) {
                 return;
             }
 
-            val = empty ? in : min
-                    ? (CharSeqComparator.INSTANCE.compare(val, in) < 0 ? val : 
in) :
-                    (CharSeqComparator.INSTANCE.compare(val, in) < 0 ? in : 
val);
+            CharSequence val = (CharSequence) state.get();
+
+            if (val == null) {
+                val = in;
+            } else if (min) {
+                val = CharSeqComparator.INSTANCE.compare(val, in) < 0 ? val : 
in;
+            } else {
+                val = CharSeqComparator.INSTANCE.compare(val, in) < 0 ? in : 
val;
+            }
 
-            empty = false;
+            state.set(val);
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return empty ? null : val;
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            result.set(state.get());
         }
 
         /** {@inheritDoc} */
@@ -717,103 +814,57 @@ public class Accumulators {
         }
     }
 
-    private static class VarBinaryMinMax implements Accumulator {
+    /** {@code MIN/MAX} for {@code VARBINARY} type. */
+    public static class VarBinaryMinMax implements Accumulator {
 
         public static final Supplier<Accumulator> MIN_FACTORY = () -> new 
VarBinaryMinMax(true);
 
-
         public static final Supplier<Accumulator> MAX_FACTORY = () -> new 
VarBinaryMinMax(false);
 
-
         private final boolean min;
 
-
-        private ByteString val;
-
-
-        private boolean empty = true;
-
-
-        private VarBinaryMinMax(boolean min) {
+        VarBinaryMinMax(boolean min) {
             this.min = min;
         }
 
         /** {@inheritDoc} */
         @Override
-        public void add(Object... args) {
+        public void add(AccumulatorsState state, Object... args) {
             ByteString in = (ByteString) args[0];
 
             if (in == null) {
                 return;
             }
 
-            val = empty ? in : min
-                    ? (val.compareTo(in) < 0 ? val : in)
-                    : (val.compareTo(in) < 0 ? in : val);
-
-            empty = false;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public Object end() {
-            return empty ? null : val;
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
-            return 
ArrayUtils.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARBINARY),
 true));
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public RelDataType returnType(IgniteTypeFactory typeFactory) {
-            return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARBINARY), 
true);
-        }
-    }
-
-    private static class DistinctAccumulator implements Accumulator {
-        private final Accumulator acc;
-
-        private final Set<Object> set = new HashSet<>();
+            ByteString val = (ByteString) state.get();
 
-        private DistinctAccumulator(Supplier<Accumulator> accSup) {
-            this.acc = accSup.get();
-        }
-
-        /** {@inheritDoc} */
-        @Override
-        public void add(Object... args) {
-            Object in = args[0];
-
-            if (in == null) {
-                return;
+            if (val == null) {
+                val = in;
+            } else if (min) {
+                val = val.compareTo(in) < 0 ? val : in;
+            } else {
+                val = val.compareTo(in) < 0 ? in : val;
             }
 
-            set.add(in);
+            state.set(val);
         }
 
         /** {@inheritDoc} */
         @Override
-        public Object end() {
-            for (Object o : set) {
-                acc.add(o);
-            }
-
-            return acc.end();
+        public void end(AccumulatorsState state, AccumulatorsState result) {
+            result.set(state.get());
         }
 
         /** {@inheritDoc} */
         @Override
         public List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory) {
-            return acc.argumentTypes(typeFactory);
+            return 
ArrayUtils.asList(typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARBINARY),
 true));
         }
 
         /** {@inheritDoc} */
         @Override
         public RelDataType returnType(IgniteTypeFactory typeFactory) {
-            return acc.returnType(typeFactory);
+            return 
typeFactory.createTypeWithNullability(typeFactory.createSqlType(VARBINARY), 
true);
         }
     }
 
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
index 5d1599bebf..848c6d6a09 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsFactory.java
@@ -51,6 +51,7 @@ import 
org.apache.ignite.internal.sql.engine.exec.exp.RexToLixTranslator;
 import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
 import org.apache.ignite.internal.sql.engine.util.Commons;
 import org.apache.ignite.internal.sql.engine.util.Primitives;
+import org.jetbrains.annotations.Nullable;
 
 /**
  * AccumulatorsFactory.
@@ -274,7 +275,7 @@ public class AccumulatorsFactory<RowT> implements 
Supplier<List<AccumulatorWrapp
             this.accumulator = accumulator;
             this.inAdapter = inAdapter;
             this.outAdapter = outAdapter;
-            distinct = call.isDistinct();
+            this.distinct = call.isDistinct();
 
             // need to be refactored after 
https://issues.apache.org/jira/browse/IGNITE-22320
             literalAgg = call.getAggregation() == LITERAL_AGG;
@@ -285,11 +286,20 @@ public class AccumulatorsFactory<RowT> implements 
Supplier<List<AccumulatorWrapp
             handler = ctx.rowHandler();
         }
 
-        /** {@inheritDoc} */
         @Override
-        public void add(RowT row) {
+        public boolean isDistinct() {
+            return distinct;
+        }
+
+        @Override
+        public Accumulator accumulator() {
+            return accumulator;
+        }
+
+        @Override
+        public Object @Nullable [] getArguments(RowT row) {
             if (type != AggregateType.REDUCE && filterArg >= 0 && 
!Boolean.TRUE.equals(handler.get(filterArg, row))) {
-                return;
+                return null;
             }
 
             int params = argList.size();
@@ -309,18 +319,16 @@ public class AccumulatorsFactory<RowT> implements 
Supplier<List<AccumulatorWrapp
                 args[i] = handler.get(argPos, row);
 
                 if (ignoreNulls && args[i] == null) {
-                    return;
+                    return null;
                 }
             }
 
-            accumulator.add(inAdapter.apply(args));
+            return inAdapter.apply(args);
         }
 
-        /** {@inheritDoc} */
         @Override
-        public Object end() {
-            return outAdapter.apply(accumulator.end());
+        public Object convertResult(Object result) {
+            return outAdapter.apply(result);
         }
-
     }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsState.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsState.java
new file mode 100644
index 0000000000..d7fa472908
--- /dev/null
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AccumulatorsState.java
@@ -0,0 +1,64 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import java.util.BitSet;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Stores states of multiple accumulator functions.
+ */
+public class AccumulatorsState {
+
+    private final Object[] row;
+
+    private final BitSet set = new BitSet();
+
+    private int index;
+
+    /** Constructor. */
+    public AccumulatorsState(int rowSize) {
+        this.row = new Object[rowSize];
+    }
+
+    /** Sets current field index. */
+    public void setIndex(int i) {
+        this.index = i;
+    }
+
+    /** Resets current field index. */
+    public void resetIndex() {
+        this.index = -1;
+    }
+
+    /** Returns a value of the current field. */
+    public @Nullable Object get() {
+        return row[index];
+    }
+
+    /** Set a value of the current field. */
+    public void set(@Nullable Object value) {
+        row[index] = value;
+        set.set(index);
+    }
+
+    /** Returns {@code true} if current field has been set. */
+    public boolean hasValue() {
+        return set.get(index);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AggregateRow.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AggregateRow.java
index acf7b5089b..9437ec87d8 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AggregateRow.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AggregateRow.java
@@ -17,25 +17,30 @@
 
 package org.apache.ignite.internal.sql.engine.exec.exp.agg;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
 import java.util.List;
+import java.util.Set;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 
 /**
  * Per group/grouping set row that contains state of accumulators.
  */
-public final class AggregateRow<RowT> {
+public class AggregateRow<RowT> {
     /** A placeholder of absent group id. */
     public static final byte NO_GROUP_ID = -1;
 
-    private final List<AccumulatorWrapper<RowT>> accs;
+    private final AccumulatorsState state;
 
-    private final AggregateType type;
+    private final Int2ObjectArrayMap<Set<Object>> distinctSets;
 
     /** Constructor. */
-    public AggregateRow(List<AccumulatorWrapper<RowT>> accs, AggregateType 
type) {
-        this.type = type;
-        this.accs = accs;
+    public AggregateRow(
+            AccumulatorsState state,
+            Int2ObjectArrayMap<Set<Object>> distinctSets
+    ) {
+        this.state = state;
+        this.distinctSets = distinctSets;
     }
 
     /** Initialized an empty group if necessary. */
@@ -56,14 +61,30 @@ public final class AggregateRow<RowT> {
     }
 
     /** Updates this row by using data of the given row. */
-    public void update(ImmutableBitSet allFields, RowHandler<RowT> handler, 
RowT row) {
-        for (AccumulatorWrapper<RowT> acc : accs) {
-            acc.add(row);
+    public void update(List<AccumulatorWrapper<RowT>> accs, ImmutableBitSet 
allFields, RowHandler<RowT> handler, RowT row) {
+        for (int i = 0; i < accs.size(); i++) {
+            AccumulatorWrapper<RowT> acc = accs.get(i);
+
+            Object[] args = acc.getArguments(row);
+            if (args == null) {
+                continue;
+            }
+
+            state.setIndex(i);
+
+            if (acc.isDistinct()) {
+                Set<Object> distinctSet = distinctSets.get(i);
+                distinctSet.add(args[0]);
+            } else {
+                acc.accumulator().add(state, args);
+            }
+
+            state.resetIndex();
         }
     }
 
     /** Creates an empty array for fields to populate output row with. */
-    public Object[] createOutput(ImmutableBitSet allFields, byte groupId) {
+    public Object[] createOutput(AggregateType type, 
List<AccumulatorWrapper<RowT>> accs, ImmutableBitSet allFields, byte groupId) {
         int extra = groupId == NO_GROUP_ID || type != AggregateType.MAP ? 0 : 
1;
         int rowSize = allFields.cardinality() + accs.size() + extra;
 
@@ -71,11 +92,31 @@ public final class AggregateRow<RowT> {
     }
 
     /** Writes aggregate state of the given row to given array. */
-    public void writeTo(Object[] output, ImmutableBitSet allFields, byte 
groupId) {
+    public void writeTo(AggregateType type, List<AccumulatorWrapper<RowT>> 
accs, Object[] output, ImmutableBitSet allFields, byte groupId) {
         int cardinality = allFields.cardinality();
+
+        AccumulatorsState result = new AccumulatorsState(accs.size());
+
         for (int i = 0; i < accs.size(); i++) {
-            AccumulatorWrapper<RowT> wrapper = accs.get(i);
-            output[i + cardinality] = wrapper.end();
+            AccumulatorWrapper<RowT> acc = accs.get(i);
+
+            state.setIndex(i);
+            result.setIndex(i);
+
+            if (acc.isDistinct()) {
+                Set<Object> distinctSet = distinctSets.get(i);
+
+                for (var arg : distinctSet) {
+                    acc.accumulator().add(state, new Object[]{arg});
+                }
+            }
+
+            acc.accumulator().end(state, result);
+
+            output[i + cardinality] = acc.convertResult(result.get());
+
+            state.resetIndex();
+            result.resetIndex();
         }
 
         if (groupId != NO_GROUP_ID && type == AggregateType.MAP) {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableDouble.java
similarity index 54%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableDouble.java
index dcb7d41831..2f0041e8c5 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableDouble.java
@@ -17,19 +17,41 @@
 
 package org.apache.ignite.internal.sql.engine.exec.exp.agg;
 
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-
 /**
- * Accumulator interface.
+ * Mutable variant of {@link Double} used by some {@link Accumulator}.
  */
-public interface Accumulator {
-    void add(Object... args);
+final class MutableDouble extends Number {
+
+    private static final long serialVersionUID = -7021424478810048306L;
+
+    private double value;
+
+    /** Adds the given value to this double. */
+    public void add(double v) {
+        value += v;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int intValue() {
+        return (int) value;
+    }
 
-    Object end();
+    /** {@inheritDoc} */
+    @Override
+    public long longValue() {
+        return (long) value;
+    }
 
-    List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+    /** {@inheritDoc} */
+    @Override
+    public float floatValue() {
+        return (float) value;
+    }
 
-    RelDataType returnType(IgniteTypeFactory typeFactory);
+    /** {@inheritDoc} */
+    @Override
+    public double doubleValue() {
+        return value;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableLong.java
similarity index 55%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableLong.java
index dcb7d41831..50027ef57f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MutableLong.java
@@ -17,19 +17,41 @@
 
 package org.apache.ignite.internal.sql.engine.exec.exp.agg;
 
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
-
 /**
- * Accumulator interface.
+ * Mutable variant of {@link Long} used by some {@link Accumulator}.
  */
-public interface Accumulator {
-    void add(Object... args);
+final class MutableLong extends Number {
+
+    private static final long serialVersionUID = -147172356021174032L;
+
+    private long value;
+
+    /** Adds the given value to this long. */
+    public void add(long v) {
+        value += v;
+    }
+
+    /** {@inheritDoc} */
+    @Override
+    public int intValue() {
+        return (int) value;
+    }
 
-    Object end();
+    /** {@inheritDoc} */
+    @Override
+    public long longValue() {
+        return value;
+    }
 
-    List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+    /** {@inheritDoc} */
+    @Override
+    public float floatValue() {
+        return value;
+    }
 
-    RelDataType returnType(IgniteTypeFactory typeFactory);
+    /** {@inheritDoc} */
+    @Override
+    public double doubleValue() {
+        return value;
+    }
 }
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
index 01b75bd4ae..c926f675da 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/HashAggregateNode.java
@@ -20,19 +20,23 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static java.util.stream.Collectors.toCollection;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
 import java.util.ArrayDeque;
 import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
+import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map;
+import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorsState;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateRow;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.GroupKey;
@@ -44,9 +48,6 @@ import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.GroupKey;
 public class HashAggregateNode<RowT> extends AbstractNode<RowT> implements 
SingleNode<RowT>, Downstream<RowT> {
     private final AggregateType type;
 
-    /** May be {@code null} when there are not accumulators (DISTINCT 
aggregate node). */
-    private final Supplier<List<AccumulatorWrapper<RowT>>> accFactory;
-
     private final RowFactory<RowT> rowFactory;
 
     /** A bit set that contains fields included in all grouping sets. */
@@ -54,6 +55,8 @@ public class HashAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
 
     private final List<Grouping> groupings;
 
+    private final List<AccumulatorWrapper<RowT>> accs;
+
     private int requested;
 
     private int waiting;
@@ -71,7 +74,6 @@ public class HashAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         super(ctx);
 
         this.type = type;
-        this.accFactory = accFactory;
         this.rowFactory = rowFactory;
 
         assert grpSets.size() <= Byte.MAX_VALUE : "Too many grouping sets";
@@ -79,6 +81,7 @@ public class HashAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         ImmutableBitSet.Builder b = ImmutableBitSet.builder();
 
         groupings = new ArrayList<>(grpSets.size());
+        accs = accFactory != null ? accFactory.get() : Collections.emptyList();
 
         for (byte i = 0; i < grpSets.size(); i++) {
             ImmutableBitSet grpFields = grpSets.get(i);
@@ -257,7 +260,7 @@ public class HashAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
             GroupKey grpKey = b.build();
 
             AggregateRow<RowT> aggRow = groups.computeIfAbsent(grpKey, k -> 
create());
-            aggRow.update(allFields, handler, row);
+            aggRow.update(accs, allFields, handler, row);
         }
 
         /**
@@ -278,7 +281,7 @@ public class HashAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
                 GroupKey grpKey = entry.getKey();
                 AggregateRow<RowT> aggRow = entry.getValue();
 
-                Object[] fields = aggRow.createOutput(allFields, grpId);
+                Object[] fields = aggRow.createOutput(type, accs, allFields, 
grpId);
 
                 int j = 0;
                 int k = 0;
@@ -287,7 +290,7 @@ public class HashAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
                     fields[j++] = grpFields.get(field) ? grpKey.field(k++) : 
null;
                 }
 
-                aggRow.writeTo(fields, allFields, grpId);
+                aggRow.writeTo(type, accs, fields, allFields, grpId);
 
                 RowT row = rowFactory.create(fields);
 
@@ -299,15 +302,18 @@ public class HashAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         }
 
         private AggregateRow<RowT> create() {
-            List<AccumulatorWrapper<RowT>> wrappers;
+            Int2ObjectArrayMap<Set<Object>> distinctSets = new 
Int2ObjectArrayMap<>();
 
-            if (accFactory == null) {
-                wrappers = Collections.emptyList();
-            } else {
-                wrappers = accFactory.get();
+            for (int i = 0; i < accs.size(); i++) {
+                AccumulatorWrapper<RowT> acc = accs.get(i);
+                if (acc.isDistinct()) {
+                    distinctSets.put(i, new HashSet<>());
+                }
             }
 
-            return new AggregateRow<>(wrappers, type);
+            AccumulatorsState state = new AccumulatorsState(accs.size());
+
+            return new AggregateRow<>(state, distinctSets);
         }
 
         private boolean isEmpty() {
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
index 836a82a2ea..4be19d8f1f 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
+++ 
b/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/rel/SortAggregateNode.java
@@ -20,18 +20,22 @@ package org.apache.ignite.internal.sql.engine.exec.rel;
 import static org.apache.ignite.internal.util.ArrayUtils.OBJECT_EMPTY_ARRAY;
 import static org.apache.ignite.internal.util.CollectionUtils.nullOrEmpty;
 
+import it.unimi.dsi.fastutil.ints.Int2ObjectArrayMap;
 import java.util.ArrayDeque;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.Deque;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Objects;
+import java.util.Set;
 import java.util.function.Supplier;
 import org.apache.calcite.util.ImmutableBitSet;
 import org.apache.ignite.internal.sql.engine.exec.ExecutionContext;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler;
 import org.apache.ignite.internal.sql.engine.exec.RowHandler.RowFactory;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorWrapper;
+import org.apache.ignite.internal.sql.engine.exec.exp.agg.AccumulatorsState;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateRow;
 import org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
 
@@ -42,9 +46,6 @@ import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.AggregateType;
 public class SortAggregateNode<RowT> extends AbstractNode<RowT> implements 
SingleNode<RowT>, Downstream<RowT> {
     private final AggregateType type;
 
-    /** May be {@code null} when there are not accumulators (DISTINCT 
aggregate node). */
-    private final Supplier<List<AccumulatorWrapper<RowT>>> accFactory;
-
     private final RowFactory<RowT> rowFactory;
 
     private final ImmutableBitSet grpSet;
@@ -53,6 +54,8 @@ public class SortAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
 
     private final Deque<RowT> outBuf = new ArrayDeque<>(inBufSize);
 
+    private final List<AccumulatorWrapper<RowT>> accs;
+
     private RowT prevRow;
 
     private Group grp;
@@ -85,10 +88,10 @@ public class SortAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         assert Objects.nonNull(comp);
 
         this.type = type;
-        this.accFactory = accFactory;
         this.rowFactory = rowFactory;
         this.grpSet = grpSet;
         this.comp = comp;
+        this.accs = accFactory != null ? accFactory.get() : 
Collections.emptyList();
 
         init();
     }
@@ -210,10 +213,6 @@ public class SortAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         return this;
     }
 
-    private boolean hasAccumulators() {
-        return accFactory != null;
-    }
-
     private Group newGroup(RowT r) {
         final Object[] grpKeys = new Object[grpSet.cardinality()];
         List<Integer> fldIdxs = grpSet.asList();
@@ -248,16 +247,25 @@ public class SortAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
         private Group(Object[] grpKeys) {
             this.grpKeys = grpKeys;
 
-            List<AccumulatorWrapper<RowT>> wrappers = hasAccumulators() ? 
accFactory.get() : Collections.emptyList();
-            aggRow = new AggregateRow<>(wrappers, type);
+            AccumulatorsState state = new AccumulatorsState(accs.size());
+
+            Int2ObjectArrayMap<Set<Object>> distinctSets = new 
Int2ObjectArrayMap<>();
+            for (int i = 0; i < accs.size(); i++) {
+                AccumulatorWrapper<RowT> acc = accs.get(i);
+                if (acc.isDistinct()) {
+                    distinctSets.put(i, new HashSet<>());
+                }
+            }
+
+            aggRow = new AggregateRow<>(state, distinctSets);
         }
 
         private void add(RowT row) {
-            aggRow.update(grpSet, context().rowHandler(), row);
+            aggRow.update(accs, grpSet, context().rowHandler(), row);
         }
 
         private RowT row() {
-            Object[] fields = aggRow.createOutput(grpSet, 
AggregateRow.NO_GROUP_ID);
+            Object[] fields = aggRow.createOutput(type, accs, grpSet, 
AggregateRow.NO_GROUP_ID);
 
             int i = 0;
 
@@ -265,7 +273,7 @@ public class SortAggregateNode<RowT> extends 
AbstractNode<RowT> implements Singl
                 fields[i++] = grpKey;
             }
 
-            aggRow.writeTo(fields, grpSet, AggregateRow.NO_GROUP_ID);
+            aggRow.writeTo(type, accs, fields, grpSet, 
AggregateRow.NO_GROUP_ID);
 
             return rowFactory.create(fields);
         }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AnyValAccumulatorTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AnyValAccumulatorTest.java
new file mode 100644
index 0000000000..a3e7a8a6a8
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/AnyValAccumulatorTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.function.Supplier;
+import org.apache.calcite.sql.type.SqlTypeName;
+import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.AnyVal;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@code ANY_VAL} accumulator function.
+ */
+public class AnyValAccumulatorTest extends BaseIgniteAbstractTest {
+
+    @Test
+    public void test() {
+        StatefulAccumulator accumulator = newCall();
+
+        accumulator.add(new Object[]{null});
+        accumulator.add("1");
+        accumulator.add("2");
+
+        assertEquals("1", accumulator.end());
+    }
+
+    @Test
+    public void empty() {
+        StatefulAccumulator accumulator = newCall();
+
+        assertNull(accumulator.end());
+    }
+
+    private StatefulAccumulator newCall() {
+        Supplier<Accumulator> supplier = 
AnyVal.newAccumulator(Commons.typeFactory().createSqlType(SqlTypeName.VARCHAR));
+        return new StatefulAccumulator(supplier);
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DecimalAvgAccumulatorTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DecimalAvgAccumulatorTest.java
new file mode 100644
index 0000000000..0e30e80e14
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DecimalAvgAccumulatorTest.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.math.BigDecimal;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DecimalAvg;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@code AVG(DECIMAL)} accumulator function.
+ */
+public class DecimalAvgAccumulatorTest extends BaseIgniteAbstractTest {
+
+    @Test
+    public void testScale3() {
+        StatefulAccumulator acc = newCall(2);
+
+        acc.add(new BigDecimal("50"));
+        acc.add(new BigDecimal("20"));
+        acc.add(new Object[]{null});
+        acc.add(new BigDecimal("30"));
+
+        assertEquals(new BigDecimal("33.33"), acc.end());
+    }
+
+    @Test
+    public void testScale0() {
+        StatefulAccumulator acc = newCall(0);
+
+        acc.add(new BigDecimal("50"));
+        acc.add(new BigDecimal("20"));
+        acc.add(new Object[]{null});
+        acc.add(new BigDecimal("30"));
+
+        assertEquals(new BigDecimal("33"), acc.end());
+    }
+
+    @Test
+    public void empty() {
+        StatefulAccumulator acc = newCall(3);
+
+        assertNull(acc.end());
+    }
+
+    private StatefulAccumulator newCall(int scale) {
+        return new StatefulAccumulator(DecimalAvg.FACTORY.apply(scale));
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DoubleAvgAccumulatorTest.java
similarity index 53%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to 
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DoubleAvgAccumulatorTest.java
index dcb7d41831..2e2f0e320d 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/DoubleAvgAccumulatorTest.java
@@ -17,19 +17,36 @@
 
 package org.apache.ignite.internal.sql.engine.exec.exp.agg;
 
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DoubleAvg;
+import org.junit.jupiter.api.Test;
 
 /**
- * Accumulator interface.
+ * Tests for {@code AVG(DOUBLE)} accumulator function.
  */
-public interface Accumulator {
-    void add(Object... args);
+public class DoubleAvgAccumulatorTest {
+
+    @Test
+    public void test() {
+        StatefulAccumulator acc = newCall();
+
+        acc.add(3.0d);
+        acc.add(new Object[]{null});
+        acc.add(1.0d);
+
+        assertEquals(2.0d, acc.end());
+    }
 
-    Object end();
+    @Test
+    public void empty() {
+        StatefulAccumulator acc = newCall();
 
-    List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+        assertNull(acc.end());
+    }
 
-    RelDataType returnType(IgniteTypeFactory typeFactory);
+    private StatefulAccumulator newCall() {
+        return new StatefulAccumulator(DoubleAvg.FACTORY);
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LiteralValAccumulatorTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LiteralValAccumulatorTest.java
new file mode 100644
index 0000000000..b766d303b8
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LiteralValAccumulatorTest.java
@@ -0,0 +1,55 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.util.function.Supplier;
+import org.apache.calcite.sql.type.SqlTypeName;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.LiteralVal;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@code LITERAL_AGG(EXPR)}.
+ */
+public class LiteralValAccumulatorTest extends BaseIgniteAbstractTest {
+
+    @Test
+    public void test() {
+        StatefulAccumulator accumulator = newCall();
+
+        accumulator.add("1");
+        accumulator.add("2");
+
+        assertEquals(true, accumulator.end());
+    }
+
+    @Test
+    public void empty() {
+        StatefulAccumulator accumulator = newCall();
+
+        assertEquals(false, accumulator.end());
+    }
+
+    private StatefulAccumulator newCall() {
+        Supplier<Accumulator> supplier = 
LiteralVal.newAccumulator(Commons.typeFactory().createSqlType(SqlTypeName.VARCHAR));
+        return new StatefulAccumulator(supplier);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LongCountAccumulatorTest.java
similarity index 55%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to 
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LongCountAccumulatorTest.java
index dcb7d41831..282e2a9030 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/LongCountAccumulatorTest.java
@@ -17,19 +17,35 @@
 
 package org.apache.ignite.internal.sql.engine.exec.exp.agg;
 
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.LongCount;
+import org.junit.jupiter.api.Test;
 
 /**
- * Accumulator interface.
+ * Tests for {@code COUNT(BIGINT)} accumulator function.
  */
-public interface Accumulator {
-    void add(Object... args);
+public class LongCountAccumulatorTest {
+
+    @Test
+    public void test() {
+        StatefulAccumulator acc = newCall();
+
+        acc.add(1L);
+        acc.add(new Object[]{null});
+        acc.add(2L);
+
+        assertEquals(2L, acc.end());
+    }
 
-    Object end();
+    @Test
+    public void empty() {
+        StatefulAccumulator acc = newCall();
 
-    List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+        assertEquals(0L, acc.end());
+    }
 
-    RelDataType returnType(IgniteTypeFactory typeFactory);
+    private StatefulAccumulator newCall() {
+        return new StatefulAccumulator(LongCount.FACTORY);
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MinMaxAccumulatorTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MinMaxAccumulatorTest.java
new file mode 100644
index 0000000000..242278f517
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/MinMaxAccumulatorTest.java
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertThrows;
+
+import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import org.apache.calcite.avatica.util.ByteString;
+import org.apache.calcite.rel.type.RelDataType;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.MinMaxAccumulator;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.VarBinaryMinMax;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.VarCharMinMax;
+import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.internal.sql.engine.util.TypeUtils;
+import org.apache.ignite.internal.type.NativeType;
+import org.apache.ignite.internal.type.NativeTypeSpec;
+import org.apache.ignite.internal.type.NativeTypes;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@code MIN/MAX} accumulator functions.
+ */
+public class MinMaxAccumulatorTest {
+
+    @ParameterizedTest
+    @MethodSource("minMaxArgs")
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void min(NativeType nativeType, Comparable[] vals) {
+        StatefulAccumulator acc = newCall(true, nativeType);
+
+        for (var val : vals) {
+            acc.add(val);
+        }
+
+        acc.add(new Object[]{null});
+
+        Object result = 
Arrays.stream(vals).min(Comparator.naturalOrder()).orElseThrow();
+
+        assertEquals(result, acc.end());
+    }
+
+    @ParameterizedTest
+    @MethodSource("minMaxArgs")
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    public void max(NativeType nativeType, Comparable[] vals) {
+        StatefulAccumulator acc = newCall(false, nativeType);
+
+        for (var val : vals) {
+            acc.add(val);
+        }
+
+        acc.add(new Object[]{null});
+
+        Object result = 
Arrays.stream(vals).max(Comparator.naturalOrder()).orElseThrow();
+
+        assertEquals(result, acc.end());
+    }
+
+    private static Stream<Arguments> minMaxArgs() {
+        return Stream.of(
+                Arguments.of(NativeTypes.INT8, new Comparable[]{(byte) 2, 
(byte) 1}),
+                Arguments.of(NativeTypes.INT16, new Comparable[]{(short) 2, 
(short) 1}),
+                Arguments.of(NativeTypes.INT32, new Comparable[]{2, 1}),
+                Arguments.of(NativeTypes.INT64, new Comparable[]{2L, 1L}),
+                Arguments.of(NativeTypes.FLOAT, new Comparable[]{2.0f, 1.0f}),
+                Arguments.of(NativeTypes.DOUBLE, new Comparable[]{2.0d, 1.0d}),
+                Arguments.of(NativeTypes.decimalOf(2, 1), new Comparable[]{new 
BigDecimal("20.1"), new BigDecimal(("10.8"))}),
+                Arguments.of(NativeTypes.stringOf(10), new Comparable[]{"abc", 
"abcd"}),
+                Arguments.of(NativeTypes.stringOf(10), new Comparable[]{"a", 
"b"}),
+                Arguments.of(NativeTypes.blobOf(10), new 
Comparable[]{newByteString("abc"), newByteString("abcd")}),
+                Arguments.of(NativeTypes.blobOf(10), new 
Comparable[]{newByteString("a"), newByteString("b")})
+        );
+    }
+
+    private static ByteString newByteString(String str) {
+        return new ByteString(str.getBytes(StandardCharsets.UTF_8));
+    }
+
+    @ParameterizedTest
+    @MethodSource("emptyArgs")
+    public void rejectNull(NativeType nativeType) {
+        StatefulAccumulator acc = newCall(true, nativeType);
+
+        assertThrows(NullPointerException.class, () -> acc.add(null));
+    }
+
+    private static Stream<NativeType> emptyArgs() {
+        return Stream.of(
+                NativeTypes.INT8,
+                NativeTypes.INT16,
+                NativeTypes.INT32,
+                NativeTypes.INT64,
+                NativeTypes.FLOAT,
+                NativeTypes.DOUBLE,
+                NativeTypes.decimalOf(2, 1)
+        );
+    }
+
+    private StatefulAccumulator newCall(boolean min, NativeType nativeType) {
+        IgniteTypeFactory tf = Commons.typeFactory();
+        RelDataType dataType = TypeUtils.native2relationalType(tf, nativeType);
+
+        Supplier<Accumulator> supplier;
+        if (nativeType.spec() == NativeTypeSpec.STRING) {
+            supplier = min ? VarCharMinMax.MIN_FACTORY : 
VarCharMinMax.MAX_FACTORY;
+        } else if (nativeType.spec() == NativeTypeSpec.BYTES) {
+            supplier = min ? VarBinaryMinMax.MIN_FACTORY : 
VarBinaryMinMax.MAX_FACTORY;
+        } else {
+            supplier = MinMaxAccumulator.newAccumulator(min, tf, dataType);
+        }
+
+        return new StatefulAccumulator(supplier);
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SingleValAccumulatorTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SingleValAccumulatorTest.java
new file mode 100644
index 0000000000..2681e96bce
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SingleValAccumulatorTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static 
org.apache.ignite.internal.sql.engine.util.SqlTestUtils.assertThrowsSqlException;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.util.function.Supplier;
+import org.apache.calcite.sql.type.SqlTypeName;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.SingleVal;
+import org.apache.ignite.internal.sql.engine.util.Commons;
+import org.apache.ignite.lang.ErrorGroups.Sql;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Tests for {@code SINGLE_VAL(SUBQUERY)} accumulator function.
+ */
+public class SingleValAccumulatorTest {
+
+    @Test
+    public void test() {
+        StatefulAccumulator acc = newCall();
+        acc.add("1");
+
+        assertThrowsSqlException(Sql.RUNTIME_ERR,  "Subquery returned more 
than 1 value.", () -> acc.add("2"));
+        assertThrowsSqlException(Sql.RUNTIME_ERR,  "Subquery returned more 
than 1 value.", () -> acc.add(new Object[]{null}));
+
+        assertEquals("1", acc.end());
+    }
+
+    @Test
+    public void nullFirst() {
+        StatefulAccumulator acc = newCall();
+
+        acc.add(new Object[]{null});
+
+        assertThrowsSqlException(Sql.RUNTIME_ERR,  "Subquery returned more 
than 1 value.", () -> acc.add(new Object[]{null}));
+        assertThrowsSqlException(Sql.RUNTIME_ERR,  "Subquery returned more 
than 1 value.", () -> acc.add("2"));
+
+        assertNull(acc.end());
+    }
+
+    @Test
+    public void empty() {
+        StatefulAccumulator acc = newCall();
+
+        assertNull(acc.end());
+    }
+
+    private StatefulAccumulator newCall() {
+        Supplier<Accumulator> supplier = 
SingleVal.newAccumulator(Commons.typeFactory().createSqlType(SqlTypeName.VARCHAR));
+        return new StatefulAccumulator(supplier);
+    }
+}
diff --git 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/StatefulAccumulator.java
similarity index 53%
copy from 
modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
copy to 
modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/StatefulAccumulator.java
index dcb7d41831..42b859ea93 100644
--- 
a/modules/sql-engine/src/main/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/Accumulator.java
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/StatefulAccumulator.java
@@ -17,19 +17,34 @@
 
 package org.apache.ignite.internal.sql.engine.exec.exp.agg;
 
-import java.util.List;
-import org.apache.calcite.rel.type.RelDataType;
-import org.apache.ignite.internal.sql.engine.type.IgniteTypeFactory;
+import java.util.function.Supplier;
+import org.jetbrains.annotations.Nullable;
 
 /**
- * Accumulator interface.
+ * A helper class for testing accumulator functions.
  */
-public interface Accumulator {
-    void add(Object... args);
+public final class StatefulAccumulator {
 
-    Object end();
+    private final Accumulator accumulator;
 
-    List<RelDataType> argumentTypes(IgniteTypeFactory typeFactory);
+    private final AccumulatorsState state = new AccumulatorsState(1);
 
-    RelDataType returnType(IgniteTypeFactory typeFactory);
+    private final AccumulatorsState result = new AccumulatorsState(1);
+
+    public StatefulAccumulator(Supplier<? extends Accumulator> supplier) {
+        this(supplier.get());
+    }
+
+    public StatefulAccumulator(Accumulator accumulator) {
+        this.accumulator = accumulator;
+    }
+
+    public void add(Object... args) {
+        accumulator.add(state, args);
+    }
+
+    public @Nullable Object end() {
+        accumulator.end(state, result);
+        return result.get();
+    }
 }
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumAccumulatorTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumAccumulatorTest.java
new file mode 100644
index 0000000000..a9a87b1caf
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumAccumulatorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+
+import java.math.BigDecimal;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DecimalSumEmptyIsZero;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DoubleSumEmptyIsZero;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.LongSumEmptyIsZero;
+import org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.Sum;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@code SUM} accumulator functions.
+ */
+public class SumAccumulatorTest extends BaseIgniteAbstractTest {
+
+    @ParameterizedTest
+    @MethodSource("testArgs")
+    public void test(Accumulator sum, Object result, Object[] args) {
+        StatefulAccumulator acc = newCall(sum);
+
+        for (var a : args) {
+            acc.add(a);
+        }
+
+        assertEquals(result, acc.end());
+    }
+
+    private static Stream<Arguments> testArgs() {
+        return Stream.of(
+                Arguments.of(namedAccumulator(DoubleSumEmptyIsZero.FACTORY), 
4.0d, new Object[]{3.0d, 1.0d}),
+                Arguments.of(namedAccumulator(LongSumEmptyIsZero.FACTORY), 4L, 
new Object[]{3L, 1L}),
+                Arguments.of(namedAccumulator(DecimalSumEmptyIsZero.FACTORY), 
new BigDecimal("3.4"),
+                        new Object[]{new BigDecimal("1.3"), new 
BigDecimal("2.1")})
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("emptyArgs")
+    public void empty(Accumulator sum) {
+        StatefulAccumulator acc = newCall(sum);
+
+        assertNull(acc.end());
+    }
+
+    private static Stream<Arguments> emptyArgs() {
+        return Stream.of(
+                Arguments.of(namedAccumulator(DoubleSumEmptyIsZero.FACTORY)),
+                Arguments.of(namedAccumulator(LongSumEmptyIsZero.FACTORY)),
+                Arguments.of(namedAccumulator(DecimalSumEmptyIsZero.FACTORY))
+        );
+    }
+
+    private StatefulAccumulator newCall(Accumulator accumulator) {
+        return new StatefulAccumulator(new Sum(accumulator));
+    }
+
+    private static Named<Accumulator> namedAccumulator(Supplier<Accumulator> 
supplier) {
+        Accumulator accumulator = supplier.get();
+        return Named.of(accumulator.getClass().getName(), accumulator);
+    }
+}
diff --git 
a/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumIsZeroAccumulatorTest.java
 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumIsZeroAccumulatorTest.java
new file mode 100644
index 0000000000..f27db0c851
--- /dev/null
+++ 
b/modules/sql-engine/src/test/java/org/apache/ignite/internal/sql/engine/exec/exp/agg/SumIsZeroAccumulatorTest.java
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.sql.engine.exec.exp.agg;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+import java.math.BigDecimal;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DecimalSumEmptyIsZero;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.DoubleSumEmptyIsZero;
+import 
org.apache.ignite.internal.sql.engine.exec.exp.agg.Accumulators.LongSumEmptyIsZero;
+import org.apache.ignite.internal.testframework.BaseIgniteAbstractTest;
+import org.junit.jupiter.api.Named;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+/**
+ * Tests for {@code $SUM0} accumulator functions.
+ */
+public class SumIsZeroAccumulatorTest extends BaseIgniteAbstractTest {
+
+    @ParameterizedTest
+    @MethodSource("testArgs")
+    public void test(Accumulator sum, Object result, Object[] args) {
+        StatefulAccumulator acc = newCall(sum);
+
+        for (var a : args) {
+            acc.add(a);
+        }
+
+        acc.add(new Object[]{null});
+
+        assertEquals(result, acc.end());
+    }
+
+    private static Stream<Arguments> testArgs() {
+        return Stream.of(
+                Arguments.of(namedAccumulator(DoubleSumEmptyIsZero.FACTORY), 
4.0d, new Object[]{3.0d, 1.0d}),
+                Arguments.of(namedAccumulator(LongSumEmptyIsZero.FACTORY), 4L, 
new Object[]{3L, 1L}),
+                Arguments.of(namedAccumulator(DecimalSumEmptyIsZero.FACTORY), 
new BigDecimal("3.4"),
+                        new Object[]{new BigDecimal("1.3"), new 
BigDecimal("2.1")})
+        );
+    }
+
+    @ParameterizedTest
+    @MethodSource("zeroArgs")
+    public void zero(Accumulator sum, Object zero) {
+        StatefulAccumulator acc = newCall(sum);
+
+        assertEquals(zero, acc.end());
+    }
+
+    private static Stream<Arguments> zeroArgs() {
+        return Stream.of(
+                Arguments.of(namedAccumulator(DoubleSumEmptyIsZero.FACTORY), 
0.0d),
+                Arguments.of(namedAccumulator(LongSumEmptyIsZero.FACTORY), 0L),
+                Arguments.of(namedAccumulator(DecimalSumEmptyIsZero.FACTORY), 
BigDecimal.ZERO)
+        );
+    }
+
+    private static StatefulAccumulator newCall(Accumulator sum) {
+        return new StatefulAccumulator(sum);
+    }
+
+    private static Named<Accumulator> namedAccumulator(Supplier<Accumulator> 
supplier) {
+        Accumulator accumulator = supplier.get();
+        return Named.of(accumulator.getClass().getName(), accumulator);
+    }
+}

Reply via email to