This is an automated email from the ASF dual-hosted git repository.
jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git
The following commit(s) were added to refs/heads/master by this push:
new 07d457e36df Fix union when cross region or need mapping from child
input
07d457e36df is described below
commit 07d457e36df50a23bf4b3fe1aacd301578f1bb93
Author: Weihao Li <[email protected]>
AuthorDate: Tue Oct 21 17:38:08 2025 +0800
Fix union when cross region or need mapping from child input
---
.../it/query/recent/IoTDBUnionTable2IT.java | 45 +++++++++++++
.../it/query/recent/IoTDBUnionTableIT.java | 18 +++++
.../operator/process/CollectOperator.java | 6 +-
.../operator/process/MappingCollectOperator.java | 76 ++++++++++++++++++++++
.../plan/planner/TableOperatorGenerator.java | 20 +++++-
.../distribute/TableDistributedPlanGenerator.java | 34 +++++++++-
.../relational/planner/node/SetOperationNode.java | 4 +-
.../plan/relational/planner/node/UnionNode.java | 31 ++++++++-
.../plan/relational/type/CompatibleResolver.java | 4 ++
9 files changed, 228 insertions(+), 10 deletions(-)
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTable2IT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTable2IT.java
new file mode 100644
index 00000000000..e3235032cb2
--- /dev/null
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTable2IT.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static
org.apache.iotdb.confignode.it.partition.IoTDBPartitionShuffleStrategyIT.SHUFFLE;
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBUnionTable2IT extends IoTDBUnionTableIT {
+
+ @BeforeClass
+ public static void setUp() throws Exception {
+
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(2);
+
EnvFactory.getEnv().getConfig().getCommonConfig().setDataPartitionAllocationStrategy(SHUFFLE);
+ EnvFactory.getEnv().initClusterEnvironment();
+ prepareTableData(createSqls);
+ }
+}
diff --git
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTableIT.java
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTableIT.java
index 90ed5f04332..28065d1f33b 100644
---
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTableIT.java
+++
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTableIT.java
@@ -113,6 +113,24 @@ public class IoTDBUnionTableIT {
DATABASE_NAME);
}
+ @Test
+ public void mappingTest() {
+ String[] expectedHeader = new String[] {"col_a"};
+ String[] retArray = new String[] {"1.0,", "2.0,", "3.0,"};
+ tableResultSetEqualTest(
+ "select col_a from ((select s1 as col_a, device as col_b from table1)
union (select s2 as col_a, device as col_b from table2)) order by col_a",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+
+ retArray = new String[] {"1.0,", "1.0,", "2.0,", "3.0,"};
+ tableResultSetEqualTest(
+ "select col_a from ((select s1 as col_a, device as col_b from table1)
union all (select s2 as col_a, device as col_b from table2)) order by col_a",
+ expectedHeader,
+ retArray,
+ DATABASE_NAME);
+ }
+
@Test
public void exceptionTest() {
tableAssertTestFail(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
index 5c287332053..668cf193de9 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
@@ -35,10 +35,10 @@ public class CollectOperator implements ProcessOperator {
RamUsageEstimator.shallowSizeOfInstance(CollectOperator.class);
private final OperatorContext operatorContext;
- private final List<Operator> children;
+ protected final List<Operator> children;
private boolean inited = false;
- private int currentIndex;
+ protected int currentIndex;
public CollectOperator(OperatorContext operatorContext, List<Operator>
children) {
this.operatorContext = operatorContext;
@@ -62,7 +62,7 @@ public class CollectOperator implements ProcessOperator {
}
}
- private void closeCurrentChild(int index) throws Exception {
+ protected void closeCurrentChild(int index) throws Exception {
children.get(index).close();
children.set(index, null);
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MappingCollectOperator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MappingCollectOperator.java
new file mode 100644
index 00000000000..65d8c5a36f6
--- /dev/null
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MappingCollectOperator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+
+public class MappingCollectOperator extends CollectOperator {
+ protected static final long INSTANCE_SIZE =
+ RamUsageEstimator.shallowSizeOfInstance(MappingCollectOperator.class);
+
+ // record mapping for each child
+ private final List<List<Integer>> mappings;
+
+ public MappingCollectOperator(
+ OperatorContext operatorContext, List<Operator> children,
List<List<Integer>> mappings) {
+ super(operatorContext, children);
+ this.mappings = mappings;
+ }
+
+ @Override
+ public TsBlock next() throws Exception {
+ if (children.get(currentIndex).hasNextWithTimer()) {
+ TsBlock tsBlock = children.get(currentIndex).nextWithTimer();
+ if (tsBlock == null) {
+ return null;
+ } else {
+ Column[] columns = new Column[tsBlock.getValueColumnCount()];
+ List<Integer> mapping = mappings.get(currentIndex);
+ for (int i = 0; i < columns.length; i++) {
+ columns[i] = tsBlock.getColumn(mapping.get(i));
+ }
+ return TsBlock.wrapBlocksWithoutCopy(
+ tsBlock.getPositionCount(),
+ new RunLengthEncodedColumn(
+ TableScanOperator.TIME_COLUMN_TEMPLATE,
tsBlock.getPositionCount()),
+ columns);
+ }
+ } else {
+ closeCurrentChild(currentIndex);
+ currentIndex++;
+ return null;
+ }
+ }
+
+ protected void closeCurrentChild(int index) throws Exception {
+ children.get(index).close();
+ children.set(index, null);
+ mappings.set(index, null);
+ }
+}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 9dfc1e0c48f..c5ffb2752ee 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -51,6 +51,7 @@ import
org.apache.iotdb.db.queryengine.execution.operator.process.CollectOperato
import
org.apache.iotdb.db.queryengine.execution.operator.process.EnforceSingleRowOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
+import
org.apache.iotdb.db.queryengine.execution.operator.process.MappingCollectOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.PatternRecognitionOperator;
import
org.apache.iotdb.db.queryengine.execution.operator.process.PreviousFillWithGroupOperator;
@@ -246,6 +247,7 @@ import
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
import org.apache.tsfile.block.column.Column;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.common.conf.TSFileDescriptor;
@@ -4140,7 +4142,21 @@ public class TableOperatorGenerator extends
PlanVisitor<Operator, LocalExecution
.addOperatorContext(
context.getNextOperatorId(),
node.getPlanNodeId(),
- CollectOperator.class.getSimpleName());
- return new CollectOperator(operatorContext, children);
+ MappingCollectOperator.class.getSimpleName());
+
+ int size = children.size();
+ List<List<Integer>> mappings = new ArrayList<>(size);
+ List<Symbol> unionOutputs = node.getOutputSymbols();
+ ListMultimap<Symbol, Symbol> outputToInputs = node.getSymbolMapping();
+ for (int i = 0; i < size; i++) {
+ Map<Symbol, Integer> childOutputs =
+
makeLayoutFromOutputSymbols(node.getChildren().get(i).getOutputSymbols());
+ int finalI = i;
+ mappings.add(
+ unionOutputs.stream()
+ .map(symbol ->
childOutputs.get(outputToInputs.get(symbol).get(finalI)))
+ .collect(Collectors.toList()));
+ }
+ return new MappingCollectOperator(operatorContext, children, mappings);
}
}
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 5626f6865cb..36f92476cfd 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -101,7 +101,9 @@ import
org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils;
import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
import org.apache.tsfile.common.conf.TSFileConfig;
import org.apache.tsfile.file.metadata.IDeviceID;
import org.apache.tsfile.read.common.type.LongType;
@@ -111,6 +113,7 @@ import org.apache.tsfile.utils.Pair;
import javax.annotation.Nonnull;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
@@ -1779,7 +1782,36 @@ public class TableDistributedPlanGenerator
@Override
public List<PlanNode> visitUnion(UnionNode node, PlanContext context) {
context.clearExpectedOrderingScheme();
- return visitMultiChildProcess(node, context);
+ List<List<PlanNode>> children =
+ node.getChildren().stream()
+ .map(child -> child.accept(this, context))
+ .collect(toImmutableList());
+
+ List<PlanNode> newUnionChildren =
+
children.stream().flatMap(Collection::stream).collect(toImmutableList());
+
+ // after rewrite, we need to reconstruct SymbolMapping
+ ListMultimap<Symbol, Symbol> oldSymbolMapping = node.getSymbolMapping();
+ ImmutableListMultimap.Builder<Symbol, Symbol> newSymbolMapping =
+ ImmutableListMultimap.builder();
+ for (Symbol symbol : oldSymbolMapping.keySet()) {
+ List<Symbol> oldSymbols = oldSymbolMapping.get(symbol);
+ for (int i = 0; i < oldSymbols.size(); i++) {
+ Symbol target = oldSymbols.get(i);
+ int duplicateSize = children.get(i).size();
+ // add the same Symbol for all children spilt from one original node
+ while (duplicateSize > 0) {
+ newSymbolMapping.put(symbol, target);
+ duplicateSize--;
+ }
+ }
+ }
+ return Collections.singletonList(
+ new UnionNode(
+ node.getPlanNodeId(),
+ newUnionChildren,
+ newSymbolMapping.build(),
+ node.getOutputSymbols()));
}
public static class PlanContext {
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SetOperationNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SetOperationNode.java
index a23ba5f6cd4..6f399c1bedc 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SetOperationNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SetOperationNode.java
@@ -46,9 +46,7 @@ import static
com.google.common.collect.ImmutableList.toImmutableList;
import static java.util.Objects.requireNonNull;
public abstract class SetOperationNode extends MultiChildProcessNode {
- // Corresponding is not supported in UNION now, this field can be used for
future expansion.
- // We don't need to serialize this field now, consider it when support
Corresponding.
- private final transient ListMultimap<Symbol, Symbol> outputToInputs;
+ private final ListMultimap<Symbol, Symbol> outputToInputs;
private final List<Symbol> outputs;
protected SetOperationNode(
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/UnionNode.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/UnionNode.java
index 59fa1674368..4332542374c 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/UnionNode.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/UnionNode.java
@@ -69,6 +69,16 @@ public class UnionNode extends SetOperationNode {
PlanNodeType.TABLE_UNION_NODE.serialize(byteBuffer);
ReadWriteIOUtils.write(getOutputSymbols().size(), byteBuffer);
getOutputSymbols().forEach(symbol -> Symbol.serialize(symbol, byteBuffer));
+
+ ListMultimap<Symbol, Symbol> multimap = getSymbolMapping();
+ ReadWriteIOUtils.write(multimap.size(), byteBuffer);
+ for (Symbol key : multimap.keySet()) {
+ Symbol.serialize(key, byteBuffer);
+ ReadWriteIOUtils.write(multimap.get(key).size(), byteBuffer);
+ for (Symbol value : multimap.get(key)) {
+ Symbol.serialize(value, byteBuffer);
+ }
+ }
}
@Override
@@ -78,6 +88,16 @@ public class UnionNode extends SetOperationNode {
for (Symbol symbol : getOutputSymbols()) {
Symbol.serialize(symbol, stream);
}
+
+ ListMultimap<Symbol, Symbol> multimap = getSymbolMapping();
+ ReadWriteIOUtils.write(multimap.keySet().size(), stream);
+ for (Symbol key : multimap.keySet()) {
+ Symbol.serialize(key, stream);
+ ReadWriteIOUtils.write(multimap.get(key).size(), stream);
+ for (Symbol value : multimap.get(key)) {
+ Symbol.serialize(value, stream);
+ }
+ }
}
public static UnionNode deserialize(ByteBuffer byteBuffer) {
@@ -86,8 +106,17 @@ public class UnionNode extends SetOperationNode {
while (size-- > 0) {
outputs.add(Symbol.deserialize(byteBuffer));
}
+ ImmutableListMultimap.Builder<Symbol, Symbol> builder =
ImmutableListMultimap.builder();
+ size = ReadWriteIOUtils.readInt(byteBuffer);
+ while (size-- > 0) {
+ Symbol key = Symbol.deserialize(byteBuffer);
+ int valueSize = ReadWriteIOUtils.readInt(byteBuffer);
+ for (int i = 0; i < valueSize; i++) {
+ builder.put(key, Symbol.deserialize(byteBuffer));
+ }
+ }
PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
- return new UnionNode(planNodeId, ImmutableListMultimap.of(), outputs);
+ return new UnionNode(planNodeId, builder.build(), outputs);
}
@Override
diff --git
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/CompatibleResolver.java
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/CompatibleResolver.java
index de7ac3327fb..44d0c232a29 100644
---
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/CompatibleResolver.java
+++
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/CompatibleResolver.java
@@ -95,6 +95,10 @@ public class CompatibleResolver {
addCondition(UNKNOWN, DOUBLE, DOUBLE);
addCondition(UNKNOWN, DATE, DATE);
addCondition(UNKNOWN, TIMESTAMP, TIMESTAMP);
+ addCondition(UNKNOWN, BOOLEAN, BOOLEAN);
+ addCondition(UNKNOWN, TEXT, TEXT);
+ addCondition(UNKNOWN, STRING, STRING);
+ addCondition(UNKNOWN, BLOB, BLOB);
}
private static void addCondition(Type condition1, Type condition2, Type
result) {