This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new d328639dd0f Move query state machine support to calc commons (#17912)
d328639dd0f is described below
commit d328639dd0fc44f48554b36e0827fd764c5355a8
Author: shuwenwei <[email protected]>
AuthorDate: Thu Jun 11 17:29:24 2026 +0800
Move query state machine support to calc commons (#17912)
---
.../org/apache/iotdb/calc/i18n/CalcMessages.java | 10 +++
.../org/apache/iotdb/calc/i18n/CalcMessages.java | 9 ++
.../calc/plan/planner/TableOperatorGenerator.java | 98 ++++++++++++++++++++--
.../iotdb/db/i18n/DataNodeQueryMessages.java | 9 --
.../iotdb/db/i18n/DataNodeQueryMessages.java | 9 --
.../db/queryengine/execution/StateMachine.java | 8 +-
6 files changed, 113 insertions(+), 30 deletions(-)
diff --git
a/iotdb-core/calc-commons/src/main/i18n/en/org/apache/iotdb/calc/i18n/CalcMessages.java
b/iotdb-core/calc-commons/src/main/i18n/en/org/apache/iotdb/calc/i18n/CalcMessages.java
index 9267e141fb5..624e9d130a3 100644
---
a/iotdb-core/calc-commons/src/main/i18n/en/org/apache/iotdb/calc/i18n/CalcMessages.java
+++
b/iotdb-core/calc-commons/src/main/i18n/en/org/apache/iotdb/calc/i18n/CalcMessages.java
@@ -151,4 +151,14 @@ public final class CalcMessages {
"pushed element is null";
public static final String FAILED_TO_DELETE_TEMP_DIR = "Failed to delete
temp dir {}.";
+
+ // --- Execution ---
+
+ public static final String ERROR_SETTING_FUTURE_STATE_FOR =
+ "Error setting future state for {}";
+ public static final String ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR =
+ "Error notifying state change listener for {}";
+ public static final String SERVER_IS_SHUTTING_DOWN =
+ "Server is shutting down";
+
}
diff --git
a/iotdb-core/calc-commons/src/main/i18n/zh/org/apache/iotdb/calc/i18n/CalcMessages.java
b/iotdb-core/calc-commons/src/main/i18n/zh/org/apache/iotdb/calc/i18n/CalcMessages.java
index 00a0b60be59..0d882fea6a1 100644
---
a/iotdb-core/calc-commons/src/main/i18n/zh/org/apache/iotdb/calc/i18n/CalcMessages.java
+++
b/iotdb-core/calc-commons/src/main/i18n/zh/org/apache/iotdb/calc/i18n/CalcMessages.java
@@ -143,4 +143,13 @@ public final class CalcMessages {
"推入的元素为 null";
public static final String FAILED_TO_DELETE_TEMP_DIR = "删除临时目录 {} 失败。";
+
+ // --- Execution ---
+
+ public static final String ERROR_SETTING_FUTURE_STATE_FOR =
+ "为 {} 设置 future 状态时出错";
+ public static final String ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR =
+ "通知 {} 的状态变更监听器时出错";
+ public static final String SERVER_IS_SHUTTING_DOWN =
+ "服务器正在关闭";
}
diff --git
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
index a59016030e7..c5bf2e25094 100644
---
a/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/calc-commons/src/main/java/org/apache/iotdb/calc/plan/planner/TableOperatorGenerator.java
@@ -422,7 +422,7 @@ public abstract class TableOperatorGenerator<
context);
}
- private List<TSDataType> getInputColumnTypes(PlanNode node,
ITableTypeProvider typeProvider) {
+ protected List<TSDataType> getInputColumnTypes(PlanNode node,
ITableTypeProvider typeProvider) {
// ignore "time" column
return node.getChildren().stream()
.map(PlanNode::getOutputSymbols)
@@ -1304,7 +1304,7 @@ public abstract class TableOperatorGenerator<
return planGroupByAggregation(node, child, context.getTableTypeProvider(),
context);
}
- private Operator planGlobalAggregation(
+ protected AggregationOperator planGlobalAggregation(
AggregationNode node, Operator child, ITableTypeProvider typeProvider, C
context) {
CommonOperatorContext operatorContext =
addOperatorContext(
@@ -1328,7 +1328,7 @@ public abstract class TableOperatorGenerator<
false,
null,
Collections.emptySet())));
- return new AggregationOperator(operatorContext, child,
aggregatorBuilder.build());
+ return createAggregationOperator(operatorContext, child,
aggregatorBuilder.build());
}
// timeColumnName and measurementColumnNames will only be set for
AggTableScan.
@@ -1411,7 +1411,7 @@ public abstract class TableOperatorGenerator<
CommonOperatorContext operatorContext =
addOperatorContext(
context, node.getPlanNodeId(),
StreamingAggregationOperator.class.getSimpleName());
- return new StreamingAggregationOperator(
+ return createStreamingAggregationOperator(
operatorContext,
child,
groupByTypes,
@@ -1457,7 +1457,7 @@ public abstract class TableOperatorGenerator<
context,
node.getPlanNodeId(),
StreamingHashAggregationOperator.class.getSimpleName());
- return new StreamingHashAggregationOperator(
+ return createStreamingHashAggregationOperator(
operatorContext,
child,
preGroupedChannels,
@@ -1484,7 +1484,7 @@ public abstract class TableOperatorGenerator<
addOperatorContext(
context, node.getPlanNodeId(),
HashAggregationOperator.class.getSimpleName());
- return new HashAggregationOperator(
+ return createHashAggregationOperator(
operatorContext,
child,
groupByTypes,
@@ -1497,6 +1497,89 @@ public abstract class TableOperatorGenerator<
Long.MAX_VALUE);
}
+ protected AggregationOperator createAggregationOperator(
+ CommonOperatorContext operatorContext, Operator child,
List<TableAggregator> aggregators) {
+ return new AggregationOperator(operatorContext, child, aggregators);
+ }
+
+ protected StreamingAggregationOperator createStreamingAggregationOperator(
+ CommonOperatorContext operatorContext,
+ Operator child,
+ List<Type> groupByTypes,
+ List<Integer> groupByChannels,
+ Comparator<SortKey> groupKeyComparator,
+ List<TableAggregator> aggregators,
+ long maxPartialMemory,
+ boolean spillEnabled,
+ long unSpillMemoryLimit) {
+ return new StreamingAggregationOperator(
+ operatorContext,
+ child,
+ groupByTypes,
+ groupByChannels,
+ groupKeyComparator,
+ aggregators,
+ maxPartialMemory,
+ spillEnabled,
+ unSpillMemoryLimit);
+ }
+
+ protected StreamingHashAggregationOperator
createStreamingHashAggregationOperator(
+ CommonOperatorContext operatorContext,
+ Operator child,
+ List<Integer> preGroupedChannels,
+ List<Integer> preGroupedIndexInResult,
+ List<Type> unPreGroupedTypes,
+ List<Integer> unPreGroupedChannels,
+ List<Integer> unPreGroupedIndexInResult,
+ Comparator<SortKey> groupKeyComparator,
+ List<GroupedAggregator> aggregators,
+ AggregationNode.Step step,
+ int expectedGroups,
+ long maxPartialMemory,
+ boolean spillEnabled,
+ long unSpillMemoryLimit) {
+ return new StreamingHashAggregationOperator(
+ operatorContext,
+ child,
+ preGroupedChannels,
+ preGroupedIndexInResult,
+ unPreGroupedTypes,
+ unPreGroupedChannels,
+ unPreGroupedIndexInResult,
+ groupKeyComparator,
+ aggregators,
+ step,
+ expectedGroups,
+ maxPartialMemory,
+ spillEnabled,
+ unSpillMemoryLimit);
+ }
+
+ protected HashAggregationOperator createHashAggregationOperator(
+ CommonOperatorContext operatorContext,
+ Operator child,
+ List<Type> groupByTypes,
+ List<Integer> groupByChannels,
+ List<GroupedAggregator> aggregators,
+ AggregationNode.Step step,
+ int expectedGroups,
+ long maxPartialMemory,
+ boolean spillEnabled,
+ long unSpillMemoryLimit) {
+ return new HashAggregationOperator(
+ operatorContext,
+ child,
+ groupByTypes,
+ groupByChannels,
+ aggregators,
+ step,
+ expectedGroups,
+ maxPartialMemory,
+ spillEnabled,
+ unSpillMemoryLimit);
+ }
+
protected Comparator<SortKey> genGroupKeyComparator(
List<Type> groupTypes, List<Integer> groupByChannels) {
return getComparatorForTable(
@@ -2248,8 +2331,7 @@ public abstract class TableOperatorGenerator<
public Operator visitRowNumber(RowNumberNode node, C context) {
Operator child = node.getChild().accept(this, context);
CommonOperatorContext operatorContext =
- addOperatorContext(
- context, node.getPlanNodeId(),
MappingCollectOperator.class.getSimpleName());
+ addOperatorContext(context, node.getPlanNodeId(),
RowNumberOperator.class.getSimpleName());
List<Symbol> partitionBySymbols = node.getPartitionBy();
Map<Symbol, Integer> childLayout =
diff --git
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index 964163ce14f..b63739fef9b 100644
---
a/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/en/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -30,15 +30,6 @@ public final class DataNodeQueryMessages {
public static final String THIS_NODE_ISN_T_INSTANCE_OF_SCHEMAMEASUREMENTNODE
=
"This node isn't instance of SchemaMeasurementNode.";
- // --- Execution ---
-
- public static final String ERROR_SETTING_FUTURE_STATE_FOR =
- "Error setting future state for {}";
- public static final String ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR =
- "Error notifying state change listener for {}";
- public static final String SERVER_IS_SHUTTING_DOWN =
- "Server is shutting down";
-
// --- Execution / Aggregation ---
public static final String INVALID_AGGREGATION_FUNCTION =
diff --git
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
index ae020f8c5a3..e70f00f2db9 100644
---
a/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
+++
b/iotdb-core/datanode/src/main/i18n/zh/org/apache/iotdb/db/i18n/DataNodeQueryMessages.java
@@ -30,15 +30,6 @@ public final class DataNodeQueryMessages {
public static final String THIS_NODE_ISN_T_INSTANCE_OF_SCHEMAMEASUREMENTNODE
=
"该节点不是 SchemaMeasurementNode 实例。";
- // --- Execution ---
-
- public static final String ERROR_SETTING_FUTURE_STATE_FOR =
- "为 {} 设置 future 状态时出错";
- public static final String ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR =
- "通知 {} 的状态变更监听器时出错";
- public static final String SERVER_IS_SHUTTING_DOWN =
- "服务器正在关闭";
-
// --- Execution / Aggregation ---
public static final String INVALID_AGGREGATION_FUNCTION =
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/StateMachine.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/StateMachine.java
index 25f5ec1d02e..cae88f002f3 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/StateMachine.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/StateMachine.java
@@ -19,7 +19,7 @@
package org.apache.iotdb.db.queryengine.execution;
-import org.apache.iotdb.db.i18n.DataNodeQueryMessages;
+import org.apache.iotdb.calc.i18n.CalcMessages;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@@ -242,7 +242,7 @@ public class StateMachine<T> {
try {
futureStateChange.complete(newState);
} catch (Throwable e) {
- LOGGER.error(DataNodeQueryMessages.ERROR_SETTING_FUTURE_STATE_FOR,
name, e);
+ LOGGER.error(CalcMessages.ERROR_SETTING_FUTURE_STATE_FOR, name, e);
}
for (StateChangeListener<T> stateChangeListener :
stateChangeListeners) {
fireStateChangedListener(newState, stateChangeListener);
@@ -255,7 +255,7 @@ public class StateMachine<T> {
try {
stateChangeListener.stateChanged(newState);
} catch (Throwable e) {
-
LOGGER.error(DataNodeQueryMessages.ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR,
name, e);
+ LOGGER.error(CalcMessages.ERROR_NOTIFYING_STATE_CHANGE_LISTENER_FOR,
name, e);
}
}
@@ -327,7 +327,7 @@ public class StateMachine<T> {
executor.execute(command);
} catch (RejectedExecutionException e) {
if ((executor instanceof ExecutorService) && ((ExecutorService)
executor).isShutdown()) {
- throw new
RuntimeException(DataNodeQueryMessages.SERVER_IS_SHUTTING_DOWN, e);
+ throw new RuntimeException(CalcMessages.SERVER_IS_SHUTTING_DOWN, e);
}
throw e;
}