This is an automated email from the ASF dual-hosted git repository.

jackietien pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iotdb.git


The following commit(s) were added to refs/heads/master by this push:
     new 07d457e36df Fix union when cross region or need mapping from child 
input
07d457e36df is described below

commit 07d457e36df50a23bf4b3fe1aacd301578f1bb93
Author: Weihao Li <[email protected]>
AuthorDate: Tue Oct 21 17:38:08 2025 +0800

    Fix union when cross region or need mapping from child input
---
 .../it/query/recent/IoTDBUnionTable2IT.java        | 45 +++++++++++++
 .../it/query/recent/IoTDBUnionTableIT.java         | 18 +++++
 .../operator/process/CollectOperator.java          |  6 +-
 .../operator/process/MappingCollectOperator.java   | 76 ++++++++++++++++++++++
 .../plan/planner/TableOperatorGenerator.java       | 20 +++++-
 .../distribute/TableDistributedPlanGenerator.java  | 34 +++++++++-
 .../relational/planner/node/SetOperationNode.java  |  4 +-
 .../plan/relational/planner/node/UnionNode.java    | 31 ++++++++-
 .../plan/relational/type/CompatibleResolver.java   |  4 ++
 9 files changed, 228 insertions(+), 10 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTable2IT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTable2IT.java
new file mode 100644
index 00000000000..e3235032cb2
--- /dev/null
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTable2IT.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.relational.it.query.recent;
+
+import org.apache.iotdb.it.env.EnvFactory;
+import org.apache.iotdb.it.framework.IoTDBTestRunner;
+import org.apache.iotdb.itbase.category.TableClusterIT;
+import org.apache.iotdb.itbase.category.TableLocalStandaloneIT;
+
+import org.junit.BeforeClass;
+import org.junit.experimental.categories.Category;
+import org.junit.runner.RunWith;
+
+import static 
org.apache.iotdb.confignode.it.partition.IoTDBPartitionShuffleStrategyIT.SHUFFLE;
+import static org.apache.iotdb.db.it.utils.TestUtils.prepareTableData;
+
+@RunWith(IoTDBTestRunner.class)
+@Category({TableLocalStandaloneIT.class, TableClusterIT.class})
+public class IoTDBUnionTable2IT extends IoTDBUnionTableIT {
+
+  @BeforeClass
+  public static void setUp() throws Exception {
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setTimePartitionInterval(2);
+    
EnvFactory.getEnv().getConfig().getCommonConfig().setDataPartitionAllocationStrategy(SHUFFLE);
+    EnvFactory.getEnv().initClusterEnvironment();
+    prepareTableData(createSqls);
+  }
+}
diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTableIT.java
index 90ed5f04332..28065d1f33b 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/query/recent/IoTDBUnionTableIT.java
@@ -113,6 +113,24 @@ public class IoTDBUnionTableIT {
         DATABASE_NAME);
   }
 
+  @Test
+  public void mappingTest() {
+    String[] expectedHeader = new String[] {"col_a"};
+    String[] retArray = new String[] {"1.0,", "2.0,", "3.0,"};
+    tableResultSetEqualTest(
+        "select col_a from ((select s1 as col_a, device as col_b from table1) 
union (select s2 as col_a, device as col_b from table2)) order by col_a",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+
+    retArray = new String[] {"1.0,", "1.0,", "2.0,", "3.0,"};
+    tableResultSetEqualTest(
+        "select col_a from ((select s1 as col_a, device as col_b from table1) 
union all (select s2 as col_a, device as col_b from table2)) order by col_a",
+        expectedHeader,
+        retArray,
+        DATABASE_NAME);
+  }
+
   @Test
   public void exceptionTest() {
     tableAssertTestFail(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
index 5c287332053..668cf193de9 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/CollectOperator.java
@@ -35,10 +35,10 @@ public class CollectOperator implements ProcessOperator {
       RamUsageEstimator.shallowSizeOfInstance(CollectOperator.class);
 
   private final OperatorContext operatorContext;
-  private final List<Operator> children;
+  protected final List<Operator> children;
   private boolean inited = false;
 
-  private int currentIndex;
+  protected int currentIndex;
 
   public CollectOperator(OperatorContext operatorContext, List<Operator> 
children) {
     this.operatorContext = operatorContext;
@@ -62,7 +62,7 @@ public class CollectOperator implements ProcessOperator {
     }
   }
 
-  private void closeCurrentChild(int index) throws Exception {
+  protected void closeCurrentChild(int index) throws Exception {
     children.get(index).close();
     children.set(index, null);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MappingCollectOperator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MappingCollectOperator.java
new file mode 100644
index 00000000000..65d8c5a36f6
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/operator/process/MappingCollectOperator.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.iotdb.db.queryengine.execution.operator.process;
+
+import org.apache.iotdb.db.queryengine.execution.operator.Operator;
+import org.apache.iotdb.db.queryengine.execution.operator.OperatorContext;
+import 
org.apache.iotdb.db.queryengine.execution.operator.source.relational.TableScanOperator;
+
+import org.apache.tsfile.block.column.Column;
+import org.apache.tsfile.read.common.block.TsBlock;
+import org.apache.tsfile.read.common.block.column.RunLengthEncodedColumn;
+import org.apache.tsfile.utils.RamUsageEstimator;
+
+import java.util.List;
+
+public class MappingCollectOperator extends CollectOperator {
+  protected static final long INSTANCE_SIZE =
+      RamUsageEstimator.shallowSizeOfInstance(MappingCollectOperator.class);
+
+  // record mapping for each child
+  private final List<List<Integer>> mappings;
+
+  public MappingCollectOperator(
+      OperatorContext operatorContext, List<Operator> children, 
List<List<Integer>> mappings) {
+    super(operatorContext, children);
+    this.mappings = mappings;
+  }
+
+  @Override
+  public TsBlock next() throws Exception {
+    if (children.get(currentIndex).hasNextWithTimer()) {
+      TsBlock tsBlock = children.get(currentIndex).nextWithTimer();
+      if (tsBlock == null) {
+        return null;
+      } else {
+        Column[] columns = new Column[tsBlock.getValueColumnCount()];
+        List<Integer> mapping = mappings.get(currentIndex);
+        for (int i = 0; i < columns.length; i++) {
+          columns[i] = tsBlock.getColumn(mapping.get(i));
+        }
+        return TsBlock.wrapBlocksWithoutCopy(
+            tsBlock.getPositionCount(),
+            new RunLengthEncodedColumn(
+                TableScanOperator.TIME_COLUMN_TEMPLATE, 
tsBlock.getPositionCount()),
+            columns);
+      }
+    } else {
+      closeCurrentChild(currentIndex);
+      currentIndex++;
+      return null;
+    }
+  }
+
+  protected void closeCurrentChild(int index) throws Exception {
+    children.get(index).close();
+    children.set(index, null);
+    mappings.set(index, null);
+  }
+}
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 9dfc1e0c48f..c5ffb2752ee 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -51,6 +51,7 @@ import 
org.apache.iotdb.db.queryengine.execution.operator.process.CollectOperato
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.EnforceSingleRowOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.FilterAndProjectOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.LimitOperator;
+import 
org.apache.iotdb.db.queryengine.execution.operator.process.MappingCollectOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.OffsetOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.PatternRecognitionOperator;
 import 
org.apache.iotdb.db.queryengine.execution.operator.process.PreviousFillWithGroupOperator;
@@ -246,6 +247,7 @@ import 
org.apache.iotdb.udf.api.relational.table.TableFunctionProcessorProvider;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
 import org.apache.tsfile.block.column.Column;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.common.conf.TSFileDescriptor;
@@ -4140,7 +4142,21 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
             .addOperatorContext(
                 context.getNextOperatorId(),
                 node.getPlanNodeId(),
-                CollectOperator.class.getSimpleName());
-    return new CollectOperator(operatorContext, children);
+                MappingCollectOperator.class.getSimpleName());
+
+    int size = children.size();
+    List<List<Integer>> mappings = new ArrayList<>(size);
+    List<Symbol> unionOutputs = node.getOutputSymbols();
+    ListMultimap<Symbol, Symbol> outputToInputs = node.getSymbolMapping();
+    for (int i = 0; i < size; i++) {
+      Map<Symbol, Integer> childOutputs =
+          
makeLayoutFromOutputSymbols(node.getChildren().get(i).getOutputSymbols());
+      int finalI = i;
+      mappings.add(
+          unionOutputs.stream()
+              .map(symbol -> 
childOutputs.get(outputToInputs.get(symbol).get(finalI)))
+              .collect(Collectors.toList()));
+    }
+    return new MappingCollectOperator(operatorContext, children, mappings);
   }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index 5626f6865cb..36f92476cfd 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -101,7 +101,9 @@ import 
org.apache.iotdb.db.schemaengine.table.DataNodeTableCache;
 import org.apache.iotdb.db.schemaengine.table.DataNodeTreeViewSchemaUtils;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableListMultimap;
 import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.ListMultimap;
 import org.apache.tsfile.common.conf.TSFileConfig;
 import org.apache.tsfile.file.metadata.IDeviceID;
 import org.apache.tsfile.read.common.type.LongType;
@@ -111,6 +113,7 @@ import org.apache.tsfile.utils.Pair;
 import javax.annotation.Nonnull;
 
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
 import java.util.HashMap;
@@ -1779,7 +1782,36 @@ public class TableDistributedPlanGenerator
   @Override
   public List<PlanNode> visitUnion(UnionNode node, PlanContext context) {
     context.clearExpectedOrderingScheme();
-    return visitMultiChildProcess(node, context);
+    List<List<PlanNode>> children =
+        node.getChildren().stream()
+            .map(child -> child.accept(this, context))
+            .collect(toImmutableList());
+
+    List<PlanNode> newUnionChildren =
+        
children.stream().flatMap(Collection::stream).collect(toImmutableList());
+
+    // after rewrite, we need to reconstruct SymbolMapping
+    ListMultimap<Symbol, Symbol> oldSymbolMapping = node.getSymbolMapping();
+    ImmutableListMultimap.Builder<Symbol, Symbol> newSymbolMapping =
+        ImmutableListMultimap.builder();
+    for (Symbol symbol : oldSymbolMapping.keySet()) {
+      List<Symbol> oldSymbols = oldSymbolMapping.get(symbol);
+      for (int i = 0; i < oldSymbols.size(); i++) {
+        Symbol target = oldSymbols.get(i);
+        int duplicateSize = children.get(i).size();
+        // add the same Symbol for all children spilt from one original node
+        while (duplicateSize > 0) {
+          newSymbolMapping.put(symbol, target);
+          duplicateSize--;
+        }
+      }
+    }
+    return Collections.singletonList(
+        new UnionNode(
+            node.getPlanNodeId(),
+            newUnionChildren,
+            newSymbolMapping.build(),
+            node.getOutputSymbols()));
   }
 
   public static class PlanContext {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SetOperationNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SetOperationNode.java
index a23ba5f6cd4..6f399c1bedc 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SetOperationNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/SetOperationNode.java
@@ -46,9 +46,7 @@ import static 
com.google.common.collect.ImmutableList.toImmutableList;
 import static java.util.Objects.requireNonNull;
 
 public abstract class SetOperationNode extends MultiChildProcessNode {
-  // Corresponding is not supported in UNION now, this field can be used for 
future expansion.
-  // We don't need to serialize this field now, consider it when support 
Corresponding.
-  private final transient ListMultimap<Symbol, Symbol> outputToInputs;
+  private final ListMultimap<Symbol, Symbol> outputToInputs;
   private final List<Symbol> outputs;
 
   protected SetOperationNode(
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/UnionNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/UnionNode.java
index 59fa1674368..4332542374c 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/UnionNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/UnionNode.java
@@ -69,6 +69,16 @@ public class UnionNode extends SetOperationNode {
     PlanNodeType.TABLE_UNION_NODE.serialize(byteBuffer);
     ReadWriteIOUtils.write(getOutputSymbols().size(), byteBuffer);
     getOutputSymbols().forEach(symbol -> Symbol.serialize(symbol, byteBuffer));
+
+    ListMultimap<Symbol, Symbol> multimap = getSymbolMapping();
+    ReadWriteIOUtils.write(multimap.size(), byteBuffer);
+    for (Symbol key : multimap.keySet()) {
+      Symbol.serialize(key, byteBuffer);
+      ReadWriteIOUtils.write(multimap.get(key).size(), byteBuffer);
+      for (Symbol value : multimap.get(key)) {
+        Symbol.serialize(value, byteBuffer);
+      }
+    }
   }
 
   @Override
@@ -78,6 +88,16 @@ public class UnionNode extends SetOperationNode {
     for (Symbol symbol : getOutputSymbols()) {
       Symbol.serialize(symbol, stream);
     }
+
+    ListMultimap<Symbol, Symbol> multimap = getSymbolMapping();
+    ReadWriteIOUtils.write(multimap.keySet().size(), stream);
+    for (Symbol key : multimap.keySet()) {
+      Symbol.serialize(key, stream);
+      ReadWriteIOUtils.write(multimap.get(key).size(), stream);
+      for (Symbol value : multimap.get(key)) {
+        Symbol.serialize(value, stream);
+      }
+    }
   }
 
   public static UnionNode deserialize(ByteBuffer byteBuffer) {
@@ -86,8 +106,17 @@ public class UnionNode extends SetOperationNode {
     while (size-- > 0) {
       outputs.add(Symbol.deserialize(byteBuffer));
     }
+    ImmutableListMultimap.Builder<Symbol, Symbol> builder = 
ImmutableListMultimap.builder();
+    size = ReadWriteIOUtils.readInt(byteBuffer);
+    while (size-- > 0) {
+      Symbol key = Symbol.deserialize(byteBuffer);
+      int valueSize = ReadWriteIOUtils.readInt(byteBuffer);
+      for (int i = 0; i < valueSize; i++) {
+        builder.put(key, Symbol.deserialize(byteBuffer));
+      }
+    }
     PlanNodeId planNodeId = PlanNodeId.deserialize(byteBuffer);
-    return new UnionNode(planNodeId, ImmutableListMultimap.of(), outputs);
+    return new UnionNode(planNodeId, builder.build(), outputs);
   }
 
   @Override
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/CompatibleResolver.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/CompatibleResolver.java
index de7ac3327fb..44d0c232a29 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/CompatibleResolver.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/type/CompatibleResolver.java
@@ -95,6 +95,10 @@ public class CompatibleResolver {
     addCondition(UNKNOWN, DOUBLE, DOUBLE);
     addCondition(UNKNOWN, DATE, DATE);
     addCondition(UNKNOWN, TIMESTAMP, TIMESTAMP);
+    addCondition(UNKNOWN, BOOLEAN, BOOLEAN);
+    addCondition(UNKNOWN, TEXT, TEXT);
+    addCondition(UNKNOWN, STRING, STRING);
+    addCondition(UNKNOWN, BLOB, BLOB);
   }
 
   private static void addCondition(Type condition1, Type condition2, Type 
result) {

Reply via email to