This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 4256ea48ad1 Fix BigArray NPE of some aggregation functions (first, 
last, sum, extreme) when groups are more than 1024 in aggregation query
4256ea48ad1 is described below

commit 4256ea48ad1d5a01dfa55ab074aa0ca96b9cab69
Author: Weihao Li <[email protected]>
AuthorDate: Mon Mar 31 11:03:37 2025 +0800

    Fix BigArray NPE of some aggregation functions (first, last, sum, extreme) 
when groups are more than 1024 in aggregation query
---
 .../grouped/GroupedExtremeAccumulator.java         |   1 +
 .../grouped/GroupedFirstAccumulator.java           |   1 +
 .../grouped/GroupedLastAccumulator.java            |  21 ++-
 .../aggregation/grouped/GroupedSumAccumulator.java |   1 +
 .../analyzer/AggregationCornerCaseTest.java        | 179 +++++++++++++++++++++
 5 files changed, 200 insertions(+), 3 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
index 14cb70e6414..d0893c5a833 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedExtremeAccumulator.java
@@ -105,6 +105,7 @@ public class GroupedExtremeAccumulator implements 
GroupedAccumulator {
 
   @Override
   public void setGroupCount(long groupCount) {
+    inits.ensureCapacity(groupCount);
     switch (seriesDataType) {
       case INT32:
       case DATE:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedFirstAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedFirstAccumulator.java
index d72cf25ae69..e4277bf640b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedFirstAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedFirstAccumulator.java
@@ -125,6 +125,7 @@ public class GroupedFirstAccumulator implements 
GroupedAccumulator {
 
   @Override
   public void setGroupCount(long groupCount) {
+    minTimes.ensureCapacity(groupCount);
     switch (seriesDataType) {
       case INT32:
       case DATE:
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedLastAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedLastAccumulator.java
index 71e69f7a457..8294f3c529b 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedLastAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedLastAccumulator.java
@@ -125,6 +125,7 @@ public class GroupedLastAccumulator implements 
GroupedAccumulator {
 
   @Override
   public void setGroupCount(long groupCount) {
+    maxTimes.ensureCapacity(groupCount);
     switch (seriesDataType) {
       case INT32:
       case DATE:
@@ -439,9 +440,23 @@ public class GroupedLastAccumulator implements 
GroupedAccumulator {
 
   private void addFloatInput(
       int[] groupIds, Column valueColumn, Column timeColumn, AggregationMask 
mask) {
-    for (int i = 0; i < groupIds.length; i++) {
-      if (!valueColumn.isNull(i)) {
-        updateFloatValue(groupIds[i], valueColumn.getFloat(i), 
timeColumn.getLong(i));
+    int positionCount = mask.getSelectedPositionCount();
+
+    if (mask.isSelectAll()) {
+      for (int i = 0; i < positionCount; i++) {
+        if (!valueColumn.isNull(i)) {
+          updateFloatValue(groupIds[i], valueColumn.getFloat(i), 
timeColumn.getLong(i));
+        }
+      }
+    } else {
+      int[] selectedPositions = mask.getSelectedPositions();
+      int position;
+      for (int i = 0; i < positionCount; i++) {
+        position = selectedPositions[i];
+        if (!valueColumn.isNull(position)) {
+          updateFloatValue(
+              groupIds[position], valueColumn.getFloat(position), 
timeColumn.getLong(position));
+        }
       }
     }
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedSumAccumulator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedSumAccumulator.java
index d52dd8088fd..395fddb4f2e 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedSumAccumulator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/GroupedSumAccumulator.java
@@ -48,6 +48,7 @@ public class GroupedSumAccumulator implements 
GroupedAccumulator {
 
   @Override
   public void setGroupCount(long groupCount) {
+    initResult.ensureCapacity(groupCount);
     sumValues.ensureCapacity(groupCount);
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
index c83299df92b..6f133757a89 100644
--- 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/AggregationCornerCaseTest.java
@@ -29,6 +29,7 @@ import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateM
 import org.apache.iotdb.db.queryengine.execution.operator.Operator;
 import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.GroupedAggregator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.HashAggregationOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.StreamingHashAggregationOperator;
 import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
@@ -49,10 +50,17 @@ import org.junit.Test;
 
 import java.util.Collections;
 import java.util.List;
+import java.util.OptionalInt;
 
+import static 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.EXTREME;
+import static 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.FIRST;
+import static 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.LAST;
+import static 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.SUM;
+import static 
org.apache.iotdb.commons.udf.builtin.relational.TableBuiltinAggregationFunction.getAggregationTypeByFuncName;
 import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.process.join.merge.MergeSortComparator.getComparatorForTable;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.AccumulatorFactory.createGroupedAccumulator;
 import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.DEFAULT_GROUP_NUMBER;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.fail;
@@ -320,4 +328,175 @@ public class AggregationCornerCaseTest {
         false,
         Long.MAX_VALUE);
   }
+
+  @Test
+  public void groupMoreThan1024Test() {
+    try (HashAggregationOperator aggregationOperator = 
genHashAggregationOperator2()) {
+      ListenableFuture<?> listenableFuture = aggregationOperator.isBlocked();
+      listenableFuture.get();
+      while (!aggregationOperator.isFinished() && 
aggregationOperator.hasNext()) {
+        aggregationOperator.next();
+        listenableFuture = aggregationOperator.isBlocked();
+        listenableFuture.get();
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // construct a AggregationHashOperator has more than 1024 groups in input 
TsBlock
+  private HashAggregationOperator genHashAggregationOperator2() {
+
+    // Construct operator tree
+    QueryId queryId = new QueryId("stub_query");
+
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(
+            instanceId,
+            IoTDBThreadPoolFactory.newFixedThreadPool(
+                1, "aggregationHashOperator-test-instance-notification"));
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNodeId1 = new PlanNodeId("1");
+    driverContext.addOperatorContext(1, planNodeId1, 
TableScanOperator.class.getSimpleName());
+    PlanNodeId planNodeId2 = new PlanNodeId("2");
+    driverContext.addOperatorContext(2, planNodeId2, 
HashAggregationOperator.class.getSimpleName());
+    Operator childOperator =
+        new Operator() {
+          boolean finished = false;
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return driverContext.getOperatorContexts().get(0);
+          }
+
+          @Override
+          public TsBlock next() {
+            TsBlockBuilder builder =
+                new TsBlockBuilder(ImmutableList.of(TSDataType.TIMESTAMP, 
TSDataType.INT32));
+            ColumnBuilder[] columnBuilders = builder.getValueColumnBuilders();
+            for (int i = 0; i < 1025; i++) {
+              columnBuilders[0].writeLong(i);
+            }
+            for (int i = 0; i < 1025; i++) {
+              columnBuilders[1].writeInt(i);
+            }
+            builder.declarePositions(1025);
+            TsBlock result =
+                builder.build(
+                    new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
builder.getPositionCount()));
+            finished = true;
+            return result;
+          }
+
+          @Override
+          public boolean hasNext() {
+            return !finished;
+          }
+
+          @Override
+          public void close() {}
+
+          @Override
+          public boolean isFinished() {
+            return finished;
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 0;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 0;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+
+          @Override
+          public long ramBytesUsed() {
+            return 0;
+          }
+        };
+
+    OperatorContext operatorContext = 
driverContext.getOperatorContexts().get(1);
+
+    GroupedAggregator firstAggregator =
+        new GroupedAggregator(
+            createGroupedAccumulator(
+                FIRST.getFunctionName(),
+                getAggregationTypeByFuncName(FIRST.getFunctionName()),
+                ImmutableList.of(TSDataType.INT32, TSDataType.TIMESTAMP),
+                Collections.emptyList(),
+                Collections.emptyMap(),
+                true,
+                false),
+            AggregationNode.Step.SINGLE,
+            TSDataType.INT32,
+            ImmutableList.of(1, 0),
+            OptionalInt.empty());
+    GroupedAggregator lastAggregator =
+        new GroupedAggregator(
+            createGroupedAccumulator(
+                LAST.getFunctionName(),
+                getAggregationTypeByFuncName(LAST.getFunctionName()),
+                ImmutableList.of(TSDataType.INT32, TSDataType.TIMESTAMP),
+                Collections.emptyList(),
+                Collections.emptyMap(),
+                true,
+                false),
+            AggregationNode.Step.SINGLE,
+            TSDataType.INT32,
+            ImmutableList.of(1, 0),
+            OptionalInt.empty());
+    GroupedAggregator sumAggregator =
+        new GroupedAggregator(
+            createGroupedAccumulator(
+                SUM.getFunctionName(),
+                getAggregationTypeByFuncName(SUM.getFunctionName()),
+                ImmutableList.of(TSDataType.INT32),
+                Collections.emptyList(),
+                Collections.emptyMap(),
+                true,
+                false),
+            AggregationNode.Step.SINGLE,
+            TSDataType.DOUBLE,
+            ImmutableList.of(1),
+            OptionalInt.empty());
+    GroupedAggregator extremeAggregator =
+        new GroupedAggregator(
+            createGroupedAccumulator(
+                EXTREME.getFunctionName(),
+                getAggregationTypeByFuncName(EXTREME.getFunctionName()),
+                ImmutableList.of(TSDataType.INT32),
+                Collections.emptyList(),
+                Collections.emptyMap(),
+                true,
+                false),
+            AggregationNode.Step.SINGLE,
+            TSDataType.INT32,
+            ImmutableList.of(1),
+            OptionalInt.empty());
+
+    return new HashAggregationOperator(
+        operatorContext,
+        childOperator,
+        ImmutableList.of(IntType.INT32),
+        Collections.singletonList(1),
+        ImmutableList.of(firstAggregator, lastAggregator, sumAggregator, 
extremeAggregator),
+        AggregationNode.Step.SINGLE,
+        DEFAULT_GROUP_NUMBER,
+        Long.MAX_VALUE,
+        false,
+        Long.MAX_VALUE);
+  }
 }

Reply via email to