This is an automated email from the ASF dual-hosted git repository. huajianlan pushed a commit to branch local-exchange-planner in repository https://gitbox.apache.org/repos/asf/doris.git
commit 1e607583fa5dd619e5b1227d0d1f54fe1ae5278a Author: 924060929 <[email protected]> AuthorDate: Tue Jan 13 17:42:18 2026 +0800 local exchange planner --- be/CMakeLists.txt | 4 + be/src/io/cache/block_file_cache_factory.cpp | 8 + .../doris/datasource/tvf/source/TVFScanNode.java | 11 + .../org/apache/doris/nereids/NereidsPlanner.java | 7 +- .../glue/translator/PhysicalPlanTranslator.java | 71 +++++- .../apache/doris/nereids/trees/plans/PlanType.java | 1 + .../org/apache/doris/planner/AddLocalExchange.java | 67 +++++ .../org/apache/doris/planner/AggregationNode.java | 55 +++++ .../org/apache/doris/planner/AnalyticEvalNode.java | 37 +++ .../apache/doris/planner/AssertNumRowsNode.java | 27 ++ .../java/org/apache/doris/planner/DataSink.java | 5 + .../org/apache/doris/planner/DataStreamSink.java | 14 ++ .../org/apache/doris/planner/ExchangeNode.java | 16 ++ .../org/apache/doris/planner/HashJoinNode.java | 62 +++++ .../apache/doris/planner/LocalExchangeNode.java | 271 +++++++++++++++++++++ .../apache/doris/planner/NestedLoopJoinNode.java | 57 +++++ .../org/apache/doris/planner/OlapScanNode.java | 25 ++ .../java/org/apache/doris/planner/PlanNode.java | 58 ++++- .../org/apache/doris/planner/SetOperationNode.java | 61 +++++ .../java/org/apache/doris/planner/SortNode.java | 37 +++ .../java/org/apache/doris/qe/SessionVariable.java | 18 ++ gensrc/thrift/PaloInternalService.thrift | 3 + gensrc/thrift/Partitions.thrift | 33 +++ gensrc/thrift/PlanNodes.thrift | 22 +- 24 files changed, 954 insertions(+), 16 deletions(-) diff --git a/be/CMakeLists.txt b/be/CMakeLists.txt index 16f3737d494..a5d117405d1 100644 --- a/be/CMakeLists.txt +++ b/be/CMakeLists.txt @@ -51,6 +51,10 @@ elseif (CMAKE_SYSTEM_NAME MATCHES "Darwin") add_definitions(-D OS_MACOSX) endif () +if (OS_MACOSX) + add_definitions(-DBOOST_PROCESS_POSIX_NO_SIGTIMEDWAIT -UBOOST_POSIX_HAS_SIGTIMEDWAIT) +endif() + if (CMAKE_CXX_COMPILER_ID STREQUAL "GNU") set (COMPILER_GCC 1) option(ENABLE_PCH "enable pch" OFF) diff --git a/be/src/io/cache/block_file_cache_factory.cpp b/be/src/io/cache/block_file_cache_factory.cpp index ef12ca353de..f7f16c99d60 100644 --- a/be/src/io/cache/block_file_cache_factory.cpp +++ b/be/src/io/cache/block_file_cache_factory.cpp @@ -94,7 +94,11 @@ Status FileCacheFactory::create_file_cache(const std::string& cache_base_path, LOG_ERROR("").tag("file cache path", cache_base_path).tag("error", strerror(errno)); return Status::IOError("{} statfs error {}", cache_base_path, strerror(errno)); } +#if defined(__APPLE__) + const auto block_size = stat.f_bsize; +#else const auto block_size = stat.f_frsize ? stat.f_frsize : stat.f_bsize; +#endif size_t disk_capacity = static_cast<size_t>(static_cast<size_t>(stat.f_blocks) * static_cast<size_t>(block_size)); if (file_cache_settings.capacity == 0 || disk_capacity < file_cache_settings.capacity) { @@ -289,7 +293,11 @@ std::string validate_capacity(const std::string& path, int64_t new_capacity, valid_capacity = 0; // caller will handle the error return ret; } +#if defined(__APPLE__) + const auto block_size = stat.f_bsize; +#else const auto block_size = stat.f_frsize ? stat.f_frsize : stat.f_bsize; +#endif size_t disk_capacity = static_cast<size_t>(static_cast<size_t>(stat.f_blocks) * static_cast<size_t>(block_size)); if (new_capacity == 0 || disk_capacity < new_capacity) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java index d3c891c1b4c..a47006b6467 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/tvf/source/TVFScanNode.java @@ -23,6 +23,7 @@ import org.apache.doris.catalog.FunctionGenTable; import org.apache.doris.catalog.TableIf; import org.apache.doris.common.DdlException; import org.apache.doris.common.MetaNotFoundException; +import org.apache.doris.common.Pair; import org.apache.doris.common.UserException; import org.apache.doris.common.util.LocationPath; import org.apache.doris.common.util.Util; @@ -31,6 +32,10 @@ import org.apache.doris.datasource.FileSplit; import org.apache.doris.datasource.FileSplit.FileSplitCreator; import org.apache.doris.datasource.FileSplitter; import org.apache.doris.datasource.TableFormatType; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; +import org.apache.doris.planner.PlanNode; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.qe.SessionVariable; import org.apache.doris.spi.Split; @@ -199,4 +204,10 @@ public class TVFScanNode extends FileQueryScanNode { public int getNumInstances() { return scanRangeLocations.size(); } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, + PlanNode parent, LocalExchangeTypeRequire parentRequire) { + return Pair.of(this, LocalExchangeType.PASSTHROUGH); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java index 364de658d12..c02f2935d64 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/NereidsPlanner.java @@ -75,6 +75,7 @@ import org.apache.doris.nereids.trees.plans.physical.PhysicalPlan; import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation; import org.apache.doris.nereids.trees.plans.physical.PhysicalSqlCache; import org.apache.doris.nereids.trees.plans.physical.TopnFilter; +import org.apache.doris.planner.AddLocalExchange; import org.apache.doris.planner.PlanFragment; import org.apache.doris.planner.PlanNodeId; import org.apache.doris.planner.Planner; @@ -623,7 +624,11 @@ public class NereidsPlanner extends Planner { scanNodeList.addAll(planTranslatorContext.getScanNodes()); physicalRelations.addAll(planTranslatorContext.getPhysicalRelations()); descTable = planTranslatorContext.getDescTable(); - fragments = new ArrayList<>(planTranslatorContext.getPlanFragments()); + List<PlanFragment> planFragments = planTranslatorContext.getPlanFragments(); + if (sessionVariable.isEnableLocalShufflePlanner()) { + new AddLocalExchange().addLocalExchange(planFragments, planTranslatorContext); + } + fragments = new ArrayList<>(planFragments); boolean enableQueryCache = sessionVariable.getEnableQueryCache(); for (int seq = 0; seq < fragments.size(); seq++) { diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java index 055ed7ab5d6..19d1ca3e2ad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java @@ -340,7 +340,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanTranslatorContext context) { Plan upstream = distribute.child(); // now they're in one fragment but will be split by ExchangeNode. PlanFragment upstreamFragment = upstream.accept(this, context); - List<List<Expr>> upstreamDistributeExprs = getDistributeExprs(upstream); + List<List<Expr>> upstreamDistributeExprs = getChildrenDistributeExprs(upstream); DistributionSpec targetDistribution = distribute.getDistributionSpec(); @@ -388,6 +388,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla // target data partition DataPartition targetDataPartition = toDataPartition(targetDistribution, validOutputIds, context); exchangeNode.setPartitionType(targetDataPartition.getType()); + exchangeNode.setDistributeExprLists(getDistributeExpr(distribute)); exchangeNode.setChildrenDistributeExprLists(upstreamDistributeExprs); // its source partition is targetDataPartition. and outputPartition is UNPARTITIONED now, will be set when // visit its SinkNode @@ -671,6 +672,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla fileScan.getTableSnapshot().ifPresent(fileQueryScanNode::setQueryTableSnapshot); fileScan.getScanParams().ifPresent(fileQueryScanNode::setScanParams); } + scanNode.setDistributeExprLists(getDistributeExpr(fileScan)); return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor); } @@ -691,6 +693,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanFragment planFragment = createPlanFragment(emptySetNode, DataPartition.UNPARTITIONED, emptyRelation); context.addPlanFragment(planFragment); + emptySetNode.setDistributeExprLists(getDistributeExpr(emptyRelation)); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), emptyRelation); return planFragment; } @@ -724,6 +727,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), esScanNode, dataPartition); context.addPlanFragment(planFragment); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), esScan); + esScanNode.setDistributeExprLists(getDistributeExpr(esScan)); return planFragment; } @@ -752,6 +756,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } HudiScanNode hudiScanNode = (HudiScanNode) scanNode; hudiScanNode.setSelectedPartitions(fileScan.getSelectedPartitions()); + hudiScanNode.setDistributeExprLists(getDistributeExpr(hudiScan)); return getPlanFragmentForPhysicalFileScan(fileScan, context, scanNode, table, tupleDescriptor); } @@ -777,6 +782,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = createPlanFragment(scanNode, dataPartition, fileScan); context.addPlanFragment(planFragment); + scanNode.setDistributeExprLists(getDistributeExpr(fileScan)); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), fileScan); return planFragment; } @@ -796,6 +802,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), jdbcScanNode, dataPartition); context.addPlanFragment(planFragment); + jdbcScanNode.setDistributeExprLists(getDistributeExpr(jdbcScan)); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), jdbcScan); return planFragment; } @@ -815,6 +822,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla DataPartition dataPartition = DataPartition.RANDOM; PlanFragment planFragment = new PlanFragment(context.nextFragmentId(), odbcScanNode, dataPartition); context.addPlanFragment(planFragment); + odbcScanNode.setDistributeExprLists(getDistributeExpr(odbcScan)); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), odbcScan); return planFragment; } @@ -850,6 +858,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla OlapScanNode olapScanNode = new OlapScanNode(context.nextPlanNodeId(), tupleDescriptor, "OlapScanNode"); olapScanNode.setNereidsId(olapScan.getId()); context.getNereidsIdToPlanNodeIdMap().put(olapScan.getId(), olapScanNode.getId()); + olapScanNode.setDistributeExprLists(getDistributeExpr(olapScan)); // translate score topn info if (!olapScan.getScoreOrderKeys().isEmpty()) { @@ -983,6 +992,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PhysicalDeferMaterializeOlapScan deferMaterializeOlapScan, PlanTranslatorContext context) { PlanFragment planFragment = visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context); OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot(); + olapScanNode.setDistributeExprLists(getDistributeExpr(deferMaterializeOlapScan)); TupleDescriptor tupleDescriptor = context.getTupleDesc(olapScanNode.getTupleId()); context.createSlotDesc(tupleDescriptor, deferMaterializeOlapScan.getColumnIdSlot()); context.getTopnFilterContext().translateTarget(deferMaterializeOlapScan, olapScanNode, context); @@ -1011,6 +1021,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanFragment planFragment = createPlanFragment(unionNode, DataPartition.UNPARTITIONED, oneRowRelation); context.addPlanFragment(planFragment); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), oneRowRelation); + + unionNode.setDistributeExprLists(getDistributeExpr(oneRowRelation)); return planFragment; } @@ -1043,6 +1055,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla context.addScanNode(scanNode, schemaScan); PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, schemaScan); context.addPlanFragment(planFragment); + scanNode.setDistributeExprLists(getDistributeExpr(schemaScan)); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), schemaScan); return planFragment; } @@ -1059,6 +1072,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanFragment planFragment = createPlanFragment(scanNode, DataPartition.RANDOM, workTableReference); context.addPlanFragment(planFragment); + scanNode.setDistributeExprLists(getDistributeExpr(workTableReference)); updateLegacyPlanIdToPhysicalPlan(planFragment.getPlanRoot(), workTableReference); return planFragment; } @@ -1079,6 +1093,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla TableValuedFunctionIf catalogFunction = tvfRelation.getFunction().getCatalogFunction(); SessionVariable sv = ConnectContext.get().getSessionVariable(); ScanNode scanNode = catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv); + scanNode.setDistributeExprLists(getDistributeExpr(tvfRelation)); scanNode.setNereidsId(tvfRelation.getId()); context.getNereidsIdToPlanNodeIdMap().put(tvfRelation.getId(), scanNode.getId()); Utils.execWithUncheckedException(scanNode::init); @@ -1117,7 +1132,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla PlanTranslatorContext context) { PlanFragment inputPlanFragment = aggregate.child(0).accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(aggregate.child(0)); + List<List<Expr>> distributeExprLists = getChildrenDistributeExprs(aggregate.child(0)); List<Expression> groupByExpressions = aggregate.getGroupByExpressions(); List<NamedExpression> outputExpressions = aggregate.getOutputExpressions(); @@ -1198,6 +1213,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla AggregationNode aggregationNode = new AggregationNode(context.nextPlanNodeId(), inputPlanFragment.getPlanRoot(), aggInfo); + aggregationNode.setDistributeExprLists(getDistributeExpr(aggregate)); aggregationNode.setChildrenDistributeExprLists(distributeExprLists); aggregationNode.setNereidsId(aggregate.getId()); @@ -1315,7 +1331,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla public PlanFragment visitPhysicalAssertNumRows(PhysicalAssertNumRows<? extends Plan> assertNumRows, PlanTranslatorContext context) { PlanFragment currentFragment = assertNumRows.child().accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(assertNumRows.child()); + List<List<Expr>> distributeExprLists = getChildrenDistributeExprs(assertNumRows.child()); // we need convert all columns to nullable in AssertNumRows node // create a tuple for AssertNumRowsNode @@ -1325,6 +1341,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla TupleDescriptor tupleDescriptor = generateTupleDesc(assertNumRows.getOutput(), null, context); AssertNumRowsNode assertNumRowsNode = new AssertNumRowsNode(context.nextPlanNodeId(), currentFragment.getPlanRoot(), assertion, tupleDescriptor); + assertNumRowsNode.setDistributeExprLists(getDistributeExpr(assertNumRows)); assertNumRowsNode.setChildrenDistributeExprLists(distributeExprLists); assertNumRowsNode.setNereidsId(assertNumRows.getId()); context.getNereidsIdToPlanNodeIdMap().put(assertNumRows.getId(), assertNumRowsNode.getId()); @@ -1376,6 +1393,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } } CTEScanNode cteScanNode = new CTEScanNode(tupleDescriptor); + cteScanNode.setDistributeExprLists(getDistributeExpr(cteConsumer)); translateRuntimeFilter(cteConsumer, cteScanNode, context); context.getCteScanNodeMap().put(multiCastFragment.getFragmentId(), cteScanNode); @@ -1487,6 +1505,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla .collect(Collectors.toCollection(ArrayList::new)); TableFunctionNode tableFunctionNode = new TableFunctionNode(context.nextPlanNodeId(), currentFragment.getPlanRoot(), tupleDescriptor.getId(), functionCalls, outputSlotIds, conjuncts); + tableFunctionNode.setDistributeExprLists(getDistributeExpr(generate)); tableFunctionNode.setNereidsId(generate.getId()); context.getNereidsIdToPlanNodeIdMap().put(generate.getId(), tableFunctionNode.getId()); addPlanRoot(currentFragment, tableFunctionNode, generate); @@ -1550,7 +1569,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla // NOTICE: We must visit from right to left, to ensure the last fragment is root fragment PlanFragment rightFragment = hashJoin.child(1).accept(this, context); PlanFragment leftFragment = hashJoin.child(0).accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(physicalHashJoin.left(), physicalHashJoin.right()); + List<List<Expr>> distributeExprLists + = getChildrenDistributeExprs(physicalHashJoin.left(), physicalHashJoin.right()); if (JoinUtils.shouldNestedLoopJoin(hashJoin)) { throw new RuntimeException("Physical hash join could not execute without equal join condition."); @@ -1595,6 +1615,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla markConjuncts, hashJoin.isMarkJoin()); hashJoinNode.setNereidsId(hashJoin.getId()); context.getNereidsIdToPlanNodeIdMap().put(hashJoin.getId(), hashJoinNode.getId()); + hashJoinNode.setDistributeExprLists(getDistributeExpr(hashJoin)); hashJoinNode.setChildrenDistributeExprLists(distributeExprLists); PlanFragment currentFragment = connectJoinNode(hashJoinNode, leftFragment, rightFragment, context, hashJoin); @@ -1809,7 +1830,8 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla // PhysicalPlan plan, PlanVisitor visitor, Context context). PlanFragment rightFragment = nestedLoopJoin.child(1).accept(this, context); PlanFragment leftFragment = nestedLoopJoin.child(0).accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(nestedLoopJoin.child(0), nestedLoopJoin.child(1)); + List<List<Expr>> distributeExprLists + = getChildrenDistributeExprs(nestedLoopJoin.child(0), nestedLoopJoin.child(1)); PlanNode leftFragmentPlanRoot = leftFragment.getPlanRoot(); PlanNode rightFragmentPlanRoot = rightFragment.getPlanRoot(); @@ -1830,6 +1852,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla nestedLoopJoin.isMarkJoin()); nestedLoopJoinNode.setNereidsId(nestedLoopJoin.getId()); context.getNereidsIdToPlanNodeIdMap().put(nestedLoopJoin.getId(), nestedLoopJoinNode.getId()); + nestedLoopJoinNode.setDistributeExprLists(getDistributeExpr(nestedLoopJoin)); nestedLoopJoinNode.setChildrenDistributeExprLists(distributeExprLists); if (nestedLoopJoin.getStats() != null) { nestedLoopJoinNode.setCardinality((long) nestedLoopJoin.getStats().getRowCount()); @@ -2029,9 +2052,10 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla public PlanFragment visitPhysicalPartitionTopN(PhysicalPartitionTopN<? extends Plan> partitionTopN, PlanTranslatorContext context) { PlanFragment inputFragment = partitionTopN.child(0).accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(partitionTopN.child(0)); + List<List<Expr>> distributeExprLists = getChildrenDistributeExprs(partitionTopN.child(0)); PartitionSortNode partitionSortNode = translatePartitionSortNode( partitionTopN, inputFragment.getPlanRoot(), context); + partitionSortNode.setDistributeExprLists(getDistributeExpr(partitionTopN)); partitionSortNode.setChildrenDistributeExprLists(distributeExprLists); addPlanRoot(inputFragment, partitionSortNode, partitionTopN); // in pipeline engine, we use parallel scan by default, but it broke the rule of data distribution @@ -2225,12 +2249,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla for (Plan plan : recursiveCte.children()) { childrenFragments.add(plan.accept(this, context)); } - List<List<Expr>> distributeExprLists = getDistributeExprs(recursiveCte.children().toArray(new Plan[0])); + List<List<Expr>> distributeExprLists = getChildrenDistributeExprs(recursiveCte.children().toArray(new Plan[0])); TupleDescriptor setTuple = generateTupleDesc(recursiveCte.getOutput(), null, context); RecursiveCteNode recursiveCteNode = new RecursiveCteNode(context.nextPlanNodeId(), setTuple.getId(), recursiveCte.getCteName(), recursiveCte.isUnionAll()); + recursiveCteNode.setDistributeExprLists(getDistributeExpr(recursiveCte)); recursiveCteNode.setChildrenDistributeExprLists(distributeExprLists); recursiveCteNode.setNereidsId(recursiveCte.getId()); context.getNereidsIdToPlanNodeIdMap().put(recursiveCte.getId(), recursiveCteNode.getId()); @@ -2298,7 +2323,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla for (Plan plan : setOperation.children()) { childrenFragments.add(plan.accept(this, context)); } - List<List<Expr>> distributeExprLists = getDistributeExprs(setOperation.children().toArray(new Plan[0])); + List<List<Expr>> distributeExprLists = getChildrenDistributeExprs(setOperation.children().toArray(new Plan[0])); TupleDescriptor setTuple = generateTupleDesc(setOperation.getOutput(), null, context); SetOperationNode setOperationNode; @@ -2312,6 +2337,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } else { throw new RuntimeException("not support set operation type " + setOperation); } + setOperationNode.setDistributeExprLists(getDistributeExpr(setOperation)); setOperationNode.setChildrenDistributeExprLists(distributeExprLists); setOperationNode.setNereidsId(setOperation.getId()); context.getNereidsIdToPlanNodeIdMap().put(setOperation.getId(), setOperationNode.getId()); @@ -2424,12 +2450,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla public PlanFragment visitPhysicalQuickSort(PhysicalQuickSort<? extends Plan> sort, PlanTranslatorContext context) { PlanFragment inputFragment = sort.child(0).accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(sort.child(0)); + List<List<Expr>> distributeExprLists = getChildrenDistributeExprs(sort.child(0)); // 2. According to the type of sort, generate physical plan if (!sort.getSortPhase().isMerge()) { // For localSort or Gather->Sort, we just need to add sortNode SortNode sortNode = translateSortNode(sort, inputFragment.getPlanRoot(), context); + sortNode.setDistributeExprLists(getDistributeExpr(sort)); sortNode.setChildrenDistributeExprLists(distributeExprLists); addPlanRoot(inputFragment, sortNode, sort); } else { @@ -2446,6 +2473,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla inputFragment.getChild(0).getSink().setMerge(true); } sortNode.setMergeByExchange(); + sortNode.setDistributeExprLists(getDistributeExpr(sort)); sortNode.setChildrenDistributeExprLists(distributeExprLists); } return inputFragment; @@ -2454,7 +2482,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla @Override public PlanFragment visitPhysicalTopN(PhysicalTopN<? extends Plan> topN, PlanTranslatorContext context) { PlanFragment inputFragment = topN.child(0).accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(topN.child(0)); + List<List<Expr>> distributeExprLists = getChildrenDistributeExprs(topN.child(0)); // 2. According to the type of sort, generate physical plan if (!topN.getSortPhase().isMerge()) { // For localSort or Gather->Sort, we just need to add TopNNode @@ -2490,6 +2518,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla } } } + sortNode.setDistributeExprLists( + CollectionUtils.isEmpty(distributeExprLists) ? null : distributeExprLists.get(0) + ); sortNode.setChildrenDistributeExprLists(distributeExprLists); addPlanRoot(inputFragment, sortNode, topN); } else { @@ -2503,6 +2534,9 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return inputFragment; } ExchangeNode exchangeNode = (ExchangeNode) inputFragment.getPlanRoot(); + exchangeNode.setDistributeExprLists( + CollectionUtils.isEmpty(distributeExprLists) ? null : distributeExprLists.get(0) + ); exchangeNode.setChildrenDistributeExprLists(distributeExprLists); exchangeNode.setMergeInfo(((SortNode) exchangeNode.getChild(0)).getSortInfo()); if (inputFragment.hasChild(0) && inputFragment.getChild(0).getSink() != null) { @@ -2534,7 +2568,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla @Override public PlanFragment visitPhysicalRepeat(PhysicalRepeat<? extends Plan> repeat, PlanTranslatorContext context) { PlanFragment inputPlanFragment = repeat.child(0).accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(repeat.child(0)); + List<List<Expr>> distributeExprLists = getChildrenDistributeExprs(repeat.child(0)); List<Expression> flattenGroupingExpressions = repeat.getGroupByExpressions(); Set<Slot> preRepeatExpressions = Sets.newLinkedHashSet(); @@ -2585,6 +2619,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla allSlotId, repeat.computeGroupingFunctionsValues()); repeatNode.setNereidsId(repeat.getId()); context.getNereidsIdToPlanNodeIdMap().put(repeat.getId(), repeatNode.getId()); + repeatNode.setDistributeExprLists(getDistributeExpr(repeat)); repeatNode.setChildrenDistributeExprLists(distributeExprLists); addPlanRoot(inputPlanFragment, repeatNode, repeat); updateLegacyPlanIdToPhysicalPlan(inputPlanFragment.getPlanRoot(), repeat); @@ -2595,7 +2630,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla public PlanFragment visitPhysicalWindow(PhysicalWindow<? extends Plan> physicalWindow, PlanTranslatorContext context) { PlanFragment inputPlanFragment = physicalWindow.child(0).accept(this, context); - List<List<Expr>> distributeExprLists = getDistributeExprs(physicalWindow.child(0)); + List<List<Expr>> distributeExprLists = getChildrenDistributeExprs(physicalWindow.child(0)); // 1. translate to old optimizer variable // variable in Nereids @@ -2660,6 +2695,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla ); analyticEvalNode.setNereidsId(physicalWindow.getId()); context.getNereidsIdToPlanNodeIdMap().put(physicalWindow.getId(), analyticEvalNode.getId()); + analyticEvalNode.setDistributeExprLists(getDistributeExpr(physicalWindow)); analyticEvalNode.setChildrenDistributeExprLists(distributeExprLists); PlanNode root = inputPlanFragment.getPlanRoot(); if (root instanceof SortNode) { @@ -2697,6 +2733,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla materializeNode.setLazyColumns(materialize.getLazyColumns()); materializeNode.setLocations(materialize.getLazySlotLocations()); materializeNode.setIdxs(materialize.getlazyTableIdxs()); + materializeNode.setDistributeExprLists(getDistributeExpr(materialize)); List<Boolean> rowStoreFlags = new ArrayList<>(); for (Relation relation : materialize.getRelations()) { @@ -2732,6 +2769,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla TableValuedFunctionIf catalogFunction = tvfRelation.getFunction().getCatalogFunction(); SessionVariable sv = ConnectContext.get().getSessionVariable(); ScanNode scanNode = catalogFunction.getScanNode(context.nextPlanNodeId(), tupleDescriptor, sv); + scanNode.setDistributeExprLists(getDistributeExpr(tvfRelation)); scanNode.setNereidsId(tvfRelation.getId()); context.getNereidsIdToPlanNodeIdMap().put(tvfRelation.getId(), scanNode.getId()); Utils.execWithUncheckedException(scanNode::init); @@ -3177,7 +3215,7 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return false; } - private List<List<Expr>> getDistributeExprs(Plan ... children) { + private List<List<Expr>> getChildrenDistributeExprs(Plan ... children) { List<List<Expr>> distributeExprLists = Lists.newArrayList(); for (Plan child : children) { DistributionSpec spec = ((PhysicalPlan) child).getPhysicalProperties().getDistributionSpec(); @@ -3186,6 +3224,13 @@ public class PhysicalPlanTranslator extends DefaultPlanVisitor<PlanFragment, Pla return distributeExprLists; } + private List<Expr> getDistributeExpr(PhysicalPlan physicalPlan) { + return getDistributeExpr( + physicalPlan.getOutputExprIds(), + physicalPlan.getPhysicalProperties().getDistributionSpec() + ); + } + private List<Expr> getDistributeExpr(List<ExprId> childOutputIds, DistributionSpec spec) { if (spec instanceof DistributionSpecHash) { DistributionSpecHash distributionSpecHash = (DistributionSpecHash) spec; diff --git a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java index 0c5a16fc3ef..dc26149dc63 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java +++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/PlanType.java @@ -130,6 +130,7 @@ public enum PlanType { PHYSICAL_RECURSIVE_CTE, PHYSICAL_RECURSIVE_CTE_RECURSIVE_CHILD, PHYSICAL_DISTRIBUTE, + PHYSICAL_LOCAL_DISTRIBUTE, PHYSICAL_EXCEPT, PHYSICAL_FILTER, PHYSICAL_GENERATE, diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java new file mode 100644 index 00000000000..2fc8ad7801b --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AddLocalExchange.java @@ -0,0 +1,67 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +package org.apache.doris.planner; + +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; + +import java.util.List; + +/** AddLocalExchange */ +public class AddLocalExchange { + public void addLocalExchange(List<PlanFragment> fragments, PlanTranslatorContext context) { + for (PlanFragment fragment : fragments) { + DataSink sink = fragment.getSink(); + LocalExchangeTypeRequire require = sink == null + ? LocalExchangeTypeRequire.noRequire() : sink.getLocalExchangeTypeRequire(); + PlanNode root = fragment.getPlanRoot(); + Pair<PlanNode, LocalExchangeType> output = root + .enforceAndDeriveLocalExchange(context, null, require); + if (output.first != root) { + fragment.setPlanRoot(output.first); + } + } + } + + public static boolean isColocated(PlanNode plan) { + if (plan instanceof AggregationNode) { + return ((AggregationNode) plan).isColocate() && isColocated(plan.getChild(0)); + } else if (plan instanceof OlapScanNode) { + return true; + } else if (plan instanceof SelectNode) { + return isColocated(plan.getChild(0)); + } else if (plan instanceof HashJoinNode) { + return ((HashJoinNode) plan).isColocate() + && (isColocated(plan.getChild(0)) || isColocated(plan.getChild(1))); + } else if (plan instanceof SetOperationNode) { + if (!((SetOperationNode) plan).isColocate()) { + return false; + } + for (PlanNode child : plan.getChildren()) { + if (isColocated(child)) { + return true; + } + } + return false; + } else { + return false; + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java index b0e0bab2b2b..d41212819ab 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java @@ -26,7 +26,13 @@ import org.apache.doris.analysis.FunctionCallExpr; import org.apache.doris.analysis.SlotDescriptor; import org.apache.doris.analysis.SlotRef; import org.apache.doris.analysis.SortInfo; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.planner.normalize.Normalizer; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.qe.SessionVariable; import org.apache.doris.thrift.TAggregationNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; @@ -246,6 +252,10 @@ public class AggregationNode extends PlanNode { isColocate = colocate; } + public boolean isColocate() { + return isColocate; + } + public void setSortByGroupKey(SortInfo sortByGroupKey) { this.sortByGroupKey = sortByGroupKey; } @@ -257,4 +267,49 @@ public class AggregationNode extends PlanNode { public void setQueryCacheCandidate(boolean queryCacheCandidate) { this.queryCacheCandidate = queryCacheCandidate; } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange( + PlanTranslatorContext translatorContext, PlanNode parent, LocalExchangeTypeRequire parentRequire) { + + ConnectContext connectContext = translatorContext.getConnectContext(); + SessionVariable sessionVariable = connectContext.getSessionVariable(); + + LocalExchangeTypeRequire requireChild; + if (needsFinalize && aggInfo.getGroupingExprs().isEmpty()) { + requireChild = LocalExchangeTypeRequire.noRequire(); + } else if (canUseDistinctStreamingAgg(sessionVariable)) { + boolean childUsePoolingScan = fragment.useSerialSource(translatorContext.getConnectContext()) + && (children.get(0) instanceof ScanNode); + if (needsFinalize || (aggInfo.getGroupingExprs().size() > 1 && !useStreamingPreagg)) { + if (AddLocalExchange.isColocated(this)) { + requireChild = LocalExchangeTypeRequire.requireBucketHash(); + } else { + requireChild = parentRequire.autoHash(); + } + } else if (childUsePoolingScan) { + requireChild = LocalExchangeTypeRequire.requirePassthrough(); + } else { + requireChild = LocalExchangeTypeRequire.noRequire(); + } + } else { + if (aggInfo.getGroupingExprs().isEmpty()) { + requireChild = LocalExchangeTypeRequire.requirePassthrough(); + } else if (AddLocalExchange.isColocated(this)) { + requireChild = LocalExchangeTypeRequire.requireBucketHash(); + } else { + requireChild = parentRequire.autoHash(); + } + } + + Pair<PlanNode, LocalExchangeType> enforceResult + = enforceChild(translatorContext, requireChild, children.get(0)); + children = Lists.newArrayList(enforceResult.first); + return Pair.of(this, enforceResult.second); + } + + private boolean canUseDistinctStreamingAgg(SessionVariable sessionVariable) { + return aggInfo.getAggregateExprs().isEmpty() && sortByGroupKey == null + && sessionVariable.enableDistinctStreamingAggregation; + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java index 6992eeb001a..292df60cb26 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AnalyticEvalNode.java @@ -24,6 +24,10 @@ import org.apache.doris.analysis.AnalyticWindow; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.OrderByElement; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.thrift.TAnalyticNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; @@ -152,4 +156,37 @@ public class AnalyticEvalNode extends PlanNode { public boolean isSerialOperator() { return partitionExprs.isEmpty(); } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, + PlanNode parent, LocalExchangeTypeRequire parentRequire) { + LocalExchangeTypeRequire requireChild; + LocalExchangeType outputType = null; + if (partitionExprs.isEmpty()) { + requireChild = LocalExchangeTypeRequire.requirePassthrough(); + outputType = LocalExchangeType.PASSTHROUGH; + } else if (orderByElements.isEmpty()) { + if (AddLocalExchange.isColocated(this)) { + requireChild = LocalExchangeTypeRequire.requireBucketHash(); + outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE; + } else { + requireChild = parentRequire.autoHash(); + } + } else if (fragment.useSerialSource(translatorContext.getConnectContext()) + && children.get(0) instanceof ScanNode) { + requireChild = LocalExchangeTypeRequire.requirePassthrough(); + outputType = LocalExchangeType.PASSTHROUGH; + } else { + requireChild = LocalExchangeTypeRequire.noRequire(); + outputType = LocalExchangeType.NOOP; + } + + Pair<PlanNode, LocalExchangeType> enforceResult + = enforceChild(translatorContext, requireChild, children.get(0)); + children = Lists.newArrayList(enforceResult.first); + if (outputType == null) { + outputType = enforceResult.second; + } + return Pair.of(this, outputType); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java index dcf683f0783..6874f4f0fad 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AssertNumRowsNode.java @@ -19,11 +19,18 @@ package org.apache.doris.planner; import org.apache.doris.analysis.AssertNumRowsElement; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; +import org.apache.doris.planner.LocalExchangeNode.RequireSpecific; import org.apache.doris.thrift.TAssertNumRowsNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; +import com.google.common.collect.Lists; + /** * Assert num rows node is used to determine whether the number of rows is less than desired num of rows. * The rows are the result of subqueryString. @@ -81,4 +88,24 @@ public class AssertNumRowsNode extends PlanNode { public boolean isSerialOperator() { return true; } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, + PlanNode parent, LocalExchangeTypeRequire parentRequire) { + + PlanNode child = children.get(0); + RequireSpecific requireChild = LocalExchangeTypeRequire.requirePassthrough(); + Pair<PlanNode, LocalExchangeType> childOutput = child.enforceAndDeriveLocalExchange( + translatorContext, this, requireChild); + + if (!requireChild.satisfy(childOutput.second)) { + LocalExchangeType preferType = requireChild.preferType(); + LocalExchangeNode localExchangeNode + = new LocalExchangeNode(translatorContext.nextPlanNodeId(), child, preferType); + children = Lists.newArrayList(localExchangeNode); + } else { + children = Lists.newArrayList(childOutput.first); + } + return Pair.of(this, LocalExchangeType.PASSTHROUGH); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java index 320fcae9eb0..199c428b561 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataSink.java @@ -27,6 +27,7 @@ import org.apache.doris.common.AnalysisException; import org.apache.doris.datasource.hive.HMSExternalTable; import org.apache.doris.datasource.iceberg.IcebergExternalTable; import org.apache.doris.datasource.odbc.sink.OdbcTableSink; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TExplainLevel; @@ -86,4 +87,8 @@ public abstract class DataSink { public void setMerge(boolean merge) { isMerge = merge; } + + public LocalExchangeTypeRequire getLocalExchangeTypeRequire() { + return LocalExchangeTypeRequire.noRequire(); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java index 155b63d6224..22f73ed2b28 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataStreamSink.java @@ -22,6 +22,7 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.thrift.TDataSink; import org.apache.doris.thrift.TDataSinkType; import org.apache.doris.thrift.TDataStreamSink; @@ -29,6 +30,7 @@ import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TOlapTableLocationParam; import org.apache.doris.thrift.TOlapTablePartitionParam; import org.apache.doris.thrift.TOlapTableSchemaParam; +import org.apache.doris.thrift.TPartitionType; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; @@ -239,4 +241,16 @@ public class DataStreamSink extends DataSink { result.setStreamSink(tStreamSink); return result; } + + @Override + public LocalExchangeTypeRequire getLocalExchangeTypeRequire() { + TPartitionType outputType = outputPartition.getType(); + if (outputType == TPartitionType.HASH_PARTITIONED) { + return LocalExchangeTypeRequire.requireExecutionHash(); + } else if (outputType == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) { + return LocalExchangeTypeRequire.requireBucketHash(); + } else { + return LocalExchangeTypeRequire.noRequire(); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java index 7274f731c5a..c24c3894b4e 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ExchangeNode.java @@ -23,6 +23,10 @@ package org.apache.doris.planner; import org.apache.doris.analysis.SortInfo; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TExchangeNode; import org.apache.doris.thrift.TExplainLevel; @@ -166,4 +170,16 @@ public class ExchangeNode extends PlanNode { public boolean hasSerialScanChildren() { return false; } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, + PlanNode parent, LocalExchangeTypeRequire parentRequire) { + if (partitionType == TPartitionType.HASH_PARTITIONED) { + return Pair.of(this, LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE); + } else if (partitionType == TPartitionType.BUCKET_SHFFULE_HASH_PARTITIONED) { + return Pair.of(this, LocalExchangeType.BUCKET_HASH_SHUFFLE); + } else { + return Pair.of(this, LocalExchangeType.NOOP); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java index e5e40126b13..c748809cc35 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/HashJoinNode.java @@ -25,7 +25,11 @@ import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.JoinOperator; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; import org.apache.doris.nereids.trees.expressions.ExprId; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.thrift.TEqJoinCondition; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.THashJoinNode; @@ -118,6 +122,10 @@ public class HashJoinNode extends JoinNodeBase { colocateReason = reason; } + public boolean isColocate() { + return isColocate; + } + public Map<ExprId, SlotId> getHashOutputExprSlotIdMap() { return hashOutputExprSlotIdMap; } @@ -267,4 +275,58 @@ public class HashJoinNode extends JoinNodeBase { public List<Expr> getMarkJoinConjuncts() { return markJoinConjuncts; } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange( + PlanTranslatorContext translatorContext, PlanNode parent, LocalExchangeTypeRequire parentRequire) { + + LocalExchangeTypeRequire probeSideRequire; + LocalExchangeTypeRequire buildSideRequire; + LocalExchangeType outputType = null; + + if (joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { + buildSideRequire = probeSideRequire = LocalExchangeTypeRequire.noRequire(); + outputType = LocalExchangeType.NOOP; + } else if (distrMode == DistributionMode.BROADCAST) { + probeSideRequire = LocalExchangeTypeRequire.requirePassthrough(); + buildSideRequire = LocalExchangeTypeRequire.requirePassToOne(); + outputType = LocalExchangeType.PASSTHROUGH; + } else if (AddLocalExchange.isColocated(this) || isBucketShuffle()) { + buildSideRequire = probeSideRequire = LocalExchangeTypeRequire.requireBucketHash(); + outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE; + } else { + buildSideRequire = probeSideRequire = parentRequire.autoHash(); + } + + PlanNode probeSide = children.get(0); + Pair<PlanNode, LocalExchangeType> probeSideOutput = probeSide.enforceAndDeriveLocalExchange( + translatorContext, this, probeSideRequire); + if (!probeSideRequire.satisfy(probeSideOutput.second)) { + LocalExchangeType preferType = probeSideRequire.preferType(); + probeSide = new LocalExchangeNode( + translatorContext.nextPlanNodeId(), probeSide, preferType + ); + } else { + probeSide = probeSideOutput.first; + } + + PlanNode buildSide = children.get(1); + Pair<PlanNode, LocalExchangeType> buildSideOutput = buildSide.enforceAndDeriveLocalExchange( + translatorContext, this, buildSideRequire); + if (!buildSideRequire.satisfy(buildSideOutput.second)) { + LocalExchangeType preferType = buildSideRequire.preferType(); + buildSide = new LocalExchangeNode( + translatorContext.nextPlanNodeId(), buildSide, preferType + ); + } else { + buildSide = buildSideOutput.first; + } + + this.children = Lists.newArrayList(probeSide, buildSide); + + if (outputType == null) { + outputType = probeSideOutput.second; + } + return Pair.of(this, outputType); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/LocalExchangeNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/LocalExchangeNode.java new file mode 100644 index 00000000000..840b1b5ed4d --- /dev/null +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/LocalExchangeNode.java @@ -0,0 +1,271 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. +// This file is copied from +// https://github.com/apache/impala/blob/branch-2.9.0/fe/src/main/java/org/apache/impala/ExchangeNode.java +// and modified by Doris + +package org.apache.doris.planner; + +import org.apache.doris.analysis.Expr; +import org.apache.doris.analysis.TupleDescriptor; +import org.apache.doris.qe.ConnectContext; +import org.apache.doris.thrift.TExplainLevel; +import org.apache.doris.thrift.TExpr; +import org.apache.doris.thrift.TLocalExchangeNode; +import org.apache.doris.thrift.TLocalPartitionType; +import org.apache.doris.thrift.TPlanNode; +import org.apache.doris.thrift.TPlanNodeType; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; + +/** LocalExchangeNode */ +public class LocalExchangeNode extends PlanNode { + public static final String EXCHANGE_NODE = "LOCAL-EXCHANGE"; + + private LocalExchangeType exchangeType; + + /** + * use for Nereids only. + */ + public LocalExchangeNode(PlanNodeId id, PlanNode inputNode, LocalExchangeType exchangeType) { + super(id, inputNode, EXCHANGE_NODE); + this.offset = 0; + this.limit = -1; + this.conjuncts = Collections.emptyList(); + this.children.add(inputNode); + this.exchangeType = exchangeType; + this.fragment = inputNode.getFragment(); + + List<Expr> distributeExprs = inputNode.getDistributeExprLists(); + boolean isHashShuffle = (exchangeType == LocalExchangeType.BUCKET_HASH_SHUFFLE + || exchangeType == LocalExchangeType.LOCAL_EXECUTION_HASH_SHUFFLE + || exchangeType == LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE); + if (isHashShuffle && distributeExprs != null && !distributeExprs.isEmpty()) { + setDistributeExprLists(distributeExprs); + List<List<Expr>> distributeExprsList = new ArrayList<>(); + distributeExprsList.add(distributeExprs); + setChildrenDistributeExprLists(distributeExprsList); + } + TupleDescriptor outputTupleDesc = inputNode.getOutputTupleDesc(); + updateTupleIds(outputTupleDesc); + } + + public void updateTupleIds(TupleDescriptor outputTupleDesc) { + if (outputTupleDesc != null) { + clearTupleIds(); + tupleIds.add(outputTupleDesc.getId()); + } else { + clearTupleIds(); + tupleIds.addAll(getChild(0).getOutputTupleIds()); + } + } + + @Override + protected void toThrift(TPlanNode msg) { + msg.setIsSerialOperator((isSerialOperator() || fragment.hasSerialScanNode()) + && fragment.useSerialSource(ConnectContext.get())); + + msg.node_type = TPlanNodeType.LOCAL_EXCHANGE_NODE; + msg.local_exchange_node = new TLocalExchangeNode(exchangeType.toThrift()); + + if (exchangeType.isHashShuffle()) { + List<List<TExpr>> distributeExprLists = new ArrayList<>(); + for (List<Expr> exprList : childrenDistributeExprLists) { + List<TExpr> distributeExprList = new ArrayList<>(); + for (Expr expr : exprList) { + distributeExprList.add(expr.treeToThrift()); + } + distributeExprLists.add(distributeExprList); + } + msg.distribute_expr_lists = distributeExprLists; + } + } + + @Override + public String getNodeExplainString(String prefix, TExplainLevel detailLevel) { + return prefix + "type: " + exchangeType.name() + "\n"; + } + + /** LocalExchangeTypeRequire */ + public interface LocalExchangeTypeRequire { + boolean satisfy(LocalExchangeType provide); + + LocalExchangeType preferType(); + + default LocalExchangeTypeRequire autoHash() { + return RequireHash.INSTANCE; + } + + static NoRequire noRequire() { + return NoRequire.INSTANCE; + } + + static RequireHash requireHash() { + return RequireHash.INSTANCE; + } + + static RequireSpecific requirePassthrough() { + return requireSpecific(LocalExchangeType.PASSTHROUGH); + } + + static RequireSpecific requirePassToOne() { + return requireSpecific(LocalExchangeType.PASS_TO_ONE); + } + + static RequireSpecific requireBroadcast() { + return requireSpecific(LocalExchangeType.BROADCAST); + } + + static RequireSpecific requireAdaptivePassthrough() { + return requireSpecific(LocalExchangeType.ADAPTIVE_PASSTHROUGH); + } + + static RequireSpecific requireBucketHash() { + return requireSpecific(LocalExchangeType.BUCKET_HASH_SHUFFLE); + } + + static RequireSpecific requireExecutionHash() { + return requireSpecific(LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE); + } + + static RequireSpecific requireSpecific(LocalExchangeType require) { + return new RequireSpecific(require); + } + + default LocalExchangeType noopTo(LocalExchangeType defaultType) { + LocalExchangeType preferType = preferType(); + return (preferType == LocalExchangeType.NOOP) ? defaultType : preferType; + } + } + + /** NoRequire */ + public static class NoRequire implements LocalExchangeTypeRequire { + public static final NoRequire INSTANCE = new NoRequire(); + + @Override + public boolean satisfy(LocalExchangeType provide) { + return true; + } + + @Override + public LocalExchangeType preferType() { + return LocalExchangeType.NOOP; + } + } + + /** RequireHash */ + public static class RequireHash implements LocalExchangeTypeRequire { + public static final RequireHash INSTANCE = new RequireHash(); + + @Override + public boolean satisfy(LocalExchangeType provide) { + switch (provide) { + case GLOBAL_EXECUTION_HASH_SHUFFLE: + case BUCKET_HASH_SHUFFLE: + return true; + default: + return false; + } + } + + @Override + public LocalExchangeType preferType() { + return LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE; + } + + @Override + public LocalExchangeTypeRequire autoHash() { + return this; + } + } + + public static class RequireSpecific implements LocalExchangeTypeRequire { + LocalExchangeType requireType; + + public RequireSpecific(LocalExchangeType requireType) { + this.requireType = requireType; + } + + @Override + public boolean satisfy(LocalExchangeType provide) { + return requireType == provide; + } + + @Override + public LocalExchangeType preferType() { + return requireType; + } + + @Override + public LocalExchangeTypeRequire autoHash() { + if (requireType == LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE + || requireType == LocalExchangeType.BUCKET_HASH_SHUFFLE) { + return this; + } + return RequireHash.INSTANCE; + } + } + + public enum LocalExchangeType { + NOOP, + GLOBAL_EXECUTION_HASH_SHUFFLE, + LOCAL_EXECUTION_HASH_SHUFFLE, + BUCKET_HASH_SHUFFLE, + PASSTHROUGH, + ADAPTIVE_PASSTHROUGH, + BROADCAST, + PASS_TO_ONE, + LOCAL_MERGE_SORT; + + public boolean isHashShuffle() { + switch (this) { + case GLOBAL_EXECUTION_HASH_SHUFFLE: + case LOCAL_EXECUTION_HASH_SHUFFLE: + case BUCKET_HASH_SHUFFLE: + return true; + default: + return false; + } + } + + public TLocalPartitionType toThrift() { + switch (this) { + case GLOBAL_EXECUTION_HASH_SHUFFLE: + return TLocalPartitionType.GLOBAL_EXECUTION_HASH_SHUFFLE; + case LOCAL_EXECUTION_HASH_SHUFFLE: + return TLocalPartitionType.LOCAL_EXECUTION_HASH_SHUFFLE; + case BUCKET_HASH_SHUFFLE: + return TLocalPartitionType.BUCKET_HASH_SHUFFLE; + case PASSTHROUGH: + return TLocalPartitionType.PASSTHROUGH; + case ADAPTIVE_PASSTHROUGH: + return TLocalPartitionType.ADAPTIVE_PASSTHROUGH; + case BROADCAST: + return TLocalPartitionType.BROADCAST; + case PASS_TO_ONE: + return TLocalPartitionType.PASS_TO_ONE; + case LOCAL_MERGE_SORT: + return TLocalPartitionType.LOCAL_MERGE_SORT; + default: { + throw new IllegalStateException("Unsupported LocalExchangeType: " + this); + } + } + } + } +} diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java index 520cbf937cd..f9cc1c3a1ff 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/NestedLoopJoinNode.java @@ -22,11 +22,17 @@ import org.apache.doris.analysis.JoinOperator; import org.apache.doris.analysis.SlotId; import org.apache.doris.analysis.TupleDescriptor; import org.apache.doris.analysis.TupleId; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TNestedLoopJoinNode; import org.apache.doris.thrift.TPlanNode; import org.apache.doris.thrift.TPlanNodeType; +import com.google.common.collect.Lists; + import java.util.List; /** @@ -165,4 +171,55 @@ public class NestedLoopJoinNode extends JoinNodeBase { return joinOp == JoinOperator.RIGHT_OUTER_JOIN || joinOp == JoinOperator.RIGHT_ANTI_JOIN || joinOp == JoinOperator.RIGHT_SEMI_JOIN || joinOp == JoinOperator.FULL_OUTER_JOIN; } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, + PlanNode parent, LocalExchangeTypeRequire parentRequire) { + + boolean childUsePoolingScan = fragment.useSerialSource(translatorContext.getConnectContext()) + && ((children.get(0) instanceof ScanNode) || (children.get(1) instanceof ScanNode)); + + LocalExchangeTypeRequire probeSideRequire; + LocalExchangeTypeRequire buildSideRequire; + LocalExchangeType outputType; + if (joinOp == JoinOperator.NULL_AWARE_LEFT_ANTI_JOIN) { + probeSideRequire = buildSideRequire = LocalExchangeTypeRequire.noRequire(); + outputType = LocalExchangeType.NOOP; + } else if (childUsePoolingScan) { + probeSideRequire = LocalExchangeTypeRequire.requireAdaptivePassthrough(); + buildSideRequire = LocalExchangeTypeRequire.requireBroadcast(); + outputType = LocalExchangeType.ADAPTIVE_PASSTHROUGH; + } else { + probeSideRequire = LocalExchangeTypeRequire.requireAdaptivePassthrough(); + buildSideRequire = LocalExchangeTypeRequire.noRequire(); + outputType = LocalExchangeType.ADAPTIVE_PASSTHROUGH; + } + + PlanNode probeSide = children.get(0); + Pair<PlanNode, LocalExchangeType> probeSideOutput = probeSide.enforceAndDeriveLocalExchange( + translatorContext, this, probeSideRequire); + if (!probeSideRequire.satisfy(probeSideOutput.second)) { + LocalExchangeType preferType = probeSideRequire.preferType(); + probeSide = new LocalExchangeNode( + translatorContext.nextPlanNodeId(), probeSide, preferType + ); + } else { + probeSide = probeSideOutput.first; + } + + PlanNode buildSide = children.get(1); + Pair<PlanNode, LocalExchangeType> buildSideOutput = buildSide.enforceAndDeriveLocalExchange( + translatorContext, this, buildSideRequire); + if (!buildSideRequire.satisfy(buildSideOutput.second)) { + LocalExchangeType preferType = buildSideRequire.preferType(); + buildSide = new LocalExchangeNode( + translatorContext.nextPlanNodeId(), buildSide, preferType + ); + } else { + buildSide = buildSideOutput.first; + } + + this.children = Lists.newArrayList(probeSide, buildSide); + return Pair.of(this, outputType); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java index d5fddd65c38..20ae85b6101 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java @@ -53,6 +53,8 @@ import org.apache.doris.common.UserException; import org.apache.doris.common.util.DebugUtil; import org.apache.doris.info.PartitionNamesInfo; import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.planner.normalize.Normalizer; import org.apache.doris.planner.normalize.PartitionRangePredicateNormalizer; import org.apache.doris.qe.ConnectContext; @@ -1049,6 +1051,9 @@ public class OlapScanNode extends ScanNode { if (isPointQuery()) { output.append(prefix).append("SHORT-CIRCUIT\n"); } + if (fragment.useSerialSource(ConnectContext.get())) { + output.append(prefix).append("POOLING-SCAN\n"); + } printNestedColumns(output, prefix, getTupleDesc()); @@ -1384,4 +1389,24 @@ public class OlapScanNode extends ScanNode { } return super.getCatalogId(); } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange( + PlanTranslatorContext translatorContext, PlanNode parent, + LocalExchangeTypeRequire parentRequire) { + boolean poolingScan = fragment.useSerialSource(translatorContext.getConnectContext()); + if (poolingScan) { + LocalExchangeType preferType = parentRequire.noopTo(LocalExchangeType.BUCKET_HASH_SHUFFLE); + + LocalExchangeNode exchangeNode = new LocalExchangeNode( + translatorContext.nextPlanNodeId(), this, preferType + ); + return Pair.of( + exchangeNode, + preferType + ); + } else { + return Pair.of(this, LocalExchangeType.NOOP); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java index 14923c381b8..21e31595a55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/PlanNode.java @@ -32,6 +32,9 @@ import org.apache.doris.common.Pair; import org.apache.doris.common.TreeNode; import org.apache.doris.common.UserException; import org.apache.doris.datasource.iceberg.source.IcebergScanNode; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.planner.normalize.Normalizer; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TAccessPathType; @@ -139,7 +142,8 @@ public abstract class PlanNode extends TreeNode<PlanNode> { protected int nereidsId = -1; - private List<List<Expr>> childrenDistributeExprLists = new ArrayList<>(); + protected List<List<Expr>> childrenDistributeExprLists = new ArrayList<>(); + protected List<Expr> distributeExprLists = new ArrayList<>(); protected PlanNode(PlanNodeId id, List<TupleId> tupleIds, String planNodeName) { this.id = id; @@ -913,4 +917,56 @@ public abstract class PlanNode extends TreeNode<PlanNode> { } return StringUtils.join(mergeDisplayAccessPaths, ", "); } + + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange( + PlanTranslatorContext translatorContext, PlanNode parent, LocalExchangeTypeRequire parentRequire) { + ArrayList<PlanNode> newChildren = Lists.newArrayList(); + for (int i = 0; i < children.size(); i++) { + PlanNode child = children.get(i); + Pair<PlanNode, LocalExchangeType> childOutput + = child.enforceAndDeriveLocalExchange(translatorContext, parent, parentRequire); + newChildren.add(childOutput.first); + } + this.children = newChildren; + return Pair.of(this, LocalExchangeType.NOOP); + } + + protected Pair<PlanNode, LocalExchangeType> enforceChild( + PlanTranslatorContext translatorContext, LocalExchangeTypeRequire requireChild, PlanNode originChild) { + Pair<PlanNode, LocalExchangeType> childOutput + = originChild.enforceAndDeriveLocalExchange(translatorContext, this, requireChild); + if (!requireChild.satisfy(childOutput.second)) { + LocalExchangeType preferType = requireChild.preferType(); + return Pair.of( + new LocalExchangeNode(translatorContext.nextPlanNodeId(), childOutput.first, preferType), + preferType + ); + } else { + return childOutput; + } + } + + protected List<Expr> getChildDistributeExprList(int childIndex) { + if ((childrenDistributeExprLists == null || childrenDistributeExprLists.size() <= childIndex)) { + return null; + } else { + return childrenDistributeExprLists.get(childIndex); + } + } + + public List<List<Expr>> getChildrenDistributeExprLists() { + return childrenDistributeExprLists; + } + + public List<Expr> getDistributeExprLists() { + return distributeExprLists; + } + + public void setDistributeExprLists(List<Expr> distributeExprLists) { + if (distributeExprLists == null) { + this.distributeExprLists = Collections.emptyList(); + } else { + this.distributeExprLists = distributeExprLists; + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java index 2745fcaa135..acb808cef55 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SetOperationNode.java @@ -19,6 +19,11 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.TupleId; +import org.apache.doris.common.Pair; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; +import org.apache.doris.planner.LocalExchangeNode.NoRequire; import org.apache.doris.thrift.TExceptNode; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TExpr; @@ -34,6 +39,7 @@ import org.apache.commons.collections4.CollectionUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; import java.util.List; import java.util.stream.Collectors; @@ -188,4 +194,59 @@ public abstract class SetOperationNode extends PlanNode { public boolean isBucketShuffle() { return distributionMode.equals(DistributionMode.BUCKET_SHUFFLE); } + + public boolean isColocate() { + return isColocate; + } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, + PlanNode parent, LocalExchangeTypeRequire parentRequire) { + if (this instanceof UnionNode) { + ArrayList<PlanNode> newChildren = Lists.newArrayList(); + NoRequire requireChild = LocalExchangeTypeRequire.noRequire(); + for (PlanNode child : children) { + Pair<PlanNode, LocalExchangeType> childOutput + = child.enforceAndDeriveLocalExchange(translatorContext, this, requireChild); + if (!requireChild.satisfy(childOutput.second)) { + LocalExchangeType preferType = requireChild.preferType(); + LocalExchangeNode localExchangeNode + = new LocalExchangeNode(translatorContext.nextPlanNodeId(), child, preferType); + newChildren.add(localExchangeNode); + } else { + newChildren.add(childOutput.first); + } + } + + this.children = newChildren; + return Pair.of(this, LocalExchangeType.NOOP); + } else { + LocalExchangeTypeRequire requireChild; + LocalExchangeType outputType; + if (AddLocalExchange.isColocated(this)) { + requireChild = LocalExchangeTypeRequire.requireBucketHash(); + outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE; + } else { + requireChild = LocalExchangeTypeRequire.requireExecutionHash(); + outputType = LocalExchangeType.GLOBAL_EXECUTION_HASH_SHUFFLE; + } + + ArrayList<PlanNode> newChildren = Lists.newArrayList(); + for (PlanNode child : children) { + Pair<PlanNode, LocalExchangeType> childOutput + = child.enforceAndDeriveLocalExchange(translatorContext, this, requireChild); + if (!requireChild.satisfy(childOutput.second)) { + LocalExchangeType preferType = requireChild.preferType(); + LocalExchangeNode localExchangeNode + = new LocalExchangeNode(translatorContext.nextPlanNodeId(), child, preferType); + newChildren.add(localExchangeNode); + } else { + newChildren.add(childOutput.first); + } + } + + this.children = newChildren; + return Pair.of(this, outputType); + } + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java index a0e65a38011..fe30b55f428 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java +++ b/fe/fe-core/src/main/java/org/apache/doris/planner/SortNode.java @@ -23,6 +23,9 @@ package org.apache.doris.planner; import org.apache.doris.analysis.Expr; import org.apache.doris.analysis.SortInfo; import org.apache.doris.common.Pair; +import org.apache.doris.nereids.glue.translator.PlanTranslatorContext; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeType; +import org.apache.doris.planner.LocalExchangeNode.LocalExchangeTypeRequire; import org.apache.doris.qe.ConnectContext; import org.apache.doris.thrift.TExplainLevel; import org.apache.doris.thrift.TPlanNode; @@ -243,4 +246,38 @@ public class SortNode extends PlanNode { List<Pair<Integer, Integer>> topnFilterTargets) { this.topnFilterTargets = topnFilterTargets; } + + @Override + public Pair<PlanNode, LocalExchangeType> enforceAndDeriveLocalExchange(PlanTranslatorContext translatorContext, + PlanNode parent, LocalExchangeTypeRequire parentRequire) { + + LocalExchangeTypeRequire requireChild; + LocalExchangeType outputType = null; + if (children.get(0) instanceof AnalyticEvalNode) { + if (AddLocalExchange.isColocated(this)) { + requireChild = LocalExchangeTypeRequire.requireBucketHash(); + outputType = LocalExchangeType.BUCKET_HASH_SHUFFLE; + } else { + requireChild = parentRequire.autoHash(); + } + } else if (mergeByexchange) { + requireChild = LocalExchangeTypeRequire.requirePassthrough(); + outputType = LocalExchangeType.PASSTHROUGH; + } else if (fragment.useSerialSource(translatorContext.getConnectContext()) + && children.get(0) instanceof ScanNode) { + requireChild = LocalExchangeTypeRequire.requirePassthrough(); + outputType = LocalExchangeType.PASSTHROUGH; + } else { + requireChild = LocalExchangeTypeRequire.noRequire(); + outputType = LocalExchangeType.NOOP; + } + + Pair<PlanNode, LocalExchangeType> enforceResult + = enforceChild(translatorContext, requireChild, children.get(0)); + this.children = Lists.newArrayList(enforceResult.first); + if (outputType == null) { + outputType = enforceResult.second; + } + return Pair.of(this, outputType); + } } diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java index ed4c5d977f4..16c1aa0aaba 100644 --- a/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java +++ b/fe/fe-core/src/main/java/org/apache/doris/qe/SessionVariable.java @@ -329,6 +329,8 @@ public class SessionVariable implements Serializable, Writable { public static final String ENABLE_LOCAL_SHUFFLE = "enable_local_shuffle"; + public static final String ENABLE_LOCAL_SHUFFLE_PLANNER = "enable_local_shuffle_planner"; + public static final String FORCE_TO_LOCAL_SHUFFLE = "force_to_local_shuffle"; public static final String ENABLE_LOCAL_MERGE_SORT = "enable_local_merge_sort"; @@ -1526,6 +1528,12 @@ public class SessionVariable implements Serializable, Writable { "Whether to enable local shuffle on pipelineX engine."}, needForward = true) private boolean enableLocalShuffle = true; + @VariableMgr.VarAttr( + name = ENABLE_LOCAL_SHUFFLE_PLANNER, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, + description = {"是否在FE规划Local Shuffle", + "Whether to plan local shuffle in frontend"}, needForward = true) + private boolean enableLocalShufflePlanner = true; + @VariableMgr.VarAttr( name = FORCE_TO_LOCAL_SHUFFLE, fuzzy = false, varType = VariableAnnotation.EXPERIMENTAL, description = {"是否在 pipelineX 引擎上强制开启 local shuffle 优化", @@ -4442,6 +4450,14 @@ public class SessionVariable implements Serializable, Writable { this.enableLocalShuffle = enableLocalShuffle; } + public boolean isEnableLocalShufflePlanner() { + return enableLocalShufflePlanner; + } + + public void setEnableLocalShufflePlanner(boolean enableLocalShufflePlanner) { + this.enableLocalShufflePlanner = enableLocalShufflePlanner; + } + public boolean enablePushDownNoGroupAgg() { return enablePushDownNoGroupAgg; } @@ -5274,6 +5290,8 @@ public class SessionVariable implements Serializable, Writable { // Set Iceberg write target file size tResult.setIcebergWriteTargetFileSizeBytes(icebergWriteTargetFileSizeBytes); + tResult.setEnableLocalShufflePlanner(enableLocalShufflePlanner); + return tResult; } diff --git a/gensrc/thrift/PaloInternalService.thrift b/gensrc/thrift/PaloInternalService.thrift index 4ab93195984..8a27dcb59b5 100644 --- a/gensrc/thrift/PaloInternalService.thrift +++ b/gensrc/thrift/PaloInternalService.thrift @@ -436,6 +436,9 @@ struct TQueryOptions { 200: optional bool enable_adjust_conjunct_order_by_cost; + // enable plan local exchange node in fe + 201: optional bool enable_local_shuffle_planner; + // For cloud, to control if the content would be written into file cache // In write path, to control if the content would be written into file cache. // In read path, read from file cache or remote storage when execute query. diff --git a/gensrc/thrift/Partitions.thrift b/gensrc/thrift/Partitions.thrift index 86a2d9be555..a122cae6ac7 100644 --- a/gensrc/thrift/Partitions.thrift +++ b/gensrc/thrift/Partitions.thrift @@ -52,6 +52,39 @@ enum TPartitionType { HIVE_TABLE_SINK_UNPARTITIONED = 8 } +enum TLocalPartitionType { + // used to resume the global hash distribution because other distribution break the global hash distribution, + // such as PASSTHROUGH. and then JoinNode can shuffle data by the same hash distribution. + // + // for example: look here, need resume to GLOBAL_EXECUTION_HASH_SHUFFLE + // ↓ + // Node -> LocalExchangeNode(PASSTHROUGH) → JoinNode → LocalExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) → JoinNode + // ExchangeNode(BROADCAST) ↗ ↑ + // ExchangeNode(GLOBAL_EXECUTION_HASH_SHUFFLE) + GLOBAL_EXECUTION_HASH_SHUFFLE = 0, + // used to rebalance data for rebalance data and add parallelism + // + // for example: look here, need use LOCAL_EXECUTION_HASH_SHUFFLE to rebalance data + // ↓ + // Scan(hash(id)) -> LocalExchangeNode(LOCAL_EXECUTION_HASH_SHUFFLE(id, name)) → AggregationNode(group by(id,name)) + // + // the LOCAL_EXECUTION_HASH_SHUFFLE is necessary because the hash distribution of scan node is based on id, + // but the hash distribution of aggregation node is based on id and name, so we need to rebalance data by both + // id and name to make sure the data with same id and name can be sent to the same instance of aggregation node. + // and we can not use GLOBAL_EXECUTION_HASH_SHUFFLE(id, name) here, because + // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` is used to mapping partial global instance index to local + // instance index, and discard the other backend's instance index, the data not belong to the local instance will be + // discarded, which cause data loss. + LOCAL_EXECUTION_HASH_SHUFFLE = 1, + BUCKET_HASH_SHUFFLE = 2, + // round-robin partition, used to rebalance data for rebalance data and add parallelism + PASSTHROUGH = 3, + ADAPTIVE_PASSTHROUGH = 4, + BROADCAST = 5, + PASS_TO_ONE = 6, + LOCAL_MERGE_SORT = 7 +} + enum TDistributionType { UNPARTITIONED = 0, diff --git a/gensrc/thrift/PlanNodes.thrift b/gensrc/thrift/PlanNodes.thrift index 1369c6e1045..1aa3969d1e2 100644 --- a/gensrc/thrift/PlanNodes.thrift +++ b/gensrc/thrift/PlanNodes.thrift @@ -62,7 +62,8 @@ enum TPlanNodeType { GROUP_COMMIT_SCAN_NODE = 33, MATERIALIZATION_NODE = 34, REC_CTE_NODE = 35, - REC_CTE_SCAN_NODE = 36 + REC_CTE_SCAN_NODE = 36, + LOCAL_EXCHANGE_NODE = 37 } struct TKeyRange { @@ -1287,6 +1288,24 @@ struct TExchangeNode { 4: optional Partitions.TPartitionType partition_type } +struct TLocalExchangeNode { + 1: required Partitions.TLocalPartitionType partition_type + // when partition_type in (GLOBAL_EXECUTION_HASH_SHUFFLE, LOCAL_EXECUTION_HASH_SHUFFLE, BUCKET_HASH_SHUFFLE), + // the distribute_expr_lists is not null, and the legacy `TPlanNode.distribute_expr_lists` is deprecated + // + // the hash computation: + // 1. for BUCKET_HASH_SHUFFLE, use distribution_exprs to compute hash value and mod by + // `TPipelineFragmentParams.num_buckets`, and mapping bucket index to local instance id by + // `TPipelineFragmentParams.bucket_seq_to_instance_idx` + // 2. for LOCAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute hash value and mod by + // `TPipelineFragmentParams.local_params.size`, and backend will mapping instance index to local instance + // by `i -> i`, for example: 1 -> 1, 2 -> 2, ... + // 3. for GLOBAL_EXECUTION_HASH_SHUFFLE, use distribution_exprs to compute hash value and mod by + // `TPipelineFragmentParams.total_instances`, and mapping global instance index to local instance by + // `TPipelineFragmentParams.shuffle_idx_to_instance_idx` + 2: optional list<Exprs.TExpr> distribute_expr_lists +} + struct TOlapRewriteNode { 1: required list<Exprs.TExpr> columns 2: required list<Types.TColumnType> column_types @@ -1503,6 +1522,7 @@ struct TPlanNode { 50: optional list<list<Exprs.TExpr>> distribute_expr_lists 51: optional bool is_serial_operator 52: optional TRecCTEScanNode rec_cte_scan_node + 53: optional TLocalExchangeNode local_exchange_node // projections is final projections, which means projecting into results and materializing them into the output block. 101: optional list<Exprs.TExpr> projections --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
