This is an automated email from the ASF dual-hosted git repository. caogaofei pushed a commit to branch beyyes/join in repository https://gitbox.apache.org/repos/asf/iotdb.git
commit bda7dc1b167d83631b7c3190943384680b508725 Author: Beyyes <[email protected]> AuthorDate: Mon Aug 26 22:49:53 2024 +0800 fix typeprovider --- .../db/it/IoTDBMultiIDsWithAttributesTableIT.java | 3 -- .../plan/planner/TableOperatorGenerator.java | 6 ++++ .../distribute/TableDistributedPlanGenerator.java | 18 ++++++---- .../TableModelTypeProviderExtractor.java | 8 +++++ .../plan/relational/planner/node/JoinNode.java | 3 ++ .../optimizations/PushPredicateIntoTableScan.java | 38 +++++++++++++++------- 6 files changed, 56 insertions(+), 20 deletions(-) diff --git a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java index 4457055bd3e..9eeea1bc77d 100644 --- a/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java +++ b/integration-test/src/test/java/org/apache/iotdb/relational/it/db/it/IoTDBMultiIDsWithAttributesTableIT.java @@ -91,9 +91,6 @@ public class IoTDBMultiIDsWithAttributesTableIT { EnvFactory.getEnv() .getConfig() .getCommonConfig() - .setEnableSeqSpaceCompaction(false) - .setEnableUnseqSpaceCompaction(false) - .setEnableCrossSpaceCompaction(false) .setMaxTsBlockLineNumber(1) .setMaxNumberOfPointsInPage(1); insertData(); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java index 94fdb0c4df7..c4c7bd5f2ad 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/planner/TableOperatorGenerator.java @@ -70,6 +70,7 @@ import org.apache.iotdb.db.queryengine.plan.relational.planner.OrderingScheme; import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; @@ -737,6 +738,11 @@ public class TableOperatorGenerator extends PlanVisitor<Operator, LocalExecution TSFileDescriptor.getInstance().getConfig().getMaxTsBlockLineNumber()); } + @Override + public Operator visitJoin(JoinNode node, LocalExecutionPlanContext context) { + throw new IllegalStateException("JoinOperator is not implemented currently."); + } + @Override public Operator visitCountMerge( final CountSchemaMergeNode node, final LocalExecutionPlanContext context) { diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java index efd17df6e23..ba9049ad776 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableDistributedPlanGenerator.java @@ -321,17 +321,23 @@ public class TableDistributedPlanGenerator @Override public List<PlanNode> visitJoin(JoinNode node, PlanContext context) { + // child of JoinNode must be SortNode, so after rewritten, the child must be MergeSortNode or + // SortNode List<PlanNode> leftChildrenNodes = node.getLeftChild().accept(this, context); + checkArgument( + leftChildrenNodes.size() == 1, "The size of left children node of JoinNode should be 1"); + node.setLeftChild(leftChildrenNodes.get(0)); + OrderingScheme leftChildOrdering = nodeOrderingMap.get(leftChildrenNodes.get(0).getPlanNodeId()); - PlanNode leftNode = mergeChildrenViaCollectOrMergeSort(leftChildOrdering, leftChildrenNodes); - node.setLeftChild(leftNode); + if (leftChildOrdering != null) { + nodeOrderingMap.put(node.getPlanNodeId(), leftChildOrdering); + } List<PlanNode> rightChildrenNodes = node.getRightChild().accept(this, context); - OrderingScheme rightChildOrdering = - nodeOrderingMap.get(rightChildrenNodes.get(0).getPlanNodeId()); - PlanNode rightNode = mergeChildrenViaCollectOrMergeSort(rightChildOrdering, leftChildrenNodes); - node.setLeftChild(rightNode); + checkArgument( + rightChildrenNodes.size() == 1, "The size of right children node of JoinNode should be 1"); + node.setRightChild(rightChildrenNodes.get(0)); return Collections.singletonList(node); } diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java index deb6678d4c6..971f92051a7 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/distribute/TableModelTypeProviderExtractor.java @@ -26,6 +26,7 @@ import org.apache.iotdb.db.queryengine.plan.planner.plan.node.sink.IdentitySinkN import org.apache.iotdb.db.queryengine.plan.relational.planner.Symbol; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.CollectNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.FilterNode; +import org.apache.iotdb.db.queryengine.plan.relational.planner.node.JoinNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.LimitNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.MergeSortNode; import org.apache.iotdb.db.queryengine.plan.relational.planner.node.OffsetNode; @@ -194,6 +195,13 @@ public class TableModelTypeProviderExtractor { return null; } + @Override + public Void visitJoin(JoinNode node, Void context) { + node.getLeftChild().accept(this, context); + node.getRightChild().accept(this, context); + return null; + } + @Override public Void visitIdentitySink(IdentitySinkNode node, Void context) { node.getChildren().forEach(c -> c.accept(this, context)); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java index 45358d7d9e9..a729cd2bbec 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/node/JoinNode.java @@ -104,6 +104,9 @@ public class JoinNode extends TwoChildProcessNode { checkArgument( leftSymbols.containsAll(leftOutputSymbols), "Left source inputs do not contain all left output symbols"); + if (!rightSymbols.containsAll(rightOutputSymbols)) { + System.out.println("===="); + } checkArgument( rightSymbols.containsAll(rightOutputSymbols), "Right source inputs do not contain all right output symbols"); diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java index 4c7d10a5b20..5f3ce5c8f49 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/plan/relational/planner/optimizations/PushPredicateIntoTableScan.java @@ -54,7 +54,6 @@ import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.Node; import org.apache.iotdb.db.queryengine.plan.relational.sql.ast.SymbolReference; import com.google.common.collect.ImmutableList; -import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableSet; import org.apache.tsfile.read.filter.basic.Filter; import org.apache.tsfile.utils.Pair; @@ -64,6 +63,7 @@ import java.util.Collection; import java.util.Collections; import java.util.EnumSet; import java.util.HashSet; +import java.util.LinkedHashMap; import java.util.List; import java.util.Map; import java.util.Objects; @@ -306,7 +306,7 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { @Override public PlanNode visitJoin(JoinNode node, Void context) { hasJoinNode = true; - Expression inheritedPredicate = predicate; + Expression inheritedPredicate = predicate != null ? predicate : TRUE_LITERAL; // See if we can rewrite outer joins in terms of a plain inner join node = tryNormalizeToOuterToInnerJoin(node, inheritedPredicate); @@ -801,9 +801,10 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { /** Get deviceEntries and DataPartition used in TableScan. */ private PlanNode tableMetadataIndexScan( TableScanNode tableScanNode, List<Expression> metadataExpressions) { - // for join operator, columnSymbols in TableScanNode is renamed, which adds suffix for origin + // for join operator, columnSymbols in TableScanNode may be renamed, which adds suffix for + // origin // column name, - // add a new ProjectNode above TableScanNode. + // thus we add a new ProjectNode above TableScanNode. boolean tableScanNodeColumnsRenamed = false; for (Map.Entry<Symbol, ColumnSchema> entry : tableScanNode.getAssignments().entrySet()) { Symbol columnSymbol = entry.getKey(); @@ -820,17 +821,32 @@ public class PushPredicateIntoTableScan implements PlanOptimizer { MetadataExpressionTransformForJoin.transform( expression1, tableScanNode.getAssignments())); - Assignments.Builder projectAssignments = Assignments.builder(); - ImmutableMap.Builder<Symbol, ColumnSchema> tableScanAssignments = ImmutableMap.builder(); + List<Symbol> newTableScanSymbols = new ArrayList<>(tableScanNode.getOutputSymbols().size()); + Map<Symbol, ColumnSchema> newTableScanAssignments = + new LinkedHashMap<>(tableScanNode.getOutputSymbols().size()); + Map<Symbol, Expression> projectAssignments = + new LinkedHashMap<>(tableScanNode.getOutputSymbols().size()); for (Map.Entry<Symbol, ColumnSchema> entry : tableScanNode.getAssignments().entrySet()) { - Symbol columnSymbol = entry.getKey(); + Symbol originalSymbol = entry.getKey(); ColumnSchema columnSchema = entry.getValue(); - projectAssignments.put(columnSymbol, new SymbolReference(columnSchema.getName())); - tableScanAssignments.put(Symbol.of(columnSchema.getName()), columnSchema); + + Symbol realSymbol = Symbol.of(columnSchema.getName()); + newTableScanSymbols.add(realSymbol); + newTableScanAssignments.put(realSymbol, columnSchema); + projectAssignments.put(originalSymbol, new SymbolReference(columnSchema.getName())); + queryContext.getTypeProvider().putTableModelType(originalSymbol, columnSchema.getType()); + if (tableScanNode.getIdAndAttributeIndexMap().containsKey(originalSymbol)) { + Integer idx = tableScanNode.getIdAndAttributeIndexMap().get(originalSymbol); + tableScanNode.getIdAndAttributeIndexMap().remove(originalSymbol); + tableScanNode.getIdAndAttributeIndexMap().put(realSymbol, idx); + } } + + tableScanNode.setOutputSymbols(newTableScanSymbols); + tableScanNode.setAssignments(newTableScanAssignments); newProjectNode = - new ProjectNode(queryId.genPlanNodeId(), tableScanNode, projectAssignments.build()); - tableScanNode.setAssignments(tableScanAssignments.build()); + new ProjectNode( + queryId.genPlanNodeId(), tableScanNode, new Assignments(projectAssignments)); } List<String> attributeColumns = new ArrayList<>();
