This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/join in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit dcea59827265f4bb5563e047e605d5b6622ca0ec Author: Beyyes <[email protected]> AuthorDate: Mon Aug 12 14:39:35 2024 +0800 add basic join node --- .../plan/relational/planner/QueryPlanner.java | 85 +++++++ .../plan/relational/planner/RelationPlanner.java | 273 ++++++++++++++++++++- .../plan/relational/planner/node/JoinNode.java | 230 +++++++++++++++++ 3 files changed, 583 insertions(+), 5 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java index 0a49d7c0492..b19917e18c5 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/QueryPlanner.java @@ -22,7 +22,9 @@ import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.ProjectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.SortNode; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Cast; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Delete; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; @@ -34,6 +36,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QuerySpecificatio import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SortItem; import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import org.apache.tsfile.read.common.type.Type; @@ -45,6 +48,7 @@ import java.util.Map; import java.util.Optional; import static com.google.common.collect.ImmutableList.toImmutableList; +import static java.lang.String.format; import static java.util.Objects.requireNonNull; import static org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingTranslator.sortItemToSortOrder; import static org.apache.iotdb.db.queryengine.plan.relational.planner.PlanBuilder.newPlanBuilder; @@ -250,6 +254,53 @@ public class QueryPlanner { } } + /** + * Creates a projection with any additional coercions by identity of the provided expressions. + * + * @return the new subplan and a mapping of each expression to the symbol representing the + * coercion or an existing symbol if a coercion wasn't needed + */ + public static PlanAndMappings coerce( + PlanBuilder subPlan, + List<Expression> expressions, + Analysis analysis, + QueryId idAllocator, + SymbolAllocator symbolAllocator) { + Assignments.Builder assignments = Assignments.builder(); + assignments.putIdentities(subPlan.getRoot().getOutputSymbols()); + + Map<NodeRef<Expression>, Symbol> mappings = new HashMap<>(); + for (Expression expression : expressions) { + Type coercion = analysis.getCoercion(expression); + + // expressions may be repeated, for example, when resolving ordinal references in a GROUP BY + // clause + if (!mappings.containsKey(NodeRef.of(expression))) { + if (coercion != null) { + Symbol symbol = symbolAllocator.newSymbol(expression, coercion); + + assignments.put( + symbol, + new Cast( + subPlan.rewrite(expression), + // TODO(beyyes) transfer toSqlType(coercion) method, + null, + false)); + + mappings.put(NodeRef.of(expression), symbol); + } else { + mappings.put(NodeRef.of(expression), subPlan.translate(expression)); + } + } + } + + subPlan = + subPlan.withNewRoot( + new ProjectNode(idAllocator.genPlanNodeId(), subPlan.getRoot(), assignments.build())); + + return new PlanAndMappings(subPlan, mappings); + } + private Optional<OrderingScheme> orderingScheme( PlanBuilder subPlan, Optional<OrderBy> orderBy, List<Expression> orderByExpressions) { if (!orderBy.isPresent() || (analysis.isOrderByRedundant(orderBy.get()))) { @@ -310,4 +361,38 @@ public class QueryPlanner { } return subPlan; } + + public static class PlanAndMappings { + private final PlanBuilder subPlan; + private final Map<NodeRef<Expression>, Symbol> mappings; + + public PlanAndMappings(PlanBuilder subPlan, Map<NodeRef<Expression>, Symbol> mappings) { + this.subPlan = subPlan; + this.mappings = ImmutableMap.copyOf(mappings); + } + + public PlanBuilder getSubPlan() { + return subPlan; + } + + public Symbol get(Expression expression) { + return tryGet(expression) + .orElseThrow( + () -> + new IllegalArgumentException( + format( + "No mapping for expression: %s (%s)", + expression, System.identityHashCode(expression)))); + } + + public Optional<Symbol> tryGet(Expression expression) { + Symbol result = mappings.get(NodeRef.of(expression)); + + if (result != null) { + return Optional.of(result); + } + + return Optional.empty(); + } + } } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java index a1c98cb14fc..7c44cccc88f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/RelationPlanner.java @@ -17,6 +17,7 @@ import org.apache.iotdb.commons.schema.table.column.TsTableColumnCategory; import org.apache.iotdb.db.queryengine.common.MPPQueryContext; import org.apache.iotdb.db.queryengine.common.QueryId; import org.apache.iotdb.db.queryengine.common.SessionInfo; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.InsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowNode; import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalInsertRowsNode; @@ -24,18 +25,27 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.write.RelationalIn import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Analysis; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Field; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.NodeRef; +import org.apache.iotdb.db.queryengine.plan.relational.analyzer.RelationType; import org.apache.iotdb.db.queryengine.plan.relational.analyzer.Scope; import org.apache.iotdb.db.queryengine.plan.relational.metadata.ColumnSchema; import org.apache.iotdb.db.queryengine.plan.relational.metadata.QualifiedObjectName; +import org.apache.iotdb.db.queryengine.plan.relational.metadata.TableMetadataImpl; +import org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.TableScanNode; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AliasedRelation; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.AstVisitor; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Except; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRow; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertRows; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.InsertTablet; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Intersect; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.JoinUsing; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.LogicalExpression; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.QualifiedName; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Query; @@ -45,6 +55,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Table; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.TableSubquery; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Union; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Values; +import org.apache.iotdb.db.queryengine.plan.relational.type.InternalTypeManager; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertRowsStatement; import org.apache.iotdb.db.queryengine.plan.statement.crud.InsertTabletStatement; @@ -58,8 +69,18 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; +import java.util.Optional; +import java.util.Set; +import java.util.stream.Collectors; import static java.util.Objects.requireNonNull; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.PlanBuilder.newPlanBuilder; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.QueryPlanner.coerce; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.QueryPlanner.coerceIfNecessary; +import static org.apache.iotdb.db.queryengine.plan.relational.planner.ir.IrUtils.extractPredicates; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.CROSS; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.IMPLICIT; +import static org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Join.Type.INNER; public class RelationPlanner extends AstVisitor<RelationPlan, Void> { @@ -179,6 +200,253 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { return new RelationPlan(plan.getRoot(), analysis.getScope(node), plan.getFieldMappings()); } + @Override + protected RelationPlan visitJoin(Join node, Void context) { + RelationPlan leftPlan = process(node.getLeft(), context); + RelationPlan rightPlan = process(node.getRight(), context); + + if (node.getCriteria().isPresent() && node.getCriteria().get() instanceof JoinUsing) { + throw new IllegalStateException("JoinUsing is not supported in current version."); + // TODO return planJoinUsing(node, leftPlan, rightPlan); + } + + return planJoin( + analysis.getJoinCriteria(node), + node.getType(), + analysis.getScope(node), + leftPlan, + rightPlan, + analysis.getSubqueries(node)); + } + + public RelationPlan planJoin( + Expression criteria, + Join.Type type, + Scope scope, + RelationPlan leftPlan, + RelationPlan rightPlan, + Analysis.SubqueryAnalysis subqueries) { + // NOTE: symbols must be in the same order as the outputDescriptor + List<Symbol> outputSymbols = + ImmutableList.<Symbol>builder() + .addAll(leftPlan.getFieldMappings()) + .addAll(rightPlan.getFieldMappings()) + .build(); + + PlanBuilder leftPlanBuilder = + newPlanBuilder(leftPlan, analysis).withScope(scope, outputSymbols); + PlanBuilder rightPlanBuilder = + newPlanBuilder(rightPlan, analysis).withScope(scope, outputSymbols); + + ImmutableList.Builder<JoinNode.EquiJoinClause> equiClauses = ImmutableList.builder(); + List<Expression> complexJoinExpressions = new ArrayList<>(); + List<Expression> postInnerJoinConditions = new ArrayList<>(); + + RelationType left = leftPlan.getDescriptor(); + RelationType right = rightPlan.getDescriptor(); + + if (type != CROSS && type != IMPLICIT) { + List<Expression> leftComparisonExpressions = new ArrayList<>(); + List<Expression> rightComparisonExpressions = new ArrayList<>(); + List<ComparisonExpression.Operator> joinConditionComparisonOperators = new ArrayList<>(); + + for (Expression conjunct : extractPredicates(LogicalExpression.Operator.AND, criteria)) { + if (!isEqualComparisonExpression(conjunct) && type != INNER) { + complexJoinExpressions.add(conjunct); + continue; + } + + Set<QualifiedName> dependencies = + SymbolsExtractor.extractNames(conjunct, analysis.getColumnReferences()); + + if (dependencies.stream().allMatch(left::canResolve) + || dependencies.stream().allMatch(right::canResolve)) { + // If the conjunct can be evaluated entirely with the inputs on either side of the join, + // add + // it to the list complex expressions and let the optimizers figure out how to push it + // down later. + complexJoinExpressions.add(conjunct); + } else if (conjunct instanceof ComparisonExpression) { + Expression firstExpression = ((ComparisonExpression) conjunct).getLeft(); + Expression secondExpression = ((ComparisonExpression) conjunct).getRight(); + ComparisonExpression.Operator comparisonOperator = + ((ComparisonExpression) conjunct).getOperator(); + Set<QualifiedName> firstDependencies = + SymbolsExtractor.extractNames(firstExpression, analysis.getColumnReferences()); + Set<QualifiedName> secondDependencies = + SymbolsExtractor.extractNames(secondExpression, analysis.getColumnReferences()); + + if (firstDependencies.stream().allMatch(left::canResolve) + && secondDependencies.stream().allMatch(right::canResolve)) { + leftComparisonExpressions.add(firstExpression); + rightComparisonExpressions.add(secondExpression); + joinConditionComparisonOperators.add(comparisonOperator); + } else if (firstDependencies.stream().allMatch(right::canResolve) + && secondDependencies.stream().allMatch(left::canResolve)) { + leftComparisonExpressions.add(secondExpression); + rightComparisonExpressions.add(firstExpression); + joinConditionComparisonOperators.add(comparisonOperator.flip()); + } else { + // the case when we mix symbols from both left and right join side on either side of + // condition. + complexJoinExpressions.add(conjunct); + } + } else { + complexJoinExpressions.add(conjunct); + } + } + + // leftPlanBuilder = subqueryPlanner.handleSubqueries(leftPlanBuilder, + // leftComparisonExpressions, subqueries); + // rightPlanBuilder = subqueryPlanner.handleSubqueries(rightPlanBuilder, + // rightComparisonExpressions, subqueries); + + // Add projections for join criteria + leftPlanBuilder = + leftPlanBuilder.appendProjections( + leftComparisonExpressions, symbolAllocator, queryContext); + rightPlanBuilder = + rightPlanBuilder.appendProjections( + rightComparisonExpressions, symbolAllocator, queryContext); + + QueryPlanner.PlanAndMappings leftCoercions = + coerce( + leftPlanBuilder, leftComparisonExpressions, analysis, idAllocator, symbolAllocator); + leftPlanBuilder = leftCoercions.getSubPlan(); + QueryPlanner.PlanAndMappings rightCoercions = + coerce( + rightPlanBuilder, rightComparisonExpressions, analysis, idAllocator, symbolAllocator); + rightPlanBuilder = rightCoercions.getSubPlan(); + + for (int i = 0; i < leftComparisonExpressions.size(); i++) { + if (joinConditionComparisonOperators.get(i) == ComparisonExpression.Operator.EQUAL) { + Symbol leftSymbol = leftCoercions.get(leftComparisonExpressions.get(i)); + Symbol rightSymbol = rightCoercions.get(rightComparisonExpressions.get(i)); + + equiClauses.add(new JoinNode.EquiJoinClause(leftSymbol, rightSymbol)); + } else { + postInnerJoinConditions.add( + new ComparisonExpression( + joinConditionComparisonOperators.get(i), + leftCoercions.get(leftComparisonExpressions.get(i)).toSymbolReference(), + rightCoercions.get(rightComparisonExpressions.get(i)).toSymbolReference())); + } + } + } + + PlanNode root = + new JoinNode( + idAllocator.genPlanNodeId(), + mapJoinType(type), + leftPlanBuilder.getRoot(), + rightPlanBuilder.getRoot(), + equiClauses.build(), + leftPlanBuilder.getRoot().getOutputSymbols(), + rightPlanBuilder.getRoot().getOutputSymbols(), + false, + Optional.empty(), + Optional.empty(), + Optional.empty(), + Optional.empty()); + + if (type != INNER) { + for (Expression complexExpression : complexJoinExpressions) { + Set<QualifiedName> dependencies = + SymbolsExtractor.extractNamesNoSubqueries( + complexExpression, analysis.getColumnReferences()); + + // This is for handling uncorreled subqueries. Correlated subqueries are not currently + // supported and are dealt with + // during analysis. + // Make best effort to plan the subquery in the branch of the join involving the other + // inputs to the expression. + // E.g., + // t JOIN u ON t.x = (...) get's planned on the t side + // t JOIN u ON t.x = (...) get's planned on the u side + // t JOIN u ON t.x + u.x = (...) get's planned on an arbitrary side + if (dependencies.stream().allMatch(left::canResolve)) { + // leftPlanBuilder = subqueryPlanner.handleSubqueries(leftPlanBuilder, complexExpression, + // subqueries); + } else { + // rightPlanBuilder = subqueryPlanner.handleSubqueries(rightPlanBuilder, + // complexExpression, subqueries); + } + } + } + TranslationMap translationMap = + new TranslationMap( + Optional.empty(), + scope, + analysis, + outputSymbols, + new PlannerContext(new TableMetadataImpl(), new InternalTypeManager())) + .withAdditionalMappings(leftPlanBuilder.getTranslations().getMappings()) + .withAdditionalMappings(rightPlanBuilder.getTranslations().getMappings()); + + if (type != INNER && !complexJoinExpressions.isEmpty()) { + root = + new JoinNode( + idAllocator.genPlanNodeId(), + mapJoinType(type), + leftPlanBuilder.getRoot(), + rightPlanBuilder.getRoot(), + equiClauses.build(), + leftPlanBuilder.getRoot().getOutputSymbols(), + rightPlanBuilder.getRoot().getOutputSymbols(), + false, + Optional.of( + IrUtils.and( + complexJoinExpressions.stream() + .map(e -> coerceIfNecessary(analysis, e, translationMap.rewrite(e))) + .collect(Collectors.toList()))), + Optional.empty(), + Optional.empty(), + Optional.empty()); + } + + if (type == INNER) { + // rewrite all the other conditions using output symbols from left + right plan node. + PlanBuilder rootPlanBuilder = new PlanBuilder(translationMap, root); + // rootPlanBuilder = subqueryPlanner.handleSubqueries(rootPlanBuilder, complexJoinExpressions, + // subqueries); + + for (Expression expression : complexJoinExpressions) { + postInnerJoinConditions.add( + coerceIfNecessary(analysis, expression, rootPlanBuilder.rewrite(expression))); + } + root = rootPlanBuilder.getRoot(); + + Expression postInnerJoinCriteria; + if (!postInnerJoinConditions.isEmpty()) { + postInnerJoinCriteria = IrUtils.and(postInnerJoinConditions); + root = new FilterNode(idAllocator.genPlanNodeId(), root, postInnerJoinCriteria); + } + } + + return new RelationPlan(root, scope, outputSymbols); + } + + public static JoinNode.JoinType mapJoinType(Join.Type joinType) { + switch (joinType) { + case CROSS: + case IMPLICIT: + case INNER: + return JoinNode.JoinType.INNER; + case LEFT: + return JoinNode.JoinType.LEFT; + case RIGHT: + return JoinNode.JoinType.RIGHT; + case FULL: + return JoinNode.JoinType.FULL; + } + throw new UnsupportedOperationException(joinType + " Join type is not supported"); + } + + private static boolean isEqualComparisonExpression(Expression conjunct) { + return conjunct instanceof ComparisonExpression + && ((ComparisonExpression) conjunct).getOperator() == ComparisonExpression.Operator.EQUAL; + } + // ================================ Implemented later ===================================== @Override @@ -191,11 +459,6 @@ public class RelationPlanner extends AstVisitor<RelationPlan, Void> { throw new IllegalStateException("SubqueryExpression is not supported in current version."); } - @Override - protected RelationPlan visitJoin(Join node, Void context) { - throw new IllegalStateException("Join is not supported in current version."); - } - @Override protected RelationPlan visitAliasedRelation(AliasedRelation node, Void context) { throw new IllegalStateException("AliasedRelation is not supported in current version."); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java new file mode 100644 index 00000000000..e412d8182c4 --- /dev/null +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java @@ -0,0 +1,230 @@ +package org.apache.iotdb.db.queryengine.plan.relational.planner.node; + +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNode; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.PlanNodeId; +import org.apache.iotdb.db.queryengine.plan.planner.plan.node.process.MultiChildProcessNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.ComparisonExpression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Expression; +import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.NullLiteral; + +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableSet; + +import java.io.DataOutputStream; +import java.io.IOException; +import java.nio.ByteBuffer; +import java.util.List; +import java.util.Objects; +import java.util.Optional; +import java.util.Set; + +import static com.google.common.base.Preconditions.checkArgument; +import static java.lang.String.format; +import static java.util.Objects.requireNonNull; + +public class JoinNode extends MultiChildProcessNode { + + private final JoinType type; + private final PlanNode left; + private final PlanNode right; + private final List<EquiJoinClause> criteria; + private final List<Symbol> leftOutputSymbols; + private final List<Symbol> rightOutputSymbols; + private final boolean maySkipOutputDuplicates; + private final Optional<Expression> filter; + private final Optional<Symbol> leftHashSymbol; + private final Optional<Symbol> rightHashSymbol; + // private final Optional<DistributionType> distributionType; + private final Optional<Boolean> spillable; + + // private final Map<DynamicFilterId, Symbol> dynamicFilters; + + @JsonCreator + public JoinNode( + @JsonProperty("id") PlanNodeId id, + @JsonProperty("type") JoinType type, + @JsonProperty("left") PlanNode left, + @JsonProperty("right") PlanNode right, + @JsonProperty("criteria") List<EquiJoinClause> criteria, + @JsonProperty("leftOutputSymbols") List<Symbol> leftOutputSymbols, + @JsonProperty("rightOutputSymbols") List<Symbol> rightOutputSymbols, + @JsonProperty("maySkipOutputDuplicates") boolean maySkipOutputDuplicates, + @JsonProperty("filter") Optional<Expression> filter, + @JsonProperty("leftHashSymbol") Optional<Symbol> leftHashSymbol, + @JsonProperty("rightHashSymbol") Optional<Symbol> rightHashSymbol, + // @JsonProperty("distributionType") Optional<DistributionType> distributionType, + @JsonProperty("spillable") Optional<Boolean> spillable) + // @JsonProperty("dynamicFilters") Map<DynamicFilterId, Symbol> dynamicFilters, + // @JsonProperty("reorderJoinStatsAndCost") Optional<PlanNodeStatsAndCostSummary> + // reorderJoinStatsAndCost) + { + super(id); + requireNonNull(type, "type is null"); + requireNonNull(left, "left is null"); + requireNonNull(right, "right is null"); + requireNonNull(criteria, "criteria is null"); + requireNonNull(leftOutputSymbols, "leftOutputSymbols is null"); + requireNonNull(rightOutputSymbols, "rightOutputSymbols is null"); + requireNonNull(filter, "filter is null"); + // The condition doesn't guarantee that filter is of type boolean, but was found to be a + // practical way to identify + // places where JoinNode could be created without appropriate coercions. + checkArgument( + !filter.isPresent() || !(filter.get() instanceof NullLiteral), + "Filter must be an expression of boolean type: %s", + filter); + requireNonNull(leftHashSymbol, "leftHashSymbol is null"); + requireNonNull(rightHashSymbol, "rightHashSymbol is null"); + // requireNonNull(distributionType, "distributionType is null"); + requireNonNull(spillable, "spillable is null"); + + this.type = type; + this.left = left; + this.right = right; + this.criteria = ImmutableList.copyOf(criteria); + this.leftOutputSymbols = ImmutableList.copyOf(leftOutputSymbols); + this.rightOutputSymbols = ImmutableList.copyOf(rightOutputSymbols); + this.maySkipOutputDuplicates = maySkipOutputDuplicates; + this.filter = filter; + this.leftHashSymbol = leftHashSymbol; + this.rightHashSymbol = rightHashSymbol; + // this.distributionType = distributionType; + this.spillable = spillable; + // this.dynamicFilters = ImmutableMap.copyOf(requireNonNull(dynamicFilters, "dynamicFilters is + // null")); + // this.reorderJoinStatsAndCost = requireNonNull(reorderJoinStatsAndCost, + // "reorderJoinStatsAndCost is null"); + + Set<Symbol> leftSymbols = ImmutableSet.copyOf(left.getOutputSymbols()); + Set<Symbol> rightSymbols = ImmutableSet.copyOf(right.getOutputSymbols()); + + checkArgument( + leftSymbols.containsAll(leftOutputSymbols), + "Left source inputs do not contain all left output symbols"); + checkArgument( + rightSymbols.containsAll(rightOutputSymbols), + "Right source inputs do not contain all right output symbols"); + + checkArgument( + !(criteria.isEmpty() && leftHashSymbol.isPresent()), + "Left hash symbol is only valid in an equijoin"); + checkArgument( + !(criteria.isEmpty() && rightHashSymbol.isPresent()), + "Right hash symbol is only valid in an equijoin"); + + criteria.forEach( + equiJoinClause -> + checkArgument( + leftSymbols.contains(equiJoinClause.getLeft()) + && rightSymbols.contains(equiJoinClause.getRight()), + "Equality join criteria should be normalized according to join sides: %s", + equiJoinClause)); + + // if (distributionType.isPresent()) { + // // The implementation of full outer join only works if the data is hash + // partitioned. + // checkArgument( + // !(distributionType.get() == REPLICATED && (type == RIGHT || type == + // FULL)), + // "%s join do not work with %s distribution type", + // type, + // distributionType.get()); + // } + + // for (Symbol symbol : dynamicFilters.values()) { + // checkArgument(rightSymbols.contains(symbol), "Right join input doesn't contain + // symbol for dynamic filter: %s", symbol); + // } + } + + @Override + public PlanNode clone() { + return null; + } + + @Override + public List<String> getOutputColumnNames() { + throw new IllegalStateException(); + } + + @Override + protected void serializeAttributes(ByteBuffer byteBuffer) {} + + @Override + protected void serializeAttributes(DataOutputStream stream) throws IOException {} + + public static class EquiJoinClause { + private final Symbol left; + private final Symbol right; + + @JsonCreator + public EquiJoinClause(@JsonProperty("left") Symbol left, @JsonProperty("right") Symbol right) { + this.left = requireNonNull(left, "left is null"); + this.right = requireNonNull(right, "right is null"); + } + + @JsonProperty("left") + public Symbol getLeft() { + return left; + } + + @JsonProperty("right") + public Symbol getRight() { + return right; + } + + public ComparisonExpression toExpression() { + return new ComparisonExpression( + ComparisonExpression.Operator.EQUAL, left.toSymbolReference(), right.toSymbolReference()); + } + + public EquiJoinClause flip() { + return new EquiJoinClause(right, left); + } + + @Override + public boolean equals(Object obj) { + if (this == obj) { + return true; + } + + if (obj == null || !this.getClass().equals(obj.getClass())) { + return false; + } + + EquiJoinClause other = (EquiJoinClause) obj; + + return Objects.equals(this.left, other.left) && Objects.equals(this.right, other.right); + } + + @Override + public int hashCode() { + return Objects.hash(left, right); + } + + @Override + public String toString() { + return format("%s = %s", left, right); + } + } + + public enum JoinType { + INNER("InnerJoin"), + LEFT("LeftJoin"), + RIGHT("RightJoin"), + FULL("FullJoin"); + + private final String joinLabel; + + JoinType(String joinLabel) { + this.joinLabel = joinLabel; + } + + public String getJoinLabel() { + return joinLabel; + } + } +}
