This is an automated email from the ASF dual-hosted git repository.
lijibing pushed a commit to branch high-priority-column
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/high-priority-column by this
push:
new ab5b7c19c58 Collect high priority columns. (#31235)
ab5b7c19c58 is described below
commit ab5b7c19c58241bf240a6f24043740c5c15af045
Author: Jibing-Li <[email protected]>
AuthorDate: Wed Feb 28 11:54:40 2024 +0800
Collect high priority columns. (#31235)
---
.../doris/nereids/jobs/executor/Rewriter.java | 4 +-
.../org/apache/doris/nereids/rules/RuleType.java | 1 +
.../expression/HighPriorityColumnCollector.java | 202 +++++++++++++++++++++
.../apache/doris/statistics/AnalysisManager.java | 44 +++++
.../doris/statistics/HighPriorityColumn.java | 55 ++++++
5 files changed, 305 insertions(+), 1 deletion(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
index e2d386dc910..fdba72fbf6e 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java
@@ -32,6 +32,7 @@ import
org.apache.doris.nereids.rules.expression.CheckLegalityAfterRewrite;
import org.apache.doris.nereids.rules.expression.ExpressionNormalization;
import org.apache.doris.nereids.rules.expression.ExpressionOptimization;
import org.apache.doris.nereids.rules.expression.ExpressionRewrite;
+import org.apache.doris.nereids.rules.expression.HighPriorityColumnCollector;
import org.apache.doris.nereids.rules.rewrite.AddDefaultLimit;
import org.apache.doris.nereids.rules.rewrite.AdjustConjunctsReturnType;
import org.apache.doris.nereids.rules.rewrite.AdjustNullable;
@@ -440,7 +441,8 @@ public class Rewriter extends AbstractBatchJobExecutor {
new CollectFilterAboveConsumer(),
new CollectProjectAboveConsumer()
)
- )
+ ),
+ topic("Collect used column", custom(RuleType.COLLECT_COLUMNS,
HighPriorityColumnCollector::new))
);
private static final List<RewriteJob> WHOLE_TREE_REWRITE_JOBS
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
index 653deb47bdf..898d944fcbb 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/RuleType.java
@@ -295,6 +295,7 @@ public enum RuleType {
LEADING_JOIN(RuleTypeClass.REWRITE),
REWRITE_SENTINEL(RuleTypeClass.REWRITE),
+ COLLECT_COLUMNS(RuleTypeClass.REWRITE),
// topn opts
DEFER_MATERIALIZE_TOP_N_RESULT(RuleTypeClass.REWRITE),
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/HighPriorityColumnCollector.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/HighPriorityColumnCollector.java
new file mode 100644
index 00000000000..ed67ad97005
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/rules/expression/HighPriorityColumnCollector.java
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.expression;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.Env;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.nereids.jobs.JobContext;
+import
org.apache.doris.nereids.rules.expression.HighPriorityColumnCollector.CollectorContext;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalCatalogRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFileScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalFilter;
+import org.apache.doris.nereids.trees.plans.logical.LogicalHaving;
+import org.apache.doris.nereids.trees.plans.logical.LogicalJoin;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.nereids.trees.plans.visitor.CustomRewriter;
+import org.apache.doris.nereids.trees.plans.visitor.DefaultPlanRewriter;
+import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.statistics.AnalysisManager;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.Set;
+import java.util.stream.Collectors;
+
+/**
+ * Used to collect High priority column.
+ */
+public class HighPriorityColumnCollector extends
DefaultPlanRewriter<CollectorContext> implements CustomRewriter {
+
+ @Override
+ public Plan rewriteRoot(Plan plan, JobContext jobContext) {
+ ConnectContext connectContext = ConnectContext.get();
+ if (connectContext != null &&
connectContext.getSessionVariable().internalSession) {
+ return plan;
+ }
+ CollectorContext context = new CollectorContext();
+ plan.accept(this, context);
+ if (StatisticsUtil.enableAutoAnalyze()) {
+ context.queried.removeAll(context.usedInPredicate);
+ AnalysisManager analysisManager =
Env.getCurrentEnv().getAnalysisManager();
+
analysisManager.updateColumnUsedInPredicate(context.usedInPredicate);
+ analysisManager.updateQueriedColumn(context.queried);
+ }
+ return plan;
+ }
+
+ /**
+ * Context.
+ */
+ public static class CollectorContext {
+ public Map<Slot/*project output column*/, NamedExpression/*Actual
project expr*/> projects = new HashMap<>();
+
+ public Set<Slot> usedInPredicate = new HashSet<>();
+
+ public Set<Slot> queried = new HashSet<>();
+ }
+
+ @Override
+ public Plan visitLogicalProject(LogicalProject<? extends Plan> project,
CollectorContext context) {
+ project.child().accept(this, context);
+ List<NamedExpression> projects = project.getOutputs();
+ List<Slot> slots = project.computeOutput();
+ for (int i = 0; i < slots.size(); i++) {
+ context.projects.put(slots.get(i), projects.get(i));
+ }
+ if (project.child() instanceof LogicalCatalogRelation
+ || project.child() instanceof LogicalFilter
+ && ((LogicalFilter) project.child()).child() instanceof
LogicalCatalogRelation) {
+ Set<Slot> allUsed = project.getExpressions()
+ .stream().flatMap(e -> e.<Set<SlotReference>>collect(n ->
n instanceof SlotReference).stream())
+ .collect(Collectors.toSet());
+ LogicalCatalogRelation scan = project.child() instanceof
LogicalCatalogRelation
+ ? (LogicalCatalogRelation) project.child()
+ : (LogicalCatalogRelation) project.child().child(0);
+ List<Slot> outputOfScan = scan.getOutput();
+ for (Slot slot : outputOfScan) {
+ if (!allUsed.contains(slot)) {
+ context.queried.remove(slot);
+ }
+ }
+ }
+ return project;
+ }
+
+ @Override
+ public Plan visitLogicalJoin(LogicalJoin<? extends Plan, ? extends Plan>
join, CollectorContext context) {
+ join.child(0).accept(this, context);
+ join.child(1).accept(this, context);
+ context.usedInPredicate.addAll(
+ (join.isMarkJoin() ? join.getLeftConditionSlot() :
join.getConditionSlot())
+ .stream().flatMap(s -> backtrace(s, context).stream())
+ .collect(Collectors.toSet())
+ );
+ return join;
+ }
+
+ @Override
+ public Plan visitLogicalAggregate(LogicalAggregate<? extends Plan>
aggregate, CollectorContext context) {
+ aggregate.child(0).accept(this, context);
+ context.usedInPredicate.addAll(aggregate.getGroupByExpressions()
+ .stream()
+ .flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof
SlotReference).stream())
+ .flatMap(s -> backtrace(s, context).stream())
+ .collect(Collectors.toSet()));
+ return aggregate;
+ }
+
+ @Override
+ public Plan visitLogicalHaving(LogicalHaving<? extends Plan> having,
CollectorContext context) {
+ having.child(0).accept(this, context);
+ context.usedInPredicate.addAll(
+ having.getExpressions().stream()
+ .flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof
SlotReference).stream())
+ .flatMap(s -> backtrace(s, context).stream())
+ .collect(Collectors.toSet()));
+ return having;
+ }
+
+ @Override
+ public Plan visitLogicalOlapScan(LogicalOlapScan olapScan,
CollectorContext context) {
+ List<Slot> slots = olapScan.getOutput();
+ context.queried.addAll(slots);
+ return olapScan;
+ }
+
+ @Override
+ public Plan visitLogicalFileScan(LogicalFileScan fileScan,
CollectorContext context) {
+ List<Slot> slots = fileScan.getOutput();
+ context.queried.addAll(slots);
+ return fileScan;
+ }
+
+ @Override
+ public Plan visitLogicalFilter(LogicalFilter<? extends Plan> filter,
CollectorContext context) {
+ filter.child(0).accept(this, context);
+ context.usedInPredicate.addAll(filter
+ .getExpressions()
+ .stream()
+ .flatMap(e -> e.<Set<SlotReference>>collect(n -> n instanceof
SlotReference).stream())
+ .flatMap(s -> backtrace(s, context).stream())
+ .collect(Collectors.toSet()));
+ return filter;
+ }
+
+ private Set<Slot> backtrace(Slot slot, CollectorContext context) {
+ return backtrace(slot, new HashSet<>(), context);
+ }
+
+ private Set<Slot> backtrace(Slot slot, Set<Slot> path, CollectorContext
context) {
+ if (path.contains(slot)) {
+ return Collections.emptySet();
+ }
+ path.add(slot);
+ if (slot instanceof SlotReference) {
+ SlotReference slotReference = (SlotReference) slot;
+ Optional<Column> col = slotReference.getColumn();
+ Optional<TableIf> table = slotReference.getTable();
+ if (col.isPresent() && table.isPresent()) {
+ return Collections.singleton(slot);
+ }
+ }
+ NamedExpression namedExpression = context.projects.get(slot);
+ if (namedExpression == null) {
+ return Collections.emptySet();
+ }
+ Set<SlotReference> slotReferences
+ = namedExpression.<Set<SlotReference>>collect(n -> n
instanceof SlotReference);
+ Set<Slot> refCol = new HashSet<>();
+ for (SlotReference slotReference : slotReferences) {
+ refCol.addAll(backtrace(slotReference, path, context));
+ }
+ return refCol;
+ }
+
+}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
index eac50b40757..da80a48081b 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/statistics/AnalysisManager.java
@@ -47,6 +47,8 @@ import org.apache.doris.datasource.CatalogIf;
import org.apache.doris.datasource.ExternalTable;
import org.apache.doris.datasource.hive.HMSExternalTable;
import org.apache.doris.mysql.privilege.PrivPredicate;
+import org.apache.doris.nereids.trees.expressions.Slot;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.persist.AnalyzeDeletionLog;
import org.apache.doris.persist.gson.GsonUtils;
import org.apache.doris.qe.ConnectContext;
@@ -89,9 +91,11 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.NavigableMap;
import java.util.Optional;
+import java.util.Queue;
import java.util.Set;
import java.util.StringJoiner;
import java.util.TreeMap;
+import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
@@ -104,6 +108,16 @@ public class AnalysisManager implements Writable {
private static final Logger LOG =
LogManager.getLogger(AnalysisManager.class);
+ /**
+ * Mem only.
+ */
+ public final Queue<HighPriorityColumn> predicateColumns = new
ArrayBlockingQueue<>(100);
+
+ /**
+ * Mem only.
+ */
+ public final Queue<HighPriorityColumn> queryColumns = new
ArrayBlockingQueue<>(100);
+
// Tracking running manually submitted async tasks, keep in mem only
protected final ConcurrentMap<Long, Map<Long, BaseAnalysisTask>>
analysisJobIdToTaskMap = new ConcurrentHashMap<>();
@@ -1086,4 +1100,34 @@ public class AnalysisManager implements Writable {
}
return false;
}
+
+
+ public void updateColumnUsedInPredicate(Set<Slot> slotReferences) {
+ updateColumn(slotReferences, predicateColumns);
+ }
+
+ public void updateQueriedColumn(Collection<Slot> slotReferences) {
+ updateColumn(slotReferences, queryColumns);
+ }
+
+ protected void updateColumn(Collection<Slot> slotReferences,
Queue<HighPriorityColumn> queue) {
+ for (Slot s : slotReferences) {
+ if (!(s instanceof SlotReference)) {
+ return;
+ }
+ Optional<Column> optionalColumn = ((SlotReference) s).getColumn();
+ Optional<TableIf> optionalTable = ((SlotReference) s).getTable();
+ if (optionalColumn.isPresent() && optionalTable.isPresent()) {
+ TableIf table = optionalTable.get();
+ DatabaseIf database = table.getDatabase();
+ if (database != null) {
+ CatalogIf catalog = database.getCatalog();
+ if (catalog != null) {
+ queue.offer(new HighPriorityColumn(catalog.getId(),
database.getId(),
+ table.getId(),
optionalColumn.get().getName()));
+ }
+ }
+ }
+ }
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
new file mode 100644
index 00000000000..c4bc20c399a
--- /dev/null
+++
b/fe/fe-core/src/main/java/org/apache/doris/statistics/HighPriorityColumn.java
@@ -0,0 +1,55 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.statistics;
+
+import java.util.Objects;
+
+public class HighPriorityColumn {
+
+ public final long catalogId;
+ public final long dbId;
+ public final long tblId;
+ public final String colName;
+
+ public HighPriorityColumn(long catalogId, long dbId, long tblId, String
colName) {
+ this.catalogId = catalogId;
+ this.dbId = dbId;
+ this.tblId = tblId;
+ this.colName = colName;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(catalogId, dbId, tblId, colName);
+ }
+
+ @Override
+ public boolean equals(Object other) {
+ if (this == other) {
+ return true;
+ }
+ if (!(other instanceof HighPriorityColumn)) {
+ return false;
+ }
+ HighPriorityColumn otherCriticalColumn = (HighPriorityColumn) other;
+ return this.catalogId == otherCriticalColumn.catalogId
+ && this.dbId == otherCriticalColumn.dbId
+ && this.tblId == otherCriticalColumn.tblId
+ && this.colName.equals(otherCriticalColumn.colName);
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]