morningman commented on code in PR #54033:
URL: https://github.com/apache/doris/pull/54033#discussion_r2243833209


##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java:
##########
@@ -90,6 +98,190 @@
  */
 public class MaterializedViewUtils {
 
+    public static final String MIN = "min";
+    public static final String MAX = "max";
+    public static final String SUM = "sum";
+
+    /**
+     * paimon mv agg keys and functions
+     */
+    public static class MVAggInfo {
+        public List<String> keys = new ArrayList<>();
+        public Map<String, String> aggFunctions = new HashMap<>();
+    }
+
+    private static class MVAnalysis implements GeneratedPlanPatterns {
+        public static MVAnalysis instance = new MVAnalysis();
+        private static final ImmutableSet<String> 
ALLOW_MERGE_AGGREGATE_FUNCTIONS =
+                ImmutableSet.of(MIN, MAX, SUM);
+
+        /**
+         * Extract aggregation keys and aggregation functions according to the 
execution plan
+         */
+        public MVAggInfo checkAndGetAggInfo(Plan plan) {
+            if (plan instanceof LogicalResultSink) {
+                plan = plan.child(0);
+            }
+            boolean isAggMode = 
logicalAggregate(logicalProject(logicalFileScan()))
+                    .getPattern().matchPlanTree(plan);
+            if (!isAggMode) {
+                throw new RuntimeException(
+                        "mv query must be agg mode and table must a paimon 
table: " + plan);
+            }
+            LogicalAggregate<? extends Plan> aggregate = (LogicalAggregate) 
plan;
+            LogicalProject<? extends Plan> project = (LogicalProject) 
plan.child(0);
+            LogicalFileScan fileScan = (LogicalFileScan) project.child(0);
+
+            if (!(fileScan.getTable() instanceof PaimonExternalTable)) {
+                throw new RuntimeException("table " + fileScan.getTable() + " 
not support incremental mv");
+            }
+
+            // select a + 1 from t group a + 1  ->  a + 1 should has alias
+            project.getProjects().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            // select a, sum(b) from t group a  ->  sum(b) should has alias
+            aggregate.getOutputExpressions().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            PaimonExternalTable paimonExternalTable = (PaimonExternalTable) 
fileScan.getTable();
+
+            Table paimonTable = 
paimonExternalTable.getPaimonTable(Optional.empty());
+            Set<String> primaryKeys = new HashSet<>(paimonTable.primaryKeys());
+            List<Slot> projectInputSlots = 
project.getProjects().stream().map(NamedExpression::getInputSlots)
+                    .flatMap(Collection::stream).collect(Collectors.toList());
+
+            CoreOptions coreOptions = new CoreOptions(paimonTable.options());
+
+            Set<NamedExpression> primaryKeySlots = new HashSet<>();
+            Map<ExprId, String> exprIdToAggFunctionName = new HashMap<>();
+            List<Slot> fileScanOutputs = fileScan.getOutput();
+            for (Slot slot : fileScanOutputs) {
+                String fieldName = slot.getName();
+                if (primaryKeys.contains(fieldName)) {
+                    primaryKeySlots.add(slot);
+                    continue;
+                }
+
+                if (!projectInputSlots.contains(slot)) {
+                    continue;
+                }
+
+                String strAggFunc = coreOptions.fieldAggFunc(fieldName);
+                if (strAggFunc == null || strAggFunc.trim().isEmpty()) {
+                    String errorMsg = String.format(
+                            "paimon merge on doris failed: column %s is not a 
aggregate column!",
+                            fieldName);
+                    throw new RuntimeException(errorMsg);
+                }
+
+                switch (strAggFunc) {
+                    case SUM:
+                        exprIdToAggFunctionName.put(slot.getExprId(), SUM);
+                        break;
+                    case MAX:
+                        exprIdToAggFunctionName.put(slot.getExprId(), MAX);
+                        break;
+                    case MIN:
+                        exprIdToAggFunctionName.put(slot.getExprId(), MIN);
+                        break;
+                    default:
+                        String errorMsg = String.format(
+                                "paimon merge on doris failed: Use unsupported 
aggregation: %s",

Review Comment:
   This is a general class, should not appear certain data source name like 
`paimon`



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -272,6 +285,220 @@ public void run() throws JobException {
         }
     }
 
+    public MTMVDataRefreshExec getMTMVDataRefreshExec() {
+        if (mtmv.getIncrementalRefresh()) {
+            return new MTMVIncrementalDataRefreshExec();
+        } else {
+            return new MTMVFullDataRefreshExec();
+        }
+    }
+
+    public abstract class MTMVDataRefreshExec {
+
+        protected abstract void prepare(MTMVRefreshContext context,
+                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots) throws Exception;
+
+        private void executeWithRetry(Set<String> execPartitionNames, 
Map<TableIf, String> tableWithPartKey)
+                throws Exception {
+            int retryCount = 0;
+            int retryTime = Config.max_query_retry_time;
+            retryTime = retryTime <= 0 ? 1 : retryTime + 1;
+            Exception lastException = null;
+            while (retryCount < retryTime) {
+                try {
+                    exec(execPartitionNames, tableWithPartKey);
+                    break; // Exit loop if execution is successful
+                } catch (Exception e) {
+                    if (!(Config.isCloudMode() && 
e.getMessage().contains(FeConstants.CLOUD_RETRY_E230))) {
+                        throw e; // Re-throw if it's not a retryable exception
+                    }
+                    lastException = e;
+
+                    int randomMillis = 10 + (int) (Math.random() * 10);
+                    if (retryCount > retryTime / 2) {
+                        randomMillis = 20 + (int) (Math.random() * 10);
+                    }
+                    if (DebugPointUtil.isEnable("MTMVTask.retry.longtime")) {
+                        randomMillis = 1000;
+                    }
+
+                    retryCount++;
+                    LOG.warn("Retrying execution due to exception: {}. Attempt 
{}/{}, "
+                            + "taskId {} execPartitionNames {} lastQueryId {}, 
randomMillis {}",
+                            e.getMessage(), retryCount, retryTime, getTaskId(),
+                            execPartitionNames, lastQueryId, randomMillis);
+                    if (retryCount >= retryTime) {
+                        throw new Exception("Max retry attempts reached, 
original: " + lastException);
+                    }
+                    Thread.sleep(randomMillis);
+                }
+            }
+        }
+
+        private void exec(Set<String> refreshPartitionNames,
+                Map<TableIf, String> tableWithPartKey)
+                throws Exception {
+            ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
+            StatementContext statementContext = new StatementContext();
+            for (Entry<MvccTableInfo, MvccSnapshot> entry : 
snapshots.entrySet()) {
+                statementContext.setSnapshot(entry.getKey(), entry.getValue());
+            }
+            ctx.setStatementContext(statementContext);
+            TUniqueId queryId = generateQueryId();
+            lastQueryId = DebugUtil.printId(queryId);
+            // if SELF_MANAGE mv, only have default partition,  will not have 
partitionItem, so we give empty set
+            Command command = getDataRefreshCommand(refreshPartitionNames, 
tableWithPartKey);
+            try {
+                executor = new StmtExecutor(ctx, new 
LogicalPlanAdapter(command, ctx.getStatementContext()));
+                ctx.setExecutor(executor);
+                ctx.setQueryId(queryId);
+                ctx.getState().setNereids(true);
+                command.run(ctx, executor);
+                if (getStatus() == TaskStatus.CANCELED) {
+                    // Throwing an exception to interrupt subsequent partition 
update tasks
+                    throw new JobException("task is CANCELED");
+                }
+                if (ctx.getState().getStateType() != MysqlStateType.OK) {
+                    throw new JobException(ctx.getState().getErrorMessage());
+                }
+            } finally {
+                if (executor != null) {
+                    AuditLogHelper.logAuditLog(ctx, 
getDummyStmt(refreshPartitionNames),
+                            executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(),
+                            true);
+                }
+            }
+        }
+
+        protected abstract Command getDataRefreshCommand(
+                Set<String> refreshPartitionNames, Map<TableIf, String> 
tableWithPartKey) throws UserException;
+
+        public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext 
context)
+                throws AnalysisException {
+            List<String> needRefreshPartitionsByManual = 
calculateNeedRefreshPartitionsByManual(context);
+            if (!needRefreshPartitionsByManual.isEmpty()) {
+                return needRefreshPartitionsByManual;
+            }
+            return calculateNeedRefreshPartitionsInternal(context);
+        }
+
+        protected List<String> 
calculateNeedRefreshPartitionsInternal(MTMVRefreshContext context)
+                throws AnalysisException {
+            // if refreshMethod is COMPLETE, we must FULL refresh, avoid 
external table MTMV always not refresh
+            if (mtmv.getRefreshInfo().getRefreshMethod() == 
RefreshMethod.COMPLETE) {
+                return Lists.newArrayList(mtmv.getPartitionNames());
+            }
+            // check if data is fresh
+            // We need to use a newly generated relationship and cannot 
retrieve it using mtmv.getRelation()
+            // to avoid rebuilding the baseTable and causing a change in the 
tableId
+            boolean fresh = MTMVPartitionUtil.isMTMVSync(context, 
relation.getBaseTablesOneLevel(),
+                    mtmv.getExcludedTriggerTables());
+            if (fresh) {
+                return Lists.newArrayList();
+            }
+            // current, if partitionType is SELF_MANAGE, we can only FULL 
refresh
+            if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.SELF_MANAGE) {
+                return Lists.newArrayList(mtmv.getPartitionNames());
+            }
+            // We need to use a newly generated relationship and cannot 
retrieve it using mtmv.getRelation()
+            // to avoid rebuilding the baseTable and causing a change in the 
tableId
+            return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context, 
relation.getBaseTablesOneLevel());
+        }
+
+        public List<String> 
calculateNeedRefreshPartitionsByManual(MTMVRefreshContext context) {
+            // check whether the user manually triggers it
+            if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
+                if (taskContext.isComplete()) {
+                    return Lists.newArrayList(mtmv.getPartitionNames());
+                } else if 
(!CollectionUtils.isEmpty(taskContext.getPartitions())) {
+                    return taskContext.getPartitions();
+                }
+            }
+            return Lists.newArrayList();
+        }
+    }
+
+    private class MTMVFullDataRefreshExec extends MTMVDataRefreshExec {

Review Comment:
   I think the name `MTMVFullDataRefreshExec` is not suitable. Because 
"partition level refresh" is also a kind of `incremental` refresh. How about 
`PartitionRefresh` and `SnapshotRefresh`?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java:
##########
@@ -90,6 +98,190 @@
  */
 public class MaterializedViewUtils {
 
+    public static final String MIN = "min";
+    public static final String MAX = "max";
+    public static final String SUM = "sum";
+
+    /**
+     * paimon mv agg keys and functions
+     */
+    public static class MVAggInfo {
+        public List<String> keys = new ArrayList<>();
+        public Map<String, String> aggFunctions = new HashMap<>();
+    }
+
+    private static class MVAnalysis implements GeneratedPlanPatterns {
+        public static MVAnalysis instance = new MVAnalysis();
+        private static final ImmutableSet<String> 
ALLOW_MERGE_AGGREGATE_FUNCTIONS =
+                ImmutableSet.of(MIN, MAX, SUM);
+
+        /**
+         * Extract aggregation keys and aggregation functions according to the 
execution plan
+         */
+        public MVAggInfo checkAndGetAggInfo(Plan plan) {

Review Comment:
   Better add a UT for this class to test all situations



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java:
##########
@@ -90,6 +98,190 @@
  */
 public class MaterializedViewUtils {
 
+    public static final String MIN = "min";
+    public static final String MAX = "max";
+    public static final String SUM = "sum";
+
+    /**
+     * paimon mv agg keys and functions
+     */
+    public static class MVAggInfo {
+        public List<String> keys = new ArrayList<>();
+        public Map<String, String> aggFunctions = new HashMap<>();
+    }
+
+    private static class MVAnalysis implements GeneratedPlanPatterns {
+        public static MVAnalysis instance = new MVAnalysis();
+        private static final ImmutableSet<String> 
ALLOW_MERGE_AGGREGATE_FUNCTIONS =
+                ImmutableSet.of(MIN, MAX, SUM);
+
+        /**
+         * Extract aggregation keys and aggregation functions according to the 
execution plan
+         */
+        public MVAggInfo checkAndGetAggInfo(Plan plan) {
+            if (plan instanceof LogicalResultSink) {
+                plan = plan.child(0);
+            }
+            boolean isAggMode = 
logicalAggregate(logicalProject(logicalFileScan()))
+                    .getPattern().matchPlanTree(plan);
+            if (!isAggMode) {
+                throw new RuntimeException(
+                        "mv query must be agg mode and table must a paimon 
table: " + plan);
+            }
+            LogicalAggregate<? extends Plan> aggregate = (LogicalAggregate) 
plan;
+            LogicalProject<? extends Plan> project = (LogicalProject) 
plan.child(0);
+            LogicalFileScan fileScan = (LogicalFileScan) project.child(0);
+
+            if (!(fileScan.getTable() instanceof PaimonExternalTable)) {
+                throw new RuntimeException("table " + fileScan.getTable() + " 
not support incremental mv");
+            }
+
+            // select a + 1 from t group a + 1  ->  a + 1 should has alias
+            project.getProjects().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            // select a, sum(b) from t group a  ->  sum(b) should has alias
+            aggregate.getOutputExpressions().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            PaimonExternalTable paimonExternalTable = (PaimonExternalTable) 
fileScan.getTable();
+
+            Table paimonTable = 
paimonExternalTable.getPaimonTable(Optional.empty());
+            Set<String> primaryKeys = new HashSet<>(paimonTable.primaryKeys());
+            List<Slot> projectInputSlots = 
project.getProjects().stream().map(NamedExpression::getInputSlots)
+                    .flatMap(Collection::stream).collect(Collectors.toList());
+
+            CoreOptions coreOptions = new CoreOptions(paimonTable.options());
+
+            Set<NamedExpression> primaryKeySlots = new HashSet<>();
+            Map<ExprId, String> exprIdToAggFunctionName = new HashMap<>();
+            List<Slot> fileScanOutputs = fileScan.getOutput();
+            for (Slot slot : fileScanOutputs) {
+                String fieldName = slot.getName();
+                if (primaryKeys.contains(fieldName)) {
+                    primaryKeySlots.add(slot);
+                    continue;
+                }
+
+                if (!projectInputSlots.contains(slot)) {
+                    continue;
+                }
+
+                String strAggFunc = coreOptions.fieldAggFunc(fieldName);
+                if (strAggFunc == null || strAggFunc.trim().isEmpty()) {
+                    String errorMsg = String.format(
+                            "paimon merge on doris failed: column %s is not a 
aggregate column!",
+                            fieldName);
+                    throw new RuntimeException(errorMsg);
+                }
+
+                switch (strAggFunc) {
+                    case SUM:
+                        exprIdToAggFunctionName.put(slot.getExprId(), SUM);
+                        break;
+                    case MAX:
+                        exprIdToAggFunctionName.put(slot.getExprId(), MAX);
+                        break;
+                    case MIN:
+                        exprIdToAggFunctionName.put(slot.getExprId(), MIN);
+                        break;
+                    default:
+                        String errorMsg = String.format(
+                                "paimon merge on doris failed: Use unsupported 
aggregation: %s",
+                                strAggFunc);
+                        throw new RuntimeException(errorMsg);
+                }
+            }
+
+            Set<Slot> replacedGroupBySlots = 
PlanUtils.replaceExpressionByProjections(
+                    project.getProjects(), new 
ArrayList<>(aggregate.getGroupByExpressions()))
+                    
.stream().map(Expression::getInputSlots).flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
+
+            if (!new 
HashSet<>(primaryKeySlots).containsAll(replacedGroupBySlots)) {
+                replacedGroupBySlots.removeAll(primaryKeySlots);
+                String errorMsg = String.format(
+                        "paimon merge on doris failed: exist non-primary key 
group columns: %s",

Review Comment:
   ```suggestion
                           "merge on doris failed: exist non-primary key group 
columns: %s",
   ```



##########
regression-test/suites/mtmv_p0/test_paimon_incremental_mtmv.groovy:
##########
@@ -0,0 +1,202 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_paimon_incremental_mtmv", 
"p0,external,mtmv,external_docker,external_docker_doris") {
+    logger.info("start test_paimon_incremental_mtmv")
+    String enabled = context.config.otherConfigs.get("enablePaimonTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disabled paimon test")
+        return
+    }
+    String suiteName = "test_paimon_mtmv"
+    String catalogName = "${suiteName}_catalog"
+    String mvName = "${suiteName}_incremental_mv"
+    String dbName = context.config.getDbNameByFile(context.file)
+    String otherDbName = "${suiteName}_otherdb"
+    String tableName = "${suiteName}_table"
+    String paimonDbName = "test_paimon_spark"
+    String paimonAggTableName = "paimon_incremental_mv_test_agg_table"
+
+    sql """drop database if exists ${otherDbName}"""
+    sql """create database ${otherDbName}"""
+    sql """
+        CREATE TABLE ${otherDbName}.${tableName} (
+          c1 INT,
+          c2 INT
+        ) ENGINE=OLAP
+        PROPERTIES ('replication_num' = '1')
+       """
+
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+
+    sql """DROP CATALOG IF EXISTS ${catalogName}"""
+    sql """CREATE CATALOG ${catalogName} PROPERTIES (
+            'type'='paimon',
+            'warehouse' = 's3://warehouse/wh/',
+            "s3.access_key" = "admin",
+            "s3.secret_key" = "password",
+            "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+            "s3.region" = "us-east-1",
+            "fs.oss.connection.timeout" = "1000",

Review Comment:
   do we need these `fs.oss` params?



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java:
##########
@@ -90,6 +98,190 @@
  */
 public class MaterializedViewUtils {
 
+    public static final String MIN = "min";
+    public static final String MAX = "max";
+    public static final String SUM = "sum";
+
+    /**
+     * paimon mv agg keys and functions
+     */
+    public static class MVAggInfo {
+        public List<String> keys = new ArrayList<>();
+        public Map<String, String> aggFunctions = new HashMap<>();
+    }
+
+    private static class MVAnalysis implements GeneratedPlanPatterns {
+        public static MVAnalysis instance = new MVAnalysis();
+        private static final ImmutableSet<String> 
ALLOW_MERGE_AGGREGATE_FUNCTIONS =
+                ImmutableSet.of(MIN, MAX, SUM);
+
+        /**
+         * Extract aggregation keys and aggregation functions according to the 
execution plan
+         */
+        public MVAggInfo checkAndGetAggInfo(Plan plan) {
+            if (plan instanceof LogicalResultSink) {
+                plan = plan.child(0);
+            }
+            boolean isAggMode = 
logicalAggregate(logicalProject(logicalFileScan()))
+                    .getPattern().matchPlanTree(plan);
+            if (!isAggMode) {
+                throw new RuntimeException(
+                        "mv query must be agg mode and table must a paimon 
table: " + plan);

Review Comment:
   Here you did not check if this is a paimon table, so need to change the 
error message



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/UpdateMvByInrementalCommand.java:
##########
@@ -0,0 +1,37 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands;
+
+import 
org.apache.doris.nereids.trees.plans.commands.insert.InsertIntoTableCommand;
+import org.apache.doris.nereids.trees.plans.logical.LogicalPlan;
+
+import org.apache.logging.log4j.LogManager;
+import org.apache.logging.log4j.Logger;
+
+import java.util.Optional;
+
+/**
+ * Update mv by partition
+ */
+public class UpdateMvByInrementalCommand extends InsertIntoTableCommand {

Review Comment:
   ```suggestion
   public class UpdateMvByIncrementalCommand extends InsertIntoTableCommand {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java:
##########
@@ -283,6 +293,33 @@ public void analyzeQuery(ConnectContext ctx, Map<String, 
String> mvProperties) {
                     (distribution == null || 
CollectionUtils.isEmpty(distribution.getCols())) ? Sets.newHashSet()
                             : Sets.newHashSet(distribution.getCols()),
                     simpleColumnDefinitions, properties);
+            if (this.inremental) {
+                keys = paimonMVAggInfo.keys;
+                if (!keys.isEmpty()) {
+                    keysType = KeysType.AGG_KEYS;
+                }
+                columns = columns.stream()
+                        .map(c -> {
+                            String funcName = 
paimonMVAggInfo.aggFunctions.get(c.getName());
+                            if (funcName != null) {
+                                switch (funcName) {
+                                    case MaterializedViewUtils.MIN:
+                                        c.setAggType(AggregateType.MIN);
+                                        break;
+                                    case MaterializedViewUtils.MAX:
+                                        c.setAggType(AggregateType.MAX);
+                                        break;
+                                    case MaterializedViewUtils.SUM:
+                                        c.setAggType(AggregateType.SUM);
+                                        break;
+                                    default:
+                                        throw new RuntimeException("unsupport 
agg function: " + funcName);

Review Comment:
   ```suggestion
                                           throw new 
RuntimeException("unsupported agg function: " + funcName);
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -272,6 +285,220 @@ public void run() throws JobException {
         }
     }
 
+    public MTMVDataRefreshExec getMTMVDataRefreshExec() {
+        if (mtmv.getIncrementalRefresh()) {
+            return new MTMVIncrementalDataRefreshExec();
+        } else {
+            return new MTMVFullDataRefreshExec();
+        }
+    }
+
+    public abstract class MTMVDataRefreshExec {
+
+        protected abstract void prepare(MTMVRefreshContext context,
+                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots) throws Exception;
+
+        private void executeWithRetry(Set<String> execPartitionNames, 
Map<TableIf, String> tableWithPartKey)
+                throws Exception {
+            int retryCount = 0;
+            int retryTime = Config.max_query_retry_time;
+            retryTime = retryTime <= 0 ? 1 : retryTime + 1;
+            Exception lastException = null;
+            while (retryCount < retryTime) {
+                try {
+                    exec(execPartitionNames, tableWithPartKey);
+                    break; // Exit loop if execution is successful
+                } catch (Exception e) {
+                    if (!(Config.isCloudMode() && 
e.getMessage().contains(FeConstants.CLOUD_RETRY_E230))) {
+                        throw e; // Re-throw if it's not a retryable exception
+                    }
+                    lastException = e;
+
+                    int randomMillis = 10 + (int) (Math.random() * 10);
+                    if (retryCount > retryTime / 2) {
+                        randomMillis = 20 + (int) (Math.random() * 10);
+                    }
+                    if (DebugPointUtil.isEnable("MTMVTask.retry.longtime")) {
+                        randomMillis = 1000;
+                    }
+
+                    retryCount++;
+                    LOG.warn("Retrying execution due to exception: {}. Attempt 
{}/{}, "
+                            + "taskId {} execPartitionNames {} lastQueryId {}, 
randomMillis {}",
+                            e.getMessage(), retryCount, retryTime, getTaskId(),
+                            execPartitionNames, lastQueryId, randomMillis);
+                    if (retryCount >= retryTime) {
+                        throw new Exception("Max retry attempts reached, 
original: " + lastException);
+                    }
+                    Thread.sleep(randomMillis);
+                }
+            }
+        }
+
+        private void exec(Set<String> refreshPartitionNames,
+                Map<TableIf, String> tableWithPartKey)
+                throws Exception {
+            ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
+            StatementContext statementContext = new StatementContext();
+            for (Entry<MvccTableInfo, MvccSnapshot> entry : 
snapshots.entrySet()) {
+                statementContext.setSnapshot(entry.getKey(), entry.getValue());
+            }
+            ctx.setStatementContext(statementContext);
+            TUniqueId queryId = generateQueryId();
+            lastQueryId = DebugUtil.printId(queryId);
+            // if SELF_MANAGE mv, only have default partition,  will not have 
partitionItem, so we give empty set
+            Command command = getDataRefreshCommand(refreshPartitionNames, 
tableWithPartKey);
+            try {
+                executor = new StmtExecutor(ctx, new 
LogicalPlanAdapter(command, ctx.getStatementContext()));
+                ctx.setExecutor(executor);
+                ctx.setQueryId(queryId);
+                ctx.getState().setNereids(true);
+                command.run(ctx, executor);
+                if (getStatus() == TaskStatus.CANCELED) {
+                    // Throwing an exception to interrupt subsequent partition 
update tasks
+                    throw new JobException("task is CANCELED");
+                }
+                if (ctx.getState().getStateType() != MysqlStateType.OK) {
+                    throw new JobException(ctx.getState().getErrorMessage());
+                }
+            } finally {
+                if (executor != null) {
+                    AuditLogHelper.logAuditLog(ctx, 
getDummyStmt(refreshPartitionNames),
+                            executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(),
+                            true);
+                }
+            }
+        }
+
+        protected abstract Command getDataRefreshCommand(
+                Set<String> refreshPartitionNames, Map<TableIf, String> 
tableWithPartKey) throws UserException;
+
+        public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext 
context)
+                throws AnalysisException {
+            List<String> needRefreshPartitionsByManual = 
calculateNeedRefreshPartitionsByManual(context);
+            if (!needRefreshPartitionsByManual.isEmpty()) {
+                return needRefreshPartitionsByManual;
+            }
+            return calculateNeedRefreshPartitionsInternal(context);
+        }
+
+        protected List<String> 
calculateNeedRefreshPartitionsInternal(MTMVRefreshContext context)
+                throws AnalysisException {
+            // if refreshMethod is COMPLETE, we must FULL refresh, avoid 
external table MTMV always not refresh
+            if (mtmv.getRefreshInfo().getRefreshMethod() == 
RefreshMethod.COMPLETE) {
+                return Lists.newArrayList(mtmv.getPartitionNames());
+            }
+            // check if data is fresh
+            // We need to use a newly generated relationship and cannot 
retrieve it using mtmv.getRelation()
+            // to avoid rebuilding the baseTable and causing a change in the 
tableId
+            boolean fresh = MTMVPartitionUtil.isMTMVSync(context, 
relation.getBaseTablesOneLevel(),
+                    mtmv.getExcludedTriggerTables());
+            if (fresh) {
+                return Lists.newArrayList();
+            }
+            // current, if partitionType is SELF_MANAGE, we can only FULL 
refresh
+            if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.SELF_MANAGE) {
+                return Lists.newArrayList(mtmv.getPartitionNames());
+            }
+            // We need to use a newly generated relationship and cannot 
retrieve it using mtmv.getRelation()
+            // to avoid rebuilding the baseTable and causing a change in the 
tableId
+            return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context, 
relation.getBaseTablesOneLevel());
+        }
+
+        public List<String> 
calculateNeedRefreshPartitionsByManual(MTMVRefreshContext context) {
+            // check whether the user manually triggers it
+            if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
+                if (taskContext.isComplete()) {
+                    return Lists.newArrayList(mtmv.getPartitionNames());
+                } else if 
(!CollectionUtils.isEmpty(taskContext.getPartitions())) {
+                    return taskContext.getPartitions();
+                }
+            }
+            return Lists.newArrayList();
+        }
+    }
+
+    private class MTMVFullDataRefreshExec extends MTMVDataRefreshExec {
+
+        @Override
+        protected void prepare(MTMVRefreshContext context,
+                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots) {}
+
+        @Override
+        protected Command getDataRefreshCommand(
+                Set<String> refreshPartitionNames, Map<TableIf, String> 
tableWithPartKey) throws UserException {
+            return UpdateMvUtils
+                    .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() 
!= MTMVPartitionType.SELF_MANAGE
+                            ? refreshPartitionNames : Sets.newHashSet(), 
tableWithPartKey, false, new HashMap<>());
+        }
+
+    }
+
+    private class MTMVIncrementalDataRefreshExec extends MTMVDataRefreshExec {
+        private boolean incremental;
+        private long tableSnapshotId;
+        private long mvSnapshotId;
+        private Map<String, String> params;
+
+        public void prepare(MTMVRefreshContext context,
+                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots) throws Exception {
+            params = new HashMap<>();
+            Set<BaseTableInfo> baseTables = relation.getBaseTablesOneLevel();
+            if (baseTables.size() != 1) {
+                throw new JobException("Only support incremental refresh for 
single table MTMV");

Review Comment:
   ```suggestion
                   throw new JobException("Only support incremental refresh for 
single table mv");
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/job/extensions/mtmv/MTMVTask.java:
##########
@@ -272,6 +285,220 @@ public void run() throws JobException {
         }
     }
 
+    public MTMVDataRefreshExec getMTMVDataRefreshExec() {
+        if (mtmv.getIncrementalRefresh()) {
+            return new MTMVIncrementalDataRefreshExec();
+        } else {
+            return new MTMVFullDataRefreshExec();
+        }
+    }
+
+    public abstract class MTMVDataRefreshExec {
+
+        protected abstract void prepare(MTMVRefreshContext context,
+                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots) throws Exception;
+
+        private void executeWithRetry(Set<String> execPartitionNames, 
Map<TableIf, String> tableWithPartKey)
+                throws Exception {
+            int retryCount = 0;
+            int retryTime = Config.max_query_retry_time;
+            retryTime = retryTime <= 0 ? 1 : retryTime + 1;
+            Exception lastException = null;
+            while (retryCount < retryTime) {
+                try {
+                    exec(execPartitionNames, tableWithPartKey);
+                    break; // Exit loop if execution is successful
+                } catch (Exception e) {
+                    if (!(Config.isCloudMode() && 
e.getMessage().contains(FeConstants.CLOUD_RETRY_E230))) {
+                        throw e; // Re-throw if it's not a retryable exception
+                    }
+                    lastException = e;
+
+                    int randomMillis = 10 + (int) (Math.random() * 10);
+                    if (retryCount > retryTime / 2) {
+                        randomMillis = 20 + (int) (Math.random() * 10);
+                    }
+                    if (DebugPointUtil.isEnable("MTMVTask.retry.longtime")) {
+                        randomMillis = 1000;
+                    }
+
+                    retryCount++;
+                    LOG.warn("Retrying execution due to exception: {}. Attempt 
{}/{}, "
+                            + "taskId {} execPartitionNames {} lastQueryId {}, 
randomMillis {}",
+                            e.getMessage(), retryCount, retryTime, getTaskId(),
+                            execPartitionNames, lastQueryId, randomMillis);
+                    if (retryCount >= retryTime) {
+                        throw new Exception("Max retry attempts reached, 
original: " + lastException);
+                    }
+                    Thread.sleep(randomMillis);
+                }
+            }
+        }
+
+        private void exec(Set<String> refreshPartitionNames,
+                Map<TableIf, String> tableWithPartKey)
+                throws Exception {
+            ConnectContext ctx = MTMVPlanUtil.createMTMVContext(mtmv);
+            StatementContext statementContext = new StatementContext();
+            for (Entry<MvccTableInfo, MvccSnapshot> entry : 
snapshots.entrySet()) {
+                statementContext.setSnapshot(entry.getKey(), entry.getValue());
+            }
+            ctx.setStatementContext(statementContext);
+            TUniqueId queryId = generateQueryId();
+            lastQueryId = DebugUtil.printId(queryId);
+            // if SELF_MANAGE mv, only have default partition,  will not have 
partitionItem, so we give empty set
+            Command command = getDataRefreshCommand(refreshPartitionNames, 
tableWithPartKey);
+            try {
+                executor = new StmtExecutor(ctx, new 
LogicalPlanAdapter(command, ctx.getStatementContext()));
+                ctx.setExecutor(executor);
+                ctx.setQueryId(queryId);
+                ctx.getState().setNereids(true);
+                command.run(ctx, executor);
+                if (getStatus() == TaskStatus.CANCELED) {
+                    // Throwing an exception to interrupt subsequent partition 
update tasks
+                    throw new JobException("task is CANCELED");
+                }
+                if (ctx.getState().getStateType() != MysqlStateType.OK) {
+                    throw new JobException(ctx.getState().getErrorMessage());
+                }
+            } finally {
+                if (executor != null) {
+                    AuditLogHelper.logAuditLog(ctx, 
getDummyStmt(refreshPartitionNames),
+                            executor.getParsedStmt(), 
executor.getQueryStatisticsForAuditLog(),
+                            true);
+                }
+            }
+        }
+
+        protected abstract Command getDataRefreshCommand(
+                Set<String> refreshPartitionNames, Map<TableIf, String> 
tableWithPartKey) throws UserException;
+
+        public List<String> calculateNeedRefreshPartitions(MTMVRefreshContext 
context)
+                throws AnalysisException {
+            List<String> needRefreshPartitionsByManual = 
calculateNeedRefreshPartitionsByManual(context);
+            if (!needRefreshPartitionsByManual.isEmpty()) {
+                return needRefreshPartitionsByManual;
+            }
+            return calculateNeedRefreshPartitionsInternal(context);
+        }
+
+        protected List<String> 
calculateNeedRefreshPartitionsInternal(MTMVRefreshContext context)
+                throws AnalysisException {
+            // if refreshMethod is COMPLETE, we must FULL refresh, avoid 
external table MTMV always not refresh
+            if (mtmv.getRefreshInfo().getRefreshMethod() == 
RefreshMethod.COMPLETE) {
+                return Lists.newArrayList(mtmv.getPartitionNames());
+            }
+            // check if data is fresh
+            // We need to use a newly generated relationship and cannot 
retrieve it using mtmv.getRelation()
+            // to avoid rebuilding the baseTable and causing a change in the 
tableId
+            boolean fresh = MTMVPartitionUtil.isMTMVSync(context, 
relation.getBaseTablesOneLevel(),
+                    mtmv.getExcludedTriggerTables());
+            if (fresh) {
+                return Lists.newArrayList();
+            }
+            // current, if partitionType is SELF_MANAGE, we can only FULL 
refresh
+            if (mtmv.getMvPartitionInfo().getPartitionType() == 
MTMVPartitionType.SELF_MANAGE) {
+                return Lists.newArrayList(mtmv.getPartitionNames());
+            }
+            // We need to use a newly generated relationship and cannot 
retrieve it using mtmv.getRelation()
+            // to avoid rebuilding the baseTable and causing a change in the 
tableId
+            return MTMVPartitionUtil.getMTMVNeedRefreshPartitions(context, 
relation.getBaseTablesOneLevel());
+        }
+
+        public List<String> 
calculateNeedRefreshPartitionsByManual(MTMVRefreshContext context) {
+            // check whether the user manually triggers it
+            if (taskContext.getTriggerMode() == MTMVTaskTriggerMode.MANUAL) {
+                if (taskContext.isComplete()) {
+                    return Lists.newArrayList(mtmv.getPartitionNames());
+                } else if 
(!CollectionUtils.isEmpty(taskContext.getPartitions())) {
+                    return taskContext.getPartitions();
+                }
+            }
+            return Lists.newArrayList();
+        }
+    }
+
+    private class MTMVFullDataRefreshExec extends MTMVDataRefreshExec {
+
+        @Override
+        protected void prepare(MTMVRefreshContext context,
+                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots) {}
+
+        @Override
+        protected Command getDataRefreshCommand(
+                Set<String> refreshPartitionNames, Map<TableIf, String> 
tableWithPartKey) throws UserException {
+            return UpdateMvUtils
+                    .from(mtmv, mtmv.getMvPartitionInfo().getPartitionType() 
!= MTMVPartitionType.SELF_MANAGE
+                            ? refreshPartitionNames : Sets.newHashSet(), 
tableWithPartKey, false, new HashMap<>());
+        }
+
+    }
+
+    private class MTMVIncrementalDataRefreshExec extends MTMVDataRefreshExec {
+        private boolean incremental;
+        private long tableSnapshotId;
+        private long mvSnapshotId;
+        private Map<String, String> params;
+
+        public void prepare(MTMVRefreshContext context,
+                Map<String, MTMVRefreshPartitionSnapshot> 
execPartitionSnapshots) throws Exception {
+            params = new HashMap<>();
+            Set<BaseTableInfo> baseTables = relation.getBaseTablesOneLevel();
+            if (baseTables.size() != 1) {
+                throw new JobException("Only support incremental refresh for 
single table MTMV");
+            }
+            BaseTableInfo baseTableInfo = baseTables.iterator().next();
+            MTMVSnapshotIf mvSnapshot = 
mtmv.getRefreshSnapshot().getMVSnapshot(mtmv.getName(), baseTableInfo);
+            mvSnapshotId = 
Optional.ofNullable(mvSnapshot).map(MTMVSnapshotIf::getSnapshotVersion).orElse(0L);
+            params.put(TableScanParams.READ_MODE, 
TableScanParams.INCREMENTAL_READ);

Review Comment:
   I think we should extract a interface for this "params generation" logic. 
Because different datasource(like Paimon or Hudi) may have different parameter 
names.
   So:
   1. You move the the field like `DORIS_START_SNAPSHOT_ID` from 
`PaimonScanNode` to `TableScanParams`, I think you move them back because they 
are only for Paimon.
   2. Use interface to separate the certain implementation of different data 
source.



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/CreateMTMVInfo.java:
##########
@@ -111,6 +113,10 @@ public class CreateMTMVInfo {
     private MTMVRelation relation;
     private MTMVPartitionInfo mvPartitionInfo;
 
+    private boolean inremental;
+    private MaterializedViewUtils.MVAggInfo paimonMVAggInfo;

Review Comment:
   ```suggestion
       private MaterializedViewUtils.MVAggInfo MVAggInfo;
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java:
##########
@@ -90,6 +98,190 @@
  */
 public class MaterializedViewUtils {
 
+    public static final String MIN = "min";
+    public static final String MAX = "max";
+    public static final String SUM = "sum";
+
+    /**
+     * paimon mv agg keys and functions
+     */
+    public static class MVAggInfo {
+        public List<String> keys = new ArrayList<>();
+        public Map<String, String> aggFunctions = new HashMap<>();
+    }
+
+    private static class MVAnalysis implements GeneratedPlanPatterns {
+        public static MVAnalysis instance = new MVAnalysis();
+        private static final ImmutableSet<String> 
ALLOW_MERGE_AGGREGATE_FUNCTIONS =
+                ImmutableSet.of(MIN, MAX, SUM);
+
+        /**
+         * Extract aggregation keys and aggregation functions according to the 
execution plan
+         */
+        public MVAggInfo checkAndGetAggInfo(Plan plan) {
+            if (plan instanceof LogicalResultSink) {
+                plan = plan.child(0);
+            }
+            boolean isAggMode = 
logicalAggregate(logicalProject(logicalFileScan()))
+                    .getPattern().matchPlanTree(plan);
+            if (!isAggMode) {
+                throw new RuntimeException(
+                        "mv query must be agg mode and table must a paimon 
table: " + plan);
+            }
+            LogicalAggregate<? extends Plan> aggregate = (LogicalAggregate) 
plan;
+            LogicalProject<? extends Plan> project = (LogicalProject) 
plan.child(0);
+            LogicalFileScan fileScan = (LogicalFileScan) project.child(0);
+
+            if (!(fileScan.getTable() instanceof PaimonExternalTable)) {
+                throw new RuntimeException("table " + fileScan.getTable() + " 
not support incremental mv");
+            }
+
+            // select a + 1 from t group a + 1  ->  a + 1 should has alias
+            project.getProjects().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            // select a, sum(b) from t group a  ->  sum(b) should has alias
+            aggregate.getOutputExpressions().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            PaimonExternalTable paimonExternalTable = (PaimonExternalTable) 
fileScan.getTable();
+
+            Table paimonTable = 
paimonExternalTable.getPaimonTable(Optional.empty());
+            Set<String> primaryKeys = new HashSet<>(paimonTable.primaryKeys());
+            List<Slot> projectInputSlots = 
project.getProjects().stream().map(NamedExpression::getInputSlots)
+                    .flatMap(Collection::stream).collect(Collectors.toList());
+
+            CoreOptions coreOptions = new CoreOptions(paimonTable.options());
+
+            Set<NamedExpression> primaryKeySlots = new HashSet<>();
+            Map<ExprId, String> exprIdToAggFunctionName = new HashMap<>();
+            List<Slot> fileScanOutputs = fileScan.getOutput();
+            for (Slot slot : fileScanOutputs) {
+                String fieldName = slot.getName();
+                if (primaryKeys.contains(fieldName)) {
+                    primaryKeySlots.add(slot);
+                    continue;
+                }
+
+                if (!projectInputSlots.contains(slot)) {
+                    continue;
+                }
+
+                String strAggFunc = coreOptions.fieldAggFunc(fieldName);
+                if (strAggFunc == null || strAggFunc.trim().isEmpty()) {
+                    String errorMsg = String.format(
+                            "paimon merge on doris failed: column %s is not a 
aggregate column!",
+                            fieldName);
+                    throw new RuntimeException(errorMsg);
+                }
+
+                switch (strAggFunc) {
+                    case SUM:
+                        exprIdToAggFunctionName.put(slot.getExprId(), SUM);
+                        break;
+                    case MAX:
+                        exprIdToAggFunctionName.put(slot.getExprId(), MAX);
+                        break;
+                    case MIN:
+                        exprIdToAggFunctionName.put(slot.getExprId(), MIN);
+                        break;
+                    default:
+                        String errorMsg = String.format(
+                                "paimon merge on doris failed: Use unsupported 
aggregation: %s",
+                                strAggFunc);
+                        throw new RuntimeException(errorMsg);
+                }
+            }
+
+            Set<Slot> replacedGroupBySlots = 
PlanUtils.replaceExpressionByProjections(
+                    project.getProjects(), new 
ArrayList<>(aggregate.getGroupByExpressions()))
+                    
.stream().map(Expression::getInputSlots).flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
+
+            if (!new 
HashSet<>(primaryKeySlots).containsAll(replacedGroupBySlots)) {
+                replacedGroupBySlots.removeAll(primaryKeySlots);
+                String errorMsg = String.format(
+                        "paimon merge on doris failed: exist non-primary key 
group columns: %s",
+                        replacedGroupBySlots);
+                throw new RuntimeException(errorMsg);
+            }
+
+            List<Expression> replacedAggFunctions = 
PlanUtils.replaceExpressionByProjections(
+                    project.getProjects(), new 
ArrayList<>(aggregate.getOutputExpressions()));
+
+            MVAggInfo mvAggInfo = new MVAggInfo();
+            for (Expression e : replacedAggFunctions) {
+                if (!(e instanceof Alias) || !(e.child(0) instanceof 
AggregateFunction)) {
+                    continue;
+                }
+                Alias alias = (Alias) e;
+                AggregateFunction aggFunc = (AggregateFunction) alias.child(0);
+                if 
(!(ALLOW_MERGE_AGGREGATE_FUNCTIONS.contains(aggFunc.getName()))) {
+                    String errorMsg = String.format(
+                            "paimon merge on doris failed: Use unsupported 
aggregation: %s",
+                            aggFunc.getName());
+                    throw new RuntimeException(errorMsg);
+                }
+                if (aggFunc.isDistinct()) {
+                    String errorMsg = String.format(
+                            "paimon merge on doris failed: %s Use distinct",
+                            aggFunc);
+                    throw new RuntimeException(errorMsg);
+                }
+                // not support outerAggFunc: sum(a+1),sum(a+b)
+                if (!(aggFunc.child(0) instanceof SlotReference)) {
+                    String errorMsg = String.format(
+                            "paimon merge on doris failed: %s child is a 
complex expression",
+                            aggFunc);
+                    throw new RuntimeException(errorMsg);
+                }
+                ExprId childExprId = ((SlotReference) 
aggFunc.child(0)).getExprId();
+                if (exprIdToAggFunctionName.containsKey(childExprId)) {
+                    String aggFunctionName = 
exprIdToAggFunctionName.get(childExprId);
+                    if (!aggFunctionName.equals(aggFunc.getName())) {
+                        String errorMsg = String.format(
+                                "paimon merge on doris failed: sql agg 
function %s "
+                                + "is different from paimon agg function %s",
+                                aggFunc, aggFunctionName);
+                        throw new RuntimeException(errorMsg);
+                    }
+                } else {
+                    String errorMsg = String.format(
+                            "paimon merge on doris failed: Unable to find a 
reasonable "
+                            + "aggregation function for %s from paimon agg 
function",
+                            aggFunc);
+                    throw new RuntimeException(errorMsg);
+                }
+                mvAggInfo.aggFunctions.put(alias.getName(), aggFunc.getName());
+            }
+
+            aggregate.getGroupByExpressions()
+                    .forEach(slot -> mvAggInfo.keys.add(((SlotReference) 
slot).getName()));
+            return mvAggInfo;
+        }
+
+        @Override
+        public RulePromise defaultPromise() {
+            return RulePromise.ANALYSIS;
+        }
+    }
+
+    public static MVAggInfo checkPaimonIncrementalMV(Plan plan) {

Review Comment:
   ```suggestion
       public static MVAggInfo checkIncrementalMV(Plan plan) {
   ```



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java:
##########
@@ -90,6 +98,190 @@
  */
 public class MaterializedViewUtils {
 
+    public static final String MIN = "min";
+    public static final String MAX = "max";
+    public static final String SUM = "sum";
+
+    /**
+     * paimon mv agg keys and functions
+     */
+    public static class MVAggInfo {
+        public List<String> keys = new ArrayList<>();
+        public Map<String, String> aggFunctions = new HashMap<>();
+    }
+
+    private static class MVAnalysis implements GeneratedPlanPatterns {
+        public static MVAnalysis instance = new MVAnalysis();
+        private static final ImmutableSet<String> 
ALLOW_MERGE_AGGREGATE_FUNCTIONS =
+                ImmutableSet.of(MIN, MAX, SUM);
+
+        /**
+         * Extract aggregation keys and aggregation functions according to the 
execution plan
+         */
+        public MVAggInfo checkAndGetAggInfo(Plan plan) {
+            if (plan instanceof LogicalResultSink) {
+                plan = plan.child(0);
+            }
+            boolean isAggMode = 
logicalAggregate(logicalProject(logicalFileScan()))
+                    .getPattern().matchPlanTree(plan);
+            if (!isAggMode) {
+                throw new RuntimeException(
+                        "mv query must be agg mode and table must a paimon 
table: " + plan);
+            }
+            LogicalAggregate<? extends Plan> aggregate = (LogicalAggregate) 
plan;
+            LogicalProject<? extends Plan> project = (LogicalProject) 
plan.child(0);
+            LogicalFileScan fileScan = (LogicalFileScan) project.child(0);
+
+            if (!(fileScan.getTable() instanceof PaimonExternalTable)) {
+                throw new RuntimeException("table " + fileScan.getTable() + " 
not support incremental mv");
+            }
+
+            // select a + 1 from t group a + 1  ->  a + 1 should has alias
+            project.getProjects().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            // select a, sum(b) from t group a  ->  sum(b) should has alias
+            aggregate.getOutputExpressions().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            PaimonExternalTable paimonExternalTable = (PaimonExternalTable) 
fileScan.getTable();
+
+            Table paimonTable = 
paimonExternalTable.getPaimonTable(Optional.empty());
+            Set<String> primaryKeys = new HashSet<>(paimonTable.primaryKeys());
+            List<Slot> projectInputSlots = 
project.getProjects().stream().map(NamedExpression::getInputSlots)
+                    .flatMap(Collection::stream).collect(Collectors.toList());
+
+            CoreOptions coreOptions = new CoreOptions(paimonTable.options());
+
+            Set<NamedExpression> primaryKeySlots = new HashSet<>();
+            Map<ExprId, String> exprIdToAggFunctionName = new HashMap<>();
+            List<Slot> fileScanOutputs = fileScan.getOutput();
+            for (Slot slot : fileScanOutputs) {
+                String fieldName = slot.getName();
+                if (primaryKeys.contains(fieldName)) {
+                    primaryKeySlots.add(slot);
+                    continue;
+                }
+
+                if (!projectInputSlots.contains(slot)) {
+                    continue;
+                }
+
+                String strAggFunc = coreOptions.fieldAggFunc(fieldName);
+                if (strAggFunc == null || strAggFunc.trim().isEmpty()) {
+                    String errorMsg = String.format(
+                            "paimon merge on doris failed: column %s is not a 
aggregate column!",

Review Comment:
   Should remove `paimon`



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java:
##########
@@ -90,6 +98,190 @@
  */
 public class MaterializedViewUtils {
 
+    public static final String MIN = "min";
+    public static final String MAX = "max";
+    public static final String SUM = "sum";
+
+    /**
+     * paimon mv agg keys and functions
+     */
+    public static class MVAggInfo {
+        public List<String> keys = new ArrayList<>();
+        public Map<String, String> aggFunctions = new HashMap<>();
+    }
+
+    private static class MVAnalysis implements GeneratedPlanPatterns {
+        public static MVAnalysis instance = new MVAnalysis();
+        private static final ImmutableSet<String> 
ALLOW_MERGE_AGGREGATE_FUNCTIONS =
+                ImmutableSet.of(MIN, MAX, SUM);
+
+        /**
+         * Extract aggregation keys and aggregation functions according to the 
execution plan
+         */
+        public MVAggInfo checkAndGetAggInfo(Plan plan) {
+            if (plan instanceof LogicalResultSink) {
+                plan = plan.child(0);
+            }
+            boolean isAggMode = 
logicalAggregate(logicalProject(logicalFileScan()))
+                    .getPattern().matchPlanTree(plan);
+            if (!isAggMode) {
+                throw new RuntimeException(
+                        "mv query must be agg mode and table must a paimon 
table: " + plan);
+            }
+            LogicalAggregate<? extends Plan> aggregate = (LogicalAggregate) 
plan;
+            LogicalProject<? extends Plan> project = (LogicalProject) 
plan.child(0);
+            LogicalFileScan fileScan = (LogicalFileScan) project.child(0);
+
+            if (!(fileScan.getTable() instanceof PaimonExternalTable)) {
+                throw new RuntimeException("table " + fileScan.getTable() + " 
not support incremental mv");
+            }
+
+            // select a + 1 from t group a + 1  ->  a + 1 should has alias
+            project.getProjects().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            // select a, sum(b) from t group a  ->  sum(b) should has alias
+            aggregate.getOutputExpressions().forEach(e -> {
+                if (e instanceof Alias) {
+                    Alias alias = (Alias) e;
+                    if (alias.isNameFromChild()) {
+                        throw new RuntimeException("complex expression must 
has alias: " + e);
+                    }
+                }
+            });
+
+            PaimonExternalTable paimonExternalTable = (PaimonExternalTable) 
fileScan.getTable();
+
+            Table paimonTable = 
paimonExternalTable.getPaimonTable(Optional.empty());
+            Set<String> primaryKeys = new HashSet<>(paimonTable.primaryKeys());
+            List<Slot> projectInputSlots = 
project.getProjects().stream().map(NamedExpression::getInputSlots)
+                    .flatMap(Collection::stream).collect(Collectors.toList());
+
+            CoreOptions coreOptions = new CoreOptions(paimonTable.options());
+
+            Set<NamedExpression> primaryKeySlots = new HashSet<>();
+            Map<ExprId, String> exprIdToAggFunctionName = new HashMap<>();
+            List<Slot> fileScanOutputs = fileScan.getOutput();
+            for (Slot slot : fileScanOutputs) {
+                String fieldName = slot.getName();
+                if (primaryKeys.contains(fieldName)) {
+                    primaryKeySlots.add(slot);
+                    continue;
+                }
+
+                if (!projectInputSlots.contains(slot)) {
+                    continue;
+                }
+
+                String strAggFunc = coreOptions.fieldAggFunc(fieldName);
+                if (strAggFunc == null || strAggFunc.trim().isEmpty()) {
+                    String errorMsg = String.format(
+                            "paimon merge on doris failed: column %s is not a 
aggregate column!",
+                            fieldName);
+                    throw new RuntimeException(errorMsg);
+                }
+
+                switch (strAggFunc) {
+                    case SUM:
+                        exprIdToAggFunctionName.put(slot.getExprId(), SUM);
+                        break;
+                    case MAX:
+                        exprIdToAggFunctionName.put(slot.getExprId(), MAX);
+                        break;
+                    case MIN:
+                        exprIdToAggFunctionName.put(slot.getExprId(), MIN);
+                        break;
+                    default:
+                        String errorMsg = String.format(
+                                "paimon merge on doris failed: Use unsupported 
aggregation: %s",
+                                strAggFunc);
+                        throw new RuntimeException(errorMsg);
+                }
+            }
+
+            Set<Slot> replacedGroupBySlots = 
PlanUtils.replaceExpressionByProjections(
+                    project.getProjects(), new 
ArrayList<>(aggregate.getGroupByExpressions()))
+                    
.stream().map(Expression::getInputSlots).flatMap(Collection::stream)
+                    .collect(Collectors.toSet());
+
+            if (!new 
HashSet<>(primaryKeySlots).containsAll(replacedGroupBySlots)) {
+                replacedGroupBySlots.removeAll(primaryKeySlots);
+                String errorMsg = String.format(
+                        "paimon merge on doris failed: exist non-primary key 
group columns: %s",

Review Comment:
   Same for other error msg in this class



##########
fe/fe-core/src/main/java/org/apache/doris/nereids/rules/exploration/mv/MaterializedViewUtils.java:
##########
@@ -90,6 +98,190 @@
  */
 public class MaterializedViewUtils {
 
+    public static final String MIN = "min";
+    public static final String MAX = "max";
+    public static final String SUM = "sum";
+
+    /**
+     * paimon mv agg keys and functions
+     */
+    public static class MVAggInfo {
+        public List<String> keys = new ArrayList<>();
+        public Map<String, String> aggFunctions = new HashMap<>();
+    }
+
+    private static class MVAnalysis implements GeneratedPlanPatterns {
+        public static MVAnalysis instance = new MVAnalysis();
+        private static final ImmutableSet<String> 
ALLOW_MERGE_AGGREGATE_FUNCTIONS =
+                ImmutableSet.of(MIN, MAX, SUM);
+
+        /**
+         * Extract aggregation keys and aggregation functions according to the 
execution plan
+         */
+        public MVAggInfo checkAndGetAggInfo(Plan plan) {
+            if (plan instanceof LogicalResultSink) {
+                plan = plan.child(0);
+            }
+            boolean isAggMode = 
logicalAggregate(logicalProject(logicalFileScan()))
+                    .getPattern().matchPlanTree(plan);
+            if (!isAggMode) {
+                throw new RuntimeException(
+                        "mv query must be agg mode and table must a paimon 
table: " + plan);
+            }
+            LogicalAggregate<? extends Plan> aggregate = (LogicalAggregate) 
plan;
+            LogicalProject<? extends Plan> project = (LogicalProject) 
plan.child(0);
+            LogicalFileScan fileScan = (LogicalFileScan) project.child(0);
+
+            if (!(fileScan.getTable() instanceof PaimonExternalTable)) {
+                throw new RuntimeException("table " + fileScan.getTable() + " 
not support incremental mv");

Review Comment:
   How about using `NereidsException`? Same for other `RuntimeException` in 
this class
   And change the exception message: "Only Paimon table support incremental mv"



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to