This is an automated email from the ASF dual-hosted git repository.
JackieTien97 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 556cd6628b5 Fix multi InnerTimeJoinNode with different TimePartitions
in one FI (#17652)
556cd6628b5 is described below
commit 556cd6628b5e39adfa56959a97099da1a8228572
Author: Weihao Li <[email protected]>
AuthorDate: Thu May 14 14:02:03 2026 +0800
Fix multi InnerTimeJoinNode with different TimePartitions in one FI (#17652)
---
.../aggregation/IoTDBAlignByDeviceWildcardIT.java | 94 ++++++++++++++++++++++
.../planner/distribution/DistributionPlanner.java | 24 ++++--
.../planner/distribution/ExchangeNodeAdder.java | 83 +++++++++++++++----
.../planner/plan/node/process/ExchangeNode.java | 18 ++++-
4 files changed, 195 insertions(+), 24 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java
new file mode 100644
index 00000000000..e2bb07f23be
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/db/it/aggregation/IoTDBAlignByDeviceWildcardIT.java
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.it.aggregation;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.LocalStandaloneIT;
+
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareData;
+import static org.apache.iotdb.db.it.utils.TestUtils.resultSetEqualTest;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({LocalStandaloneIT.class})
+public class IoTDBAlignByDeviceWildcardIT {
+
+ private static final String[] SQL_LIST =
+ new String[] {
+ "CREATE DATABASE root.min",
+ "CREATE TIMESERIES root.min.d1.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.min.d1.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.min.d2.s1 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "CREATE TIMESERIES root.min.d2.s2 WITH DATATYPE=INT32, ENCODING=PLAIN",
+ "INSERT INTO root.min.d1(time, s1, s2) VALUES(1, 1, 1)",
+ "INSERT INTO root.min.d1(time, s1, s2) VALUES(2, 1, 1)",
+ "FLUSH",
+ "INSERT INTO root.min.d2(time, s1, s2) VALUES(5, 1, 1)",
+ "FLUSH"
+ };
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+ EnvFactory.getEnv()
+ .getConfig()
+ .getCommonConfig()
+ .setDefaultDataRegionGroupNumPerDatabase(1)
+ .setTimePartitionInterval(1)
+ .setEnableSeqSpaceCompaction(false)
+ .setEnableUnseqSpaceCompaction(false)
+ .setEnableCrossSpaceCompaction(false);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareData(SQL_LIST);
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ // EnvFactory.getEnv().cleanClusterEnvironment();
+ }
+
+ @Test
+ public void testWildcardAlignByDeviceWithTimePartitionSplit() {
+ String sql =
+ "SELECT count(s1) FROM root.min.** "
+ + "WHERE s2 is not null and s1 is not null "
+ + "GROUP BY([1, 6), 1ms) ALIGN BY DEVICE";
+ String[] expectedHeader = new String[] {"Time", "Device", "count(s1)"};
+ String[] expectedRows =
+ new String[] {
+ "1,root.min.d1,1,",
+ "2,root.min.d1,1,",
+ "3,root.min.d1,0,",
+ "4,root.min.d1,0,",
+ "5,root.min.d1,0,",
+ "1,root.min.d2,0,",
+ "2,root.min.d2,0,",
+ "3,root.min.d2,0,",
+ "4,root.min.d2,0,",
+ "5,root.min.d2,1,",
+ };
+ resultSetEqualTest(sql, expectedHeader, expectedRows);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
index 078c2c3e846..317d293b504 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/DistributionPlanner.java
@@ -145,13 +145,23 @@ public class DistributionPlanner {
ExchangeNode exchangeNode = (ExchangeNode) child;
TRegionReplicaSet regionOfChild =
context.getNodeDistribution(exchangeNode.getChild().getPlanNodeId()).getRegion();
- MultiChildrenSinkNode newChild =
- memo.computeIfAbsent(
- regionOfChild,
- tRegionReplicaSet ->
- needShuffleSinkNode
- ? new
ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
- : new
IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId()));
+ MultiChildrenSinkNode newChild;
+ if (exchangeNode.isForcedExchange()) {
+ // Keep forced exchange branch isolated: do not merge into shared
sink memo.
+ newChild =
+ needShuffleSinkNode
+ ? new
ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+ : new
IdentitySinkNode(context.queryContext.getQueryId().genPlanNodeId());
+ } else {
+ newChild =
+ memo.computeIfAbsent(
+ regionOfChild,
+ tRegionReplicaSet ->
+ needShuffleSinkNode
+ ? new
ShuffleSinkNode(context.queryContext.getQueryId().genPlanNodeId())
+ : new IdentitySinkNode(
+
context.queryContext.getQueryId().genPlanNodeId()));
+ }
newChild.addChild(exchangeNode.getChild());
newChild.addDownStreamChannelLocation(
new
DownStreamChannelLocation(exchangeNode.getPlanNodeId().toString()));
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
index 2c3e560159a..db21b6a7af2 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/distribution/ExchangeNodeAdder.java
@@ -70,17 +70,17 @@ import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.RegionScanN
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesAggregationScanNode;
import
org.apache.iotdb.db.queryengine.plan.planner.plan.node.source.SeriesScanNode;
+import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-import static com.google.common.collect.ImmutableList.toImmutableList;
-
public class ExchangeNodeAdder implements PlanVisitor<PlanNode,
NodeGroupContext> {
private final Analysis analysis;
+ private boolean containsInnerTimeJoinInCurrentSubtree = false;
public ExchangeNodeAdder(Analysis analysis) {
this.analysis = analysis;
@@ -93,10 +93,7 @@ public class ExchangeNodeAdder implements
PlanVisitor<PlanNode, NodeGroupContext
return node;
}
// Visit all the children of current node
- List<PlanNode> children =
- node.getChildren().stream()
- .map(child -> child.accept(this, context))
- .collect(toImmutableList());
+ List<PlanNode> children =
visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
// Calculate the node distribution info according to its children
@@ -216,7 +213,13 @@ public class ExchangeNodeAdder implements
PlanVisitor<PlanNode, NodeGroupContext
@Override
public PlanNode visitDeviceView(DeviceViewNode node, NodeGroupContext
context) {
- return processMultiChildNode(node, context);
+ List<PlanNode> visitedChildren =
+ visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
+ // Force Exchange for multi-child DeviceView if any child subtree contains
InnerTimeJoin.
+ if (node.getChildren().size() > 1 &&
containsInnerTimeJoinInCurrentSubtree) {
+ return processMultiChildNodeWithForcedExchange(node, context,
visitedChildren);
+ }
+ return processMultiChildNode(node, context, visitedChildren);
}
@Override
@@ -237,7 +240,13 @@ public class ExchangeNodeAdder implements
PlanVisitor<PlanNode, NodeGroupContext
@Override
public PlanNode visitMergeSort(MergeSortNode node, NodeGroupContext context)
{
- return processMultiChildNode(node, context);
+ List<PlanNode> visitedChildren =
+ visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
+ // Force Exchange if any child subtree contains InnerTimeJoin.
+ if (containsInnerTimeJoinInCurrentSubtree) {
+ return processMultiChildNodeWithForcedExchange(node, context,
visitedChildren);
+ }
+ return processMultiChildNode(node, context, visitedChildren);
}
@Override
@@ -438,11 +447,14 @@ public class ExchangeNodeAdder implements
PlanVisitor<PlanNode, NodeGroupContext
return processMultiChildNodeByLocation(node, context);
}
- MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
List<PlanNode> visitedChildren =
- node.getChildren().stream()
- .map(child -> visit(child, context))
- .collect(Collectors.toList());
+ visitChildrenAndRecordInnerTimeJoin(node.getChildren(), context);
+ return processMultiChildNode(node, context, visitedChildren);
+ }
+
+ private PlanNode processMultiChildNode(
+ MultiChildProcessNode node, NodeGroupContext context, List<PlanNode>
visitedChildren) {
+ MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
// DataRegion in which node locates
TRegionReplicaSet dataRegion;
@@ -556,13 +568,47 @@ public class ExchangeNodeAdder implements
PlanVisitor<PlanNode, NodeGroupContext
}
private ExchangeNode genExchangeNode(NodeGroupContext context, PlanNode
child) {
+ return genExchangeNode(context, child, false);
+ }
+
+ private ExchangeNode genExchangeNode(
+ NodeGroupContext context, PlanNode child, boolean forcedExchange) {
ExchangeNode exchangeNode = new
ExchangeNode(context.queryContext.getQueryId().genPlanNodeId());
exchangeNode.setChild(child);
exchangeNode.setOutputColumnNames(child.getOutputColumnNames());
+ exchangeNode.setForcedExchange(forcedExchange);
context.hasExchangeNode = true;
return exchangeNode;
}
+ private PlanNode processMultiChildNodeWithForcedExchange(
+ MultiChildProcessNode node, NodeGroupContext context, List<PlanNode>
visitedChildren) {
+ MultiChildProcessNode newNode = (MultiChildProcessNode) node.clone();
+ for (PlanNode child : visitedChildren) {
+ newNode.addChild(genExchangeNode(context, child, true));
+ }
+ context.putNodeDistribution(
+ newNode.getPlanNodeId(),
+ new NodeDistribution(
+ NodeDistributionType.SAME_WITH_SOME_CHILD,
context.getMostlyUsedDataRegion()));
+ return newNode;
+ }
+
+ private List<PlanNode> visitChildrenAndRecordInnerTimeJoin(
+ List<PlanNode> children, NodeGroupContext context) {
+ List<PlanNode> result = new ArrayList<>(children.size());
+ boolean originalTimeJoin = containsInnerTimeJoinInCurrentSubtree;
+ boolean hasInnerTimeJoinInChildren = false;
+ for (PlanNode child : children) {
+ containsInnerTimeJoinInCurrentSubtree = false;
+ PlanNode visitedChild = visit(child, context);
+ hasInnerTimeJoinInChildren |= containsInnerTimeJoinInCurrentSubtree;
+ result.add(visitedChild);
+ }
+ containsInnerTimeJoinInCurrentSubtree = originalTimeJoin ||
hasInnerTimeJoinInChildren;
+ return result;
+ }
+
@Override
public PlanNode visitSlidingWindowAggregation(
SlidingWindowAggregationNode node, NodeGroupContext context) {
@@ -596,9 +642,9 @@ public class ExchangeNodeAdder implements
PlanVisitor<PlanNode, NodeGroupContext
if (region == null
&&
context.getNodeDistribution(child.getPlanNodeId()).getType()
== NodeDistributionType.SAME_WITH_ALL_CHILDREN) {
- return
calculateSchemaRegionByChildren(child.getChildren(), context);
+ region =
calculateSchemaRegionByChildren(child.getChildren(), context);
}
- return region;
+ return region == null ? DataPartition.NOT_ASSIGNED :
region;
},
Collectors.counting()));
@@ -656,6 +702,13 @@ public class ExchangeNodeAdder implements
PlanVisitor<PlanNode, NodeGroupContext
}
public PlanNode visit(PlanNode node, NodeGroupContext context) {
- return node.accept(this, context);
+ boolean originalTimeJoin = containsInnerTimeJoinInCurrentSubtree;
+ containsInnerTimeJoinInCurrentSubtree = false;
+ PlanNode visitedNode = node.accept(this, context);
+ containsInnerTimeJoinInCurrentSubtree =
+ originalTimeJoin
+ || containsInnerTimeJoinInCurrentSubtree
+ || node instanceof InnerTimeJoinNode;
+ return visitedNode;
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ExchangeNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ExchangeNode.java
index 084141e90a7..0902f52995c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ExchangeNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/plan/node/process/ExchangeNode.java
@@ -51,6 +51,9 @@ public class ExchangeNode extends SingleChildProcessNode {
/** Exchange needs to know which child of IdentitySinkNode/ShuffleSinkNode
it matches */
private int indexOfUpstreamSinkHandle = 0;
+ /** Planner-only flag: this exchange is forced and should keep independent
upstream sink. */
+ private transient boolean forcedExchange = false;
+
public ExchangeNode(PlanNodeId id) {
super(id);
}
@@ -75,6 +78,7 @@ public class ExchangeNode extends SingleChildProcessNode {
ExchangeNode node = new ExchangeNode(getPlanNodeId());
node.setOutputColumnNames(outputColumnNames);
node.setIndexOfUpstreamSinkHandle(indexOfUpstreamSinkHandle);
+ node.setForcedExchange(forcedExchange);
return node;
}
@@ -171,6 +175,14 @@ public class ExchangeNode extends SingleChildProcessNode {
this.indexOfUpstreamSinkHandle = indexOfUpstreamSinkHandle;
}
+ public boolean isForcedExchange() {
+ return forcedExchange;
+ }
+
+ public void setForcedExchange(boolean forcedExchange) {
+ this.forcedExchange = forcedExchange;
+ }
+
public TEndPoint getUpstreamEndpoint() {
return upstreamEndpoint;
}
@@ -197,11 +209,13 @@ public class ExchangeNode extends SingleChildProcessNode {
ExchangeNode that = (ExchangeNode) o;
return Objects.equals(upstreamEndpoint, that.upstreamEndpoint)
&& Objects.equals(upstreamInstanceId, that.upstreamInstanceId)
- && Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId);
+ && Objects.equals(upstreamPlanNodeId, that.upstreamPlanNodeId)
+ && forcedExchange == that.forcedExchange;
}
@Override
public int hashCode() {
- return Objects.hash(super.hashCode(), upstreamEndpoint,
upstreamInstanceId, upstreamPlanNodeId);
+ return Objects.hash(
+ super.hashCode(), upstreamEndpoint, upstreamInstanceId,
upstreamPlanNodeId, forcedExchange);
}
}