This is an automated email from the ASF dual-hosted git repository.
englefly pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/master by this push:
new ee8d45fe3a1 [opt](nereids) support topn-filter for external table
(#34674)
ee8d45fe3a1 is described below
commit ee8d45fe3a16dd1e812b13b026f7553cf8a1cae3
Author: minghong <[email protected]>
AuthorDate: Wed May 15 10:36:01 2024 +0800
[opt](nereids) support topn-filter for external table (#34674)
* support topn-filter for external table
---
.../apache/doris/datasource/ExternalScanNode.java | 5 ++
.../org/apache/doris/datasource/FileScanNode.java | 9 +++-
.../doris/datasource/es/source/EsScanNode.java | 8 ++++
.../doris/datasource/jdbc/source/JdbcScanNode.java | 8 ++++
.../doris/datasource/odbc/source/OdbcScanNode.java | 8 ++++
.../glue/translator/PhysicalPlanTranslator.java | 34 ++++++++++----
.../doris/nereids/processor/post/TopNScanOpt.java | 54 ++++++++++++----------
.../nereids/processor/post/TopnFilterContext.java | 46 ++++++++++++------
.../org/apache/doris/planner/DataGenScanNode.java | 12 +++--
.../org/apache/doris/planner/MysqlScanNode.java | 8 ++++
.../org/apache/doris/planner/OlapScanNode.java | 37 +--------------
.../org/apache/doris/planner/OriginalPlanner.java | 2 +-
.../java/org/apache/doris/planner/ScanNode.java | 29 ++++++++++++
.../doris/planner/TestExternalTableScanNode.java | 9 ++++
.../main/java/org/apache/doris/qe/Coordinator.java | 6 +--
.../external_table_p0/jdbc/test_jdbc_query_pg.out | 20 ++++++++
.../jdbc/test_jdbc_query_pg.groovy | 4 +-
17 files changed, 203 insertions(+), 96 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
index d41fab5916c..e85fed8b62a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalScanNode.java
@@ -24,6 +24,7 @@ import org.apache.doris.planner.PlanNodeId;
import org.apache.doris.planner.ScanNode;
import org.apache.doris.qe.ConnectContext;
import org.apache.doris.statistics.StatisticalType;
+import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TScanRangeLocations;
import org.apache.logging.log4j.LogManager;
@@ -96,4 +97,8 @@ public abstract class ExternalScanNode extends ScanNode {
public int getNumInstances() {
return scanRangeLocations.size();
}
+
+ protected void toThrift(TPlanNode msg) {
+ super.toThrift(msg);
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
index 227f636e67b..fba97bc5959 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/FileScanNode.java
@@ -55,6 +55,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
+import java.util.stream.Collectors;
/**
* Base class for External File Scan, including external query and load.
@@ -93,6 +94,7 @@ public abstract class FileScanNode extends ExternalScanNode {
fileScanNode.setTableName(desc.getTable().getName());
}
planNode.setFileScanNode(fileScanNode);
+ super.toThrift(planNode);
}
@Override
@@ -167,7 +169,12 @@ public abstract class FileScanNode extends
ExternalScanNode {
}
output.append(String.format("numNodes=%s", numNodes)).append("\n");
output.append(prefix).append(String.format("pushdown agg=%s",
pushDownAggNoGroupingOp)).append("\n");
-
+ if (useTopnFilter()) {
+ String topnFilterSources = String.join(",",
+ topnFilterSortNodes.stream()
+ .map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
+ output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
+ }
return output.toString();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
index 7bc4f9ea1c0..663657b5d95 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/es/source/EsScanNode.java
@@ -67,6 +67,7 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* ScanNode for Elasticsearch.
@@ -183,6 +184,7 @@ public class EsScanNode extends ExternalScanNode {
}
esScanNode.setProperties(properties);
msg.es_scan_node = esScanNode;
+ super.toThrift(msg);
}
// only do partition(es index level) prune
@@ -316,6 +318,12 @@ public class EsScanNode extends ExternalScanNode {
String indexName = table.getIndexName();
String typeName = table.getMappingType();
output.append(prefix).append(String.format("ES index/type: %s/%s",
indexName, typeName)).append("\n");
+ if (useTopnFilter()) {
+ String topnFilterSources = String.join(",",
+ topnFilterSortNodes.stream()
+ .map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
+ output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
+ }
return output.toString();
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
index 58ab0f9d226..cd06a9a0d20 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/jdbc/source/JdbcScanNode.java
@@ -60,6 +60,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
public class JdbcScanNode extends ExternalScanNode {
private static final Logger LOG = LogManager.getLogger(JdbcScanNode.class);
@@ -257,6 +258,12 @@ public class JdbcScanNode extends ExternalScanNode {
output.append(prefix).append("PREDICATES:
").append(expr.toSql()).append("\n");
}
}
+ if (useTopnFilter()) {
+ String topnFilterSources = String.join(",",
+ topnFilterSortNodes.stream()
+ .map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
+ output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
+ }
return output.toString();
}
@@ -308,6 +315,7 @@ public class JdbcScanNode extends ExternalScanNode {
msg.jdbc_scan_node.setQueryString(getJdbcQueryStr());
}
msg.jdbc_scan_node.setTableType(jdbcType);
+ super.toThrift(msg);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
index 9b597dddb54..9b964e41fa9 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/datasource/odbc/source/OdbcScanNode.java
@@ -54,6 +54,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
+import java.util.stream.Collectors;
/**
* Full scan of an ODBC table.
@@ -141,6 +142,12 @@ public class OdbcScanNode extends ExternalScanNode {
Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
output.append(prefix).append("PREDICATES:
").append(expr.toSql()).append("\n");
}
+ if (useTopnFilter()) {
+ String topnFilterSources = String.join(",",
+ topnFilterSortNodes.stream()
+ .map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
+ output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
+ }
return output.toString();
}
@@ -233,6 +240,7 @@ public class OdbcScanNode extends ExternalScanNode {
odbcScanNode.setQueryString(getOdbcQueryStr());
msg.odbc_scan_node = odbcScanNode;
+ super.toThrift(msg);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index b8cae519296..fba5558c462 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -593,6 +593,10 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, finalScanNode,
context)
)
);
+ if (context.getTopnFilterContext().isTopnFilterTarget(fileScan)) {
+ context.getTopnFilterContext().addLegacyTarget(fileScan, scanNode);
+ }
+
Utils.execWithUncheckedException(scanNode::finalizeForNereids);
// Create PlanFragment
DataPartition dataPartition = DataPartition.RANDOM;
@@ -637,6 +641,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, esScanNode, context)
)
);
+ if (context.getTopnFilterContext().isTopnFilterTarget(esScan)) {
+ context.getTopnFilterContext().addLegacyTarget(esScan, esScanNode);
+ }
Utils.execWithUncheckedException(esScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(),
esScanNode, dataPartition);
@@ -661,6 +668,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, jdbcScanNode, context)
)
);
+ if (context.getTopnFilterContext().isTopnFilterTarget(jdbcScan)) {
+ context.getTopnFilterContext().addLegacyTarget(jdbcScan,
jdbcScanNode);
+ }
Utils.execWithUncheckedException(jdbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(),
jdbcScanNode, dataPartition);
@@ -685,6 +695,9 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
expr ->
runtimeFilterGenerator.translateRuntimeFilterTarget(expr, odbcScanNode, context)
)
);
+ if (context.getTopnFilterContext().isTopnFilterTarget(odbcScan)) {
+ context.getTopnFilterContext().addLegacyTarget(odbcScan,
odbcScanNode);
+ }
Utils.execWithUncheckedException(odbcScanNode::finalizeForNereids);
DataPartition dataPartition = DataPartition.RANDOM;
PlanFragment planFragment = new PlanFragment(context.nextFragmentId(),
odbcScanNode, dataPartition);
@@ -763,7 +776,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
);
olapScanNode.setPushDownAggNoGrouping(context.getRelationPushAggOp(olapScan.getRelationId()));
if (context.getTopnFilterContext().isTopnFilterTarget(olapScan)) {
- olapScanNode.setUseTopnOpt(true);
context.getTopnFilterContext().addLegacyTarget(olapScan,
olapScanNode);
}
// TODO: we need to remove all finalizeForNereids
@@ -790,7 +802,6 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
PlanFragment planFragment =
visitPhysicalOlapScan(deferMaterializeOlapScan.getPhysicalOlapScan(), context);
OlapScanNode olapScanNode = (OlapScanNode) planFragment.getPlanRoot();
if
(context.getTopnFilterContext().isTopnFilterTarget(deferMaterializeOlapScan)) {
- olapScanNode.setUseTopnOpt(true);
context.getTopnFilterContext().addLegacyTarget(deferMaterializeOlapScan,
olapScanNode);
}
TupleDescriptor tupleDescriptor =
context.getTupleDesc(olapScanNode.getTupleId());
@@ -2113,11 +2124,14 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
sortNode.setUseTopnOpt(true);
context.getTopnFilterContext().getTargets(topN).forEach(
- olapScan -> {
- Optional<OlapScanNode> legacyScan =
-
context.getTopnFilterContext().getLegacyScanNode(olapScan);
+ relation -> {
+ Optional<ScanNode> legacyScan =
+
context.getTopnFilterContext().getLegacyScanNode(relation);
Preconditions.checkState(legacyScan.isPresent(),
- "cannot find OlapScanNode for topn
filter");
+ "cannot find ScanNode for topn filter:\n"
+ + "relation: %s \n%s",
+ relation.toString(),
+ context.getTopnFilterContext().toString());
legacyScan.get().addTopnFilterSortNode(sortNode);
}
);
@@ -2173,11 +2187,11 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
if (context.getTopnFilterContext().isTopnFilterSource(topN)) {
sortNode.setUseTopnOpt(true);
context.getTopnFilterContext().getTargets(topN).forEach(
- olapScan -> {
- Optional<OlapScanNode> legacyScan =
-
context.getTopnFilterContext().getLegacyScanNode(olapScan);
+ relation -> {
+ Optional<ScanNode> legacyScan =
+
context.getTopnFilterContext().getLegacyScanNode(relation);
Preconditions.checkState(legacyScan.isPresent(),
- "cannot find OlapScanNode for topn
filter");
+ "cannot find ScanNode for topn filter");
legacyScan.get().addTopnFilterSortNode(sortNode);
}
);
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
index a8129639f5e..ec1e52d6426 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopNScanOpt.java
@@ -23,9 +23,14 @@ import
org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.plans.Plan;
import org.apache.doris.nereids.trees.plans.SortPhase;
import org.apache.doris.nereids.trees.plans.algebra.Join;
-import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
-import org.apache.doris.nereids.trees.plans.physical.PhysicalCatalogRelation;
+import
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeOlapScan;
import
org.apache.doris.nereids.trees.plans.physical.PhysicalDeferMaterializeTopN;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalEsScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalFileScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalJdbcScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOdbcScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalOlapScan;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
import org.apache.doris.nereids.trees.plans.physical.PhysicalTopN;
import org.apache.doris.nereids.trees.plans.physical.PhysicalWindow;
import org.apache.doris.qe.ConnectContext;
@@ -41,10 +46,9 @@ import java.util.Optional;
*/
public class TopNScanOpt extends PlanPostProcessor {
-
@Override
public PhysicalTopN<? extends Plan> visitPhysicalTopN(PhysicalTopN<?
extends Plan> topN, CascadesContext ctx) {
- Optional<OlapScan> scanOpt = findScanForTopnFilter(topN);
+ Optional<PhysicalRelation> scanOpt = findScanForTopnFilter(topN);
scanOpt.ifPresent(scan ->
ctx.getTopnFilterContext().addTopnFilter(topN, scan));
topN.child().accept(this, ctx);
return topN;
@@ -53,13 +57,13 @@ public class TopNScanOpt extends PlanPostProcessor {
@Override
public Plan
visitPhysicalDeferMaterializeTopN(PhysicalDeferMaterializeTopN<? extends Plan>
topN,
CascadesContext context) {
- Optional<OlapScan> scanOpt =
findScanForTopnFilter(topN.getPhysicalTopN());
+ Optional<PhysicalRelation> scanOpt =
findScanForTopnFilter(topN.getPhysicalTopN());
scanOpt.ifPresent(scan ->
context.getTopnFilterContext().addTopnFilter(topN, scan));
topN.child().accept(this, context);
return topN;
}
- private Optional<OlapScan> findScanForTopnFilter(PhysicalTopN<? extends
Plan> topN) {
+ private Optional<PhysicalRelation> findScanForTopnFilter(PhysicalTopN<?
extends Plan> topN) {
if (topN.getSortPhase() != SortPhase.LOCAL_SORT) {
return Optional.empty();
}
@@ -92,30 +96,23 @@ public class TopNScanOpt extends PlanPostProcessor {
}
boolean nullsFirst = topN.getOrderKeys().get(0).isNullFirst();
- OlapScan olapScan = findScanNodeBySlotReference(topN, (SlotReference)
firstKey, nullsFirst);
- if (olapScan != null
- && olapScan.getTable().isDupKeysOrMergeOnWrite()
- && olapScan instanceof PhysicalCatalogRelation) {
- return Optional.of(olapScan);
- }
-
- return Optional.empty();
+ return findScanNodeBySlotReference(topN, (SlotReference) firstKey,
nullsFirst);
}
- private OlapScan findScanNodeBySlotReference(Plan root, SlotReference
slot, boolean nullsFirst) {
+ private Optional<PhysicalRelation> findScanNodeBySlotReference(Plan root,
SlotReference slot, boolean nullsFirst) {
if (root instanceof PhysicalWindow) {
- return null;
+ return Optional.empty();
}
- if (root instanceof OlapScan) {
- if (root.getOutputSet().contains(slot)) {
- return (OlapScan) root;
+ if (root instanceof PhysicalRelation) {
+ if (root.getOutputSet().contains(slot) &&
supportPhysicalRelations((PhysicalRelation) root)) {
+ return Optional.of((PhysicalRelation) root);
} else {
- return null;
+ return Optional.empty();
}
}
- OlapScan target = null;
+ Optional<PhysicalRelation> target;
if (root instanceof Join) {
Join join = (Join) root;
if (nullsFirst && join.getJoinType().isOuterJoin()) {
@@ -123,7 +120,7 @@ public class TopNScanOpt extends PlanPostProcessor {
// and to the right child of rightOuterJoin.
// but we have rule to push topn down to the left/right side.
and topn-filter
// will be generated according to the inferred topn node.
- return null;
+ return Optional.empty();
}
// try to push to both left and right child
if (root.child(0).getOutputSet().contains(slot)) {
@@ -139,12 +136,12 @@ public class TopNScanOpt extends PlanPostProcessor {
Plan child = root.child(0);
if (child.getOutputSet().contains(slot)) {
target = findScanNodeBySlotReference(child, slot, nullsFirst);
- if (target != null) {
+ if (target.isPresent()) {
return target;
}
}
}
- return target;
+ return Optional.empty();
}
private long getTopNOptLimitThreshold() {
@@ -153,4 +150,13 @@ public class TopNScanOpt extends PlanPostProcessor {
}
return -1;
}
+
+ private boolean supportPhysicalRelations(PhysicalRelation relation) {
+ return relation instanceof PhysicalOlapScan
+ || relation instanceof PhysicalOdbcScan
+ || relation instanceof PhysicalEsScan
+ || relation instanceof PhysicalFileScan
+ || relation instanceof PhysicalJdbcScan
+ || relation instanceof PhysicalDeferMaterializeOlapScan;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java
index b5f79defef4..6a4fe3123df 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/processor/post/TopnFilterContext.java
@@ -17,9 +17,9 @@
package org.apache.doris.nereids.processor.post;
-import org.apache.doris.nereids.trees.plans.algebra.OlapScan;
import org.apache.doris.nereids.trees.plans.algebra.TopN;
-import org.apache.doris.planner.OlapScanNode;
+import org.apache.doris.nereids.trees.plans.physical.PhysicalRelation;
+import org.apache.doris.planner.ScanNode;
import org.apache.doris.planner.SortNode;
import com.google.common.collect.Lists;
@@ -35,20 +35,20 @@ import java.util.Set;
* topN runtime filter context
*/
public class TopnFilterContext {
- private final Map<TopN, List<OlapScan>> filters = Maps.newHashMap();
+ private final Map<TopN, List<PhysicalRelation>> filters =
Maps.newHashMap();
private final Set<TopN> sources = Sets.newHashSet();
- private final Set<OlapScan> targets = Sets.newHashSet();
- private final Map<OlapScan, OlapScanNode> legacyTargetsMap =
Maps.newHashMap();
+ private final Set<PhysicalRelation> targets = Sets.newHashSet();
+ private final Map<PhysicalRelation, ScanNode> legacyTargetsMap =
Maps.newHashMap();
private final Map<TopN, SortNode> legacySourceMap = Maps.newHashMap();
/**
* add topN filter
*/
- public void addTopnFilter(TopN topn, OlapScan scan) {
+ public void addTopnFilter(TopN topn, PhysicalRelation scan) {
targets.add(scan);
sources.add(topn);
- List<OlapScan> targets = filters.get(topn);
+ List<PhysicalRelation> targets = filters.get(topn);
if (targets == null) {
filters.put(topn, Lists.newArrayList(scan));
} else {
@@ -59,14 +59,14 @@ public class TopnFilterContext {
/**
* find the corresponding sortNode for topn filter
*/
- public Optional<OlapScanNode> getLegacyScanNode(OlapScan scan) {
- return legacyTargetsMap.keySet().contains(scan)
+ public Optional<ScanNode> getLegacyScanNode(PhysicalRelation scan) {
+ return legacyTargetsMap.containsKey(scan)
? Optional.of(legacyTargetsMap.get(scan))
: Optional.empty();
}
public Optional<SortNode> getLegacySortNode(TopN topn) {
- return legacyTargetsMap.keySet().contains(topn)
+ return legacyTargetsMap.containsKey(topn)
? Optional.of(legacySourceMap.get(topn))
: Optional.empty();
}
@@ -75,19 +75,35 @@ public class TopnFilterContext {
return sources.contains(topn);
}
- public boolean isTopnFilterTarget(OlapScan scan) {
- return targets.contains(scan);
+ public boolean isTopnFilterTarget(PhysicalRelation relation) {
+ return targets.contains(relation);
}
public void addLegacySource(TopN topn, SortNode sort) {
legacySourceMap.put(topn, sort);
}
- public void addLegacyTarget(OlapScan olapScan, OlapScanNode legacy) {
- legacyTargetsMap.put(olapScan, legacy);
+ public void addLegacyTarget(PhysicalRelation relation, ScanNode legacy) {
+ legacyTargetsMap.put(relation, legacy);
}
- public List<OlapScan> getTargets(TopN topn) {
+ public List<PhysicalRelation> getTargets(TopN topn) {
return filters.get(topn);
}
+
+ /**
+ * toString
+ */
+ public String toString() {
+ StringBuilder builder = new StringBuilder("TopnFilterContext\n");
+ String indent = " ";
+ String arrow = " -> ";
+ builder.append("filters:\n");
+ for (TopN topn : filters.keySet()) {
+ builder.append(indent).append(topn.toString()).append("\n");
+
builder.append(indent).append(arrow).append(filters.get(topn)).append("\n");
+ }
+ return builder.toString();
+
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
index 14a50160d63..1c760adb94a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/DataGenScanNode.java
@@ -38,6 +38,7 @@ import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import java.util.List;
+import java.util.stream.Collectors;
/**
* This scan node is used for data source generated from memory.
@@ -79,6 +80,7 @@ public class DataGenScanNode extends ExternalScanNode {
dataGenScanNode.setTupleId(desc.getId().asInt());
dataGenScanNode.setFuncName(tvf.getDataGenFunctionName());
msg.data_gen_scan_node = dataGenScanNode;
+ super.toThrift(msg);
}
@Override
@@ -130,11 +132,13 @@ public class DataGenScanNode extends ExternalScanNode {
if (!conjuncts.isEmpty()) {
output.append(prefix).append("predicates:
").append(getExplainString(conjuncts)).append("\n");
}
-
output.append(prefix).append("table value function:
").append(tvf.getDataGenFunctionName()).append("\n");
-
-
-
+ if (useTopnFilter()) {
+ String topnFilterSources = String.join(",",
+ topnFilterSortNodes.stream()
+ .map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
+ output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
+ }
return output.toString();
}
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
index 703566bbc0b..fe75530dae9 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/MysqlScanNode.java
@@ -43,6 +43,7 @@ import org.apache.logging.log4j.Logger;
import java.util.ArrayList;
import java.util.List;
+import java.util.stream.Collectors;
/**
* Full scan of an MySQL table.
@@ -93,6 +94,12 @@ public class MysqlScanNode extends ExternalScanNode {
Expr expr = convertConjunctsToAndCompoundPredicate(conjuncts);
output.append(prefix).append("PREDICATES:
").append(expr.toSql()).append("\n");
}
+ if (useTopnFilter()) {
+ String topnFilterSources = String.join(",",
+ topnFilterSortNodes.stream()
+ .map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
+ output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
+ }
return output.toString();
}
@@ -153,6 +160,7 @@ public class MysqlScanNode extends ExternalScanNode {
protected void toThrift(TPlanNode msg) {
msg.node_type = TPlanNodeType.MYSQL_SCAN_NODE;
msg.mysql_scan_node = new TMySQLScanNode(desc.getId().asInt(),
tblName, columns, filters);
+ super.toThrift(msg);
}
@Override
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
index 6b5128ba5eb..3e32853119b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OlapScanNode.java
@@ -178,12 +178,6 @@ public class OlapScanNode extends ScanNode {
// It's limit for scanner instead of scanNode so we add a new limit.
private long sortLimit = -1;
- // useTopnOpt is equivalent to !topnFilterSortNodes.isEmpty().
- // keep this flag for compatibility.
- private boolean useTopnOpt = false;
- // support multi topn filter
- private final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
-
// List of tablets will be scanned by current olap_scan_node
private ArrayList<Long> scanTabletIds = Lists.newArrayList();
@@ -319,14 +313,6 @@ public class OlapScanNode extends ScanNode {
this.sortLimit = sortLimit;
}
- public boolean getUseTopnOpt() {
- return useTopnOpt;
- }
-
- public void setUseTopnOpt(boolean useTopnOpt) {
- this.useTopnOpt = useTopnOpt;
- }
-
public Collection<Long> getSelectedPartitionIds() {
return selectedPartitionIds;
}
@@ -1347,7 +1333,7 @@ public class OlapScanNode extends ScanNode {
if (sortLimit != -1) {
output.append(prefix).append("SORT LIMIT:
").append(sortLimit).append("\n");
}
- if (useTopnOpt) {
+ if (useTopnFilter()) {
String topnFilterSources = String.join(",",
topnFilterSortNodes.stream()
.map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
@@ -1524,18 +1510,6 @@ public class OlapScanNode extends ScanNode {
if (sortLimit != -1) {
msg.olap_scan_node.setSortLimit(sortLimit);
}
- msg.olap_scan_node.setUseTopnOpt(useTopnOpt);
- List<Integer> topnFilterSourceNodeIds = getTopnFilterSortNodes()
- .stream()
- .map(sortNode -> sortNode.getId().asInt())
- .collect(Collectors.toList());
- if (!topnFilterSourceNodeIds.isEmpty()) {
- if (SessionVariable.enablePipelineEngineX()) {
- msg.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
- } else {
-
msg.olap_scan_node.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
- }
- }
msg.olap_scan_node.setKeyType(olapTable.getKeysType().toThrift());
String tableName = olapTable.getName();
if (selectedIndexId != -1) {
@@ -1556,6 +1530,7 @@ public class OlapScanNode extends ScanNode {
if (shouldColoScan || SessionVariable.enablePipelineEngineX()) {
msg.olap_scan_node.setDistributeColumnIds(new
ArrayList<>(distributionColumnIds));
}
+ super.toThrift(msg);
}
public void collectColumns(Analyzer analyzer, Set<String>
equivalenceColumns, Set<String> unequivalenceColumns) {
@@ -1813,14 +1788,6 @@ public class OlapScanNode extends ScanNode {
return getScanTabletIds().size();
}
- public void addTopnFilterSortNode(SortNode sortNode) {
- topnFilterSortNodes.add(sortNode);
- }
-
- public List<SortNode> getTopnFilterSortNodes() {
- return topnFilterSortNodes;
- }
-
@Override
public int numScanBackends() {
return scanBackendIds.size();
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
index cfce6060542..06644a7ab9d 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/OriginalPlanner.java
@@ -550,7 +550,7 @@ public class OriginalPlanner extends Planner {
OlapScanNode scanNode = (OlapScanNode) child;
if (scanNode.isDupKeysOrMergeOnWrite()) {
sortNode.setUseTopnOpt(true);
- scanNode.setUseTopnOpt(true);
+ // scanNode.setUseTopnOpt(true);
}
}
}
diff --git a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
index b88a2438d9a..c9febfa8047 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/ScanNode.java
@@ -49,11 +49,13 @@ import org.apache.doris.datasource.SplitGenerator;
import org.apache.doris.datasource.SplitSource;
import org.apache.doris.nereids.glue.translator.PlanTranslatorContext;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.rpc.RpcException;
import org.apache.doris.statistics.StatisticalType;
import org.apache.doris.statistics.query.StatsDelta;
import org.apache.doris.system.Backend;
import org.apache.doris.thrift.TNetworkAddress;
+import org.apache.doris.thrift.TPlanNode;
import org.apache.doris.thrift.TScanRange;
import org.apache.doris.thrift.TScanRangeLocation;
import org.apache.doris.thrift.TScanRangeLocations;
@@ -99,6 +101,9 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
// create a mapping between output slot's id and project expr
Map<SlotId, Expr> outputSlotToProjectExpr = new HashMap<>();
+ // support multi topn filter
+ protected final List<SortNode> topnFilterSortNodes = Lists.newArrayList();
+
public ScanNode(PlanNodeId id, TupleDescriptor desc, String planNodeName,
StatisticalType statisticalType) {
super(id, desc.getId().asList(), planNodeName, statisticalType);
this.desc = desc;
@@ -812,4 +817,28 @@ public abstract class ScanNode extends PlanNode implements
SplitGenerator {
scanNode.updateScanRangeVersions(visibleVersionMap);
}
}
+
+ protected void toThrift(TPlanNode msg) {
+ // topn filter
+ if (useTopnFilter() && SessionVariable.enablePipelineEngineX()) {
+ List<Integer> topnFilterSourceNodeIds = getTopnFilterSortNodes()
+ .stream()
+ .map(sortNode -> sortNode.getId().asInt())
+ .collect(Collectors.toList());
+ msg.setTopnFilterSourceNodeIds(topnFilterSourceNodeIds);
+ }
+ }
+
+ public void addTopnFilterSortNode(SortNode sortNode) {
+ topnFilterSortNodes.add(sortNode);
+ }
+
+ public List<SortNode> getTopnFilterSortNodes() {
+ return topnFilterSortNodes;
+ }
+
+ public boolean useTopnFilter() {
+ return !topnFilterSortNodes.isEmpty();
+ }
+
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java
index 3136444725f..3d6461b923c 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/TestExternalTableScanNode.java
@@ -33,6 +33,8 @@ import com.google.common.collect.Lists;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
+import java.util.stream.Collectors;
+
public class TestExternalTableScanNode extends ExternalScanNode {
private static final Logger LOG =
LogManager.getLogger(TestExternalTableScanNode.class);
private String tableName;
@@ -46,6 +48,12 @@ public class TestExternalTableScanNode extends
ExternalScanNode {
public String getNodeExplainString(String prefix, TExplainLevel
detailLevel) {
StringBuilder output = new StringBuilder();
output.append(prefix).append("TABLE: ").append(tableName).append("\n");
+ if (useTopnFilter()) {
+ String topnFilterSources = String.join(",",
+ topnFilterSortNodes.stream()
+ .map(node -> node.getId().asInt() +
"").collect(Collectors.toList()));
+ output.append(prefix).append("TOPN
OPT:").append(topnFilterSources).append("\n");
+ }
return output.toString();
}
@@ -74,6 +82,7 @@ public class TestExternalTableScanNode extends
ExternalScanNode {
msg.test_external_scan_node = new TTestExternalScanNode();
msg.test_external_scan_node.setTupleId(desc.getId().asInt());
msg.test_external_scan_node.setTableName(tableName);
+ super.toThrift(msg);
}
@Override
diff --git a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
index 431e30896eb..a9bcc69c4a6 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/qe/Coordinator.java
@@ -3837,10 +3837,8 @@ public class Coordinator implements CoordInterface {
}
Set<Integer> topnFilterSources = Sets.newLinkedHashSet();
for (ScanNode scanNode : scanNodes) {
- if (scanNode instanceof OlapScanNode) {
- for (SortNode sortNode : ((OlapScanNode)
scanNode).getTopnFilterSortNodes()) {
- topnFilterSources.add(sortNode.getId().asInt());
- }
+ for (SortNode sortNode : scanNode.getTopnFilterSortNodes()) {
+ topnFilterSources.add(sortNode.getId().asInt());
}
}
diff --git a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
index 06b0247814a..ec2058ca46e 100644
--- a/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
+++ b/regression-test/data/external_table_p0/jdbc/test_jdbc_query_pg.out
@@ -1263,9 +1263,29 @@ true
1
-- !sql67 --
+true abc def 2022-10-11 1.234 1 2 0
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 0 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 0
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 1 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 0
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 2 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 0
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 0
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 1
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 0 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 1
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 1 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 1
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 2 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 1
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 1
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 2
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 0 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 2
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 1 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 2
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 2 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 2
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 2
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 0 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 0 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 1 2022-10-22T10:59:59 34.123
true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 1 2022-10-22T10:59:59 34.123
true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 2 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 2 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
+true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
true abc def 2022-10-11 1.234 1 2 3
2022-10-22T10:59:59 34.123 true abc def 2022-10-11 1.234
1 2 3 2022-10-22T10:59:59 34.123
diff --git
a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
index ae2566c445c..492fdeb349b 100644
--- a/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
+++ b/regression-test/suites/external_table_p0/jdbc/test_jdbc_query_pg.groovy
@@ -537,9 +537,9 @@ suite("test_jdbc_query_pg",
"p0,external,pg,external_docker,external_docker_pg")
order_qt_sql63 """ SELECT * FROM (SELECT 1 a) x CROSS JOIN (SELECT 2
b) y """
order_qt_sql65 """ SELECT t.c FROM (SELECT 1) as t1 CROSS JOIN (SELECT
0 AS c UNION ALL SELECT 1) t """
order_qt_sql66 """ SELECT t.c FROM (SELECT 1) as a CROSS JOIN (SELECT
0 AS c UNION ALL SELECT 1) t """
- order_qt_sql67 """ SELECT * FROM (SELECT * FROM $jdbcPg14Table1 ORDER
BY k8 LIMIT 5) a
+ order_qt_sql67 """ SELECT a.*, b.* FROM (SELECT * FROM $jdbcPg14Table1
ORDER BY k8 LIMIT 5) a
JOIN (SELECT * FROM $jdbcPg14Table1 ORDER BY k8
LIMIT 5) b ON 123 = 123
- order by a.k8 desc limit 5"""
+ order by a.k8, b.k8 desc limit 25"""
order_qt_sql68 """ SELECT id, count(1) as c FROM $dorisExTable1 GROUP
BY id
HAVING c IN (select k8 from $jdbcPg14Table1 where
k8 = 2) """
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]