This is an automated email from the ASF dual-hosted git repository.
yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git
The following commit(s) were added to refs/heads/branch-4.0 by this push:
new 4b3061c2d9f branch-4.0: (query cache) query cache shouldn't be hitted
when session variable changed or use udf (#60315) (#60661)
4b3061c2d9f is described below
commit 4b3061c2d9f0145b58de85e51f4533b62f91c465
Author: 924060929 <[email protected]>
AuthorDate: Fri Feb 13 09:36:06 2026 +0800
branch-4.0: (query cache) query cache shouldn't be hitted when session
variable changed or use udf (#60315) (#60661)
cherry pick from #60315
---
.../glue/translator/PhysicalPlanTranslator.java | 93 +++++++++++++++++++++-
.../org/apache/doris/planner/AggregationNode.java | 10 +++
.../planner/normalize/QueryCacheNormalizer.java | 35 +++++---
.../doris/planner/QueryCacheNormalizerTest.java | 1 +
.../suites/query_p0/cache/query_cache.groovy | 2 +
.../query_p0/cache/query_cache_with_context.groovy | 65 +++++++++++++++
6 files changed, 194 insertions(+), 12 deletions(-)
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
index f150c7f3bf4..0a98be0281b 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/nereids/glue/translator/PhysicalPlanTranslator.java
@@ -39,6 +39,7 @@ import org.apache.doris.analysis.TableRef;
import org.apache.doris.analysis.TableSample;
import org.apache.doris.analysis.TupleDescriptor;
import org.apache.doris.analysis.TupleId;
+import org.apache.doris.catalog.AliasFunction;
import org.apache.doris.catalog.Column;
import org.apache.doris.catalog.Env;
import org.apache.doris.catalog.Function.NullableMode;
@@ -105,9 +106,11 @@ import
org.apache.doris.nereids.trees.expressions.SessionVarGuardExpr;
import org.apache.doris.nereids.trees.expressions.Slot;
import org.apache.doris.nereids.trees.expressions.SlotReference;
import org.apache.doris.nereids.trees.expressions.WindowFrame;
+import org.apache.doris.nereids.trees.expressions.functions.Udf;
import
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
import org.apache.doris.nereids.trees.expressions.functions.agg.AggregateParam;
import
org.apache.doris.nereids.trees.expressions.functions.scalar.GroupingScalarFunction;
+import
org.apache.doris.nereids.trees.expressions.functions.scalar.UniqueFunction;
import org.apache.doris.nereids.trees.plans.AbstractPlan;
import org.apache.doris.nereids.trees.plans.AggMode;
import org.apache.doris.nereids.trees.plans.AggPhase;
@@ -249,7 +252,9 @@ import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
+import java.util.Map.Entry;
import java.util.Objects;
+import java.util.Optional;
import java.util.Set;
import java.util.TreeMap;
import java.util.concurrent.atomic.AtomicBoolean;
@@ -506,7 +511,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
List<Expr> partitionExprs =
olapTableSink.getPartitionExprList().stream()
.map(e -> ExpressionTranslator.translate(e,
context)).collect(Collectors.toList());
Map<Long, Expr> syncMvWhereClauses = new HashMap<>();
- for (Map.Entry<Long, Expression> entry :
olapTableSink.getSyncMvWhereClauses().entrySet()) {
+ for (Entry<Long, Expression> entry :
olapTableSink.getSyncMvWhereClauses().entrySet()) {
syncMvWhereClauses.put(entry.getKey(),
ExpressionTranslator.translate(entry.getValue(), context));
}
OlapTableSink sink;
@@ -1270,6 +1275,11 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
aggregationNode.setCardinality((long)
aggregate.getStats().getRowCount());
}
updateLegacyPlanIdToPhysicalPlan(inputPlanFragment.getPlanRoot(),
aggregate);
+
+ if (ConnectContext.get().getSessionVariable().getEnableQueryCache()) {
+ setQueryCacheCandidate(aggregate, aggregationNode);
+ }
+
return inputPlanFragment;
}
@@ -2420,7 +2430,7 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
context.getTopnFilterContext().translateSource(topN, sortNode);
TopnFilter filter =
context.getTopnFilterContext().getTopnFilter(topN);
List<Pair<Integer, Integer>> targets = new ArrayList<>();
- for (Map.Entry<ScanNode, Expr> entry :
filter.legacyTargets.entrySet()) {
+ for (Entry<ScanNode, Expr> entry :
filter.legacyTargets.entrySet()) {
Set<SlotRef> inputSlots =
entry.getValue().getInputSlotRef();
if (inputSlots.size() != 1) {
LOG.warn("topn filter targets error: " + inputSlots);
@@ -3298,4 +3308,83 @@ public class PhysicalPlanTranslator extends
DefaultPlanVisitor<PlanFragment, Pla
}
return child instanceof PhysicalRelation;
}
+
+ private boolean setQueryCacheCandidate(
+ PhysicalHashAggregate<? extends Plan> aggregate, AggregationNode
aggregationNode) {
+ if (hasUndeterministicExpression(aggregate)) {
+ return false;
+ }
+
+ PlanNode child = aggregationNode.getChild(0);
+ if (child instanceof AggregationNode) {
+ if (((AggregationNode) child).isQueryCacheCandidate()) {
+ aggregationNode.setQueryCacheCandidate(true);
+ return true;
+ }
+ } else if (child instanceof OlapScanNode) {
+ aggregationNode.setQueryCacheCandidate(true);
+ return true;
+ }
+ return false;
+ }
+
+ private boolean hasUndeterministicExpression(Plan plan) {
+ String cacheKey = "hasUndeterministicExpression";
+ Optional<Boolean> hasUndeterministicExpressionCache =
plan.getMutableState(cacheKey);
+ if (hasUndeterministicExpressionCache.isPresent()) {
+ return hasUndeterministicExpressionCache.get();
+ }
+ boolean result;
+ if (plan instanceof PhysicalHashAggregate) {
+ PhysicalHashAggregate<? extends Plan> aggregate =
(PhysicalHashAggregate<? extends Plan>) plan;
+ if (hasUndeterministicExpression(aggregate.getGroupByExpressions())
+ ||
hasUndeterministicExpression(aggregate.getOutputExpressions())) {
+ result = true;
+ } else {
+ result = hasUndeterministicExpression(aggregate.child());
+ }
+ } else if (plan instanceof PhysicalFilter) {
+ PhysicalFilter<? extends Plan> filter = (PhysicalFilter<? extends
Plan>) plan;
+ if (hasUndeterministicExpression(filter.getExpressions())) {
+ result = true;
+ } else {
+ result = hasUndeterministicExpression(filter.child());
+ }
+ } else if (plan instanceof PhysicalProject) {
+ PhysicalProject<? extends Plan> project = (PhysicalProject<?
extends Plan>) plan;
+ if (hasUndeterministicExpression(project.getProjects())) {
+ result = true;
+ } else {
+ result = hasUndeterministicExpression(project.child());
+ }
+ } else if (plan instanceof PhysicalOlapScan) {
+ result = false;
+ } else {
+ // unsupported for query cache
+ result = true;
+ }
+ plan.setMutableState(cacheKey, result);
+ return result;
+ }
+
+ private boolean hasUndeterministicExpression(Collection<? extends
Expression> expressions) {
+ for (Expression groupByExpression : expressions) {
+ if (groupByExpression.containsType(AliasFunction.class, Udf.class,
UniqueFunction.class)) {
+ return true;
+ }
+
+ boolean nonDeterministic = groupByExpression.anyMatch(e -> {
+ if (e instanceof Expression) {
+ if (!((Expression) e).isDeterministic()) {
+ return true;
+ }
+ }
+ return false;
+ });
+ if (nonDeterministic) {
+ return true;
+ }
+ }
+ return false;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
index de190387d59..1f475cc85fe 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/planner/AggregationNode.java
@@ -62,6 +62,8 @@ public class AggregationNode extends PlanNode {
private SortInfo sortByGroupKey;
+ private boolean queryCacheCandidate;
+
/**
* Create an agg node that is not an intermediate node.
* isIntermediate is true if it is a slave node in a 2-part agg plan.
@@ -272,4 +274,12 @@ public class AggregationNode extends PlanNode {
public void setSortByGroupKey(SortInfo sortByGroupKey) {
this.sortByGroupKey = sortByGroupKey;
}
+
+ public boolean isQueryCacheCandidate() {
+ return queryCacheCandidate;
+ }
+
+ public void setQueryCacheCandidate(boolean queryCacheCandidate) {
+ this.queryCacheCandidate = queryCacheCandidate;
+ }
}
diff --git
a/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java
b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java
index 861e7c5e4cd..b74ebd0e7c3 100644
---
a/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java
+++
b/fe/fe-core/src/main/java/org/apache/doris/planner/normalize/QueryCacheNormalizer.java
@@ -30,8 +30,10 @@ import org.apache.doris.planner.OlapScanNode;
import org.apache.doris.planner.PlanFragment;
import org.apache.doris.planner.PlanNode;
import org.apache.doris.qe.ConnectContext;
+import org.apache.doris.qe.SessionVariable;
import org.apache.doris.thrift.TNormalizedPlanNode;
import org.apache.doris.thrift.TQueryCacheParam;
+import org.apache.doris.thrift.TStringLiteral;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableList;
@@ -53,9 +55,9 @@ import java.util.stream.Collectors;
public class QueryCacheNormalizer implements Normalizer {
private final PlanFragment fragment;
private final DescriptorTable descriptorTable;
- private final NormalizedIdGenerator normalizedPlanIds = new
NormalizedIdGenerator();
- private final NormalizedIdGenerator normalizedTupleIds = new
NormalizedIdGenerator();
- private final NormalizedIdGenerator normalizedSlotIds = new
NormalizedIdGenerator();
+ private final NormalizedIdGenerator normalizedPlanIds = new
NormalizedIdGenerator();
+ private final NormalizedIdGenerator normalizedTupleIds = new
NormalizedIdGenerator();
+ private final NormalizedIdGenerator normalizedSlotIds = new
NormalizedIdGenerator();
// result
private final TQueryCacheParam queryCacheParam = new TQueryCacheParam();
@@ -72,7 +74,7 @@ public class QueryCacheNormalizer implements Normalizer {
return Optional.empty();
}
List<TNormalizedPlanNode> normalizedDigestPlans =
normalizePlanTree(context, cachePoint.get());
- byte[] digest = computeDigest(normalizedDigestPlans);
+ byte[] digest = computeDigest(context, normalizedDigestPlans);
return setQueryCacheParam(cachePoint.get(), digest, context);
} catch (Throwable t) {
return Optional.empty();
@@ -90,11 +92,12 @@ public class QueryCacheNormalizer implements Normalizer {
private Optional<TQueryCacheParam> setQueryCacheParam(
CachePoint cachePoint, byte[] digest, ConnectContext context) {
+ SessionVariable sessionVariable = context.getSessionVariable();
queryCacheParam.setNodeId(cachePoint.cacheRoot.getId().asInt());
queryCacheParam.setDigest(digest);
-
queryCacheParam.setForceRefreshQueryCache(context.getSessionVariable().isQueryCacheForceRefresh());
-
queryCacheParam.setEntryMaxBytes(context.getSessionVariable().getQueryCacheEntryMaxBytes());
-
queryCacheParam.setEntryMaxRows(context.getSessionVariable().getQueryCacheEntryMaxRows());
+
queryCacheParam.setForceRefreshQueryCache(sessionVariable.isQueryCacheForceRefresh());
+
queryCacheParam.setEntryMaxBytes(sessionVariable.getQueryCacheEntryMaxBytes());
+
queryCacheParam.setEntryMaxRows(sessionVariable.getQueryCacheEntryMaxRows());
queryCacheParam.setOutputSlotMapping(
cachePoint.cacheRoot.getOutputTupleIds()
@@ -122,11 +125,16 @@ public class QueryCacheNormalizer implements Normalizer {
if (planRoot instanceof AggregationNode) {
PlanNode child = planRoot.getChild(0);
if (child instanceof OlapScanNode) {
- return Optional.of(new CachePoint(planRoot, planRoot));
+ if (((AggregationNode) planRoot).isQueryCacheCandidate()) {
+ return Optional.of(new CachePoint(planRoot, planRoot));
+ }
} else if (child instanceof AggregationNode) {
Optional<CachePoint> childCachePoint =
doComputeCachePoint(child);
if (childCachePoint.isPresent()) {
- return Optional.of(new CachePoint(planRoot, planRoot));
+ if (((AggregationNode) planRoot).isQueryCacheCandidate()) {
+ return Optional.of(new CachePoint(planRoot, planRoot));
+ }
+ return childCachePoint;
}
}
} else if (planRoot instanceof ExchangeNode) {
@@ -151,13 +159,20 @@ public class QueryCacheNormalizer implements Normalizer {
normalizedPlans.add(plan.normalize(this));
}
- public static byte[] computeDigest(List<TNormalizedPlanNode>
normalizedDigestPlans) throws Exception {
+ public static byte[] computeDigest(
+ ConnectContext context, List<TNormalizedPlanNode>
normalizedDigestPlans) throws Exception {
TSerializer serializer = new TSerializer(new
TCompactProtocol.Factory());
MessageDigest digest = MessageDigest.getInstance("SHA-256");
for (TNormalizedPlanNode node : normalizedDigestPlans) {
digest.update(serializer.serialize(node));
}
+
+ StringBuffer variables = new StringBuffer();
+ context.getSessionVariable().readAffectQueryResultVariables((k, v) -> {
+ variables.append(k).append("=").append(v).append("|");
+ });
+ digest.update(serializer.serialize(new
TStringLiteral(variables.toString())));
return digest.digest();
}
diff --git
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
index de0237acd21..5648484f49d 100644
---
a/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
+++
b/fe/fe-core/src/test/java/org/apache/doris/planner/QueryCacheNormalizerTest.java
@@ -113,6 +113,7 @@ public class QueryCacheNormalizerTest extends
TestWithFeService {
createTables(nonPart, part1, part2, multiLeveParts);
connectContext.getSessionVariable().setDisableNereidsRules("PRUNE_EMPTY_PARTITION");
+ connectContext.getSessionVariable().setEnableQueryCache(true);
}
@Test
diff --git a/regression-test/suites/query_p0/cache/query_cache.groovy
b/regression-test/suites/query_p0/cache/query_cache.groovy
index 981b9f7d340..fb15e34a4f5 100644
--- a/regression-test/suites/query_p0/cache/query_cache.groovy
+++ b/regression-test/suites/query_p0/cache/query_cache.groovy
@@ -20,6 +20,8 @@ import java.util.stream.Collectors
suite("query_cache") {
def tableName =
"table_3_undef_partitions2_keys3_properties4_distributed_by53"
+ sql "set enable_sql_cache=false"
+
def test = {
sql "set enable_query_cache=false"
diff --git
a/regression-test/suites/query_p0/cache/query_cache_with_context.groovy
b/regression-test/suites/query_p0/cache/query_cache_with_context.groovy
new file mode 100644
index 00000000000..e96cd0ca78a
--- /dev/null
+++ b/regression-test/suites/query_p0/cache/query_cache_with_context.groovy
@@ -0,0 +1,65 @@
+import java.util.concurrent.atomic.AtomicReference
+
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("query_cache_with_context") {
+ multi_sql """
+ set enable_sql_cache=false;
+ set enable_query_cache=false;
+ drop table if exists query_cache_with_context;
+ create table query_cache_with_context(id int, value decimal(26, 4))
properties('replication_num'='1');
+ insert into query_cache_with_context values(1, 1), (2, 2), (3, 3);
+ set enable_query_cache=true;
+ """
+
+ def getDigest = { def sqlStr ->
+ AtomicReference<String> result = new AtomicReference<>()
+ explain {
+ sql sqlStr
+
+ check {exp ->
+ def digests = exp.split("\n").findAll { line ->
line.contains("DIGEST") }
+ if (!digests.isEmpty()) {
+ result.set(digests.get(0).split(":")[1].trim())
+ }
+ }
+ }
+ return result.get()
+ }
+
+ def test_session_variable_change = {
+ sql "set enable_decimal256=true"
+ def digest1 = getDigest("select id from query_cache_with_context group
by id")
+ sql "set enable_decimal256=false"
+ def digest2 = getDigest("select id from query_cache_with_context group
by id")
+ assertNotEquals(digest1, digest2)
+ }()
+
+ def test_udf_function = {
+ def jarPath =
"""${context.config.suitePath}/javaudf_p0/jars/java-udf-case-jar-with-dependencies.jar"""
+ scp_udf_file_to_all_be(jarPath)
+ sql("DROP FUNCTION IF EXISTS test_udf_with_query_cache(string, int,
int);")
+ sql """ CREATE FUNCTION test_udf_with_query_cache(string, int, int)
RETURNS string PROPERTIES (
+ "file"="file://${jarPath}",
+ "symbol"="org.apache.doris.udf.StringTest",
+ "type"="JAVA_UDF"
+ ); """
+ String digest = getDigest("select test_udf_with_query_cache(id, 2, 3)
from query_cache_with_context group by 1")
+ assertEquals(null, digest)
+ }()
+}
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]