This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/join
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit bda7dc1b167d83631b7c3190943384680b508725
Author: Beyyes <[email protected]>
AuthorDate: Mon Aug 26 22:49:53 2024 +0800

    fix typeprovider
---
 .../db/it/IoTDBMultiIDsWithAttributesTableIT.java  |  3 --
 .../plan/planner/TableOperatorGenerator.java       |  6 ++++
 .../distribute/TableDistributedPlanGenerator.java  | 18 ++++++----
 .../TableModelTypeProviderExtractor.java           |  8 +++++
 .../plan/relational/planner/node/JoinNode.java     |  3 ++
 .../optimizations/PushPredicateIntoTableScan.java  | 38 +++++++++++++++-------
 6 files changed, 56 insertions(+), 20 deletions(-)

diff --git 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
index 4457055bd3e..9eeea1bc77d 100644
--- 
a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
+++ 
b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java
@@ -91,9 +91,6 @@ public class IoTDBMultiIDsWithAttributesTableIT {
     EnvFactory.getEnv()
         .getConfig()
         .getCommonConfig()
-        .setEnableSeqSpaceCompaction(false)
-        .setEnableUnseqSpaceCompaction(false)
-        .setEnableCrossSpaceCompaction(false)
         .setMaxTsBlockLineNumber(1)
         .setMaxNumberOfPointsInPage(1);
     insertData();
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
index 94fdb0c4df7..c4c7bd5f2ad 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java
@@ -70,6 +70,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
@@ -737,6 +738,11 @@ public class TableOperatorGenerator extends 
PlanVisitor<Operator, LocalExecution
         TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber());
   }
 
+  @Override
+  public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) {
+    throw new IllegalStateException("JoinOperator is not implemented 
currently.");
+  }
+
   @Override
   public Operator visitCountMerge(
       final CountSchemaMergeNode node, final LocalExecutionPlanContext 
context) {
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
index efd17df6e23..ba9049ad776 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java
@@ -321,17 +321,23 @@ public class TableDistributedPlanGenerator
 
   @Override
   public List<PlanNode> visitJoin(JoinNode node, PlanContext context) {
+    // child of JoinNode must be SortNode, so after rewritten, the child must 
be MergeSortNode or
+    // SortNode
     List<PlanNode> leftChildrenNodes = node.getLeftChild().accept(this, 
context);
+    checkArgument(
+        leftChildrenNodes.size() == 1, "The size of left children node of 
JoinNode should be 1");
+    node.setLeftChild(leftChildrenNodes.get(0));
+
     OrderingScheme leftChildOrdering =
         nodeOrderingMap.get(leftChildrenNodes.get(0).getPlanNodeId());
-    PlanNode leftNode = mergeChildrenViaCollectOrMergeSort(leftChildOrdering, 
leftChildrenNodes);
-    node.setLeftChild(leftNode);
+    if (leftChildOrdering != null) {
+      nodeOrderingMap.put(node.getPlanNodeId(), leftChildOrdering);
+    }
 
     List<PlanNode> rightChildrenNodes = node.getRightChild().accept(this, 
context);
-    OrderingScheme rightChildOrdering =
-        nodeOrderingMap.get(rightChildrenNodes.get(0).getPlanNodeId());
-    PlanNode rightNode = 
mergeChildrenViaCollectOrMergeSort(rightChildOrdering, leftChildrenNodes);
-    node.setLeftChild(rightNode);
+    checkArgument(
+        rightChildrenNodes.size() == 1, "The size of right children node of 
JoinNode should be 1");
+    node.setRightChild(rightChildrenNodes.get(0));
 
     return Collections.singletonList(node);
   }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
index deb6678d4c6..971f92051a7 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java
@@ -26,6 +26,7 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkN
 import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
@@ -194,6 +195,13 @@ public class TableModelTypeProviderExtractor {
       return null;
     }
 
+    @Override
+    public Void visitJoin(JoinNode node, Void context) {
+      node.getLeftChild().accept(this, context);
+      node.getRightChild().accept(this, context);
+      return null;
+    }
+
     @Override
     public Void visitIdentitySink(IdentitySinkNode node, Void context) {
       node.getChildren().forEach(c -> c.accept(this, context));
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
index 45358d7d9e9..a729cd2bbec 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
@@ -104,6 +104,9 @@ public class JoinNode extends TwoChildProcessNode {
     checkArgument(
         leftSymbols.containsAll(leftOutputSymbols),
         "Left source inputs do not contain all left output symbols");
+    if (!rightSymbols.containsAll(rightOutputSymbols)) {
+      System.out.println("====");
+    }
     checkArgument(
         rightSymbols.containsAll(rightOutputSymbols),
         "Right source inputs do not contain all right output symbols");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
index 4c7d10a5b20..5f3ce5c8f49 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java
@@ -54,7 +54,6 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference;
 
 import com.google.common.collect.ImmutableList;
-import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.ImmutableSet;
 import org.apache.tsfile.read.filter.basic.Filter;
 import org.apache.tsfile.utils.Pair;
@@ -64,6 +63,7 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.EnumSet;
 import java.util.HashSet;
+import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -306,7 +306,7 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
     @Override
     public PlanNode visitJoin(JoinNode node, Void context) {
       hasJoinNode = true;
-      Expression inheritedPredicate = predicate;
+      Expression inheritedPredicate = predicate != null ? predicate : 
TRUE_LITERAL;
 
       // See if we can rewrite outer joins in terms of a plain inner join
       node = tryNormalizeToOuterToInnerJoin(node, inheritedPredicate);
@@ -801,9 +801,10 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
     /** Get deviceEntries and DataPartition used in TableScan. */
     private PlanNode tableMetadataIndexScan(
         TableScanNode tableScanNode, List<Expression> metadataExpressions) {
-      // for join operator, columnSymbols in TableScanNode is renamed, which 
adds suffix for origin
+      // for join operator, columnSymbols in TableScanNode may be renamed, 
which adds suffix for
+      // origin
       // column name,
-      // add a new ProjectNode above TableScanNode.
+      // thus we add a new ProjectNode above TableScanNode.
       boolean tableScanNodeColumnsRenamed = false;
       for (Map.Entry<Symbol, ColumnSchema> entry : 
tableScanNode.getAssignments().entrySet()) {
         Symbol columnSymbol = entry.getKey();
@@ -820,17 +821,32 @@ public class PushPredicateIntoTableScan implements 
PlanOptimizer {
                 MetadataExpressionTransformForJoin.transform(
                     expression1, tableScanNode.getAssignments()));
 
-        Assignments.Builder projectAssignments = Assignments.builder();
-        ImmutableMap.Builder<Symbol, ColumnSchema> tableScanAssignments = 
ImmutableMap.builder();
+        List<Symbol> newTableScanSymbols = new 
ArrayList<>(tableScanNode.getOutputSymbols().size());
+        Map<Symbol, ColumnSchema> newTableScanAssignments =
+            new LinkedHashMap<>(tableScanNode.getOutputSymbols().size());
+        Map<Symbol, Expression> projectAssignments =
+            new LinkedHashMap<>(tableScanNode.getOutputSymbols().size());
         for (Map.Entry<Symbol, ColumnSchema> entry : 
tableScanNode.getAssignments().entrySet()) {
-          Symbol columnSymbol = entry.getKey();
+          Symbol originalSymbol = entry.getKey();
           ColumnSchema columnSchema = entry.getValue();
-          projectAssignments.put(columnSymbol, new 
SymbolReference(columnSchema.getName()));
-          tableScanAssignments.put(Symbol.of(columnSchema.getName()), 
columnSchema);
+
+          Symbol realSymbol = Symbol.of(columnSchema.getName());
+          newTableScanSymbols.add(realSymbol);
+          newTableScanAssignments.put(realSymbol, columnSchema);
+          projectAssignments.put(originalSymbol, new 
SymbolReference(columnSchema.getName()));
+          queryContext.getTypeProvider().putTableModelType(originalSymbol, 
columnSchema.getType());
+          if 
(tableScanNode.getIdAndAttributeIndexMap().containsKey(originalSymbol)) {
+            Integer idx = 
tableScanNode.getIdAndAttributeIndexMap().get(originalSymbol);
+            tableScanNode.getIdAndAttributeIndexMap().remove(originalSymbol);
+            tableScanNode.getIdAndAttributeIndexMap().put(realSymbol, idx);
+          }
         }
+
+        tableScanNode.setOutputSymbols(newTableScanSymbols);
+        tableScanNode.setAssignments(newTableScanAssignments);
         newProjectNode =
-            new ProjectNode(queryId.genPlanNodeId(), tableScanNode, 
projectAssignments.build());
-        tableScanNode.setAssignments(tableScanAssignments.build());
+            new ProjectNode(
+                queryId.genPlanNodeId(), tableScanNode, new 
Assignments(projectAssignments));
       }
 
       List<String> attributeColumns = new ArrayList<>();

Reply via email to