This is an automated email from the ASF dual-hosted git repository.

JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new d328639dd0f Move query state machine support to calc commons (#17912)
d328639dd0f is described below

commit d328639dd0fc44f48554b36e0827fd764c5355a8
Author: shuwenwei <[email protected]>
AuthorDate: Thu Jun 11 17:29:24 2026 +0800

    Move query state machine support to calc commons (#17912)
---
 .../org/apache/iotdb/calc/i18n/CalcMessages.java   | 10 +++
 .../org/apache/iotdb/calc/i18n/CalcMessages.java   |  9 ++
 .../calc/plan/planner/TableOperatorGenerator.java  | 98 ++++++++++++++++++++--
 .../iotdb/db/i18n/DataNodeQueryMessages.java       |  9 --
 .../iotdb/db/i18n/DataNodeQueryMessages.java       |  9 --
 .../db/queryengine/execution/StateMachine.java     |  8 +-
 6 files changed, 113 insertions(+), 30 deletions(-)

diff --git 
a/iotdb-core/calc-commons/src/main/i18n/en/org/apache/iotdb/calc/i18n/CalcMessages.java
 
b/iotdb-core/calc-commons/src/main/i18n/en/org/apache/iotdb/calc/i18n/CalcMessages.java
index 9267e141fb5..624e9d130a3 100644
--- 
a/iotdb-core/calc-commons/src/main/i18n/en/org/apache/iotdb/calc/i18n/CalcMessages.java
+++ 
b/iotdb-core/calc-commons/src/main/i18n/en/org/apache/iotdb/calc/i18n/CalcMessages.java
@@ -151,4 +151,14 @@ public final class CalcMessages {
       "pushed element is null";
 
   public static final String FAILED_TO_DELETE_TEMP_DIR = "Failed to delete 
temp dir {}.";
+
+  // --- Execution ---
+
+  public static final String ERROR_SETTING_FUTURE_STATE_FOR =
+      "Error setting future state for {}";
+  public static final String ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR =
+      "Error notifying state change listener for {}";
+  public static final String SERVER_IS_SHUTTING_DOWN =
+      "Server is shutting down";
+
 }
diff --git 
a/iotdb-core/calc-commons/src/main/i18n/zh/org/apache/iotdb/calc/i18n/CalcMessages.java
 
b/iotdb-core/calc-commons/src/main/i18n/zh/org/apache/iotdb/calc/i18n/CalcMessages.java
index 00a0b60be59..0d882fea6a1 100644
--- 
a/iotdb-core/calc-commons/src/main/i18n/zh/org/apache/iotdb/calc/i18n/CalcMessages.java
+++ 
b/iotdb-core/calc-commons/src/main/i18n/zh/org/apache/iotdb/calc/i18n/CalcMessages.java
@@ -143,4 +143,13 @@ public final class CalcMessages {
       "推入的元素为 null";
 
   public static final String FAILED_TO_DELETE_TEMP_DIR = "删除临时目录 {} 失败。";
+
+  // --- Execution ---
+
+  public static final String ERROR_SETTING_FUTURE_STATE_FOR =
+      "为 {} 设置 future 状态时出错";
+  public static final String ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR =
+      "通知 {} 的状态变更监听器时出错";
+  public static final String SERVER_IS_SHUTTING_DOWN =
+      "服务器正在关闭";
 }
diff --git 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
index a59016030e7..c5bf2e25094 100644
--- 
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
@@ -422,7 +422,7 @@ public abstract class TableOperatorGenerator<
         context);
   }
 
-  private List<TSDataType> getInputColumnTypes(PlanNode node, 
ITableTypeProvider typeProvider) {
+  protected List<TSDataType> getInputColumnTypes(PlanNode node, 
ITableTypeProvider typeProvider) {
     // ignore "time" column
     return node.getChildren().stream()
         .map(PlanNode::getOutputSymbols)
@@ -1304,7 +1304,7 @@ public abstract class TableOperatorGenerator<
     return planGroupByAggregation(node, child, context.getTableTypeProvider(), 
context);
   }
 
-  private Operator planGlobalAggregation(
+  protected AggregationOperator planGlobalAggregation(
       AggregationNode node, Operator child, ITableTypeProvider typeProvider, C 
context) {
     CommonOperatorContext operatorContext =
         addOperatorContext(
@@ -1328,7 +1328,7 @@ public abstract class TableOperatorGenerator<
                         false,
                         null,
                         Collections.emptySet())));
-    return new AggregationOperator(operatorContext, child, 
aggregatorBuilder.build());
+    return createAggregationOperator(operatorContext, child, 
aggregatorBuilder.build());
   }
 
   // timeColumnName and measurementColumnNames will only be set for 
AggTableScan.
@@ -1411,7 +1411,7 @@ public abstract class TableOperatorGenerator<
         CommonOperatorContext operatorContext =
             addOperatorContext(
                 context, node.getPlanNodeId(), 
StreamingAggregationOperator.class.getSimpleName());
-        return new StreamingAggregationOperator(
+        return createStreamingAggregationOperator(
             operatorContext,
             child,
             groupByTypes,
@@ -1457,7 +1457,7 @@ public abstract class TableOperatorGenerator<
               context,
               node.getPlanNodeId(),
               StreamingHashAggregationOperator.class.getSimpleName());
-      return new StreamingHashAggregationOperator(
+      return createStreamingHashAggregationOperator(
           operatorContext,
           child,
           preGroupedChannels,
@@ -1484,7 +1484,7 @@ public abstract class TableOperatorGenerator<
         addOperatorContext(
             context, node.getPlanNodeId(), 
HashAggregationOperator.class.getSimpleName());
 
-    return new HashAggregationOperator(
+    return createHashAggregationOperator(
         operatorContext,
         child,
         groupByTypes,
@@ -1497,6 +1497,89 @@ public abstract class TableOperatorGenerator<
         Long.MAX_VALUE);
   }
 
+  protected AggregationOperator createAggregationOperator(
+      CommonOperatorContext operatorContext, Operator child, 
List<TableAggregator> aggregators) {
+    return new AggregationOperator(operatorContext, child, aggregators);
+  }
+
+  protected StreamingAggregationOperator createStreamingAggregationOperator(
+      CommonOperatorContext operatorContext,
+      Operator child,
+      List<Type> groupByTypes,
+      List<Integer> groupByChannels,
+      Comparator<SortKey> groupKeyComparator,
+      List<TableAggregator> aggregators,
+      long maxPartialMemory,
+      boolean spillEnabled,
+      long unSpillMemoryLimit) {
+    return new StreamingAggregationOperator(
+        operatorContext,
+        child,
+        groupByTypes,
+        groupByChannels,
+        groupKeyComparator,
+        aggregators,
+        maxPartialMemory,
+        spillEnabled,
+        unSpillMemoryLimit);
+  }
+
+  protected StreamingHashAggregationOperator 
createStreamingHashAggregationOperator(
+      CommonOperatorContext operatorContext,
+      Operator child,
+      List<Integer> preGroupedChannels,
+      List<Integer> preGroupedIndexInResult,
+      List<Type> unPreGroupedTypes,
+      List<Integer> unPreGroupedChannels,
+      List<Integer> unPreGroupedIndexInResult,
+      Comparator<SortKey> groupKeyComparator,
+      List<GroupedAggregator> aggregators,
+      AggregationNode.Step step,
+      int expectedGroups,
+      long maxPartialMemory,
+      boolean spillEnabled,
+      long unSpillMemoryLimit) {
+    return new StreamingHashAggregationOperator(
+        operatorContext,
+        child,
+        preGroupedChannels,
+        preGroupedIndexInResult,
+        unPreGroupedTypes,
+        unPreGroupedChannels,
+        unPreGroupedIndexInResult,
+        groupKeyComparator,
+        aggregators,
+        step,
+        expectedGroups,
+        maxPartialMemory,
+        spillEnabled,
+        unSpillMemoryLimit);
+  }
+
+  protected HashAggregationOperator createHashAggregationOperator(
+      CommonOperatorContext operatorContext,
+      Operator child,
+      List<Type> groupByTypes,
+      List<Integer> groupByChannels,
+      List<GroupedAggregator> aggregators,
+      AggregationNode.Step step,
+      int expectedGroups,
+      long maxPartialMemory,
+      boolean spillEnabled,
+      long unSpillMemoryLimit) {
+    return new HashAggregationOperator(
+        operatorContext,
+        child,
+        groupByTypes,
+        groupByChannels,
+        aggregators,
+        step,
+        expectedGroups,
+        maxPartialMemory,
+        spillEnabled,
+        unSpillMemoryLimit);
+  }
+
   protected Comparator<SortKey> genGroupKeyComparator(
       List<Type> groupTypes, List<Integer> groupByChannels) {
     return getComparatorForTable(
@@ -2248,8 +2331,7 @@ public abstract class TableOperatorGenerator<
   public Operator visitRowNumber(RowNumberNode node, C context) {
     Operator child = node.getChild().accept(this, context);
     CommonOperatorContext operatorContext =
-        addOperatorContext(
-            context, node.getPlanNodeId(), 
MappingCollectOperator.class.getSimpleName());
+        addOperatorContext(context, node.getPlanNodeId(), 
RowNumberOperator.class.getSimpleName());
 
     List<Symbol> partitionBySymbols = node.getPartitionBy();
     Map<Symbol, Integer> childLayout =
diff --git 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index 964163ce14f..b63739fef9b 100644
--- 
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -30,15 +30,6 @@ public final class DataNodeQueryMessages {
   public static final String THIS_NODE_ISN_T_INSTANCE_OF_SCHEMAMEASUREMENTNODE 
=
       "This node isn't instance of SchemaMeasurementNode.";
 
-  // --- Execution ---
-
-  public static final String ERROR_SETTING_FUTURE_STATE_FOR =
-      "Error setting future state for {}";
-  public static final String ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR =
-      "Error notifying state change listener for {}";
-  public static final String SERVER_IS_SHUTTING_DOWN =
-      "Server is shutting down";
-
   // --- Execution / Aggregation ---
 
   public static final String INVALID_AGGREGATION_FUNCTION =
diff --git 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index ae020f8c5a3..e70f00f2db9 100644
--- 
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++ 
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -30,15 +30,6 @@ public final class DataNodeQueryMessages {
   public static final String THIS_NODE_ISN_T_INSTANCE_OF_SCHEMAMEASUREMENTNODE 
=
       "该节点不是 SchemaMeasurementNode 实例。";
 
-  // --- Execution ---
-
-  public static final String ERROR_SETTING_FUTURE_STATE_FOR =
-      "为 {} 设置 future 状态时出错";
-  public static final String ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR =
-      "通知 {} 的状态变更监听器时出错";
-  public static final String SERVER_IS_SHUTTING_DOWN =
-      "服务器正在关闭";
-
   // --- Execution / Aggregation ---
 
   public static final String INVALID_AGGREGATION_FUNCTION =
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/StateMachine.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/StateMachine.java
index 25f5ec1d02e..cae88f002f3 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/StateMachine.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/StateMachine.java
@@ -19,7 +19,7 @@
 
 package org.apache.iotdb.db.queryengine.execution;
 
-import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
+import org.apache.iotdb.calc.i18n.CalcMessages;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.collect.ImmutableList;
@@ -242,7 +242,7 @@ public class StateMachine<T> {
           try {
             futureStateChange.complete(newState);
           } catch (Throwable e) {
-            LOGGER.error(DataNodeQueryMessages.ERROR_SETTING_FUTURE_STATE_FOR, 
name, e);
+            LOGGER.error(CalcMessages.ERROR_SETTING_FUTURE_STATE_FOR, name, e);
           }
           for (StateChangeListener<T> stateChangeListener : 
stateChangeListeners) {
             fireStateChangedListener(newState, stateChangeListener);
@@ -255,7 +255,7 @@ public class StateMachine<T> {
     try {
       stateChangeListener.stateChanged(newState);
     } catch (Throwable e) {
-      
LOGGER.error(DataNodeQueryMessages.ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR, 
name, e);
+      LOGGER.error(CalcMessages.ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR, 
name, e);
     }
   }
 
@@ -327,7 +327,7 @@ public class StateMachine<T> {
       executor.execute(command);
     } catch (RejectedExecutionException e) {
       if ((executor instanceof ExecutorService) && ((ExecutorService) 
executor).isShutdown()) {
-        throw new 
RuntimeException(DataNodeQueryMessages.SERVER_IS_SHUTTING_DOWN, e);
+        throw new RuntimeException(CalcMessages.SERVER_IS_SHUTTING_DOWN, e);
       }
       throw e;
     }

Reply via email to