This is an automated email from the ASF dual-hosted git repository.

caogaofei pushed a commit to branch beyyes/join
in repository https://gitbox.apache.org/repos/asf/iotdb.git

commit dcea59827265f4bb5563e047e605d5b6622ca0ec
Author: Beyyes <[email protected]>
AuthorDate: Mon Aug 12 14:39:35 2024 +0800

    add basic join node
---
 .../plan/relational/planner/QueryPlanner.java      |  85 +++++++
 .../plan/relational/planner/RelationPlanner.java   | 273 ++++++++++++++++++++-
 .../plan/relational/planner/node/JoinNode.java     | 230 +++++++++++++++++
 3 files changed, 583 insertions(+), 5 deletions(-)

diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
index 0a49d7c0492..b19917e18c5 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java
@@ -22,7 +22,9 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode;
+import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode;
 import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
@@ -34,6 +36,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QuerySpecificatio
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SortItem;
 
 import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Iterables;
 import org.apache.tsfile.read.common.type.Type;
 
@@ -45,6 +48,7 @@ import java.util.Map;
 import java.util.Optional;
 
 import static com.google.common.collect.ImmutableList.toImmutableList;
+import static java.lang.String.format;
 import static java.util.Objects.requireNonNull;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingTranslator.sortItemToSortOrder;
 import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.PlanBuilder.newPlanBuilder;
@@ -250,6 +254,53 @@ public class QueryPlanner {
     }
   }
 
+  /**
+   * Creates a projection with any additional coercions by identity of the 
provided expressions.
+   *
+   * @return the new subplan and a mapping of each expression to the symbol 
representing the
+   *     coercion or an existing symbol if a coercion wasn't needed
+   */
+  public static PlanAndMappings coerce(
+      PlanBuilder subPlan,
+      List<Expression> expressions,
+      Analysis analysis,
+      QueryId idAllocator,
+      SymbolAllocator symbolAllocator) {
+    Assignments.Builder assignments = Assignments.builder();
+    assignments.putIdentities(subPlan.getRoot().getOutputSymbols());
+
+    Map<NodeRef<Expression>, Symbol> mappings = new HashMap<>();
+    for (Expression expression : expressions) {
+      Type coercion = analysis.getCoercion(expression);
+
+      // expressions may be repeated, for example, when resolving ordinal 
references in a GROUP BY
+      // clause
+      if (!mappings.containsKey(NodeRef.of(expression))) {
+        if (coercion != null) {
+          Symbol symbol = symbolAllocator.newSymbol(expression, coercion);
+
+          assignments.put(
+              symbol,
+              new Cast(
+                  subPlan.rewrite(expression),
+                  // TODO(beyyes) transfer toSqlType(coercion) method,
+                  null,
+                  false));
+
+          mappings.put(NodeRef.of(expression), symbol);
+        } else {
+          mappings.put(NodeRef.of(expression), subPlan.translate(expression));
+        }
+      }
+    }
+
+    subPlan =
+        subPlan.withNewRoot(
+            new ProjectNode(idAllocator.genPlanNodeId(), subPlan.getRoot(), 
assignments.build()));
+
+    return new PlanAndMappings(subPlan, mappings);
+  }
+
   private Optional<OrderingScheme> orderingScheme(
       PlanBuilder subPlan, Optional<OrderBy> orderBy, List<Expression> 
orderByExpressions) {
     if (!orderBy.isPresent() || (analysis.isOrderByRedundant(orderBy.get()))) {
@@ -310,4 +361,38 @@ public class QueryPlanner {
     }
     return subPlan;
   }
+
+  public static class PlanAndMappings {
+    private final PlanBuilder subPlan;
+    private final Map<NodeRef<Expression>, Symbol> mappings;
+
+    public PlanAndMappings(PlanBuilder subPlan, Map<NodeRef<Expression>, 
Symbol> mappings) {
+      this.subPlan = subPlan;
+      this.mappings = ImmutableMap.copyOf(mappings);
+    }
+
+    public PlanBuilder getSubPlan() {
+      return subPlan;
+    }
+
+    public Symbol get(Expression expression) {
+      return tryGet(expression)
+          .orElseThrow(
+              () ->
+                  new IllegalArgumentException(
+                      format(
+                          "No mapping for expression: %s (%s)",
+                          expression, System.identityHashCode(expression))));
+    }
+
+    public Optional<Symbol> tryGet(Expression expression) {
+      Symbol result = mappings.get(NodeRef.of(expression));
+
+      if (result != null) {
+        return Optional.of(result);
+      }
+
+      return Optional.empty();
+    }
+  }
 }
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
index a1c98cb14fc..7c44cccc88f 100644
--- 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java
@@ -17,6 +17,7 @@ import 
org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory;
 import org.apache.iotdb.db.queryengine.common.MPPQueryContext;
 import org.apache.iotdb.db.queryengine.common.QueryId;
 import org.apache.iotdb.db.queryengine.common.SessionInfo;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode;
 import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode;
@@ -24,18 +25,27 @@ import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef;
+import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType;
 import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope;
 import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema;
 import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName;
+import 
org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode;
 import 
org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRow;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRows;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.JoinUsing;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query;
@@ -45,6 +55,7 @@ import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableSubquery;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Union;
 import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Values;
+import 
org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement;
 import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement;
 import 
org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement;
@@ -58,8 +69,18 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
 
 import static java.util.Objects.requireNonNull;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.PlanBuilder.newPlanBuilder;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.QueryPlanner.coerce;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.QueryPlanner.coerceIfNecessary;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractPredicates;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.CROSS;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.IMPLICIT;
+import static 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.INNER;
 
 public class RelationPlanner extends AstVisitor<RelationPlan, Void> {
 
@@ -179,6 +200,253 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
     return new RelationPlan(plan.getRoot(), analysis.getScope(node), 
plan.getFieldMappings());
   }
 
+  @Override
+  protected RelationPlan visitJoin(Join node, Void context) {
+    RelationPlan leftPlan = process(node.getLeft(), context);
+    RelationPlan rightPlan = process(node.getRight(), context);
+
+    if (node.getCriteria().isPresent() && node.getCriteria().get() instanceof 
JoinUsing) {
+      throw new IllegalStateException("JoinUsing is not supported in current 
version.");
+      // TODO return planJoinUsing(node, leftPlan, rightPlan);
+    }
+
+    return planJoin(
+        analysis.getJoinCriteria(node),
+        node.getType(),
+        analysis.getScope(node),
+        leftPlan,
+        rightPlan,
+        analysis.getSubqueries(node));
+  }
+
+  public RelationPlan planJoin(
+      Expression criteria,
+      Join.Type type,
+      Scope scope,
+      RelationPlan leftPlan,
+      RelationPlan rightPlan,
+      Analysis.SubqueryAnalysis subqueries) {
+    // NOTE: symbols must be in the same order as the outputDescriptor
+    List<Symbol> outputSymbols =
+        ImmutableList.<Symbol>builder()
+            .addAll(leftPlan.getFieldMappings())
+            .addAll(rightPlan.getFieldMappings())
+            .build();
+
+    PlanBuilder leftPlanBuilder =
+        newPlanBuilder(leftPlan, analysis).withScope(scope, outputSymbols);
+    PlanBuilder rightPlanBuilder =
+        newPlanBuilder(rightPlan, analysis).withScope(scope, outputSymbols);
+
+    ImmutableList.Builder<JoinNode.EquiJoinClause> equiClauses = 
ImmutableList.builder();
+    List<Expression> complexJoinExpressions = new ArrayList<>();
+    List<Expression> postInnerJoinConditions = new ArrayList<>();
+
+    RelationType left = leftPlan.getDescriptor();
+    RelationType right = rightPlan.getDescriptor();
+
+    if (type != CROSS && type != IMPLICIT) {
+      List<Expression> leftComparisonExpressions = new ArrayList<>();
+      List<Expression> rightComparisonExpressions = new ArrayList<>();
+      List<ComparisonExpression.Operator> joinConditionComparisonOperators = 
new ArrayList<>();
+
+      for (Expression conjunct : 
extractPredicates(LogicalExpression.Operator.AND, criteria)) {
+        if (!isEqualComparisonExpression(conjunct) && type != INNER) {
+          complexJoinExpressions.add(conjunct);
+          continue;
+        }
+
+        Set<QualifiedName> dependencies =
+            SymbolsExtractor.extractNames(conjunct, 
analysis.getColumnReferences());
+
+        if (dependencies.stream().allMatch(left::canResolve)
+            || dependencies.stream().allMatch(right::canResolve)) {
+          // If the conjunct can be evaluated entirely with the inputs on 
either side of the join,
+          // add
+          // it to the list complex expressions and let the optimizers figure 
out how to push it
+          // down later.
+          complexJoinExpressions.add(conjunct);
+        } else if (conjunct instanceof ComparisonExpression) {
+          Expression firstExpression = ((ComparisonExpression) 
conjunct).getLeft();
+          Expression secondExpression = ((ComparisonExpression) 
conjunct).getRight();
+          ComparisonExpression.Operator comparisonOperator =
+              ((ComparisonExpression) conjunct).getOperator();
+          Set<QualifiedName> firstDependencies =
+              SymbolsExtractor.extractNames(firstExpression, 
analysis.getColumnReferences());
+          Set<QualifiedName> secondDependencies =
+              SymbolsExtractor.extractNames(secondExpression, 
analysis.getColumnReferences());
+
+          if (firstDependencies.stream().allMatch(left::canResolve)
+              && secondDependencies.stream().allMatch(right::canResolve)) {
+            leftComparisonExpressions.add(firstExpression);
+            rightComparisonExpressions.add(secondExpression);
+            joinConditionComparisonOperators.add(comparisonOperator);
+          } else if (firstDependencies.stream().allMatch(right::canResolve)
+              && secondDependencies.stream().allMatch(left::canResolve)) {
+            leftComparisonExpressions.add(secondExpression);
+            rightComparisonExpressions.add(firstExpression);
+            joinConditionComparisonOperators.add(comparisonOperator.flip());
+          } else {
+            // the case when we mix symbols from both left and right join side 
on either side of
+            // condition.
+            complexJoinExpressions.add(conjunct);
+          }
+        } else {
+          complexJoinExpressions.add(conjunct);
+        }
+      }
+
+      // leftPlanBuilder = subqueryPlanner.handleSubqueries(leftPlanBuilder,
+      // leftComparisonExpressions, subqueries);
+      // rightPlanBuilder = subqueryPlanner.handleSubqueries(rightPlanBuilder,
+      // rightComparisonExpressions, subqueries);
+
+      // Add projections for join criteria
+      leftPlanBuilder =
+          leftPlanBuilder.appendProjections(
+              leftComparisonExpressions, symbolAllocator, queryContext);
+      rightPlanBuilder =
+          rightPlanBuilder.appendProjections(
+              rightComparisonExpressions, symbolAllocator, queryContext);
+
+      QueryPlanner.PlanAndMappings leftCoercions =
+          coerce(
+              leftPlanBuilder, leftComparisonExpressions, analysis, 
idAllocator, symbolAllocator);
+      leftPlanBuilder = leftCoercions.getSubPlan();
+      QueryPlanner.PlanAndMappings rightCoercions =
+          coerce(
+              rightPlanBuilder, rightComparisonExpressions, analysis, 
idAllocator, symbolAllocator);
+      rightPlanBuilder = rightCoercions.getSubPlan();
+
+      for (int i = 0; i < leftComparisonExpressions.size(); i++) {
+        if (joinConditionComparisonOperators.get(i) == 
ComparisonExpression.Operator.EQUAL) {
+          Symbol leftSymbol = 
leftCoercions.get(leftComparisonExpressions.get(i));
+          Symbol rightSymbol = 
rightCoercions.get(rightComparisonExpressions.get(i));
+
+          equiClauses.add(new JoinNode.EquiJoinClause(leftSymbol, 
rightSymbol));
+        } else {
+          postInnerJoinConditions.add(
+              new ComparisonExpression(
+                  joinConditionComparisonOperators.get(i),
+                  
leftCoercions.get(leftComparisonExpressions.get(i)).toSymbolReference(),
+                  
rightCoercions.get(rightComparisonExpressions.get(i)).toSymbolReference()));
+        }
+      }
+    }
+
+    PlanNode root =
+        new JoinNode(
+            idAllocator.genPlanNodeId(),
+            mapJoinType(type),
+            leftPlanBuilder.getRoot(),
+            rightPlanBuilder.getRoot(),
+            equiClauses.build(),
+            leftPlanBuilder.getRoot().getOutputSymbols(),
+            rightPlanBuilder.getRoot().getOutputSymbols(),
+            false,
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty(),
+            Optional.empty());
+
+    if (type != INNER) {
+      for (Expression complexExpression : complexJoinExpressions) {
+        Set<QualifiedName> dependencies =
+            SymbolsExtractor.extractNamesNoSubqueries(
+                complexExpression, analysis.getColumnReferences());
+
+        // This is for handling uncorreled subqueries. Correlated subqueries 
are not currently
+        // supported and are dealt with
+        // during analysis.
+        // Make best effort to plan the subquery in the branch of the join 
involving the other
+        // inputs to the expression.
+        // E.g.,
+        //  t JOIN u ON t.x = (...) get's planned on the t side
+        //  t JOIN u ON t.x = (...) get's planned on the u side
+        //  t JOIN u ON t.x + u.x = (...) get's planned on an arbitrary side
+        if (dependencies.stream().allMatch(left::canResolve)) {
+          // leftPlanBuilder = 
subqueryPlanner.handleSubqueries(leftPlanBuilder, complexExpression,
+          // subqueries);
+        } else {
+          // rightPlanBuilder = 
subqueryPlanner.handleSubqueries(rightPlanBuilder,
+          // complexExpression, subqueries);
+        }
+      }
+    }
+    TranslationMap translationMap =
+        new TranslationMap(
+                Optional.empty(),
+                scope,
+                analysis,
+                outputSymbols,
+                new PlannerContext(new TableMetadataImpl(), new 
InternalTypeManager()))
+            
.withAdditionalMappings(leftPlanBuilder.getTranslations().getMappings())
+            
.withAdditionalMappings(rightPlanBuilder.getTranslations().getMappings());
+
+    if (type != INNER && !complexJoinExpressions.isEmpty()) {
+      root =
+          new JoinNode(
+              idAllocator.genPlanNodeId(),
+              mapJoinType(type),
+              leftPlanBuilder.getRoot(),
+              rightPlanBuilder.getRoot(),
+              equiClauses.build(),
+              leftPlanBuilder.getRoot().getOutputSymbols(),
+              rightPlanBuilder.getRoot().getOutputSymbols(),
+              false,
+              Optional.of(
+                  IrUtils.and(
+                      complexJoinExpressions.stream()
+                          .map(e -> coerceIfNecessary(analysis, e, 
translationMap.rewrite(e)))
+                          .collect(Collectors.toList()))),
+              Optional.empty(),
+              Optional.empty(),
+              Optional.empty());
+    }
+
+    if (type == INNER) {
+      // rewrite all the other conditions using output symbols from left + 
right plan node.
+      PlanBuilder rootPlanBuilder = new PlanBuilder(translationMap, root);
+      // rootPlanBuilder = subqueryPlanner.handleSubqueries(rootPlanBuilder, 
complexJoinExpressions,
+      // subqueries);
+
+      for (Expression expression : complexJoinExpressions) {
+        postInnerJoinConditions.add(
+            coerceIfNecessary(analysis, expression, 
rootPlanBuilder.rewrite(expression)));
+      }
+      root = rootPlanBuilder.getRoot();
+
+      Expression postInnerJoinCriteria;
+      if (!postInnerJoinConditions.isEmpty()) {
+        postInnerJoinCriteria = IrUtils.and(postInnerJoinConditions);
+        root = new FilterNode(idAllocator.genPlanNodeId(), root, 
postInnerJoinCriteria);
+      }
+    }
+
+    return new RelationPlan(root, scope, outputSymbols);
+  }
+
+  public static JoinNode.JoinType mapJoinType(Join.Type joinType) {
+    switch (joinType) {
+      case CROSS:
+      case IMPLICIT:
+      case INNER:
+        return JoinNode.JoinType.INNER;
+      case LEFT:
+        return JoinNode.JoinType.LEFT;
+      case RIGHT:
+        return JoinNode.JoinType.RIGHT;
+      case FULL:
+        return JoinNode.JoinType.FULL;
+    }
+    throw new UnsupportedOperationException(joinType + " Join type is not 
supported");
+  }
+
+  private static boolean isEqualComparisonExpression(Expression conjunct) {
+    return conjunct instanceof ComparisonExpression
+        && ((ComparisonExpression) conjunct).getOperator() == 
ComparisonExpression.Operator.EQUAL;
+  }
+
   // ================================ Implemented later 
=====================================
 
   @Override
@@ -191,11 +459,6 @@ public class RelationPlanner extends 
AstVisitor<RelationPlan, Void> {
     throw new IllegalStateException("SubqueryExpression is not supported in 
current version.");
   }
 
-  @Override
-  protected RelationPlan visitJoin(Join node, Void context) {
-    throw new IllegalStateException("Join is not supported in current 
version.");
-  }
-
   @Override
   protected RelationPlan visitAliasedRelation(AliasedRelation node, Void 
context) {
     throw new IllegalStateException("AliasedRelation is not supported in 
current version.");
diff --git 
a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
new file mode 100644
index 00000000000..e412d8182c4
--- /dev/null
+++ 
b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java
@@ -0,0 +1,230 @@
+package org.apache.iotdb.db.queryengine.plan.relational.planner.node;
+
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode;
+import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId;
+import 
org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode;
+import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol;
+import 
org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression;
+import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral;
+
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableSet;
+
+import java.io.DataOutputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+
+import static com.google.common.base.Preconditions.checkArgument;
+import static java.lang.String.format;
+import static java.util.Objects.requireNonNull;
+
+public class JoinNode extends MultiChildProcessNode {
+
+  private final JoinType type;
+  private final PlanNode left;
+  private final PlanNode right;
+  private final List<EquiJoinClause> criteria;
+  private final List<Symbol> leftOutputSymbols;
+  private final List<Symbol> rightOutputSymbols;
+  private final boolean maySkipOutputDuplicates;
+  private final Optional<Expression> filter;
+  private final Optional<Symbol> leftHashSymbol;
+  private final Optional<Symbol> rightHashSymbol;
+  // private final Optional<DistributionType> distributionType;
+  private final Optional<Boolean> spillable;
+
+  // private final Map<DynamicFilterId, Symbol> dynamicFilters;
+
+  @JsonCreator
+  public JoinNode(
+      @JsonProperty("id") PlanNodeId id,
+      @JsonProperty("type") JoinType type,
+      @JsonProperty("left") PlanNode left,
+      @JsonProperty("right") PlanNode right,
+      @JsonProperty("criteria") List<EquiJoinClause> criteria,
+      @JsonProperty("leftOutputSymbols") List<Symbol> leftOutputSymbols,
+      @JsonProperty("rightOutputSymbols") List<Symbol> rightOutputSymbols,
+      @JsonProperty("maySkipOutputDuplicates") boolean maySkipOutputDuplicates,
+      @JsonProperty("filter") Optional<Expression> filter,
+      @JsonProperty("leftHashSymbol") Optional<Symbol> leftHashSymbol,
+      @JsonProperty("rightHashSymbol") Optional<Symbol> rightHashSymbol,
+      // @JsonProperty("distributionType") Optional<DistributionType> 
distributionType,
+      @JsonProperty("spillable") Optional<Boolean> spillable)
+        // @JsonProperty("dynamicFilters") Map<DynamicFilterId, Symbol> 
dynamicFilters,
+        // @JsonProperty("reorderJoinStatsAndCost") 
Optional<PlanNodeStatsAndCostSummary>
+        // reorderJoinStatsAndCost)
+      {
+    super(id);
+    requireNonNull(type, "type is null");
+    requireNonNull(left, "left is null");
+    requireNonNull(right, "right is null");
+    requireNonNull(criteria, "criteria is null");
+    requireNonNull(leftOutputSymbols, "leftOutputSymbols is null");
+    requireNonNull(rightOutputSymbols, "rightOutputSymbols is null");
+    requireNonNull(filter, "filter is null");
+    // The condition doesn't guarantee that filter is of type boolean, but was 
found to be a
+    // practical way to identify
+    // places where JoinNode could be created without appropriate coercions.
+    checkArgument(
+        !filter.isPresent() || !(filter.get() instanceof NullLiteral),
+        "Filter must be an expression of boolean type: %s",
+        filter);
+    requireNonNull(leftHashSymbol, "leftHashSymbol is null");
+    requireNonNull(rightHashSymbol, "rightHashSymbol is null");
+    // requireNonNull(distributionType, "distributionType is null");
+    requireNonNull(spillable, "spillable is null");
+
+    this.type = type;
+    this.left = left;
+    this.right = right;
+    this.criteria = ImmutableList.copyOf(criteria);
+    this.leftOutputSymbols = ImmutableList.copyOf(leftOutputSymbols);
+    this.rightOutputSymbols = ImmutableList.copyOf(rightOutputSymbols);
+    this.maySkipOutputDuplicates = maySkipOutputDuplicates;
+    this.filter = filter;
+    this.leftHashSymbol = leftHashSymbol;
+    this.rightHashSymbol = rightHashSymbol;
+    // this.distributionType = distributionType;
+    this.spillable = spillable;
+    // this.dynamicFilters = 
ImmutableMap.copyOf(requireNonNull(dynamicFilters, "dynamicFilters is
+    // null"));
+    // this.reorderJoinStatsAndCost = requireNonNull(reorderJoinStatsAndCost,
+    // "reorderJoinStatsAndCost is null");
+
+    Set<Symbol> leftSymbols = ImmutableSet.copyOf(left.getOutputSymbols());
+    Set<Symbol> rightSymbols = ImmutableSet.copyOf(right.getOutputSymbols());
+
+    checkArgument(
+        leftSymbols.containsAll(leftOutputSymbols),
+        "Left source inputs do not contain all left output symbols");
+    checkArgument(
+        rightSymbols.containsAll(rightOutputSymbols),
+        "Right source inputs do not contain all right output symbols");
+
+    checkArgument(
+        !(criteria.isEmpty() && leftHashSymbol.isPresent()),
+        "Left hash symbol is only valid in an equijoin");
+    checkArgument(
+        !(criteria.isEmpty() && rightHashSymbol.isPresent()),
+        "Right hash symbol is only valid in an equijoin");
+
+    criteria.forEach(
+        equiJoinClause ->
+            checkArgument(
+                leftSymbols.contains(equiJoinClause.getLeft())
+                    && rightSymbols.contains(equiJoinClause.getRight()),
+                "Equality join criteria should be normalized according to join 
sides: %s",
+                equiJoinClause));
+
+    //        if (distributionType.isPresent()) {
+    //            // The implementation of full outer join only works if the 
data is hash
+    // partitioned.
+    //            checkArgument(
+    //                    !(distributionType.get() == REPLICATED && (type == 
RIGHT || type ==
+    // FULL)),
+    //                    "%s join do not work with %s distribution type",
+    //                    type,
+    //                    distributionType.get());
+    //        }
+
+    //        for (Symbol symbol : dynamicFilters.values()) {
+    //            checkArgument(rightSymbols.contains(symbol), "Right join 
input doesn't contain
+    // symbol for dynamic filter: %s", symbol);
+    //        }
+  }
+
+  @Override
+  public PlanNode clone() {
+    return null;
+  }
+
+  @Override
+  public List<String> getOutputColumnNames() {
+    throw new IllegalStateException();
+  }
+
+  @Override
+  protected void serializeAttributes(ByteBuffer byteBuffer) {}
+
+  @Override
+  protected void serializeAttributes(DataOutputStream stream) throws 
IOException {}
+
+  public static class EquiJoinClause {
+    private final Symbol left;
+    private final Symbol right;
+
+    @JsonCreator
+    public EquiJoinClause(@JsonProperty("left") Symbol left, 
@JsonProperty("right") Symbol right) {
+      this.left = requireNonNull(left, "left is null");
+      this.right = requireNonNull(right, "right is null");
+    }
+
+    @JsonProperty("left")
+    public Symbol getLeft() {
+      return left;
+    }
+
+    @JsonProperty("right")
+    public Symbol getRight() {
+      return right;
+    }
+
+    public ComparisonExpression toExpression() {
+      return new ComparisonExpression(
+          ComparisonExpression.Operator.EQUAL, left.toSymbolReference(), 
right.toSymbolReference());
+    }
+
+    public EquiJoinClause flip() {
+      return new EquiJoinClause(right, left);
+    }
+
+    @Override
+    public boolean equals(Object obj) {
+      if (this == obj) {
+        return true;
+      }
+
+      if (obj == null || !this.getClass().equals(obj.getClass())) {
+        return false;
+      }
+
+      EquiJoinClause other = (EquiJoinClause) obj;
+
+      return Objects.equals(this.left, other.left) && 
Objects.equals(this.right, other.right);
+    }
+
+    @Override
+    public int hashCode() {
+      return Objects.hash(left, right);
+    }
+
+    @Override
+    public String toString() {
+      return format("%s = %s", left, right);
+    }
+  }
+
+  public enum JoinType {
+    INNER("InnerJoin"),
+    LEFT("LeftJoin"),
+    RIGHT("RightJoin"),
+    FULL("FullJoin");
+
+    private final String joinLabel;
+
+    JoinType(String joinLabel) {
+      this.joinLabel = joinLabel;
+    }
+
+    public String getJoinLabel() {
+      return joinLabel;
+    }
+  }
+}

Reply via email to