This is an automated email from the ASF dual-hosted git repository.

chenyz pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 9c66a3cdc83 Fix group by hash when process TsBlock more than 1024 
lines (#14989)
9c66a3cdc83 is described below

commit 9c66a3cdc83ba22c0c282f049c750c8838457c8a
Author: Weihao Li <[email protected]>
AuthorDate: Fri Feb 28 16:57:37 2025 +0800

    Fix group by hash when process TsBlock more than 1024 lines (#14989)
    
    Fix group by hash when process TsBlock more than 1024 lines
---
 .../aggregation/grouped/hash/HashStrategy.java     |   2 +-
 .../relational/analyzer/GroupByLargeDataTest.java  | 179 +++++++++++++++++++++
 2 files changed, 180 insertions(+), 1 deletion(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java
index bd6a4a52ebe..6a5cf351dd9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/source/relational/aggregation/grouped/hash/HashStrategy.java
@@ -189,7 +189,7 @@ public class HashStrategy implements FlatHashStrategy {
   @Override
   public void hashBatched(Column[] columns, long[] hashes, int offset, int 
length) {
     for (int i = 0; i < length; i++) {
-      hashes[i] = hash(columns, i);
+      hashes[i] = hash(columns, i + offset);
     }
   }
 
diff --git 
a/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/GroupByLargeDataTest.java
 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/GroupByLargeDataTest.java
new file mode 100644
index 00000000000..1cdc2321290
--- /dev/null
+++ 
b/iotdb-core/datanode/src/test/java/org/apache/iotdb/db/queryengine/plan/relational/analyzer/GroupByLargeDataTest.java
@@ -0,0 +1,179 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.plan.relational.analyzer;
+
+import org.apache.iotdb.commons.concurrent.IoTDBThreadPoolFactory;
+import org.apache.iotdb.db.queryengine.common.FragmentInstanceId;
+import org.apache.iotdb.db.queryengine.common.PlanFragmentId;
+import org.apache.iotdb.db.queryengine.common.QueryId;
+import org.apache.iotdb.db.queryengine.execution.driver.DriverContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext;
+import 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceStateMachine;
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.HashAggregationOperator;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.AggregationNode;
+
+import com.google.common.util.concurrent.ListenableFuture;
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.block.column.ColumnBuilder;
+import org.apache.tsfile.enums.TSDataType;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.TsBlockBuilder;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.read.common.type.TimestampType;
+import org.junit.Test;
+
+import java.util.Collections;
+
+import static 
org.apache.iotdb.db.queryengine.execution.fragment.FragmentInstanceContext.createFragmentInstanceContext;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.AbstractTableScanOperator.TIME_COLUMN_TEMPLATE;
+import static 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.aggregation.grouped.hash.GroupByHash.DEFAULT_GROUP_NUMBER;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.fail;
+
+public class GroupByLargeDataTest {
+
+  @Test
+  public void test() {
+    try (HashAggregationOperator aggregationOperator = 
genHashAggregationOperator()) {
+      ListenableFuture<?> listenableFuture = aggregationOperator.isBlocked();
+      listenableFuture.get();
+      while (!aggregationOperator.isFinished() && 
aggregationOperator.hasNext()) {
+        TsBlock tsBlock = aggregationOperator.next();
+        if (tsBlock != null && !tsBlock.isEmpty()) {
+          assertEquals(2, tsBlock.getPositionCount());
+          Column column = tsBlock.getColumn(0);
+          if (column.getLong(0) == 1) {
+            assertEquals(2, column.getLong(1));
+          } else {
+            assertEquals(2, column.getLong(0));
+            assertEquals(1, column.getLong(1));
+          }
+        }
+        listenableFuture = aggregationOperator.isBlocked();
+        listenableFuture.get();
+      }
+
+    } catch (Exception e) {
+      e.printStackTrace();
+      fail(e.getMessage());
+    }
+  }
+
+  // construct a AggregationHashOperator has more than 1024 lines in input 
TsBlock
+  private HashAggregationOperator genHashAggregationOperator() {
+
+    // Construct operator tree
+    QueryId queryId = new QueryId("stub_query");
+
+    FragmentInstanceId instanceId =
+        new FragmentInstanceId(new PlanFragmentId(queryId, 0), 
"stub-instance");
+    FragmentInstanceStateMachine stateMachine =
+        new FragmentInstanceStateMachine(
+            instanceId,
+            IoTDBThreadPoolFactory.newFixedThreadPool(
+                1, "aggregationHashOperator-test-instance-notification"));
+    FragmentInstanceContext fragmentInstanceContext =
+        createFragmentInstanceContext(instanceId, stateMachine);
+    DriverContext driverContext = new DriverContext(fragmentInstanceContext, 
0);
+    PlanNodeId planNodeId1 = new PlanNodeId("1");
+    driverContext.addOperatorContext(1, planNodeId1, 
TableScanOperator.class.getSimpleName());
+    PlanNodeId planNodeId2 = new PlanNodeId("2");
+    driverContext.addOperatorContext(2, planNodeId2, 
HashAggregationOperator.class.getSimpleName());
+    Operator childOperator =
+        new Operator() {
+          boolean finished = false;
+
+          @Override
+          public OperatorContext getOperatorContext() {
+            return driverContext.getOperatorContexts().get(0);
+          }
+
+          @Override
+          public TsBlock next() {
+            TsBlockBuilder builder =
+                new 
TsBlockBuilder(Collections.singletonList(TSDataType.TIMESTAMP));
+            ColumnBuilder columnBuilder = builder.getValueColumnBuilders()[0];
+            for (int i = 0; i < 1000; i++) {
+              columnBuilder.writeLong(1);
+            }
+            for (int i = 1000; i < 1025; i++) {
+              columnBuilder.writeLong(2);
+            }
+            builder.declarePositions(1025);
+            TsBlock result =
+                builder.build(
+                    new RunLengthEncodedColumn(TIME_COLUMN_TEMPLATE, 
builder.getPositionCount()));
+            finished = true;
+            return result;
+          }
+
+          @Override
+          public boolean hasNext() throws Exception {
+            return !finished;
+          }
+
+          @Override
+          public void close() throws Exception {}
+
+          @Override
+          public boolean isFinished() throws Exception {
+            return finished;
+          }
+
+          @Override
+          public long calculateMaxPeekMemory() {
+            return 0;
+          }
+
+          @Override
+          public long calculateMaxReturnSize() {
+            return 0;
+          }
+
+          @Override
+          public long calculateRetainedSizeAfterCallingNext() {
+            return 0;
+          }
+
+          @Override
+          public long ramBytesUsed() {
+            return 0;
+          }
+        };
+
+    OperatorContext operatorContext = 
driverContext.getOperatorContexts().get(1);
+
+    return new HashAggregationOperator(
+        operatorContext,
+        childOperator,
+        Collections.singletonList(TimestampType.TIMESTAMP),
+        Collections.singletonList(0),
+        Collections.emptyList(),
+        AggregationNode.Step.SINGLE,
+        DEFAULT_GROUP_NUMBER,
+        Long.MAX_VALUE,
+        false,
+        Long.MAX_VALUE);
+  }
+}

Reply via email to