github-actions[bot] commented on code in PR #61183:
URL: https://github.com/apache/doris/pull/61183#discussion_r2910550834


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/jobs/executor/Rewriter.java:
##########
@@ -60,115 +60,7 @@
 import org.apache.doris.nereids.rules.rewrite.CountDistinctRewrite;
 import org.apache.doris.nereids.rules.rewrite.CountLiteralRewrite;
 import org.apache.doris.nereids.rules.rewrite.CreatePartitionTopNFromWindow;
-import 
org.apache.doris.nereids.rules.rewrite.DecomposeRepeatWithPreAggregation;
-import org.apache.doris.nereids.rules.rewrite.DecoupleEncodeDecode;

Review Comment:
   **[Critical / Build Break]** This diff removes ~109 explicit imports (e.g., 
`MergeFilters`, `EliminateSort`, `InferPredicates`, `ReorderJoin`, 
`MergeAggregate`, `EliminateAggCaseWhen`, etc.) that are all still referenced 
by simple name in the body of `Rewriter.java`. No wildcard import replaces 
them. This will cause a compilation failure.
   
   This appears to be an accidental IDE artifact — perhaps an auto-import 
cleanup that went wrong. Please restore all deleted imports and only add the 
new `RewriteSimpleAggToConstantRule` import.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java:
##########
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.stats.SimpleAggCacheMgr;
+import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMax;
+import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMaxKey;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+
+/**
+ * For simple aggregation queries like
+ * 'select count(*), count(not-null column), min(col), max(col) from 
olap_table',
+ * rewrite them to return constants directly from FE metadata, completely 
bypassing BE.
+ *
+ * <p>COUNT uses table.getRowCount().
+ * MIN/MAX uses an async FE-side cache ({@link SimpleAggCacheMgr}) that stores 
exact values
+ * obtained via internal SQL queries, NOT sampled ColumnStatistic.
+ *
+ * <p>Conditions:
+ * 1. DUP_KEYS table only (AGG_KEYS: rowCount inflated; UNIQUE_KEYS: min/max 
may be inaccurate
+ *    in MoW model before compaction merges delete-marked rows)
+ * 2. No GROUP BY
+ * 3. Only COUNT / MIN / MAX aggregate functions
+ * 4. COUNT(col): col must be NOT NULL (so count(col) == rowCount)
+ * 5. MIN/MAX(col): col must be numeric or date type, and not aggregated
+ */
+public class RewriteSimpleAggToConstantRule implements RewriteRuleFactory {
+
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                // pattern: agg -> scan
+                logicalAggregate(logicalOlapScan())
+                        .thenApply(ctx -> {
+                            LogicalAggregate<LogicalOlapScan> agg = ctx.root;
+                            LogicalOlapScan olapScan = agg.child();
+                            return tryRewrite(agg, olapScan, 
ctx.statementContext);
+                        })
+                        .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT),
+                // pattern: agg -> project -> scan
+                logicalAggregate(logicalProject(logicalOlapScan()))
+                        .thenApply(ctx -> {
+                            LogicalAggregate<?> agg = ctx.root;
+                            LogicalOlapScan olapScan = (LogicalOlapScan) 
ctx.root.child().child();
+                            return tryRewrite(agg, olapScan, 
ctx.statementContext);
+                        })
+                        .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT)
+        );
+    }
+
+    private Plan tryRewrite(LogicalAggregate<?> agg, LogicalOlapScan olapScan,
+            StatementContext statementContext) {
+        OlapTable table = olapScan.getTable();
+        long selectedIndex = olapScan.getSelectedIndexId();
+
+        // Condition 1: DUP_KEYS only.
+        // - DUP_KEYS: FE rowCount equals actual count(*); min/max are 
accurate.
+        // - AGG_KEYS: rowCount may be inflated before full compaction.
+        // - UNIQUE_KEYS: in MoW model, min/max may include values from 
delete-marked rows
+        //   not yet compacted, so the result could be inaccurate.
+        if (table.getKeysType() != KeysType.DUP_KEYS) {
+            return null;
+        }
+
+        // Condition 2: No GROUP BY
+        if (!agg.getGroupByExpressions().isEmpty()) {
+            return null;
+        }
+
+        // Condition 3: Only COUNT / MIN / MAX aggregate functions.
+        Set<AggregateFunction> funcs = agg.getAggregateFunctions();
+        if (funcs.isEmpty()) {
+            return null;
+        }
+        for (AggregateFunction func : funcs) {
+            if (!(func instanceof Count) && !(func instanceof Min) && !(func 
instanceof Max)) {
+                return null;
+            }
+        }
+
+        // Try to compute a constant for each output expression.
+        // If ANY one cannot be replaced, we give up the entire rewrite.
+        List<NamedExpression> newOutputExprs = new ArrayList<>();
+        for (NamedExpression outputExpr : agg.getOutputExpressions()) {
+            if (!(outputExpr instanceof Alias)) {
+                // Unexpected: for no-group-by aggregates, outputs should all 
be aliases over agg funcs
+                return null;
+            }
+            Alias alias = (Alias) outputExpr;
+            Expression child = alias.child();
+            if (!(child instanceof AggregateFunction)) {
+                return null;
+            }
+            AggregateFunction func = (AggregateFunction) child;
+            Optional<Literal> constant = tryGetConstant(func, table, 
selectedIndex);
+            if (!constant.isPresent()) {
+                // Cannot replace this agg function — give up
+                return null;
+            }
+            newOutputExprs.add(new Alias(alias.getExprId(), constant.get(), 
alias.getName()));
+        }
+
+        if (newOutputExprs.isEmpty()) {
+            return null;
+        }
+
+        // Build: LogicalProject(constants) -> LogicalOneRowRelation(dummy)
+        // The OneRowRelation provides a single-row source; all real values 
come from the project.
+        LogicalOneRowRelation oneRowRelation = new LogicalOneRowRelation(
+                statementContext.getNextRelationId(),
+                ImmutableList.of(new Alias(new NullLiteral(), "__dummy__")));
+        return new LogicalProject<>(newOutputExprs, oneRowRelation);
+    }
+
+    /**
+     * Try to compute a compile-time constant value for the given aggregate 
function,
+     * using FE-side cached row-counts (for COUNT) or exact min/max cache (for 
MIN/MAX).
+     * All values are obtained via internal SQL queries (SELECT count/min/max),
+     * NOT from BE tablet stats reporting, to avoid delayed-reporting and 
version issues.
+     */
+    private Optional<Literal> tryGetConstant(AggregateFunction func, OlapTable 
table, long selectedIndex) {
+        if (func.isDistinct()) {
+            return Optional.empty();
+        }
+
+        long version = table.getVisibleVersionTime();
+

Review Comment:
   **[Nit]** The variable is named `version` but 
`OlapTable.getVisibleVersionTime()` returns a **millisecond timestamp**, not a 
version number. Consider renaming to `versionTime` or `visibleVersionTimeMs` 
for clarity, both here and in the cache loaders.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java:
##########
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.stats.SimpleAggCacheMgr;
+import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMax;
+import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMaxKey;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+
+/**
+ * For simple aggregation queries like
+ * 'select count(*), count(not-null column), min(col), max(col) from 
olap_table',
+ * rewrite them to return constants directly from FE metadata, completely 
bypassing BE.
+ *
+ * <p>COUNT uses table.getRowCount().
+ * MIN/MAX uses an async FE-side cache ({@link SimpleAggCacheMgr}) that stores 
exact values
+ * obtained via internal SQL queries, NOT sampled ColumnStatistic.
+ *
+ * <p>Conditions:
+ * 1. DUP_KEYS table only (AGG_KEYS: rowCount inflated; UNIQUE_KEYS: min/max 
may be inaccurate
+ *    in MoW model before compaction merges delete-marked rows)
+ * 2. No GROUP BY
+ * 3. Only COUNT / MIN / MAX aggregate functions
+ * 4. COUNT(col): col must be NOT NULL (so count(col) == rowCount)
+ * 5. MIN/MAX(col): col must be numeric or date type, and not aggregated
+ */
+public class RewriteSimpleAggToConstantRule implements RewriteRuleFactory {
+
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                // pattern: agg -> scan
+                logicalAggregate(logicalOlapScan())
+                        .thenApply(ctx -> {
+                            LogicalAggregate<LogicalOlapScan> agg = ctx.root;
+                            LogicalOlapScan olapScan = agg.child();
+                            return tryRewrite(agg, olapScan, 
ctx.statementContext);
+                        })
+                        .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT),
+                // pattern: agg -> project -> scan
+                logicalAggregate(logicalProject(logicalOlapScan()))
+                        .thenApply(ctx -> {
+                            LogicalAggregate<?> agg = ctx.root;
+                            LogicalOlapScan olapScan = (LogicalOlapScan) 
ctx.root.child().child();
+                            return tryRewrite(agg, olapScan, 
ctx.statementContext);
+                        })
+                        .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT)
+        );
+    }
+
+    private Plan tryRewrite(LogicalAggregate<?> agg, LogicalOlapScan olapScan,

Review Comment:
   **[Critical / Data Correctness]** `tryRewrite()` does not check whether the 
`LogicalOlapScan` has:
   - `manuallySpecifiedPartitions` (e.g., `SELECT count(*) FROM t 
PARTITION(p1)`)
   - `manuallySpecifiedTabletIds` (e.g., `SELECT count(*) FROM t TABLET(12345)`)
   - `tableSample` (e.g., `SELECT count(*) FROM t TABLESAMPLE(10 ROWS)`)
   
   The cached values are for the **full table**, so returning them for a 
partition-/tablet-/sample-restricted scan produces wrong results.
   
   WHERE clause filters are safe because they produce a `LogicalFilter` node 
that prevents pattern matching, but the three restrictions above are properties 
of `LogicalOlapScan` itself.
   
   Suggested fix:
   ```java
   // After getting olapScan:
   if (!olapScan.getManuallySpecifiedPartitions().isEmpty()
           || !olapScan.getManuallySpecifiedTabletIds().isEmpty()
           || olapScan.getTableSample() != null) {
       return null;
   }
   ```
   Please also add negative test cases for these scenarios.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java:
##########
@@ -0,0 +1,473 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.stats;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.ResultRow;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Async cache that stores exact MIN/MAX/COUNT values for OlapTable,
+ * used by {@code RewriteSimpleAggToConstantRule} to replace simple
+ * aggregations with constant values.
+ *
+ * <p>MIN/MAX values are obtained by executing
+ * {@code SELECT min(col), max(col) FROM table}, and COUNT values by
+ * {@code SELECT count(*) FROM table}, both as internal SQL queries
+ * inside FE. Results are cached with a version stamp derived from
+ * {@code OlapTable.getVisibleVersionTime()}.
+ * When a caller provides a version newer than the cached version,
+ * the stale entry is evicted and a background reload is triggered.
+ *
+ * <p>Only numeric and date/datetime columns are cached for MIN/MAX;
+ * aggregated columns are skipped.
+ */
+public class SimpleAggCacheMgr {
+
+    // ======================== Public inner types ========================
+
+    /**
+     * Holds exact min and max values for a column as strings.
+     */
+    public static class ColumnMinMax {
+        private final String minValue;
+        private final String maxValue;
+
+        public ColumnMinMax(String minValue, String maxValue) {
+            this.minValue = minValue;
+            this.maxValue = maxValue;
+        }
+
+        public String minValue() {
+            return minValue;
+        }
+
+        public String maxValue() {
+            return maxValue;
+        }
+    }
+
+    /**
+     * Cache key identifying a column by its table ID and column name.
+     */
+    public static final class ColumnMinMaxKey {
+        private final long tableId;
+        private final String columnName;
+
+        public ColumnMinMaxKey(long tableId, String columnName) {
+            this.tableId = tableId;
+            this.columnName = columnName;
+        }
+
+        public long getTableId() {
+            return tableId;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ColumnMinMaxKey)) {
+                return false;
+            }
+            ColumnMinMaxKey that = (ColumnMinMaxKey) o;
+            return tableId == that.tableId && 
columnName.equalsIgnoreCase(that.columnName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, columnName.toLowerCase());
+        }
+
+        @Override
+        public String toString() {
+            return "ColumnMinMaxKey{tableId=" + tableId + ", column=" + 
columnName + "}";
+        }
+    }
+
+    private static class CacheValue {
+        private final ColumnMinMax minMax;
+        private final long version;
+
+        CacheValue(ColumnMinMax minMax, long version) {
+            this.minMax = minMax;
+            this.version = version;
+        }
+
+        ColumnMinMax minMax() {
+            return minMax;
+        }
+
+        long version() {
+            return version;
+        }
+    }
+
+    /**
+     * Cached row count with version stamp.
+     */
+    private static class RowCountValue {
+        private final long rowCount;
+        private final long version;
+
+        RowCountValue(long rowCount, long version) {
+            this.rowCount = rowCount;
+            this.version = version;
+        }
+
+        long rowCount() {
+            return rowCount;
+        }
+
+        long version() {
+            return version;
+        }
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(SimpleAggCacheMgr.class);
+
+    private static volatile SimpleAggCacheMgr INSTANCE;
+    private static volatile SimpleAggCacheMgr TEST_INSTANCE;
+
+    private final AsyncLoadingCache<ColumnMinMaxKey, Optional<CacheValue>> 
cache;
+    private final AsyncLoadingCache<Long, Optional<RowCountValue>> 
rowCountCache;
+
+    /**
+     * Protected no-arg constructor for test subclassing.
+     * Subclasses override {@link #getStats}, {@link #getRowCount}, etc.
+     */
+    protected SimpleAggCacheMgr() {
+        this.cache = null;
+        this.rowCountCache = null;
+    }
+
+    private SimpleAggCacheMgr(ExecutorService executor) {
+        this.cache = Caffeine.newBuilder()
+                .maximumSize(Config.stats_cache_size)
+                .executor(executor)
+                .buildAsync(new CacheLoader());
+        this.rowCountCache = Caffeine.newBuilder()
+                .maximumSize(Config.stats_cache_size)
+                .executor(executor)
+                .buildAsync(new RowCountLoader());
+    }
+
+    private static SimpleAggCacheMgr getInstance() {
+        if (INSTANCE == null) {
+            synchronized (SimpleAggCacheMgr.class) {
+                if (INSTANCE == null) {
+                    ExecutorService executor = 
ThreadPoolManager.newDaemonCacheThreadPool(
+                            4, "simple-agg-cache-pool", true);
+                    INSTANCE = new SimpleAggCacheMgr(executor);
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
+    /**
+     * Returns the singleton instance backed by async-loading cache,
+     * or the test override if one has been set.
+     */
+    public static SimpleAggCacheMgr internalInstance() {
+        SimpleAggCacheMgr test = TEST_INSTANCE;
+        if (test != null) {
+            return test;
+        }
+        return getInstance();
+    }
+
+    /**
+     * Override used only in unit tests to inject a mock implementation.
+     */
+    @VisibleForTesting
+    public static void setTestInstance(SimpleAggCacheMgr instance) {
+        TEST_INSTANCE = instance;
+    }
+
+    /**
+     * Reset the test override so that subsequent calls go back to the real 
cache.
+     */
+    @VisibleForTesting
+    public static void clearTestInstance() {
+        TEST_INSTANCE = null;
+    }
+
+    /**
+     * Get the cached min/max for a column.
+     */
+    public Optional<ColumnMinMax> getStats(ColumnMinMaxKey key, long version) {
+        CompletableFuture<Optional<CacheValue>> future = cache.get(key);
+        if (future.isDone()) {
+            try {
+                Optional<CacheValue> cacheValue = future.get();
+                if (cacheValue.isPresent()) {
+                    CacheValue value = cacheValue.get();
+                    if (value.version() >= version) {
+                        return Optional.of(value.minMax());
+                    }
+                }
+                // Either empty (load failed / version changed during load)
+                // or stale — evict so next call triggers a fresh reload.
+                cache.synchronous().invalidate(key);
+            } catch (Exception e) {
+                LOG.warn("Failed to get MinMax for column: {}, version: {}", 
key, version, e);
+                cache.synchronous().invalidate(key);
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Evict the cached stats for a column, if present. Used when we know the 
data has changed
+     */
+    public void removeStats(ColumnMinMaxKey key) {
+        cache.synchronous().invalidate(key);
+    }
+
+    /**
+     * Get the cached row count for a table.
+     */
+    public OptionalLong getRowCount(long tableId, long version) {
+        CompletableFuture<Optional<RowCountValue>> future = 
rowCountCache.get(tableId);
+        if (future.isDone()) {
+            try {
+                Optional<RowCountValue> cached = future.get();
+                if (cached.isPresent()) {
+                    RowCountValue value = cached.get();
+                    if (value.version() >= version) {
+                        return OptionalLong.of(value.rowCount());
+                    }
+                }
+                // Either empty (load failed / version changed during load)
+                // or stale — evict so next call triggers a fresh reload.
+                rowCountCache.synchronous().invalidate(tableId);
+            } catch (Exception e) {
+                LOG.warn("Failed to get row count for table: {}, version: {}", 
tableId, version, e);
+                rowCountCache.synchronous().invalidate(tableId);
+            }
+        }
+        return OptionalLong.empty();
+    }
+
+    /**
+     * Generate the internal SQL for fetching exact min/max values.
+     */
+    @VisibleForTesting
+    public static String genMinMaxSql(List<String> qualifiers, String 
columnName) {
+        // qualifiers: [catalogName, dbName, tableName]
+        String quotedCol = "`" + StatisticsUtil.escapeColumnName(columnName) + 
"`";
+        String fullTable = "`" + qualifiers.get(0) + "`.`"
+                + qualifiers.get(1) + "`.`"
+                + qualifiers.get(2) + "`";
+        return "SELECT min(" + quotedCol + "), max(" + quotedCol + ") FROM " + 
fullTable;
+    }
+
+    /**
+     * Generate the internal SQL for fetching exact row count.
+     */
+    @VisibleForTesting
+    public static String genCountSql(List<String> qualifiers) {
+        String fullTable = "`" + qualifiers.get(0) + "`.`"
+                + qualifiers.get(1) + "`.`"
+                + qualifiers.get(2) + "`";
+        return "SELECT count(*) FROM " + fullTable;
+    }
+
+    /**
+     * Async cache loader that issues internal SQL queries to compute exact 
min/max.
+     */
+    protected static final class CacheLoader
+            implements AsyncCacheLoader<ColumnMinMaxKey, Optional<CacheValue>> 
{
+
+        @Override
+        public @NonNull CompletableFuture<Optional<CacheValue>> asyncLoad(
+                @NonNull ColumnMinMaxKey key, @NonNull Executor executor) {

Review Comment:
   **[Suggestion / Robustness]** The cache loaders execute `SELECT count(*)` / 
`SELECT min(col), max(col)` as internal SQL, which goes through the full 
Nereids rewrite pipeline — including `RewriteSimpleAggToConstantRule` itself.
   
   Currently this is *accidentally safe* because Caffeine deduplicates 
concurrent loads for the same key, and `isDone()` returns false for the 
in-flight future, so the rule bails out for the recursive internal query. But 
this is fragile and undocumented.
   
   Consider either:
   1. Adding a comment documenting why recursion is safe, or (better)
   2. Disabling the rule for internal queries, e.g., by setting 
`disable_nereids_rules=REWRITE_SIMPLE_AGG_TO_CONSTANT` in the `ConnectContext` 
created by `StatisticsUtil.buildConnectContext()`, or
   3. Checking `connectContext.getSessionVariable().isInternalQuery()` (or 
similar) in the rule itself.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/stats/SimpleAggCacheMgr.java:
##########
@@ -0,0 +1,473 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.stats;
+
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.catalog.TableIf;
+import org.apache.doris.common.Config;
+import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.qe.AutoCloseConnectContext;
+import org.apache.doris.qe.StmtExecutor;
+import org.apache.doris.statistics.ResultRow;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.github.benmanes.caffeine.cache.AsyncCacheLoader;
+import com.github.benmanes.caffeine.cache.AsyncLoadingCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.google.common.annotations.VisibleForTesting;
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+import org.checkerframework.checker.nullness.qual.NonNull;
+
+import java.util.List;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+
+/**
+ * Async cache that stores exact MIN/MAX/COUNT values for OlapTable,
+ * used by {@code RewriteSimpleAggToConstantRule} to replace simple
+ * aggregations with constant values.
+ *
+ * <p>MIN/MAX values are obtained by executing
+ * {@code SELECT min(col), max(col) FROM table}, and COUNT values by
+ * {@code SELECT count(*) FROM table}, both as internal SQL queries
+ * inside FE. Results are cached with a version stamp derived from
+ * {@code OlapTable.getVisibleVersionTime()}.
+ * When a caller provides a version newer than the cached version,
+ * the stale entry is evicted and a background reload is triggered.
+ *
+ * <p>Only numeric and date/datetime columns are cached for MIN/MAX;
+ * aggregated columns are skipped.
+ */
+public class SimpleAggCacheMgr {
+
+    // ======================== Public inner types ========================
+
+    /**
+     * Holds exact min and max values for a column as strings.
+     */
+    public static class ColumnMinMax {
+        private final String minValue;
+        private final String maxValue;
+
+        public ColumnMinMax(String minValue, String maxValue) {
+            this.minValue = minValue;
+            this.maxValue = maxValue;
+        }
+
+        public String minValue() {
+            return minValue;
+        }
+
+        public String maxValue() {
+            return maxValue;
+        }
+    }
+
+    /**
+     * Cache key identifying a column by its table ID and column name.
+     */
+    public static final class ColumnMinMaxKey {
+        private final long tableId;
+        private final String columnName;
+
+        public ColumnMinMaxKey(long tableId, String columnName) {
+            this.tableId = tableId;
+            this.columnName = columnName;
+        }
+
+        public long getTableId() {
+            return tableId;
+        }
+
+        public String getColumnName() {
+            return columnName;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (!(o instanceof ColumnMinMaxKey)) {
+                return false;
+            }
+            ColumnMinMaxKey that = (ColumnMinMaxKey) o;
+            return tableId == that.tableId && 
columnName.equalsIgnoreCase(that.columnName);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(tableId, columnName.toLowerCase());
+        }
+
+        @Override
+        public String toString() {
+            return "ColumnMinMaxKey{tableId=" + tableId + ", column=" + 
columnName + "}";
+        }
+    }
+
+    private static class CacheValue {
+        private final ColumnMinMax minMax;
+        private final long version;
+
+        CacheValue(ColumnMinMax minMax, long version) {
+            this.minMax = minMax;
+            this.version = version;
+        }
+
+        ColumnMinMax minMax() {
+            return minMax;
+        }
+
+        long version() {
+            return version;
+        }
+    }
+
+    /**
+     * Cached row count with version stamp.
+     */
+    private static class RowCountValue {
+        private final long rowCount;
+        private final long version;
+
+        RowCountValue(long rowCount, long version) {
+            this.rowCount = rowCount;
+            this.version = version;
+        }
+
+        long rowCount() {
+            return rowCount;
+        }
+
+        long version() {
+            return version;
+        }
+    }
+
+    private static final Logger LOG = 
LogManager.getLogger(SimpleAggCacheMgr.class);
+
+    private static volatile SimpleAggCacheMgr INSTANCE;
+    private static volatile SimpleAggCacheMgr TEST_INSTANCE;
+
+    private final AsyncLoadingCache<ColumnMinMaxKey, Optional<CacheValue>> 
cache;
+    private final AsyncLoadingCache<Long, Optional<RowCountValue>> 
rowCountCache;
+
+    /**
+     * Protected no-arg constructor for test subclassing.
+     * Subclasses override {@link #getStats}, {@link #getRowCount}, etc.
+     */
+    protected SimpleAggCacheMgr() {
+        this.cache = null;
+        this.rowCountCache = null;
+    }
+
+    private SimpleAggCacheMgr(ExecutorService executor) {
+        this.cache = Caffeine.newBuilder()
+                .maximumSize(Config.stats_cache_size)
+                .executor(executor)
+                .buildAsync(new CacheLoader());
+        this.rowCountCache = Caffeine.newBuilder()
+                .maximumSize(Config.stats_cache_size)
+                .executor(executor)
+                .buildAsync(new RowCountLoader());
+    }
+
+    private static SimpleAggCacheMgr getInstance() {
+        if (INSTANCE == null) {
+            synchronized (SimpleAggCacheMgr.class) {
+                if (INSTANCE == null) {
+                    ExecutorService executor = 
ThreadPoolManager.newDaemonCacheThreadPool(
+                            4, "simple-agg-cache-pool", true);
+                    INSTANCE = new SimpleAggCacheMgr(executor);
+                }
+            }
+        }
+        return INSTANCE;
+    }
+
+    /**
+     * Returns the singleton instance backed by async-loading cache,
+     * or the test override if one has been set.
+     */
+    public static SimpleAggCacheMgr internalInstance() {
+        SimpleAggCacheMgr test = TEST_INSTANCE;
+        if (test != null) {
+            return test;
+        }
+        return getInstance();
+    }
+
+    /**
+     * Override used only in unit tests to inject a mock implementation.
+     */
+    @VisibleForTesting
+    public static void setTestInstance(SimpleAggCacheMgr instance) {
+        TEST_INSTANCE = instance;
+    }
+
+    /**
+     * Reset the test override so that subsequent calls go back to the real 
cache.
+     */
+    @VisibleForTesting
+    public static void clearTestInstance() {
+        TEST_INSTANCE = null;
+    }
+
+    /**
+     * Get the cached min/max for a column.
+     */
+    public Optional<ColumnMinMax> getStats(ColumnMinMaxKey key, long version) {
+        CompletableFuture<Optional<CacheValue>> future = cache.get(key);
+        if (future.isDone()) {
+            try {
+                Optional<CacheValue> cacheValue = future.get();
+                if (cacheValue.isPresent()) {
+                    CacheValue value = cacheValue.get();
+                    if (value.version() >= version) {
+                        return Optional.of(value.minMax());
+                    }
+                }
+                // Either empty (load failed / version changed during load)
+                // or stale — evict so next call triggers a fresh reload.
+                cache.synchronous().invalidate(key);
+            } catch (Exception e) {
+                LOG.warn("Failed to get MinMax for column: {}, version: {}", 
key, version, e);
+                cache.synchronous().invalidate(key);
+            }
+        }
+        return Optional.empty();
+    }
+
+    /**
+     * Evict the cached stats for a column, if present. Used when we know the 
data has changed
+     */
+    public void removeStats(ColumnMinMaxKey key) {
+        cache.synchronous().invalidate(key);
+    }
+
+    /**
+     * Get the cached row count for a table.
+     */
+    public OptionalLong getRowCount(long tableId, long version) {
+        CompletableFuture<Optional<RowCountValue>> future = 
rowCountCache.get(tableId);
+        if (future.isDone()) {
+            try {
+                Optional<RowCountValue> cached = future.get();
+                if (cached.isPresent()) {
+                    RowCountValue value = cached.get();
+                    if (value.version() >= version) {
+                        return OptionalLong.of(value.rowCount());
+                    }
+                }
+                // Either empty (load failed / version changed during load)
+                // or stale — evict so next call triggers a fresh reload.
+                rowCountCache.synchronous().invalidate(tableId);
+            } catch (Exception e) {
+                LOG.warn("Failed to get row count for table: {}, version: {}", 
tableId, version, e);
+                rowCountCache.synchronous().invalidate(tableId);
+            }
+        }
+        return OptionalLong.empty();
+    }
+
+    /**
+     * Generate the internal SQL for fetching exact min/max values.
+     */
+    @VisibleForTesting
+    public static String genMinMaxSql(List<String> qualifiers, String 
columnName) {
+        // qualifiers: [catalogName, dbName, tableName]
+        String quotedCol = "`" + StatisticsUtil.escapeColumnName(columnName) + 
"`";
+        String fullTable = "`" + qualifiers.get(0) + "`.`"
+                + qualifiers.get(1) + "`.`"
+                + qualifiers.get(2) + "`";
+        return "SELECT min(" + quotedCol + "), max(" + quotedCol + ") FROM " + 
fullTable;
+    }
+
+    /**
+     * Generate the internal SQL for fetching exact row count.
+     */
+    @VisibleForTesting
+    public static String genCountSql(List<String> qualifiers) {
+        String fullTable = "`" + qualifiers.get(0) + "`.`"
+                + qualifiers.get(1) + "`.`"
+                + qualifiers.get(2) + "`";
+        return "SELECT count(*) FROM " + fullTable;
+    }
+
+    /**
+     * Async cache loader that issues internal SQL queries to compute exact 
min/max.
+     */
+    protected static final class CacheLoader
+            implements AsyncCacheLoader<ColumnMinMaxKey, Optional<CacheValue>> 
{
+
+        @Override
+        public @NonNull CompletableFuture<Optional<CacheValue>> asyncLoad(
+                @NonNull ColumnMinMaxKey key, @NonNull Executor executor) {
+            return CompletableFuture.supplyAsync(() -> {
+                try {
+                    return doLoad(key);
+                } catch (Exception e) {
+                    LOG.warn("Failed to load MinMax for column: {}", key, e);
+                    return Optional.empty();
+                }
+            }, executor);
+        }
+
+        private Optional<CacheValue> doLoad(ColumnMinMaxKey key) throws 
Exception {
+            // Look up the table by its ID
+            TableIf tableIf = findTableById(key.getTableId());
+            if (!(tableIf instanceof OlapTable)) {
+                return Optional.empty();
+            }
+            OlapTable olapTable = (OlapTable) tableIf;
+
+            // Validate column exists and is eligible
+            Column column = olapTable.getColumn(key.getColumnName());
+            if (column == null) {
+                return Optional.empty();
+            }
+            if (!column.getType().isNumericType() && 
!column.getType().isDateType()) {
+                return Optional.empty();
+            }
+            if (column.isAggregated()) {
+                return Optional.empty();
+            }
+
+            // Use table-level visibleVersionTime, consistent with how the 
caller
+            // (RewriteSimpleAggToConstantRule) obtains the version for 
comparison.
+            long version = olapTable.getVisibleVersionTime();
+
+            // Build and execute internal SQL
+            List<String> qualifiers = olapTable.getFullQualifiers();
+            String sql = genMinMaxSql(qualifiers, column.getName());
+
+            List<ResultRow> rows;
+            try (AutoCloseConnectContext r = 
StatisticsUtil.buildConnectContext(false)) {
+                r.connectContext.getSessionVariable().setPipelineTaskNum("1");
+                StmtExecutor stmtExecutor = new StmtExecutor(r.connectContext, 
sql);
+                rows = stmtExecutor.executeInternalQuery();
+            }
+            if (rows == null || rows.isEmpty()) {
+                return Optional.empty();
+            }
+            ResultRow row = rows.get(0);
+            String minVal = row.get(0);
+            String maxVal = row.get(1);
+            if (minVal == null || maxVal == null) {
+                return Optional.empty();
+            }
+            // Re-check version after query execution to detect concurrent 
data changes.
+            // If the version changed during the query, the result is 
unreliable.
+            long versionAfter = olapTable.getVisibleVersionTime();
+            if (versionAfter != version) {
+                return Optional.empty();
+            }
+            return Optional.of(new CacheValue(new ColumnMinMax(minVal, 
maxVal), version));
+        }
+
+        /**
+         * Look up a table by its table ID across all internal databases.
+         */
+        private static TableIf findTableById(long tableId) {
+            try {

Review Comment:
   **[Performance]** `findTableById()` iterates over **all** database IDs 
(`Env.getCurrentInternalCatalog().getDbIds()`) and queries each one for the 
table. This is O(databases) per cache miss.
   
   Consider using a direct lookup if available, e.g.:
   ```java
   TableIf table = Env.getCurrentInternalCatalog().getTableByTableId(tableId);
   ```
   or caching the database ID in the cache key to avoid the linear scan.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/rewrite/RewriteSimpleAggToConstantRule.java:
##########
@@ -0,0 +1,246 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.rules.rewrite;
+
+import org.apache.doris.analysis.LiteralExpr;
+import org.apache.doris.catalog.Column;
+import org.apache.doris.catalog.KeysType;
+import org.apache.doris.catalog.OlapTable;
+import org.apache.doris.nereids.StatementContext;
+import org.apache.doris.nereids.rules.Rule;
+import org.apache.doris.nereids.rules.RuleType;
+import org.apache.doris.nereids.stats.SimpleAggCacheMgr;
+import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMax;
+import org.apache.doris.nereids.stats.SimpleAggCacheMgr.ColumnMinMaxKey;
+import org.apache.doris.nereids.trees.expressions.Alias;
+import org.apache.doris.nereids.trees.expressions.Expression;
+import org.apache.doris.nereids.trees.expressions.NamedExpression;
+import org.apache.doris.nereids.trees.expressions.SlotReference;
+import 
org.apache.doris.nereids.trees.expressions.functions.agg.AggregateFunction;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Count;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Max;
+import org.apache.doris.nereids.trees.expressions.functions.agg.Min;
+import org.apache.doris.nereids.trees.expressions.literal.BigIntLiteral;
+import org.apache.doris.nereids.trees.expressions.literal.Literal;
+import org.apache.doris.nereids.trees.expressions.literal.NullLiteral;
+import org.apache.doris.nereids.trees.plans.Plan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalAggregate;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOlapScan;
+import org.apache.doris.nereids.trees.plans.logical.LogicalOneRowRelation;
+import org.apache.doris.nereids.trees.plans.logical.LogicalProject;
+import org.apache.doris.statistics.util.StatisticsUtil;
+
+import com.google.common.collect.ImmutableList;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+import java.util.OptionalLong;
+import java.util.Set;
+
+/**
+ * For simple aggregation queries like
+ * 'select count(*), count(not-null column), min(col), max(col) from 
olap_table',
+ * rewrite them to return constants directly from FE metadata, completely 
bypassing BE.
+ *
+ * <p>COUNT uses table.getRowCount().
+ * MIN/MAX uses an async FE-side cache ({@link SimpleAggCacheMgr}) that stores 
exact values
+ * obtained via internal SQL queries, NOT sampled ColumnStatistic.
+ *
+ * <p>Conditions:
+ * 1. DUP_KEYS table only (AGG_KEYS: rowCount inflated; UNIQUE_KEYS: min/max 
may be inaccurate
+ *    in MoW model before compaction merges delete-marked rows)
+ * 2. No GROUP BY
+ * 3. Only COUNT / MIN / MAX aggregate functions
+ * 4. COUNT(col): col must be NOT NULL (so count(col) == rowCount)
+ * 5. MIN/MAX(col): col must be numeric or date type, and not aggregated
+ */
+public class RewriteSimpleAggToConstantRule implements RewriteRuleFactory {
+
+    @Override
+    public List<Rule> buildRules() {
+        return ImmutableList.of(
+                // pattern: agg -> scan
+                logicalAggregate(logicalOlapScan())
+                        .thenApply(ctx -> {
+                            LogicalAggregate<LogicalOlapScan> agg = ctx.root;
+                            LogicalOlapScan olapScan = agg.child();
+                            return tryRewrite(agg, olapScan, 
ctx.statementContext);
+                        })
+                        .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT),
+                // pattern: agg -> project -> scan
+                logicalAggregate(logicalProject(logicalOlapScan()))
+                        .thenApply(ctx -> {
+                            LogicalAggregate<?> agg = ctx.root;
+                            LogicalOlapScan olapScan = (LogicalOlapScan) 
ctx.root.child().child();
+                            return tryRewrite(agg, olapScan, 
ctx.statementContext);
+                        })
+                        .toRule(RuleType.REWRITE_SIMPLE_AGG_TO_CONSTANT)
+        );
+    }
+
+    private Plan tryRewrite(LogicalAggregate<?> agg, LogicalOlapScan olapScan,
+            StatementContext statementContext) {
+        OlapTable table = olapScan.getTable();
+        long selectedIndex = olapScan.getSelectedIndexId();
+

Review Comment:
   **[Minor]** `selectedIndex` is captured here and passed to 
`tryGetConstant()`, but inside that method the parameter is **never read or 
used**. The internal SQL always runs against the base table regardless of 
selected materialized view index.
   
   Either:
   1. Remove the unused parameter, or
   2. If the intent is to restrict rewriting to base-table-only scans, add a 
check:
   ```java
   if (selectedIndex != olapScan.getTable().getBaseIndexId()) {
       return null;
   }
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to