This is an automated email from the ASF dual-hosted git repository.

yiguolei pushed a commit to branch branch-4.0
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/branch-4.0 by this push:
     new a6e83c974a6 branch-4.0: [feature](iceberg) Support Partition Evolution 
DDL for Iceberg Tables (#57972) (#58538)
a6e83c974a6 is described below

commit a6e83c974a696abbf5efdc39ea0309fc57336ab1
Author: Socrates <[email protected]>
AuthorDate: Tue Dec 2 09:58:12 2025 +0800

    branch-4.0: [feature](iceberg) Support Partition Evolution DDL for Iceberg 
Tables (#57972) (#58538)
    
    bp: #57972
---
 .../antlr4/org/apache/doris/nereids/DorisParser.g4 |  12 +
 .../main/java/org/apache/doris/alter/Alter.java    |  42 +++
 .../java/org/apache/doris/alter/AlterOpType.java   |   4 +
 .../doris/analysis/AddPartitionFieldClause.java    | 108 ++++++++
 .../apache/doris/analysis/AlterTableClause.java    |  14 +
 .../doris/analysis/DropPartitionFieldClause.java   | 113 ++++++++
 .../analysis/ReplacePartitionFieldClause.java      | 145 ++++++++++
 .../datasource/iceberg/IcebergExternalCatalog.java |  52 ++++
 .../datasource/iceberg/IcebergMetadataOps.java     | 139 ++++++++++
 .../doris/nereids/parser/LogicalPlanBuilder.java   | 104 +++++++
 .../trees/plans/commands/AlterTableCommand.java    |   8 +-
 .../plans/commands/info/AddPartitionFieldOp.java   | 113 ++++++++
 .../plans/commands/info/DropPartitionFieldOp.java  | 126 +++++++++
 .../commands/info/ReplacePartitionFieldOp.java     | 157 +++++++++++
 .../plans/commands/AlterTableCommandTest.java      | 104 ++++++-
 .../commands/info/AddPartitionFieldOpTest.java     | 183 ++++++++++++
 .../commands/info/DropPartitionFieldOpTest.java    |  82 ++++++
 .../commands/info/ReplacePartitionFieldOpTest.java | 156 +++++++++++
 .../test_iceberg_partition_evolution_ddl.out       | 119 ++++++++
 ...est_iceberg_partition_evolution_query_write.out | 142 ++++++++++
 .../test_iceberg_mtmv_with_partition_evolution.out |  13 +
 .../test_iceberg_partition_evolution_ddl.groovy    | 306 +++++++++++++++++++++
 ..._iceberg_partition_evolution_query_write.groovy | 245 +++++++++++++++++
 ...st_iceberg_mtmv_with_partition_evolution.groovy | 170 ++++++++++++
 24 files changed, 2653 insertions(+), 4 deletions(-)

diff --git a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4 
b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
index 5985917fff8..c747c6a5531 100644
--- a/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
+++ b/fe/fe-core/src/main/antlr4/org/apache/doris/nereids/DorisParser.g4
@@ -753,6 +753,12 @@ alterTableClause
     | createOrReplaceBranchClause                                              
     #createOrReplaceBranchClauses
     | dropBranchClause                                                         
     #dropBranchClauses
     | dropTagClause                                                            
     #dropTagClauses
+    | ADD PARTITION KEY partitionTransform (AS partitionFieldName=identifier)? 
     #addPartitionFieldClause
+    | DROP PARTITION KEY (partitionFieldName=identifier | partitionTransform)  
     #dropPartitionFieldClause
+    | REPLACE PARTITION KEY
+        (oldPartitionFieldName=identifier | 
oldPartitionTransform=partitionTransform)
+        WITH newPartitionTransform=partitionTransform (AS 
newPartitionFieldName=identifier)?
+                                                                               
     #replacePartitionFieldClause
     ;
 
 createOrReplaceTagClause
@@ -799,6 +805,12 @@ dropTagClause
     : DROP TAG (IF EXISTS)? name=identifier
     ;
 
+partitionTransform
+    : identifier LEFT_PAREN INTEGER_VALUE COMMA identifier RIGHT_PAREN  
#partitionTransformWithArgs
+    | identifier LEFT_PAREN identifier RIGHT_PAREN                      
#partitionTransformWithColumn
+    | identifier                                                        
#partitionTransformIdentity
+    ;
+
 columnPosition
     : FIRST
     | AFTER position=identifier
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index aad6ab45479..c69250b500e 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -20,6 +20,7 @@ package org.apache.doris.alter;
 import org.apache.doris.analysis.AddColumnClause;
 import org.apache.doris.analysis.AddColumnsClause;
 import org.apache.doris.analysis.AddPartitionClause;
+import org.apache.doris.analysis.AddPartitionFieldClause;
 import org.apache.doris.analysis.AddPartitionLikeClause;
 import org.apache.doris.analysis.AlterClause;
 import org.apache.doris.analysis.AlterMultiPartitionClause;
@@ -29,6 +30,7 @@ import org.apache.doris.analysis.CreateOrReplaceTagClause;
 import org.apache.doris.analysis.DropBranchClause;
 import org.apache.doris.analysis.DropColumnClause;
 import org.apache.doris.analysis.DropPartitionClause;
+import org.apache.doris.analysis.DropPartitionFieldClause;
 import org.apache.doris.analysis.DropPartitionFromIndexClause;
 import org.apache.doris.analysis.DropTagClause;
 import org.apache.doris.analysis.ModifyColumnClause;
@@ -41,6 +43,7 @@ import org.apache.doris.analysis.ModifyTablePropertiesClause;
 import org.apache.doris.analysis.PartitionRenameClause;
 import org.apache.doris.analysis.ReorderColumnsClause;
 import org.apache.doris.analysis.ReplacePartitionClause;
+import org.apache.doris.analysis.ReplacePartitionFieldClause;
 import org.apache.doris.analysis.ReplaceTableClause;
 import org.apache.doris.analysis.RollupRenameClause;
 import org.apache.doris.analysis.TableRenameClause;
@@ -75,6 +78,8 @@ import org.apache.doris.common.util.MetaLockUtils;
 import org.apache.doris.common.util.PropertyAnalyzer;
 import org.apache.doris.common.util.PropertyAnalyzer.RewriteProperty;
 import org.apache.doris.datasource.ExternalTable;
+import org.apache.doris.datasource.iceberg.IcebergExternalCatalog;
+import org.apache.doris.datasource.iceberg.IcebergExternalTable;
 import org.apache.doris.mtmv.BaseTableInfo;
 import org.apache.doris.nereids.trees.plans.commands.AlterSystemCommand;
 import org.apache.doris.nereids.trees.plans.commands.AlterTableCommand;
@@ -181,6 +186,19 @@ public class Alter {
         AlterOperations currentAlterOps = new AlterOperations();
         currentAlterOps.checkConflict(alterClauses);
 
+        // Check for unsupported operations on internal tables
+        for (AlterClause clause : alterClauses) {
+            if (clause instanceof AddPartitionFieldClause) {
+                throw new UserException("ADD PARTITION KEY is only supported 
for Iceberg tables");
+            }
+            if (clause instanceof DropPartitionFieldClause) {
+                throw new UserException("DROP PARTITION KEY is only supported 
for Iceberg tables");
+            }
+            if (clause instanceof ReplacePartitionFieldClause) {
+                throw new UserException("REPLACE PARTITION KEY is only 
supported for Iceberg tables");
+            }
+        }
+
         for (AlterClause clause : alterClauses) {
             Map<String, String> properties = null;
             try {
@@ -415,6 +433,30 @@ public class Alter {
             } else if (alterClause instanceof ReorderColumnsClause) {
                 ReorderColumnsClause reorderColumns = (ReorderColumnsClause) 
alterClause;
                 table.getCatalog().reorderColumns(table, 
reorderColumns.getColumnsByPos());
+            } else if (alterClause instanceof AddPartitionFieldClause) {
+                AddPartitionFieldClause addPartitionField = 
(AddPartitionFieldClause) alterClause;
+                if (table instanceof IcebergExternalTable) {
+                    ((IcebergExternalCatalog) 
table.getCatalog()).addPartitionField(
+                            (IcebergExternalTable) table, addPartitionField);
+                } else {
+                    throw new UserException("ADD PARTITION KEY is only 
supported for Iceberg tables");
+                }
+            } else if (alterClause instanceof DropPartitionFieldClause) {
+                DropPartitionFieldClause dropPartitionField = 
(DropPartitionFieldClause) alterClause;
+                if (table instanceof IcebergExternalTable) {
+                    ((IcebergExternalCatalog) 
table.getCatalog()).dropPartitionField(
+                            (IcebergExternalTable) table, dropPartitionField);
+                } else {
+                    throw new UserException("DROP PARTITION KEY is only 
supported for Iceberg tables");
+                }
+            } else if (alterClause instanceof ReplacePartitionFieldClause) {
+                ReplacePartitionFieldClause replacePartitionField = 
(ReplacePartitionFieldClause) alterClause;
+                if (table instanceof IcebergExternalTable) {
+                    ((IcebergExternalCatalog) 
table.getCatalog()).replacePartitionField(
+                            (IcebergExternalTable) table, 
replacePartitionField);
+                } else {
+                    throw new UserException("REPLACE PARTITION KEY is only 
supported for Iceberg tables");
+                }
             } else {
                 throw new UserException("Invalid alter operations for external 
table: " + alterClauses);
             }
diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOpType.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOpType.java
index 06777976ff6..ce701e63f17 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOpType.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/AlterOpType.java
@@ -44,6 +44,10 @@ public enum AlterOpType {
     MODIFY_ENGINE,
     ALTER_BRANCH,
     ALTER_TAG,
+    // partition evolution of iceberg table
+    ADD_PARTITION_FIELD,
+    DROP_PARTITION_FIELD,
+    REPLACE_PARTITION_FIELD,
     INVALID_OP; // INVALID_OP must be the last one
 
     // true means 2 operations have no conflict.
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AddPartitionFieldClause.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AddPartitionFieldClause.java
new file mode 100644
index 00000000000..c465f8d6f1a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AddPartitionFieldClause.java
@@ -0,0 +1,108 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.common.UserException;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * AddPartitionFieldClause for Iceberg partition evolution
+ */
+public class AddPartitionFieldClause extends AlterTableClause {
+    private final String transformName;
+    private final Integer transformArg;
+    private final String columnName;
+    private final String partitionFieldName;
+
+    public AddPartitionFieldClause(String transformName, Integer transformArg, 
String columnName,
+            String partitionFieldName) {
+        super(AlterOpType.ADD_PARTITION_FIELD);
+        this.transformName = transformName;
+        this.transformArg = transformArg;
+        this.columnName = columnName;
+        this.partitionFieldName = partitionFieldName;
+    }
+
+    public String getTransformName() {
+        return transformName;
+    }
+
+    public Integer getTransformArg() {
+        return transformArg;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public String getPartitionFieldName() {
+        return partitionFieldName;
+    }
+
+    @Override
+    public void analyze() throws UserException {
+        // Validation will be done in IcebergMetadataOps
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ADD PARTITION KEY ");
+        if (transformName != null) {
+            sb.append(transformName);
+            if (transformArg != null) {
+                sb.append("(").append(transformArg);
+                if (columnName != null) {
+                    sb.append(", ").append(columnName);
+                }
+                sb.append(")");
+            } else if (columnName != null) {
+                sb.append("(").append(columnName).append(")");
+            }
+        } else if (columnName != null) {
+            sb.append(columnName);
+        }
+        if (partitionFieldName != null) {
+            sb.append(" AS ").append(partitionFieldName);
+        }
+        return sb.toString();
+    }
+
+    @Override
+    public String toString() {
+        return toSql();
+    }
+
+    @Override
+    public boolean allowOpMTMV() {
+        return false;
+    }
+
+    @Override
+    public boolean needChangeMTMVState() {
+        return false;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableClause.java
index 392f4e7b2e0..a63481f3a9a 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AlterTableClause.java
@@ -39,7 +39,21 @@ public abstract class AlterTableClause extends AlterClause {
         this.tableName = tableName;
     }
 
+    /**
+     * Check if this alter operation is allowed on Materialized View (MTMV).
+     * This method is declared as abstract to force each table operation to 
explicitly
+     * declare its MTMV support, as different operations have different MTMV 
compatibility.
+     *
+     * @return true if this operation is allowed on MTMV, false otherwise
+     */
     public abstract boolean allowOpMTMV();
 
+    /**
+     * Check if this alter operation requires changing the MTMV state.
+     * This method is declared as abstract to force each table operation to 
explicitly
+     * declare whether it affects MTMV state (e.g., schema changes that 
require MTMV rebuild).
+     *
+     * @return true if this operation requires changing MTMV state, false 
otherwise
+     */
     public abstract boolean needChangeMTMVState();
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPartitionFieldClause.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPartitionFieldClause.java
new file mode 100644
index 00000000000..aa763107520
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/DropPartitionFieldClause.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.common.UserException;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * DropPartitionFieldClause for Iceberg partition evolution
+ */
+public class DropPartitionFieldClause extends AlterTableClause {
+    private final String partitionFieldName;
+    private final String transformName;
+    private final Integer transformArg;
+    private final String columnName;
+
+    public DropPartitionFieldClause(String partitionFieldName, String 
transformName,
+            Integer transformArg, String columnName) {
+        super(AlterOpType.DROP_PARTITION_FIELD);
+        this.partitionFieldName = partitionFieldName;
+        this.transformName = transformName;
+        this.transformArg = transformArg;
+        this.columnName = columnName;
+    }
+
+    public String getPartitionFieldName() {
+        return partitionFieldName;
+    }
+
+    public String getTransformName() {
+        return transformName;
+    }
+
+    public Integer getTransformArg() {
+        return transformArg;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    @Override
+    public void analyze() throws UserException {
+        // Validation will be done in IcebergMetadataOps
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("DROP PARTITION KEY ");
+        if (partitionFieldName != null) {
+            sb.append(partitionFieldName);
+        } else {
+            appendPartitionTransform(sb);
+        }
+        return sb.toString();
+    }
+
+    private void appendPartitionTransform(StringBuilder sb) {
+        if (transformName != null) {
+            sb.append(transformName);
+            if (transformArg != null) {
+                sb.append("(").append(transformArg);
+                if (columnName != null) {
+                    sb.append(", ").append(columnName);
+                }
+                sb.append(")");
+            } else if (columnName != null) {
+                sb.append("(").append(columnName).append(")");
+            }
+        } else if (columnName != null) {
+            sb.append(columnName);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return toSql();
+    }
+
+    @Override
+    public boolean allowOpMTMV() {
+        return false;
+    }
+
+    @Override
+    public boolean needChangeMTMVState() {
+        return false;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ReplacePartitionFieldClause.java
 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ReplacePartitionFieldClause.java
new file mode 100644
index 00000000000..1e0b02a0d2e
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ReplacePartitionFieldClause.java
@@ -0,0 +1,145 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.analysis;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.common.UserException;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * ReplacePartitionFieldClause for Iceberg partition evolution
+ */
+public class ReplacePartitionFieldClause extends AlterTableClause {
+    private final String oldPartitionFieldName;
+    private final String oldTransformName;
+    private final Integer oldTransformArg;
+    private final String oldColumnName;
+    private final String newTransformName;
+    private final Integer newTransformArg;
+    private final String newColumnName;
+    private final String newPartitionFieldName;
+
+    public ReplacePartitionFieldClause(String oldPartitionFieldName, String 
oldTransformName,
+            Integer oldTransformArg, String oldColumnName,
+            String newTransformName, Integer newTransformArg, String 
newColumnName,
+            String newPartitionFieldName) {
+        super(AlterOpType.REPLACE_PARTITION_FIELD);
+        this.oldPartitionFieldName = oldPartitionFieldName;
+        this.oldTransformName = oldTransformName;
+        this.oldTransformArg = oldTransformArg;
+        this.oldColumnName = oldColumnName;
+        this.newTransformName = newTransformName;
+        this.newTransformArg = newTransformArg;
+        this.newColumnName = newColumnName;
+        this.newPartitionFieldName = newPartitionFieldName;
+    }
+
+    public String getOldPartitionFieldName() {
+        return oldPartitionFieldName;
+    }
+
+    public String getOldTransformName() {
+        return oldTransformName;
+    }
+
+    public Integer getOldTransformArg() {
+        return oldTransformArg;
+    }
+
+    public String getOldColumnName() {
+        return oldColumnName;
+    }
+
+    public String getNewTransformName() {
+        return newTransformName;
+    }
+
+    public Integer getNewTransformArg() {
+        return newTransformArg;
+    }
+
+    public String getNewColumnName() {
+        return newColumnName;
+    }
+
+    public String getNewPartitionFieldName() {
+        return newPartitionFieldName;
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("REPLACE PARTITION KEY ");
+        if (oldPartitionFieldName != null) {
+            sb.append(oldPartitionFieldName);
+        } else {
+            appendPartitionTransform(sb, oldTransformName, oldTransformArg, 
oldColumnName);
+        }
+        sb.append(" WITH ");
+        appendPartitionTransform(sb, newTransformName, newTransformArg, 
newColumnName);
+        if (newPartitionFieldName != null) {
+            sb.append(" AS ").append(newPartitionFieldName);
+        }
+        return sb.toString();
+    }
+
+    private void appendPartitionTransform(StringBuilder sb, String 
transformName, Integer transformArg,
+            String columnName) {
+        if (transformName != null) {
+            sb.append(transformName);
+            if (transformArg != null) {
+                sb.append("(").append(transformArg);
+                if (columnName != null) {
+                    sb.append(", ").append(columnName);
+                }
+                sb.append(")");
+            } else if (columnName != null) {
+                sb.append("(").append(columnName).append(")");
+            }
+        } else if (columnName != null) {
+            sb.append(columnName);
+        }
+    }
+
+    @Override
+    public String toString() {
+        return toSql();
+    }
+
+    @Override
+    public void analyze() throws UserException {
+        // Analysis will be done in IcebergMetadataOps
+    }
+
+    @Override
+    public boolean allowOpMTMV() {
+        return false;
+    }
+
+    @Override
+    public boolean needChangeMTMVState() {
+        return false;
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
index f1a655456e0..02d24cc0146 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergExternalCatalog.java
@@ -17,9 +17,15 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.analysis.AddPartitionFieldClause;
+import org.apache.doris.analysis.DropPartitionFieldClause;
+import org.apache.doris.analysis.ReplacePartitionFieldClause;
+import org.apache.doris.catalog.Env;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ThreadPoolManager;
+import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalCatalog;
+import org.apache.doris.datasource.ExternalObjectLog;
 import org.apache.doris.datasource.InitCatalogLog;
 import org.apache.doris.datasource.SessionContext;
 import org.apache.doris.datasource.operations.ExternalMetadataOperations;
@@ -144,4 +150,50 @@ public abstract class IcebergExternalCatalog extends 
ExternalCatalog {
     public boolean viewExists(String dbName, String viewName) {
         return metadataOps.viewExists(dbName, viewName);
     }
+
+    /**
+     * Add partition field to Iceberg table for partition evolution
+     */
+    public void addPartitionField(IcebergExternalTable table, 
AddPartitionFieldClause clause) throws UserException {
+        makeSureInitialized();
+        if (metadataOps == null) {
+            throw new UserException("Add partition field operation is not 
supported for catalog: " + getName());
+        }
+        ((IcebergMetadataOps) metadataOps).addPartitionField(table, clause);
+        Env.getCurrentEnv().getEditLog()
+                .logRefreshExternalTable(
+                        
ExternalObjectLog.createForRefreshTable(table.getCatalog().getId(),
+                                table.getDbName(), table.getName()));
+    }
+
+    /**
+     * Drop partition field from Iceberg table for partition evolution
+     */
+    public void dropPartitionField(IcebergExternalTable table, 
DropPartitionFieldClause clause) throws UserException {
+        makeSureInitialized();
+        if (metadataOps == null) {
+            throw new UserException("Drop partition field operation is not 
supported for catalog: " + getName());
+        }
+        ((IcebergMetadataOps) metadataOps).dropPartitionField(table, clause);
+        Env.getCurrentEnv().getEditLog()
+                .logRefreshExternalTable(
+                        
ExternalObjectLog.createForRefreshTable(table.getCatalog().getId(),
+                                table.getDbName(), table.getName()));
+    }
+
+    /**
+     * Replace partition field in Iceberg table for partition evolution
+     */
+    public void replacePartitionField(IcebergExternalTable table,
+            ReplacePartitionFieldClause clause) throws UserException {
+        makeSureInitialized();
+        if (metadataOps == null) {
+            throw new UserException("Replace partition field operation is not 
supported for catalog: " + getName());
+        }
+        ((IcebergMetadataOps) metadataOps).replacePartitionField(table, 
clause);
+        Env.getCurrentEnv().getEditLog()
+                .logRefreshExternalTable(
+                        
ExternalObjectLog.createForRefreshTable(table.getCatalog().getId(),
+                                table.getDbName(), table.getName()));
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 39e7d840e4a..8148579743e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -17,7 +17,10 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.analysis.AddPartitionFieldClause;
 import org.apache.doris.analysis.ColumnPosition;
+import org.apache.doris.analysis.DropPartitionFieldClause;
+import org.apache.doris.analysis.ReplacePartitionFieldClause;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.StructField;
@@ -52,6 +55,7 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.UpdatePartitionSpec;
 import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
@@ -59,7 +63,9 @@ import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.expressions.Expressions;
 import org.apache.iceberg.expressions.Literal;
+import org.apache.iceberg.expressions.Term;
 import org.apache.iceberg.types.Type;
 import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.view.View;
@@ -767,6 +773,139 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         return executionAuthenticator;
     }
 
+    private Term getTransform(String transformName, String columnName, Integer 
transformArg) throws UserException {
+        if (columnName == null) {
+            throw new UserException("Column name is required for partition 
transform");
+        }
+        if (transformName == null) {
+            // identity transform
+            return Expressions.ref(columnName);
+        }
+        switch (transformName.toLowerCase()) {
+            case "bucket":
+                if (transformArg == null) {
+                    throw new UserException("Bucket transform requires a 
bucket count argument");
+                }
+                return Expressions.bucket(columnName, transformArg);
+            case "truncate":
+                if (transformArg == null) {
+                    throw new UserException("Truncate transform requires a 
width argument");
+                }
+                return Expressions.truncate(columnName, transformArg);
+            case "year":
+                return Expressions.year(columnName);
+            case "month":
+                return Expressions.month(columnName);
+            case "day":
+                return Expressions.day(columnName);
+            case "hour":
+                return Expressions.hour(columnName);
+            default:
+                throw new UserException("Unsupported partition transform: " + 
transformName);
+        }
+    }
+
+    /**
+     * Add partition field to Iceberg table for partition evolution
+     */
+    public void addPartitionField(ExternalTable dorisTable, 
AddPartitionFieldClause clause) throws UserException {
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+        UpdatePartitionSpec updateSpec = icebergTable.updateSpec();
+
+        String transformName = clause.getTransformName();
+        Integer transformArg = clause.getTransformArg();
+        String columnName = clause.getColumnName();
+        String partitionFieldName = clause.getPartitionFieldName();
+        Term transform = getTransform(transformName, columnName, transformArg);
+
+        if (partitionFieldName != null) {
+            updateSpec.addField(partitionFieldName, transform);
+        } else {
+            updateSpec.addField(transform);
+        }
+
+        try {
+            executionAuthenticator.execute(() -> updateSpec.commit());
+        } catch (Exception e) {
+            throw new UserException("Failed to add partition field to table: " 
+ icebergTable.name()
+                    + ", error message is: " + 
ExceptionUtils.getRootCauseMessage(e), e);
+        }
+        refreshTable(dorisTable);
+        // Reset cached isValidRelatedTable flag after partition evolution
+        ((IcebergExternalTable) 
dorisTable).setIsValidRelatedTableCached(false);
+    }
+
+    /**
+     * Drop partition field from Iceberg table for partition evolution
+     */
+    public void dropPartitionField(ExternalTable dorisTable, 
DropPartitionFieldClause clause) throws UserException {
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+        UpdatePartitionSpec updateSpec = icebergTable.updateSpec();
+
+        if (clause.getPartitionFieldName() != null) {
+            updateSpec.removeField(clause.getPartitionFieldName());
+        } else {
+            String transformName = clause.getTransformName();
+            Integer transformArg = clause.getTransformArg();
+            String columnName = clause.getColumnName();
+            Term transform = getTransform(transformName, columnName, 
transformArg);
+            updateSpec.removeField(transform);
+        }
+
+        try {
+            executionAuthenticator.execute(() -> updateSpec.commit());
+        } catch (Exception e) {
+            throw new UserException("Failed to drop partition field from 
table: " + icebergTable.name()
+                    + ", error message is: " + 
ExceptionUtils.getRootCauseMessage(e), e);
+        }
+        refreshTable(dorisTable);
+        // Reset cached isValidRelatedTable flag after partition evolution
+        ((IcebergExternalTable) 
dorisTable).setIsValidRelatedTableCached(false);
+    }
+
+    /**
+     * Replace partition field in Iceberg table for partition evolution
+     */
+    public void replacePartitionField(ExternalTable dorisTable, 
ReplacePartitionFieldClause clause)
+            throws UserException {
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+        UpdatePartitionSpec updateSpec = icebergTable.updateSpec();
+
+        // remove old partition field
+        if (clause.getOldPartitionFieldName() != null) {
+            updateSpec.removeField(clause.getOldPartitionFieldName());
+        } else {
+            String oldTransformName = clause.getOldTransformName();
+            Integer oldTransformArg = clause.getOldTransformArg();
+            String oldColumnName = clause.getOldColumnName();
+            Term oldTransform = getTransform(oldTransformName, oldColumnName, 
oldTransformArg);
+            updateSpec.removeField(oldTransform);
+        }
+
+        // add new partition field
+        String newPartitionFieldName = clause.getNewPartitionFieldName();
+        String newTransformName = clause.getNewTransformName();
+        Integer newTransformArg = clause.getNewTransformArg();
+        String newColumnName = clause.getNewColumnName();
+        Term newTransform = getTransform(newTransformName, newColumnName, 
newTransformArg);
+
+        if (newPartitionFieldName != null) {
+            updateSpec.addField(newPartitionFieldName, newTransform);
+        } else {
+            updateSpec.addField(newTransform);
+        }
+
+        try {
+            executionAuthenticator.execute(() -> updateSpec.commit());
+        } catch (Exception e) {
+            throw new UserException("Failed to replace partition field in 
table: " + icebergTable.name()
+                    + ", error message is: " + 
ExceptionUtils.getRootCauseMessage(e), e);
+        }
+        refreshTable(dorisTable);
+        // Reset cached isValidRelatedTable flag after partition evolution
+        ((IcebergExternalTable) 
dorisTable).setIsValidRelatedTableCached(false);
+    }
+
     @Override
     public Table loadTable(String dbName, String tblName) {
         try {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
index 905a6008c6c..7d08db6595e 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/parser/LogicalPlanBuilder.java
@@ -80,6 +80,7 @@ import 
org.apache.doris.nereids.DorisParser.AddColumnsClauseContext;
 import org.apache.doris.nereids.DorisParser.AddConstraintContext;
 import org.apache.doris.nereids.DorisParser.AddIndexClauseContext;
 import org.apache.doris.nereids.DorisParser.AddPartitionClauseContext;
+import org.apache.doris.nereids.DorisParser.AddPartitionFieldClauseContext;
 import org.apache.doris.nereids.DorisParser.AddRollupClauseContext;
 import org.apache.doris.nereids.DorisParser.AdminCancelRebalanceDiskContext;
 import org.apache.doris.nereids.DorisParser.AdminCheckTabletsContext;
@@ -195,6 +196,7 @@ import 
org.apache.doris.nereids.DorisParser.DropIndexTokenFilterContext;
 import org.apache.doris.nereids.DorisParser.DropIndexTokenizerContext;
 import org.apache.doris.nereids.DorisParser.DropMVContext;
 import org.apache.doris.nereids.DorisParser.DropPartitionClauseContext;
+import org.apache.doris.nereids.DorisParser.DropPartitionFieldClauseContext;
 import org.apache.doris.nereids.DorisParser.DropProcedureContext;
 import org.apache.doris.nereids.DorisParser.DropRepositoryContext;
 import org.apache.doris.nereids.DorisParser.DropRoleContext;
@@ -275,6 +277,10 @@ import 
org.apache.doris.nereids.DorisParser.OptScanParamsContext;
 import org.apache.doris.nereids.DorisParser.OutFileClauseContext;
 import org.apache.doris.nereids.DorisParser.ParenthesizedExpressionContext;
 import org.apache.doris.nereids.DorisParser.PartitionSpecContext;
+import org.apache.doris.nereids.DorisParser.PartitionTransformContext;
+import org.apache.doris.nereids.DorisParser.PartitionTransformIdentityContext;
+import org.apache.doris.nereids.DorisParser.PartitionTransformWithArgsContext;
+import 
org.apache.doris.nereids.DorisParser.PartitionTransformWithColumnContext;
 import org.apache.doris.nereids.DorisParser.PartitionValueDefContext;
 import org.apache.doris.nereids.DorisParser.PartitionValueListContext;
 import org.apache.doris.nereids.DorisParser.PartitionsDefContext;
@@ -315,6 +321,7 @@ import 
org.apache.doris.nereids.DorisParser.RenameRollupClauseContext;
 import org.apache.doris.nereids.DorisParser.ReorderColumnsClauseContext;
 import org.apache.doris.nereids.DorisParser.ReplaceContext;
 import org.apache.doris.nereids.DorisParser.ReplacePartitionClauseContext;
+import org.apache.doris.nereids.DorisParser.ReplacePartitionFieldClauseContext;
 import org.apache.doris.nereids.DorisParser.ReplaceTableClauseContext;
 import org.apache.doris.nereids.DorisParser.ResumeMTMVContext;
 import org.apache.doris.nereids.DorisParser.RollupDefContext;
@@ -885,6 +892,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.AddColumnOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AddColumnsOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AddFollowerOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AddObserverOp;
+import org.apache.doris.nereids.trees.plans.commands.info.AddPartitionFieldOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AddPartitionOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AddRollupOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AlterLoadErrorUrlOp;
@@ -931,6 +939,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.DropFollowerOp;
 import org.apache.doris.nereids.trees.plans.commands.info.DropIndexOp;
 import org.apache.doris.nereids.trees.plans.commands.info.DropMTMVInfo;
 import org.apache.doris.nereids.trees.plans.commands.info.DropObserverOp;
+import org.apache.doris.nereids.trees.plans.commands.info.DropPartitionFieldOp;
 import 
org.apache.doris.nereids.trees.plans.commands.info.DropPartitionFromIndexOp;
 import org.apache.doris.nereids.trees.plans.commands.info.DropPartitionOp;
 import org.apache.doris.nereids.trees.plans.commands.info.DropRollupOp;
@@ -967,6 +976,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.RenamePartitionOp;
 import org.apache.doris.nereids.trees.plans.commands.info.RenameRollupOp;
 import org.apache.doris.nereids.trees.plans.commands.info.RenameTableOp;
 import org.apache.doris.nereids.trees.plans.commands.info.ReorderColumnsOp;
+import 
org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionFieldOp;
 import org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionOp;
 import org.apache.doris.nereids.trees.plans.commands.info.ReplaceTableOp;
 import org.apache.doris.nereids.trees.plans.commands.info.ResumeMTMVInfo;
@@ -5712,6 +5722,100 @@ public class LogicalPlanBuilder extends 
DorisParserBaseVisitor<Object> {
         return new ReorderColumnsOp(columnsByPos, rollupName, properties);
     }
 
+    // Helper class to reduce parameter passing
+    private static class PartitionFieldInfo {
+        final String fieldName;
+        final String transformName;
+        final Integer transformArg;
+        final String columnName;
+
+        PartitionFieldInfo(String fieldName, String transformName, Integer 
transformArg, String columnName) {
+            this.fieldName = fieldName;
+            this.transformName = transformName;
+            this.transformArg = transformArg;
+            this.columnName = columnName;
+        }
+    }
+
+    private PartitionFieldInfo 
extractOldPartitionFieldInfo(ReplacePartitionFieldClauseContext ctx) {
+        PartitionTransformContext oldTransformCtx = ctx.oldPartitionTransform;
+        if (oldTransformCtx == null) {
+            // old specified as identifier
+            return new PartitionFieldInfo(ctx.oldPartitionFieldName.getText(), 
null, null, null);
+        }
+
+        PartitionFieldInfo info = extractTransformInfo(oldTransformCtx);
+        return new PartitionFieldInfo(null, info.transformName, 
info.transformArg, info.columnName);
+    }
+
+    private PartitionFieldInfo 
extractNewPartitionFieldInfo(ReplacePartitionFieldClauseContext ctx) {
+        PartitionFieldInfo info = 
extractTransformInfo(ctx.newPartitionTransform);
+        String partitionFieldName = ctx.newPartitionFieldName != null
+                ? ctx.newPartitionFieldName.getText()
+                : null;
+        return new PartitionFieldInfo(partitionFieldName,
+            info.transformName, info.transformArg, info.columnName);
+    }
+
+    private PartitionFieldInfo extractTransformInfo(PartitionTransformContext 
ctx) {
+        if (ctx instanceof PartitionTransformWithArgsContext) {
+            PartitionTransformWithArgsContext argsCtx = 
(PartitionTransformWithArgsContext) ctx;
+            return new PartitionFieldInfo(null,
+                    argsCtx.identifier(0).getText(),
+                    Integer.parseInt(argsCtx.INTEGER_VALUE().getText()),
+                    argsCtx.identifier(1).getText());
+        }
+
+        if (ctx instanceof PartitionTransformWithColumnContext) {
+            PartitionTransformWithColumnContext colCtx = 
(PartitionTransformWithColumnContext) ctx;
+            return new PartitionFieldInfo(null,
+                    colCtx.identifier(0).getText(),
+                    null,
+                    colCtx.identifier(1).getText());
+        }
+
+        if (ctx instanceof PartitionTransformIdentityContext) {
+            PartitionTransformIdentityContext idCtx = 
(PartitionTransformIdentityContext) ctx;
+            return new PartitionFieldInfo(null, null, null, 
idCtx.identifier().getText());
+        }
+
+        return new PartitionFieldInfo(null, null, null, null);
+    }
+
+    @Override
+    public AlterTableOp 
visitAddPartitionFieldClause(AddPartitionFieldClauseContext ctx) {
+        PartitionFieldInfo info = 
extractTransformInfo(ctx.partitionTransform());
+        String partitionFieldName = ctx.partitionFieldName != null
+                ? ctx.partitionFieldName.getText()
+                : null;
+        return new AddPartitionFieldOp(info.transformName, info.transformArg,
+                info.columnName, partitionFieldName);
+    }
+
+    @Override
+    public AlterTableOp 
visitDropPartitionFieldClause(DropPartitionFieldClauseContext ctx) {
+        PartitionTransformContext transform = ctx.partitionTransform();
+        if (transform == null) {
+            // Identifier case
+            return new DropPartitionFieldOp(ctx.partitionFieldName.getText());
+        }
+        PartitionFieldInfo info = extractTransformInfo(transform);
+        return new DropPartitionFieldOp(null, info.transformName, 
info.transformArg, info.columnName);
+    }
+
+    @Override
+    public AlterTableOp 
visitReplacePartitionFieldClause(ReplacePartitionFieldClauseContext ctx) {
+        // Extract old partition field info (key name or partition expression)
+        PartitionFieldInfo oldInfo = extractOldPartitionFieldInfo(ctx);
+
+        // Extract new partition transform info
+        PartitionFieldInfo newInfo = extractNewPartitionFieldInfo(ctx);
+
+        return new ReplacePartitionFieldOp(oldInfo.fieldName, 
oldInfo.transformName,
+                oldInfo.transformArg, oldInfo.columnName,
+                newInfo.transformName, newInfo.transformArg, 
newInfo.columnName, newInfo.fieldName);
+    }
+
     @Override
     public AlterTableOp visitAddPartitionClause(AddPartitionClauseContext ctx) 
{
         boolean isTempPartition = ctx.TEMPORARY() != null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommand.java
index 38d2d6ba307..f4d37c85eb9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommand.java
@@ -40,6 +40,7 @@ import org.apache.doris.mysql.privilege.PrivPredicate;
 import org.apache.doris.nereids.trees.plans.PlanType;
 import org.apache.doris.nereids.trees.plans.commands.info.AddColumnOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AddColumnsOp;
+import org.apache.doris.nereids.trees.plans.commands.info.AddPartitionFieldOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AddRollupOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AlterTableOp;
 import org.apache.doris.nereids.trees.plans.commands.info.ColumnDefinition;
@@ -47,6 +48,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceBranchO
 import org.apache.doris.nereids.trees.plans.commands.info.CreateOrReplaceTagOp;
 import org.apache.doris.nereids.trees.plans.commands.info.DropBranchOp;
 import org.apache.doris.nereids.trees.plans.commands.info.DropColumnOp;
+import org.apache.doris.nereids.trees.plans.commands.info.DropPartitionFieldOp;
 import org.apache.doris.nereids.trees.plans.commands.info.DropRollupOp;
 import org.apache.doris.nereids.trees.plans.commands.info.DropTagOp;
 import org.apache.doris.nereids.trees.plans.commands.info.EnableFeatureOp;
@@ -56,6 +58,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.ModifyTablePropertiesO
 import org.apache.doris.nereids.trees.plans.commands.info.RenameColumnOp;
 import org.apache.doris.nereids.trees.plans.commands.info.RenameTableOp;
 import org.apache.doris.nereids.trees.plans.commands.info.ReorderColumnsOp;
+import 
org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionFieldOp;
 import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
 import org.apache.doris.nereids.trees.plans.visitor.PlanVisitor;
 import org.apache.doris.nereids.types.DataType;
@@ -252,7 +255,10 @@ public class AlterTableCommand extends Command implements 
ForwardWithSync {
                     || alterClause instanceof CreateOrReplaceBranchOp
                     || alterClause instanceof CreateOrReplaceTagOp
                     || alterClause instanceof DropBranchOp
-                    || alterClause instanceof DropTagOp) {
+                    || alterClause instanceof DropTagOp
+                    || alterClause instanceof AddPartitionFieldOp
+                    || alterClause instanceof DropPartitionFieldOp
+                    || alterClause instanceof ReplacePartitionFieldOp) {
                 alterTableOps.add(alterClause);
             } else {
                 throw new AnalysisException(table.getType().toString() + " [" 
+ table.getName() + "] "
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddPartitionFieldOp.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddPartitionFieldOp.java
new file mode 100644
index 00000000000..fb05901a00a
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddPartitionFieldOp.java
@@ -0,0 +1,113 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.analysis.AddPartitionFieldClause;
+import org.apache.doris.analysis.AlterTableClause;
+import org.apache.doris.common.UserException;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * AddPartitionFieldOp for Iceberg partition evolution
+ */
+public class AddPartitionFieldOp extends AlterTableOp {
+    private final String transformName;
+    private final Integer transformArg;
+    private final String columnName;
+    private final String partitionFieldName;
+
+    public AddPartitionFieldOp(String transformName, Integer transformArg, 
String columnName,
+            String partitionFieldName) {
+        super(AlterOpType.ADD_PARTITION_FIELD);
+        this.transformName = transformName;
+        this.transformArg = transformArg;
+        this.columnName = columnName;
+        this.partitionFieldName = partitionFieldName;
+    }
+
+    public String getTransformName() {
+        return transformName;
+    }
+
+    public Integer getTransformArg() {
+        return transformArg;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    public String getPartitionFieldName() {
+        return partitionFieldName;
+    }
+
+    @Override
+    public void validate(ConnectContext ctx) throws UserException {
+        if (columnName == null) {
+            throw new UserException("Column name must be specified");
+        }
+    }
+
+    @Override
+    public AlterTableClause translateToLegacyAlterClause() {
+        return new AddPartitionFieldClause(transformName, transformArg, 
columnName, partitionFieldName);
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public boolean allowOpMTMV() {
+        return false;
+    }
+
+    @Override
+    public boolean needChangeMTMVState() {
+        return false;
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("ADD PARTITION KEY ");
+        if (transformName != null) {
+            sb.append(transformName);
+            if (transformArg != null) {
+                sb.append("(").append(transformArg);
+                if (columnName != null) {
+                    sb.append(", ").append(columnName);
+                }
+                sb.append(")");
+            } else if (columnName != null) {
+                sb.append("(").append(columnName).append(")");
+            }
+        } else if (columnName != null) {
+            sb.append(columnName);
+        }
+        if (partitionFieldName != null) {
+            sb.append(" AS ").append(partitionFieldName);
+        }
+        return sb.toString();
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DropPartitionFieldOp.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DropPartitionFieldOp.java
new file mode 100644
index 00000000000..b53765f3180
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DropPartitionFieldOp.java
@@ -0,0 +1,126 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.analysis.AlterTableClause;
+import org.apache.doris.analysis.DropPartitionFieldClause;
+import org.apache.doris.common.UserException;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * DropPartitionFieldOp for Iceberg partition evolution
+ */
+public class DropPartitionFieldOp extends AlterTableOp {
+    private final String partitionFieldName;
+    private final String transformName;
+    private final Integer transformArg;
+    private final String columnName;
+
+    public DropPartitionFieldOp(String partitionFieldName) {
+        this(partitionFieldName, null, null, null);
+    }
+
+    public DropPartitionFieldOp(String transformName, Integer transformArg, 
String columnName) {
+        this(null, transformName, transformArg, columnName);
+    }
+
+    public DropPartitionFieldOp(String partitionFieldName, String 
transformName,
+            Integer transformArg, String columnName) {
+        super(AlterOpType.DROP_PARTITION_FIELD);
+        this.partitionFieldName = partitionFieldName;
+        this.transformName = transformName;
+        this.transformArg = transformArg;
+        this.columnName = columnName;
+    }
+
+    public String getPartitionFieldName() {
+        return partitionFieldName;
+    }
+
+    public String getTransformName() {
+        return transformName;
+    }
+
+    public Integer getTransformArg() {
+        return transformArg;
+    }
+
+    public String getColumnName() {
+        return columnName;
+    }
+
+    @Override
+    public void validate(ConnectContext ctx) throws UserException {
+        if (partitionFieldName == null && columnName == null) {
+            throw new UserException("Partition field name or column name must 
be specified");
+        }
+    }
+
+    @Override
+    public AlterTableClause translateToLegacyAlterClause() {
+        return new DropPartitionFieldClause(partitionFieldName, transformName, 
transformArg, columnName);
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public boolean allowOpMTMV() {
+        return false;
+    }
+
+    @Override
+    public boolean needChangeMTMVState() {
+        return false;
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("DROP PARTITION KEY ");
+        if (partitionFieldName != null) {
+            sb.append(partitionFieldName);
+        } else {
+            appendPartitionTransform(sb);
+        }
+        return sb.toString();
+    }
+
+    private void appendPartitionTransform(StringBuilder sb) {
+        if (transformName != null) {
+            sb.append(transformName);
+            if (transformArg != null) {
+                sb.append("(").append(transformArg);
+                if (columnName != null) {
+                    sb.append(", ").append(columnName);
+                }
+                sb.append(")");
+            } else if (columnName != null) {
+                sb.append("(").append(columnName).append(")");
+            }
+        } else if (columnName != null) {
+            sb.append(columnName);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ReplacePartitionFieldOp.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ReplacePartitionFieldOp.java
new file mode 100644
index 00000000000..f75a513b606
--- /dev/null
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ReplacePartitionFieldOp.java
@@ -0,0 +1,157 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.analysis.AlterTableClause;
+import org.apache.doris.analysis.ReplacePartitionFieldClause;
+import org.apache.doris.common.UserException;
+import org.apache.doris.qe.ConnectContext;
+
+import java.util.Collections;
+import java.util.Map;
+
+/**
+ * ReplacePartitionFieldOp for Iceberg partition evolution
+ */
+public class ReplacePartitionFieldOp extends AlterTableOp {
+    private final String oldPartitionFieldName;
+    private final String oldTransformName;
+    private final Integer oldTransformArg;
+    private final String oldColumnName;
+    private final String newTransformName;
+    private final Integer newTransformArg;
+    private final String newColumnName;
+    private final String newPartitionFieldName;
+
+    /**
+     * Constructor for ReplacePartitionFieldOp
+     */
+    public ReplacePartitionFieldOp(String oldPartitionFieldName, String 
oldTransformName,
+            Integer oldTransformArg, String oldColumnName,
+            String newTransformName, Integer newTransformArg, String 
newColumnName,
+            String newPartitionFieldName) {
+        super(AlterOpType.REPLACE_PARTITION_FIELD);
+        this.oldPartitionFieldName = oldPartitionFieldName;
+        this.oldTransformName = oldTransformName;
+        this.oldTransformArg = oldTransformArg;
+        this.oldColumnName = oldColumnName;
+        this.newTransformName = newTransformName;
+        this.newTransformArg = newTransformArg;
+        this.newColumnName = newColumnName;
+        this.newPartitionFieldName = newPartitionFieldName;
+    }
+
+    public String getOldPartitionFieldName() {
+        return oldPartitionFieldName;
+    }
+
+    public String getOldTransformName() {
+        return oldTransformName;
+    }
+
+    public Integer getOldTransformArg() {
+        return oldTransformArg;
+    }
+
+    public String getOldColumnName() {
+        return oldColumnName;
+    }
+
+    public String getNewTransformName() {
+        return newTransformName;
+    }
+
+    public Integer getNewTransformArg() {
+        return newTransformArg;
+    }
+
+    public String getNewColumnName() {
+        return newColumnName;
+    }
+
+    public String getNewPartitionFieldName() {
+        return newPartitionFieldName;
+    }
+
+    @Override
+    public void validate(ConnectContext ctx) throws UserException {
+        if (oldPartitionFieldName == null && oldColumnName == null) {
+            throw new UserException("Old partition field name or old column 
name must be specified");
+        }
+        if (newColumnName == null) {
+            throw new UserException("New column name must be specified");
+        }
+    }
+
+    @Override
+    public AlterTableClause translateToLegacyAlterClause() {
+        return new ReplacePartitionFieldClause(oldPartitionFieldName, 
oldTransformName, oldTransformArg,
+                oldColumnName, newTransformName, newTransformArg, 
newColumnName, newPartitionFieldName);
+    }
+
+    @Override
+    public Map<String, String> getProperties() {
+        return Collections.emptyMap();
+    }
+
+    @Override
+    public boolean allowOpMTMV() {
+        return false;
+    }
+
+    @Override
+    public boolean needChangeMTMVState() {
+        return false;
+    }
+
+    @Override
+    public String toSql() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("REPLACE PARTITION KEY ");
+        if (oldPartitionFieldName != null) {
+            sb.append(oldPartitionFieldName);
+        } else {
+            appendPartitionTransform(sb, oldTransformName, oldTransformArg, 
oldColumnName);
+        }
+        sb.append(" WITH ");
+        appendPartitionTransform(sb, newTransformName, newTransformArg, 
newColumnName);
+        if (newPartitionFieldName != null) {
+            sb.append(" AS ").append(newPartitionFieldName);
+        }
+        return sb.toString();
+    }
+
+    private void appendPartitionTransform(StringBuilder sb, String 
transformName, Integer transformArg,
+            String columnName) {
+        if (transformName != null) {
+            sb.append(transformName);
+            if (transformArg != null) {
+                sb.append("(").append(transformArg);
+                if (columnName != null) {
+                    sb.append(", ").append(columnName);
+                }
+                sb.append(")");
+            } else if (columnName != null) {
+                sb.append("(").append(columnName).append(")");
+            }
+        } else if (columnName != null) {
+            sb.append(columnName);
+        }
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommandTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommandTest.java
index 8f199da39ff..65ee9fd5fa1 100644
--- 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommandTest.java
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommandTest.java
@@ -17,8 +17,11 @@
 
 package org.apache.doris.nereids.trees.plans.commands;
 
+import org.apache.doris.nereids.trees.plans.commands.info.AddPartitionFieldOp;
 import org.apache.doris.nereids.trees.plans.commands.info.AlterTableOp;
+import org.apache.doris.nereids.trees.plans.commands.info.DropPartitionFieldOp;
 import org.apache.doris.nereids.trees.plans.commands.info.EnableFeatureOp;
+import 
org.apache.doris.nereids.trees.plans.commands.info.ReplacePartitionFieldOp;
 import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
 
 import org.junit.jupiter.api.Assertions;
@@ -40,14 +43,109 @@ public class AlterTableCommandTest {
         ops.clear();
         ops.add(new EnableFeatureOp("UPDATE_FLEXIBLE_COLUMNS"));
         alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
-        Assertions.assertEquals(alterTableCommand.toSql(), "ALTER TABLE 
`db`.`test` ENABLE FEATURE \"UPDATE_FLEXIBLE_COLUMNS\"");
+        Assertions.assertEquals(alterTableCommand.toSql(),
+                "ALTER TABLE `db`.`test` ENABLE FEATURE 
\"UPDATE_FLEXIBLE_COLUMNS\"");
 
         ops.clear();
         Map<String, String> properties = new HashMap<>();
         properties.put("function_column.sequence_type", "int");
         ops.add(new EnableFeatureOp("SEQUENCE_LOAD", properties));
         alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
-        Assertions.assertEquals(alterTableCommand.toSql(), "ALTER TABLE 
`db`.`test` ENABLE FEATURE \"SEQUENCE_LOAD\" WITH PROPERTIES 
(\"function_column.sequence_type\" = \"int\")");
+        Assertions.assertEquals(alterTableCommand.toSql(),
+                "ALTER TABLE `db`.`test` ENABLE FEATURE \"SEQUENCE_LOAD\" WITH 
PROPERTIES (\"function_column.sequence_type\" = \"int\")");
     }
-}
 
+    @Test
+    void testAddPartitionFieldOp() {
+        List<AlterTableOp> ops = new ArrayList<>();
+        ops.add(new AddPartitionFieldOp("bucket", 16, "id", null));
+        AlterTableCommand alterTableCommand = new AlterTableCommand(new 
TableNameInfo("db", "test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(), "ALTER TABLE 
`db`.`test` ADD PARTITION KEY bucket(16, id)");
+
+        ops.clear();
+        ops.add(new AddPartitionFieldOp("year", null, "ts", null));
+        alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(), "ALTER TABLE 
`db`.`test` ADD PARTITION KEY year(ts)");
+
+        ops.clear();
+        ops.add(new AddPartitionFieldOp(null, null, "category", null));
+        alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(), "ALTER TABLE 
`db`.`test` ADD PARTITION KEY category");
+
+        // Test with custom partition field name
+        ops.clear();
+        ops.add(new AddPartitionFieldOp("day", null, "ts", "ts_day"));
+        alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(),
+                "ALTER TABLE `db`.`test` ADD PARTITION KEY day(ts) AS ts_day");
+    }
+
+    @Test
+    void testDropPartitionFieldOp() {
+        List<AlterTableOp> ops = new ArrayList<>();
+        ops.add(new DropPartitionFieldOp("id_bucket_16"));
+        AlterTableCommand alterTableCommand = new AlterTableCommand(new 
TableNameInfo("db", "test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(), "ALTER TABLE 
`db`.`test` DROP PARTITION KEY id_bucket_16");
+
+        ops.clear();
+        ops.add(new DropPartitionFieldOp("ts_year"));
+        alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(), "ALTER TABLE 
`db`.`test` DROP PARTITION KEY ts_year");
+
+        ops.clear();
+        ops.add(new DropPartitionFieldOp("category"));
+        alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(), "ALTER TABLE 
`db`.`test` DROP PARTITION KEY category");
+    }
+
+    @Test
+    void testMultiplePartitionFieldOps() {
+        List<AlterTableOp> ops = new ArrayList<>();
+        ops.add(new AddPartitionFieldOp("day", null, "ts", null));
+        ops.add(new AddPartitionFieldOp("bucket", 8, "id", null));
+        AlterTableCommand alterTableCommand = new AlterTableCommand(new 
TableNameInfo("db", "test"), ops);
+        String sql = alterTableCommand.toSql();
+        Assertions.assertTrue(sql.contains("ADD PARTITION KEY day(ts)"));
+        Assertions.assertTrue(sql.contains("ADD PARTITION KEY bucket(8, id)"));
+    }
+
+    @Test
+    void testReplacePartitionFieldOp() {
+        List<AlterTableOp> ops = new ArrayList<>();
+        ops.add(new ReplacePartitionFieldOp("ts_year", null, null, null,
+                "month", null, "ts", null));
+        AlterTableCommand alterTableCommand = new AlterTableCommand(new 
TableNameInfo("db", "test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(),
+                "ALTER TABLE `db`.`test` REPLACE PARTITION KEY ts_year WITH 
month(ts)");
+
+        ops.clear();
+        ops.add(new ReplacePartitionFieldOp("id_bucket_10", null, null, null,
+                "bucket", 16, "id", null));
+        alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(),
+                "ALTER TABLE `db`.`test` REPLACE PARTITION KEY id_bucket_10 
WITH bucket(16, id)");
+
+        ops.clear();
+        ops.add(new ReplacePartitionFieldOp("category", null, null, null,
+                "bucket", 8, "id", null));
+        alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(),
+                "ALTER TABLE `db`.`test` REPLACE PARTITION KEY category WITH 
bucket(8, id)");
+
+        // Test with custom partition field name
+        ops.clear();
+        ops.add(new ReplacePartitionFieldOp("ts_year", null, null, null,
+                "day", null, "ts", "day_of_ts"));
+        alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(),
+                "ALTER TABLE `db`.`test` REPLACE PARTITION KEY ts_year WITH 
day(ts) AS day_of_ts");
+
+        // Test with old partition expression
+        ops.clear();
+        ops.add(new ReplacePartitionFieldOp(null, "bucket", 16, "id",
+                "truncate", 5, "code", "code_trunc"));
+        alterTableCommand = new AlterTableCommand(new TableNameInfo("db", 
"test"), ops);
+        Assertions.assertEquals(alterTableCommand.toSql(),
+                "ALTER TABLE `db`.`test` REPLACE PARTITION KEY bucket(16, id) 
WITH truncate(5, code) AS code_trunc");
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/AddPartitionFieldOpTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/AddPartitionFieldOpTest.java
new file mode 100644
index 00000000000..56d3a4f813b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/AddPartitionFieldOpTest.java
@@ -0,0 +1,183 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.analysis.AddPartitionFieldClause;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class AddPartitionFieldOpTest {
+
+    @Test
+    public void testIdentityTransform() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp(null, null, 
"category", null);
+        Assertions.assertEquals(AlterOpType.ADD_PARTITION_FIELD, 
op.getOpType());
+        Assertions.assertNull(op.getTransformName());
+        Assertions.assertNull(op.getTransformArg());
+        Assertions.assertEquals("category", op.getColumnName());
+        Assertions.assertNull(op.getPartitionFieldName());
+        Assertions.assertEquals("ADD PARTITION KEY category", op.toSql());
+
+        AddPartitionFieldClause clause = (AddPartitionFieldClause) 
op.translateToLegacyAlterClause();
+        Assertions.assertNotNull(clause);
+        Assertions.assertNull(clause.getTransformName());
+        Assertions.assertNull(clause.getTransformArg());
+        Assertions.assertEquals("category", clause.getColumnName());
+        Assertions.assertNull(clause.getPartitionFieldName());
+    }
+
+    @Test
+    public void testIdentityTransformWithCustomName() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp(null, null, 
"category", "category_partition");
+        Assertions.assertEquals("category", op.getColumnName());
+        Assertions.assertEquals("category_partition", 
op.getPartitionFieldName());
+        Assertions.assertEquals("ADD PARTITION KEY category AS 
category_partition", op.toSql());
+
+        AddPartitionFieldClause clause = (AddPartitionFieldClause) 
op.translateToLegacyAlterClause();
+        Assertions.assertNotNull(clause);
+        Assertions.assertNull(clause.getTransformName());
+        Assertions.assertNull(clause.getTransformArg());
+        Assertions.assertEquals("category", clause.getColumnName());
+        Assertions.assertEquals("category_partition", 
clause.getPartitionFieldName());
+    }
+
+    @Test
+    public void testTimeTransform() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("year", null, "ts", 
null);
+        Assertions.assertEquals("year", op.getTransformName());
+        Assertions.assertNull(op.getTransformArg());
+        Assertions.assertEquals("ts", op.getColumnName());
+        Assertions.assertEquals("ADD PARTITION KEY year(ts)", op.toSql());
+    }
+
+    @Test
+    public void testTimeTransformWithCustomName() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("year", null, "ts", 
"ts_year");
+        Assertions.assertEquals("year", op.getTransformName());
+        Assertions.assertEquals("ts", op.getColumnName());
+        Assertions.assertEquals("ts_year", op.getPartitionFieldName());
+        Assertions.assertEquals("ADD PARTITION KEY year(ts) AS ts_year", 
op.toSql());
+
+        AddPartitionFieldClause clause = (AddPartitionFieldClause) 
op.translateToLegacyAlterClause();
+        Assertions.assertEquals("year", clause.getTransformName());
+        Assertions.assertNull(clause.getTransformArg());
+        Assertions.assertEquals("ts", clause.getColumnName());
+        Assertions.assertEquals("ts_year", clause.getPartitionFieldName());
+    }
+
+    @Test
+    public void testBucketTransform() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("bucket", 16, "id", 
null);
+        Assertions.assertEquals("bucket", op.getTransformName());
+        Assertions.assertEquals(16, op.getTransformArg());
+        Assertions.assertEquals("id", op.getColumnName());
+        Assertions.assertEquals("ADD PARTITION KEY bucket(16, id)", 
op.toSql());
+    }
+
+    @Test
+    public void testBucketTransformWithCustomName() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("bucket", 16, "id", 
"id_bucket_16");
+        Assertions.assertEquals("bucket", op.getTransformName());
+        Assertions.assertEquals(16, op.getTransformArg());
+        Assertions.assertEquals("id", op.getColumnName());
+        Assertions.assertEquals("id_bucket_16", op.getPartitionFieldName());
+        Assertions.assertEquals("ADD PARTITION KEY bucket(16, id) AS 
id_bucket_16", op.toSql());
+
+        AddPartitionFieldClause clause = (AddPartitionFieldClause) 
op.translateToLegacyAlterClause();
+        Assertions.assertEquals("bucket", clause.getTransformName());
+        Assertions.assertEquals(16, clause.getTransformArg());
+        Assertions.assertEquals("id", clause.getColumnName());
+        Assertions.assertEquals("id_bucket_16", 
clause.getPartitionFieldName());
+    }
+
+    @Test
+    public void testTruncateTransform() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("truncate", 10, 
"name", null);
+        Assertions.assertEquals("truncate", op.getTransformName());
+        Assertions.assertEquals(10, op.getTransformArg());
+        Assertions.assertEquals("name", op.getColumnName());
+        Assertions.assertEquals("ADD PARTITION KEY truncate(10, name)", 
op.toSql());
+    }
+
+    @Test
+    public void testTruncateTransformWithCustomName() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("truncate", 10, 
"name", "name_truncate");
+        Assertions.assertEquals("name_truncate", op.getPartitionFieldName());
+        Assertions.assertEquals("ADD PARTITION KEY truncate(10, name) AS 
name_truncate", op.toSql());
+    }
+
+    @Test
+    public void testMonthTransform() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("month", null, 
"created_date", null);
+        Assertions.assertEquals("ADD PARTITION KEY month(created_date)", 
op.toSql());
+    }
+
+    @Test
+    public void testMonthTransformWithCustomName() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("month", null, 
"created_date", "date_month");
+        Assertions.assertEquals("ADD PARTITION KEY month(created_date) AS 
date_month", op.toSql());
+    }
+
+    @Test
+    public void testDayTransform() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("day", null, "ts", 
null);
+        Assertions.assertEquals("ADD PARTITION KEY day(ts)", op.toSql());
+    }
+
+    @Test
+    public void testDayTransformWithCustomName() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("day", null, "ts", 
"ts_day");
+        Assertions.assertEquals("ADD PARTITION KEY day(ts) AS ts_day", 
op.toSql());
+    }
+
+    @Test
+    public void testHourTransform() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("hour", null, "ts", 
null);
+        Assertions.assertEquals("ADD PARTITION KEY hour(ts)", op.toSql());
+    }
+
+    @Test
+    public void testHourTransformWithCustomName() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("hour", null, "ts", 
"ts_hour");
+        Assertions.assertEquals("ADD PARTITION KEY hour(ts) AS ts_hour", 
op.toSql());
+    }
+
+    @Test
+    public void testProperties() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("bucket", 32, "id", 
null);
+        Map<String, String> properties = op.getProperties();
+        Assertions.assertNotNull(properties);
+        Assertions.assertTrue(properties.isEmpty());
+    }
+
+    @Test
+    public void testAllowOpMTMV() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("bucket", 16, "id", 
null);
+        Assertions.assertFalse(op.allowOpMTMV());
+    }
+
+    @Test
+    public void testNeedChangeMTMVState() {
+        AddPartitionFieldOp op = new AddPartitionFieldOp("bucket", 16, "id", 
null);
+        Assertions.assertFalse(op.needChangeMTMVState());
+    }
+}
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/DropPartitionFieldOpTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/DropPartitionFieldOpTest.java
new file mode 100644
index 00000000000..9b387afcc5b
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/DropPartitionFieldOpTest.java
@@ -0,0 +1,82 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.analysis.DropPartitionFieldClause;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class DropPartitionFieldOpTest {
+
+    @Test
+    public void testDropPartitionFieldByName() {
+        DropPartitionFieldOp op = new DropPartitionFieldOp("ts_year");
+        Assertions.assertEquals(AlterOpType.DROP_PARTITION_FIELD, 
op.getOpType());
+        Assertions.assertEquals("ts_year", op.getPartitionFieldName());
+        Assertions.assertNull(op.getTransformName());
+        Assertions.assertEquals("DROP PARTITION KEY ts_year", op.toSql());
+
+        DropPartitionFieldClause clause = (DropPartitionFieldClause) 
op.translateToLegacyAlterClause();
+        Assertions.assertNotNull(clause);
+        Assertions.assertEquals("ts_year", clause.getPartitionFieldName());
+        Assertions.assertNull(clause.getTransformName());
+    }
+
+    @Test
+    public void testDropPartitionFieldByTransform() {
+        DropPartitionFieldOp op = new DropPartitionFieldOp("bucket", 16, "id");
+        Assertions.assertEquals(AlterOpType.DROP_PARTITION_FIELD, 
op.getOpType());
+        Assertions.assertNull(op.getPartitionFieldName());
+        Assertions.assertEquals("bucket", op.getTransformName());
+        Assertions.assertEquals(16, op.getTransformArg());
+        Assertions.assertEquals("id", op.getColumnName());
+        Assertions.assertEquals("DROP PARTITION KEY bucket(16, id)", 
op.toSql());
+
+        DropPartitionFieldClause clause = (DropPartitionFieldClause) 
op.translateToLegacyAlterClause();
+        Assertions.assertNotNull(clause);
+        Assertions.assertNull(clause.getPartitionFieldName());
+        Assertions.assertEquals("bucket", clause.getTransformName());
+        Assertions.assertEquals(16, clause.getTransformArg());
+        Assertions.assertEquals("id", clause.getColumnName());
+    }
+
+    @Test
+    public void testProperties() {
+        DropPartitionFieldOp op = new DropPartitionFieldOp("id_bucket_16");
+        Map<String, String> properties = op.getProperties();
+        Assertions.assertNotNull(properties);
+        Assertions.assertTrue(properties.isEmpty());
+    }
+
+    @Test
+    public void testAllowOpMTMV() {
+        DropPartitionFieldOp op = new DropPartitionFieldOp("id_bucket_16");
+        Assertions.assertFalse(op.allowOpMTMV());
+    }
+
+    @Test
+    public void testNeedChangeMTMVState() {
+        DropPartitionFieldOp op = new DropPartitionFieldOp("id_bucket_16");
+        Assertions.assertFalse(op.needChangeMTMVState());
+    }
+}
+
diff --git 
a/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/ReplacePartitionFieldOpTest.java
 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/ReplacePartitionFieldOpTest.java
new file mode 100644
index 00000000000..d7ca738fb83
--- /dev/null
+++ 
b/fe/fe-core/src/test/java/org/apache/doris/nereids/trees/plans/commands/info/ReplacePartitionFieldOpTest.java
@@ -0,0 +1,156 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+package org.apache.doris.nereids.trees.plans.commands.info;
+
+import org.apache.doris.alter.AlterOpType;
+import org.apache.doris.analysis.ReplacePartitionFieldClause;
+
+import org.junit.jupiter.api.Assertions;
+import org.junit.jupiter.api.Test;
+
+import java.util.Map;
+
+public class ReplacePartitionFieldOpTest {
+
+    @Test
+    public void testReplaceTimeTransform() {
+        ReplacePartitionFieldOp op = new ReplacePartitionFieldOp("ts_year", 
null, null, null,
+                "month", null, "ts", null);
+        Assertions.assertEquals(AlterOpType.REPLACE_PARTITION_FIELD, 
op.getOpType());
+        Assertions.assertEquals("ts_year", op.getOldPartitionFieldName());
+        Assertions.assertEquals("month", op.getNewTransformName());
+        Assertions.assertEquals("ts", op.getNewColumnName());
+        Assertions.assertNull(op.getNewPartitionFieldName());
+        Assertions.assertEquals("REPLACE PARTITION KEY ts_year WITH 
month(ts)", op.toSql());
+    }
+
+    @Test
+    public void testReplaceTimeTransformWithCustomName() {
+        ReplacePartitionFieldOp op = new ReplacePartitionFieldOp("ts_year", 
null, null, null,
+                "month", null, "ts", "ts_month");
+        Assertions.assertEquals("ts_year", op.getOldPartitionFieldName());
+        Assertions.assertEquals("month", op.getNewTransformName());
+        Assertions.assertEquals("ts", op.getNewColumnName());
+        Assertions.assertEquals("ts_month", op.getNewPartitionFieldName());
+        Assertions.assertEquals("REPLACE PARTITION KEY ts_year WITH month(ts) 
AS ts_month", op.toSql());
+
+        ReplacePartitionFieldClause clause = (ReplacePartitionFieldClause) 
op.translateToLegacyAlterClause();
+        Assertions.assertNotNull(clause);
+        Assertions.assertEquals("ts_year", clause.getOldPartitionFieldName());
+        Assertions.assertNull(clause.getOldTransformName());
+        Assertions.assertEquals("month", clause.getNewTransformName());
+    }
+
+    @Test
+    public void testReplaceBucketTransform() {
+        ReplacePartitionFieldOp op = new 
ReplacePartitionFieldOp("id_bucket_10", null, null, null,
+                "bucket", 16, "id", null);
+        Assertions.assertEquals("id_bucket_10", op.getOldPartitionFieldName());
+        Assertions.assertEquals("bucket", op.getNewTransformName());
+        Assertions.assertEquals(16, op.getNewTransformArg());
+        Assertions.assertEquals("id", op.getNewColumnName());
+        Assertions.assertEquals("REPLACE PARTITION KEY id_bucket_10 WITH 
bucket(16, id)", op.toSql());
+    }
+
+    @Test
+    public void testReplaceBucketTransformWithCustomName() {
+        ReplacePartitionFieldOp op = new 
ReplacePartitionFieldOp("id_bucket_10", null, null, null,
+                "bucket", 16, "id", "id_bucket_16");
+        Assertions.assertEquals("id_bucket_10", op.getOldPartitionFieldName());
+        Assertions.assertEquals("id_bucket_16", op.getNewPartitionFieldName());
+        Assertions.assertEquals("REPLACE PARTITION KEY id_bucket_10 WITH 
bucket(16, id) AS id_bucket_16", op.toSql());
+    }
+
+    @Test
+    public void testReplaceIdentityToTransform() {
+        ReplacePartitionFieldOp op = new ReplacePartitionFieldOp("category", 
null, null, null,
+                "bucket", 8, "id", null);
+        Assertions.assertEquals("category", op.getOldPartitionFieldName());
+        Assertions.assertEquals("bucket", op.getNewTransformName());
+        Assertions.assertEquals(8, op.getNewTransformArg());
+        Assertions.assertEquals("REPLACE PARTITION KEY category WITH bucket(8, 
id)", op.toSql());
+    }
+
+    @Test
+    public void testReplaceIdentityToTransformWithCustomName() {
+        ReplacePartitionFieldOp op = new ReplacePartitionFieldOp("category", 
null, null, null,
+                "bucket", 8, "id", "id_bucket");
+        Assertions.assertEquals("category", op.getOldPartitionFieldName());
+        Assertions.assertEquals("id_bucket", op.getNewPartitionFieldName());
+        Assertions.assertEquals("REPLACE PARTITION KEY category WITH bucket(8, 
id) AS id_bucket", op.toSql());
+    }
+
+    @Test
+    public void testReplaceTransformToIdentity() {
+        ReplacePartitionFieldOp op = new ReplacePartitionFieldOp("ts_year", 
null, null, null,
+                null, null, "category", null);
+        Assertions.assertEquals("ts_year", op.getOldPartitionFieldName());
+        Assertions.assertNull(op.getNewTransformName());
+        Assertions.assertEquals("category", op.getNewColumnName());
+        Assertions.assertEquals("REPLACE PARTITION KEY ts_year WITH category", 
op.toSql());
+    }
+
+    @Test
+    public void testReplaceTransformToIdentityWithCustomName() {
+        ReplacePartitionFieldOp op = new ReplacePartitionFieldOp("ts_year", 
null, null, null,
+                null, null, "category", "category_partition");
+        Assertions.assertEquals("ts_year", op.getOldPartitionFieldName());
+        Assertions.assertEquals("category_partition", 
op.getNewPartitionFieldName());
+        Assertions.assertEquals("REPLACE PARTITION KEY ts_year WITH category 
AS category_partition", op.toSql());
+    }
+
+    @Test
+    public void testReplaceByOldTransformExpression() {
+        ReplacePartitionFieldOp op = new ReplacePartitionFieldOp(null, 
"bucket", 16, "id",
+                "truncate", 5, "code", "code_trunc");
+        Assertions.assertNull(op.getOldPartitionFieldName());
+        Assertions.assertEquals("bucket", op.getOldTransformName());
+        Assertions.assertEquals(16, op.getOldTransformArg());
+        Assertions.assertEquals("id", op.getOldColumnName());
+        Assertions.assertEquals("REPLACE PARTITION KEY bucket(16, id) WITH 
truncate(5, code) AS code_trunc", op.toSql());
+
+        ReplacePartitionFieldClause clause = (ReplacePartitionFieldClause) 
op.translateToLegacyAlterClause();
+        Assertions.assertNull(clause.getOldPartitionFieldName());
+        Assertions.assertEquals("bucket", clause.getOldTransformName());
+        Assertions.assertEquals(16, clause.getOldTransformArg());
+        Assertions.assertEquals("id", clause.getOldColumnName());
+    }
+
+    @Test
+    public void testProperties() {
+        ReplacePartitionFieldOp op = new 
ReplacePartitionFieldOp("id_bucket_10", null, null, null,
+                "bucket", 16, "id", null);
+        Map<String, String> properties = op.getProperties();
+        Assertions.assertNotNull(properties);
+        Assertions.assertTrue(properties.isEmpty());
+    }
+
+    @Test
+    public void testAllowOpMTMV() {
+        ReplacePartitionFieldOp op = new ReplacePartitionFieldOp("ts_year", 
null, null, null,
+                "month", null, "ts", null);
+        Assertions.assertFalse(op.allowOpMTMV());
+    }
+
+    @Test
+    public void testNeedChangeMTMVState() {
+        ReplacePartitionFieldOp op = new ReplacePartitionFieldOp("ts_year", 
null, null, null,
+                "month", null, "ts", null);
+        Assertions.assertFalse(op.needChangeMTMVState());
+    }
+}
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_partition_evolution_ddl.out
 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_partition_evolution_ddl.out
new file mode 100644
index 00000000000..79b40be1514
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_partition_evolution_ddl.out
@@ -0,0 +1,119 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !add_identity_2 --
+1      Alice   A
+2      Bob     B
+3      Charlie C
+
+-- !partitions_after_1 --
+{"category":"C"}       1       1
+{"category":null}      0       2
+
+-- !partitions_year --
+{"ts_year":54} 1       1
+{"ts_year":null}       0       2
+
+-- !partitions_month --
+{"ts_year":54, "ts_month":650} 2       1
+{"ts_year":54, "ts_month":null}        1       1
+{"ts_year":null, "ts_month":null}      0       2
+
+-- !add_time_1 --
+1      Alice   2024-01-01T10:00
+2      Bob     2024-02-01T11:00
+3      Charlie 2024-01-15T10:00
+4      David   2024-03-01T12:00
+
+-- !add_bucket_2 --
+1      Alice   100
+2      Bob     200
+3      Charlie 300
+
+-- !partitions_bucket --
+{"id_bucket_16":3}     1       1
+{"id_bucket_16":null}  0       2
+
+-- !add_truncate_2 --
+1      Alice   ABCDE
+2      Bob     FGHIJ
+3      Charlie KLMNO
+
+-- !partitions_truncate --
+{"code_trunc_5":"KLMNO"}       1       1
+{"code_trunc_5":null}  0       2
+
+-- !drop_identity_2 --
+1      Alice   A
+2      Bob     B
+3      Charlie C
+
+-- !partitions_after_drop --
+{"category":"A"}       0       1
+{"category":"B"}       0       1
+{"category":null}      1       1
+
+-- !drop_time_1 --
+id     int     Yes     true    \N      
+name   text    Yes     true    \N      
+created_date   date    Yes     true    \N      
+
+-- !drop_time_2 --
+1      Alice   2023-01-01
+2      Bob     2024-01-01
+3      Charlie 2025-01-01
+
+-- !drop_bucket_1 --
+id     int     Yes     true    \N      
+name   text    Yes     true    \N      
+value  double  Yes     true    \N      
+
+-- !drop_bucket_2 --
+1      Alice   100
+2      Bob     200
+3      Charlie 300
+
+-- !multiple_2 --
+1      Alice   2024-01-01T10:00        A
+2      Bob     2024-02-01T11:00        B
+
+-- !partitions_multiple_add --
+{"ts_day":"2024-02-01", "category":"B", "id_bucket_8":4}       3       1
+{"ts_day":null, "category":null, "id_bucket_8":null}   0       1
+
+-- !multiple_4 --
+1      Alice   2024-01-01T10:00        A
+2      Bob     2024-02-01T11:00        B
+3      Charlie 2024-03-01T12:00        C
+
+-- !partitions_multiple_after_drop --
+{"ts_day":"2024-02-01", "category":"B", "id_bucket_8":4}       3       1
+{"ts_day":"2024-03-01", "category":null, "id_bucket_8":null}   1       1
+{"ts_day":null, "category":null, "id_bucket_8":null}   0       1
+
+-- !add_partition_key_alias --
+1      2024-01-01T08:00
+2      2024-02-02T09:00
+3      2025-03-03T10:00
+
+-- !add_partition_key_alias_partitions --
+{"ts_year":55} 1       1
+{"ts_year":null}       0       2
+
+-- !drop_partition_key_alias --
+1      2024-01-01T08:00
+2      2024-02-02T09:00
+3      2025-03-03T10:00
+4      2026-04-04T11:00
+
+-- !replace_partition_key_data --
+1      2024-01-01T00:00
+2      2024-02-01T00:00
+3      2024-01-05T00:00
+4      2024-03-15T00:00
+5      2025-04-20T00:00
+
+-- !replace_partition_key_partitions --
+{"ts_day":"2024-01-05", "ts_month":null, "ts_year":null}       1       1
+{"ts_day":null, "ts_month":650, "ts_year":null}        2       1
+{"ts_day":null, "ts_month":null, "ts_year":55} 3       1
+{"ts_day":null, "ts_month":null, "ts_year":null}       0       2
+
diff --git 
a/regression-test/data/external_table_p0/iceberg/test_iceberg_partition_evolution_query_write.out
 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_partition_evolution_query_write.out
new file mode 100644
index 00000000000..23d2b4b1ba3
--- /dev/null
+++ 
b/regression-test/data/external_table_p0/iceberg/test_iceberg_partition_evolution_query_write.out
@@ -0,0 +1,142 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !add_partition_before --
+1      Alice   2024-01-01T08:00
+2      Bob     2024-02-02T09:00
+
+-- !add_partition_after --
+1      Alice   2024-01-01T08:00
+2      Bob     2024-02-02T09:00
+3      Charlie 2025-03-03T10:00
+4      David   2026-04-04T11:00
+
+-- !add_partition_after_partitions --
+{"ts_year":55} 1       1
+{"ts_year":56} 1       1
+{"ts_year":null}       0       2
+
+-- !add_partition_after_drop --
+1      Alice   2024-01-01T08:00
+2      Bob     2024-02-02T09:00
+3      Charlie 2025-03-03T10:00
+4      David   2026-04-04T11:00
+5      Eve     2027-05-05T12:00
+6      Frank   2028-06-06T13:00
+
+-- !drop_partition_before --
+1      Alice   2024-01-01T08:00
+2      Bob     2024-01-02T09:00
+
+-- !drop_partition_after --
+1      Alice   2024-01-01T08:00
+2      Bob     2024-01-02T09:00
+3      Charlie 2024-01-03T10:00
+4      David   2024-01-04T11:00
+
+-- !multi_partition_before --
+1      Alice   2024-01-01T00:00
+2      Bob     2024-02-01T00:00
+
+-- !multi_partition_after_adds --
+1      Alice   2024-01-01T00:00
+2      Bob     2024-02-01T00:00
+3      Charlie 2024-01-05T00:00
+4      David   2024-01-06T00:00
+
+-- !multi_partition_after_adds_partitions --
+{"ts_day":"2024-01-05", "shard":null}  1       1
+{"ts_day":"2024-01-06", "shard":6}     2       1
+{"ts_day":null, "shard":null}  0       2
+
+-- !multi_partition_after_replace --
+1      Alice   2024-01-01T00:00
+2      Bob     2024-02-01T00:00
+3      Charlie 2024-01-05T00:00
+4      David   2024-01-06T00:00
+5      Eve     2024-03-01T00:00
+
+-- !multi_partition_after_replace_partitions --
+{"ts_day":"2024-01-05", "shard":null, "ts_month":null} 1       1
+{"ts_day":"2024-01-06", "shard":6, "ts_month":null}    2       1
+{"ts_day":null, "shard":7, "ts_month":650}     3       1
+{"ts_day":null, "shard":null, "ts_month":null} 0       2
+
+-- !multi_partition_after_drop_all --
+1      Alice   2024-01-01T00:00
+2      Bob     2024-02-01T00:00
+3      Charlie 2024-01-05T00:00
+4      David   2024-01-06T00:00
+5      Eve     2024-03-01T00:00
+6      Frank   2024-04-01T00:00
+
+-- !string_partition_before --
+A001   apple   1.1
+B002   banana  2.2
+
+-- !string_partition_after_add --
+A001   apple   1.1
+B002   banana  2.2
+C003   candy   3.3
+D004   durian  4.4
+
+-- !string_partition_after_add --
+{"sku":"C003"} 1       1
+{"sku":"D004"} 1       1
+{"sku":null}   0       2
+
+-- !string_partition_after_drop --
+A001   apple   1.1
+B002   banana  2.2
+C003   candy   3.3
+D004   durian  4.4
+E005   espresso        5.5
+
+-- !date_partition_before --
+1      2023-12-31      10.00
+2      2024-01-01      20.00
+
+-- !date_partition_after_add --
+1      2023-12-31      10.00
+2      2024-01-01      20.00
+3      2024-02-15      30.00
+4      2025-03-20      40.00
+
+-- !date_partition_after_add --
+{"order_year":54, "order_date_month":649}      2       1
+{"order_year":55, "order_date_month":662}      2       1
+{"order_year":null, "order_date_month":null}   0       2
+
+-- !date_partition_after_replace --
+1      2023-12-31      10.00
+2      2024-01-01      20.00
+3      2024-02-15      30.00
+4      2025-03-20      40.00
+5      2025-03-21      50.00
+
+-- !date_partition_after_replace --
+{"order_year":54, "order_date_month":649, "order_day":null}    2       1
+{"order_year":55, "order_date_month":662, "order_day":null}    2       1
+{"order_year":55, "order_date_month":null, "order_day":"2025-03-21"}   3       
1
+{"order_year":null, "order_date_month":null, "order_day":null} 0       2
+
+-- !numeric_partition_before --
+1      10001   10
+2      10002   20
+
+-- !numeric_partition_after_add --
+1      10001   10
+2      10002   20
+3      10003   30
+4      20004   40
+
+-- !numeric_partition_after_add --
+{"item_bucket":2, "category_id_trunc_3":20004} 2       1
+{"item_bucket":3, "category_id_trunc_3":10002} 2       1
+{"item_bucket":null, "category_id_trunc_3":null}       0       2
+
+-- !numeric_partition_after_drop --
+1      10001   10
+2      10002   20
+3      10003   30
+4      20004   40
+5      30005   50
+
diff --git 
a/regression-test/data/mtmv_p0/test_iceberg_mtmv_with_partition_evolution.out 
b/regression-test/data/mtmv_p0/test_iceberg_mtmv_with_partition_evolution.out
new file mode 100644
index 00000000000..110593582aa
--- /dev/null
+++ 
b/regression-test/data/mtmv_p0/test_iceberg_mtmv_with_partition_evolution.out
@@ -0,0 +1,13 @@
+-- This file is automatically generated. You should know what you did if you 
want to edit this
+-- !evolution_day_to_year_initial --
+2024-01-15T10:00       1
+2024-02-20T11:00       2
+
+-- !evolution_change_column_initial --
+2024-01-15T10:00       2024-01-16T10:00        1
+2024-02-20T11:00       2024-02-21T11:00        2
+
+-- !evolution_year_to_identity_initial --
+2024-01-15T10:00       1
+2024-02-20T11:00       2
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_partition_evolution_ddl.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_partition_evolution_ddl.groovy
new file mode 100644
index 00000000000..70942e4484b
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_partition_evolution_ddl.groovy
@@ -0,0 +1,306 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_partition_evolution_ddl", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "test_iceberg_partition_evolution_ddl"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    logger.info("catalog " + catalog_name + " created")
+    sql """switch ${catalog_name};"""
+    logger.info("switched to catalog " + catalog_name)
+    sql """drop database if exists test_partition_evolution_db force"""
+    sql """create database test_partition_evolution_db"""
+    sql """use test_partition_evolution_db;"""
+
+    sql """set enable_fallback_to_original_planner=false;"""
+
+    // Test 1: Add identity partition field
+    String table1 = "test_add_identity_partition"
+    sql """drop table if exists ${table1}"""
+    sql """
+    CREATE TABLE ${table1} (
+        id INT,
+        name STRING,
+        category STRING
+    );
+    """
+    sql """INSERT INTO ${table1} VALUES (1, 'Alice', 'A'), (2, 'Bob', 'B')"""
+    
+    sql """ALTER TABLE ${table1} ADD PARTITION KEY category"""
+    // Insert data after adding partition field to see new partition info
+    sql """INSERT INTO ${table1} VALUES (3, 'Charlie', 'C')"""
+    qt_add_identity_2 """SELECT * FROM ${table1} ORDER BY id"""
+    order_qt_partitions_after_1 """SELECT `partition`, spec_id, record_count 
FROM ${table1}\$partitions ORDER BY `partition`"""
+
+    // Test 2: Add time-based partition fields
+    String table2 = "test_add_time_partition"
+    sql """drop table if exists ${table2}"""
+    sql """
+    CREATE TABLE ${table2} (
+        id INT,
+        name STRING,
+        ts DATETIME
+    );
+    """
+    sql """INSERT INTO ${table2} VALUES (1, 'Alice', '2024-01-01 10:00:00'), 
(2, 'Bob', '2024-02-01 11:00:00')"""
+    
+    sql """ALTER TABLE ${table2} ADD PARTITION KEY year(ts)"""
+    // Insert data after adding partition field to see new partition info
+    sql """INSERT INTO ${table2} VALUES (3, 'Charlie', '2024-01-15 
10:00:00')"""
+    order_qt_partitions_year """SELECT `partition`, spec_id, record_count FROM 
${table2}\$partitions ORDER BY `partition`"""
+    
+    sql """ALTER TABLE ${table2} ADD PARTITION KEY month(ts)"""
+    // Insert data after adding partition field to see new partition info
+    sql """INSERT INTO ${table2} VALUES (4, 'David', '2024-03-01 12:00:00')"""
+    order_qt_partitions_month """SELECT `partition`, spec_id, record_count 
FROM ${table2}\$partitions ORDER BY `partition`"""
+    
+    qt_add_time_1 """SELECT * FROM ${table2} ORDER BY id"""
+
+    // Test 3: Add bucket partition field
+    String table3 = "test_add_bucket_partition"
+    sql """drop table if exists ${table3}"""
+    sql """
+    CREATE TABLE ${table3} (
+        id INT,
+        name STRING,
+        value DOUBLE
+    );
+    """
+    sql """INSERT INTO ${table3} VALUES (1, 'Alice', 100.0), (2, 'Bob', 
200.0)"""
+    
+    sql """ALTER TABLE ${table3} ADD PARTITION KEY bucket(16, id)"""
+    // Insert data after adding partition field to see new partition info
+    sql """INSERT INTO ${table3} VALUES (3, 'Charlie', 300.0)"""
+    qt_add_bucket_2 """SELECT * FROM ${table3} ORDER BY id"""
+    order_qt_partitions_bucket """SELECT `partition`, spec_id, record_count 
FROM ${table3}\$partitions ORDER BY `partition`"""
+
+    // Test 4: Add truncate partition field
+    String table4 = "test_add_truncate_partition"
+    sql """drop table if exists ${table4}"""
+    sql """
+    CREATE TABLE ${table4} (
+        id INT,
+        name STRING,
+        code STRING
+    );
+    """
+    sql """INSERT INTO ${table4} VALUES (1, 'Alice', 'ABCDE'), (2, 'Bob', 
'FGHIJ')"""
+    
+    sql """ALTER TABLE ${table4} ADD PARTITION KEY truncate(5, code)"""
+    // Insert data after adding partition field to see new partition info
+    sql """INSERT INTO ${table4} VALUES (3, 'Charlie', 'KLMNO')"""
+    qt_add_truncate_2 """SELECT * FROM ${table4} ORDER BY id"""
+    order_qt_partitions_truncate """SELECT `partition`, spec_id, record_count 
FROM ${table4}\$partitions ORDER BY `partition`"""
+
+    // Test 5: Drop partition field - identity
+    String table5 = "test_drop_identity_partition"
+    sql """drop table if exists ${table5}"""
+    sql """
+    CREATE TABLE ${table5} (
+        id INT,
+        name STRING,
+        category STRING
+    )
+    PARTITION BY LIST (category) ();
+    """
+    sql """INSERT INTO ${table5} VALUES (1, 'Alice', 'A'), (2, 'Bob', 'B')"""
+
+    // Drop partition field
+    sql """ALTER TABLE ${table5} DROP PARTITION KEY category"""
+    // Insert data after dropping partition field to see new partition info
+    sql """INSERT INTO ${table5} VALUES (3, 'Charlie', 'C')"""
+    qt_drop_identity_2 """SELECT * FROM ${table5} ORDER BY id"""
+    order_qt_partitions_after_drop """SELECT `partition`, spec_id, 
record_count FROM ${table5}\$partitions ORDER BY `partition`"""
+
+    // Test 6: Drop partition field - time-based
+    String table6 = "test_drop_time_partition"
+    sql """drop table if exists ${table6}"""
+    sql """
+    CREATE TABLE ${table6} (
+        id INT,
+        name STRING,
+        created_date DATE
+    )
+    PARTITION BY LIST (year(created_date)) ();
+    """
+    sql """INSERT INTO ${table6} VALUES (1, 'Alice', '2023-01-01'), (2, 'Bob', 
'2024-01-01')"""
+    
+    sql """ALTER TABLE ${table6} DROP PARTITION KEY year(created_date)"""
+    qt_drop_time_1 """DESC ${table6}"""
+    
+    sql """INSERT INTO ${table6} VALUES (3, 'Charlie', '2025-01-01')"""
+    qt_drop_time_2 """SELECT * FROM ${table6} ORDER BY id"""
+
+
+    // Test 7: Drop partition field - bucket
+    String table7 = "test_drop_bucket_partition"
+    sql """drop table if exists ${table7}"""
+    sql """
+    CREATE TABLE ${table7} (
+        id INT,
+        name STRING,
+        value DOUBLE
+    )
+    PARTITION BY LIST (bucket(16, id)) ();
+    """
+    sql """INSERT INTO ${table7} VALUES (1, 'Alice', 100.0), (2, 'Bob', 
200.0)"""
+    
+    sql """ALTER TABLE ${table7} DROP PARTITION KEY bucket(16, id)"""
+    qt_drop_bucket_1 """DESC ${table7}"""
+    
+    sql """INSERT INTO ${table7} VALUES (3, 'Charlie', 300.0)"""
+    qt_drop_bucket_2 """SELECT * FROM ${table7} ORDER BY id"""
+
+    // Test 8: Multiple partition evolution operations
+    String table8 = "test_multiple_evolution"
+    sql """drop table if exists ${table8}"""
+    sql """
+    CREATE TABLE ${table8} (
+        id INT,
+        name STRING,
+        ts DATETIME,
+        category STRING
+    );
+    """
+    sql """INSERT INTO ${table8} VALUES (1, 'Alice', '2024-01-01 10:00:00', 
'A')"""
+    
+    // Add multiple partition fields
+    sql """ALTER TABLE ${table8} ADD PARTITION KEY day(ts)"""
+    sql """ALTER TABLE ${table8} ADD PARTITION KEY category"""
+    sql """ALTER TABLE ${table8} ADD PARTITION KEY bucket(8, id)"""
+    // Insert data after adding partition fields to see new partition info
+    sql """INSERT INTO ${table8} VALUES (2, 'Bob', '2024-02-01 11:00:00', 
'B')"""
+    qt_multiple_2 """SELECT * FROM ${table8} ORDER BY id"""
+    order_qt_partitions_multiple_add """SELECT `partition`, spec_id, 
record_count FROM ${table8}\$partitions ORDER BY `partition`"""
+    
+    // Drop some partition fields
+    sql """ALTER TABLE ${table8} DROP PARTITION KEY bucket(8, id)"""
+    sql """ALTER TABLE ${table8} DROP PARTITION KEY category"""
+    // Insert data after dropping partition fields to see new partition info
+    sql """INSERT INTO ${table8} VALUES (3, 'Charlie', '2024-03-01 12:00:00', 
'C')"""
+    qt_multiple_4 """SELECT * FROM ${table8} ORDER BY id"""
+    order_qt_partitions_multiple_after_drop """SELECT `partition`, spec_id, 
record_count FROM ${table8}\$partitions ORDER BY `partition`"""
+
+    // Test 9: Error cases - drop non-existent partition field
+    String table9 = "test_error_cases"
+    sql """drop table if exists ${table9}"""
+    sql """
+    CREATE TABLE ${table9} (
+        id INT,
+        name STRING
+    );
+    """
+    
+    test {
+        sql """ALTER TABLE ${table9} DROP PARTITION KEY bucket(16, id)"""
+        exception "Cannot find partition field to remove"
+    }
+
+    // Test 10: Error cases - invalid transform
+    test {
+        sql """ALTER TABLE ${table9} ADD PARTITION KEY invalid_transform(id)"""
+        exception "Unsupported partition transform"
+    }
+
+    // Test 11: Error cases - missing argument for bucket
+    test {
+        sql """ALTER TABLE ${table9} ADD PARTITION KEY bucket(id)"""
+        exception "Bucket transform requires a bucket count argument"
+    }
+
+    // Test 12: Error cases - not an Iceberg table
+    sql """create database if not exists internal.test_internal_table_db"""
+    sql """drop table if exists 
internal.test_internal_table_db.test_internal_table"""
+    sql """
+    CREATE TABLE internal.test_internal_table_db.test_internal_table (
+        id INT,
+        name STRING
+    ) DISTRIBUTED BY HASH(id) BUCKETS 1
+    PROPERTIES("replication_num" = "1");
+    """
+
+    test {
+        sql """ALTER TABLE internal.test_internal_table_db.test_internal_table 
ADD PARTITION KEY id"""
+        exception "ADD PARTITION KEY is only supported for Iceberg tables"
+    }
+
+    // Test 13: Add partition field with AS key name and drop by key name
+    String table13 = "test_add_partition_key_with_alias"
+    sql """drop table if exists ${table13}"""
+    sql """
+    CREATE TABLE ${table13} (
+        id INT,
+        ts DATETIME
+    );
+    """
+    sql """INSERT INTO ${table13} VALUES (1, '2024-01-01 08:00:00'), (2, 
'2024-02-02 09:00:00')"""
+
+    sql """ALTER TABLE ${table13} ADD PARTITION KEY year(ts) AS ts_year"""
+    sql """INSERT INTO ${table13} VALUES (3, '2025-03-03 10:00:00')"""
+    qt_add_partition_key_alias """SELECT * FROM ${table13} ORDER BY id"""
+    order_qt_add_partition_key_alias_partitions """SELECT `partition`, 
spec_id, record_count FROM ${table13}\$partitions ORDER BY `partition`"""
+
+    // drop by custom key name
+    sql """ALTER TABLE ${table13} DROP PARTITION KEY ts_year"""
+    sql """INSERT INTO ${table13} VALUES (4, '2026-04-04 11:00:00')"""
+    qt_drop_partition_key_alias """SELECT * FROM ${table13} ORDER BY id"""
+
+    // Test 14: Replace partition field with/without AS key name
+    String table14 = "test_replace_partition_field"
+    sql """drop table if exists ${table14}"""
+    sql """
+    CREATE TABLE ${table14} (
+        id INT,
+        ts DATETIME
+    );
+    """
+    sql """INSERT INTO ${table14} VALUES (1, '2024-01-01 00:00:00'), (2, 
'2024-02-01 00:00:00')"""
+
+    sql """ALTER TABLE ${table14} ADD PARTITION KEY day(ts) AS ts_day"""
+    sql """INSERT INTO ${table14} VALUES (3, '2024-01-05 00:00:00')"""
+
+    // Replace without AS (key name becomes default transform name)
+    sql """ALTER TABLE ${table14} REPLACE PARTITION KEY ts_day WITH 
month(ts)"""
+    sql """INSERT INTO ${table14} VALUES (4, '2024-03-15 00:00:00')"""
+
+    // Replace with AS to specify new name explicitly
+    sql """ALTER TABLE ${table14} REPLACE PARTITION KEY ts_month WITH year(ts) 
AS ts_year"""
+    sql """INSERT INTO ${table14} VALUES (5, '2025-04-20 00:00:00')"""
+
+    qt_replace_partition_key_data """SELECT * FROM ${table14} ORDER BY id"""
+    order_qt_replace_partition_key_partitions """SELECT `partition`, spec_id, 
record_count FROM ${table14}\$partitions ORDER BY `partition`"""
+}
diff --git 
a/regression-test/suites/external_table_p0/iceberg/test_iceberg_partition_evolution_query_write.groovy
 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_partition_evolution_query_write.groovy
new file mode 100644
index 00000000000..5f9754f0442
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/test_iceberg_partition_evolution_query_write.groovy
@@ -0,0 +1,245 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_partition_evolution_query_write", 
"p0,external,doris,external_docker,external_docker_doris") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "test_iceberg_partition_evolution_query_write"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    logger.info("catalog " + catalog_name + " created")
+    sql """switch ${catalog_name};"""
+    logger.info("switched to catalog " + catalog_name)
+
+    sql """drop database if exists test_partition_evolution_query_write_db 
force"""
+    sql """create database test_partition_evolution_query_write_db"""
+    sql """use test_partition_evolution_query_write_db;"""
+
+    sql """set enable_fallback_to_original_planner=false;"""
+
+    // Scenario 1: Start as non-partition table -> ADD partition key -> 
query/write -> DROP back to non partition
+    String addTable = "test_query_after_add_partition"
+    sql """drop table if exists ${addTable}"""
+    sql """
+    CREATE TABLE ${addTable} (
+        id INT,
+        name STRING,
+        ts DATETIME
+    );
+    """
+    sql """INSERT INTO ${addTable} VALUES
+        (1, 'Alice', '2024-01-01 08:00:00'),
+        (2, 'Bob', '2024-02-02 09:00:00')"""
+    qt_add_partition_before """SELECT * FROM ${addTable} ORDER BY id"""
+
+    sql """ALTER TABLE ${addTable} ADD PARTITION KEY year(ts) AS ts_year"""
+    sql """INSERT INTO ${addTable} VALUES
+        (3, 'Charlie', '2025-03-03 10:00:00'),
+        (4, 'David', '2026-04-04 11:00:00')"""
+
+    qt_add_partition_after """SELECT * FROM ${addTable} ORDER BY id"""
+    order_qt_add_partition_after_partitions """
+        SELECT `partition`, spec_id, record_count
+        FROM ${addTable}\$partitions ORDER BY `partition`
+    """
+    // drop partition key to turn table back to non-partitioned
+    sql """ALTER TABLE ${addTable} DROP PARTITION KEY ts_year"""
+    sql """INSERT INTO ${addTable} VALUES
+        (5, 'Eve', '2027-05-05 12:00:00'),
+        (6, 'Frank', '2028-06-06 13:00:00')"""
+    qt_add_partition_after_drop """SELECT * FROM ${addTable} ORDER BY id"""
+
+    // Scenario 2: Table starts partitioned -> DROP to become non-partitioned 
-> query/write
+    String dropTable = "test_query_after_drop_partition"
+    sql """drop table if exists ${dropTable}"""
+    sql """
+    CREATE TABLE ${dropTable} (
+        id INT,
+        name STRING,
+        ts DATETIME
+    ) PARTITION BY LIST (DAY(ts)) ();
+    """
+    sql """INSERT INTO ${dropTable} VALUES
+        (1, 'Alice', '2024-01-01 08:00:00'),
+        (2, 'Bob', '2024-01-02 09:00:00')"""
+    qt_drop_partition_before """SELECT * FROM ${dropTable} ORDER BY id"""
+
+    sql """ALTER TABLE ${dropTable} DROP PARTITION KEY day(ts)"""
+    sql """INSERT INTO ${dropTable} VALUES
+        (3, 'Charlie', '2024-01-03 10:00:00'),
+        (4, 'David', '2024-01-04 11:00:00')"""
+    qt_drop_partition_after """SELECT * FROM ${dropTable} ORDER BY id"""
+
+    // Scenario 3: Multiple ADDs -> REPLACE -> DROP sequence
+    String multiTable = "test_multi_add_replace_drop"
+    sql """drop table if exists ${multiTable}"""
+    sql """
+    CREATE TABLE ${multiTable} (
+        id INT,
+        name STRING,
+        ts DATETIME
+    );
+    """
+    sql """INSERT INTO ${multiTable} VALUES
+        (1, 'Alice', '2024-01-01 00:00:00'),
+        (2, 'Bob', '2024-02-01 00:00:00')"""
+    qt_multi_partition_before """SELECT * FROM ${multiTable} ORDER BY id"""
+
+    sql """ALTER TABLE ${multiTable} ADD PARTITION KEY day(ts) AS ts_day"""
+    sql """INSERT INTO ${multiTable} VALUES (3, 'Charlie', '2024-01-05 
00:00:00')"""
+    sql """ALTER TABLE ${multiTable} ADD PARTITION KEY bucket(8, id) AS 
shard"""
+    sql """INSERT INTO ${multiTable} VALUES (4, 'David', '2024-01-06 
00:00:00')"""
+    qt_multi_partition_after_adds """SELECT * FROM ${multiTable} ORDER BY id"""
+    order_qt_multi_partition_after_adds_partitions """
+        SELECT `partition`, spec_id, record_count
+        FROM ${multiTable}\$partitions ORDER BY `partition`
+    """
+
+    sql """ALTER TABLE ${multiTable} REPLACE PARTITION KEY ts_day WITH 
month(ts) AS ts_month"""
+    sql """INSERT INTO ${multiTable} VALUES (5, 'Eve', '2024-03-01 
00:00:00')"""
+    qt_multi_partition_after_replace """SELECT * FROM ${multiTable} ORDER BY 
id"""
+    order_qt_multi_partition_after_replace_partitions """
+        SELECT `partition`, spec_id, record_count
+        FROM ${multiTable}\$partitions ORDER BY `partition`
+    """
+
+    sql """ALTER TABLE ${multiTable} DROP PARTITION KEY shard"""
+    sql """ALTER TABLE ${multiTable} DROP PARTITION KEY ts_month"""
+    sql """INSERT INTO ${multiTable} VALUES (6, 'Frank', '2024-04-01 
00:00:00')"""
+    qt_multi_partition_after_drop_all """SELECT * FROM ${multiTable} ORDER BY 
id"""
+
+    // Scenario 4: String column identity partition
+    String stringTable = "test_string_partition"
+    sql """drop table if exists ${stringTable}"""
+    sql """
+    CREATE TABLE ${stringTable} (
+        sku STRING,
+        descr STRING,
+        price DOUBLE
+    );
+    """
+    sql """INSERT INTO ${stringTable} VALUES
+        ('A001', 'apple', 1.1),
+        ('B002', 'banana', 2.2)"""
+    qt_string_partition_before """SELECT * FROM ${stringTable} ORDER BY sku"""
+
+    sql """ALTER TABLE ${stringTable} ADD PARTITION KEY sku"""
+    sql """INSERT INTO ${stringTable} VALUES
+        ('C003', 'candy', 3.3),
+        ('D004', 'durian', 4.4)"""
+    qt_string_partition_after_add """SELECT * FROM ${stringTable} ORDER BY 
sku"""
+    order_qt_string_partition_after_add """
+        SELECT `partition`, spec_id, record_count
+        FROM ${stringTable}\$partitions ORDER BY `partition`
+    """
+
+    sql """ALTER TABLE ${stringTable} DROP PARTITION KEY sku"""
+    sql """INSERT INTO ${stringTable} VALUES ('E005', 'espresso', 5.5)"""
+    qt_string_partition_after_drop """SELECT * FROM ${stringTable} ORDER BY 
sku"""
+
+    // Scenario 5: DATE column using year/month transforms
+    String dateTable = "test_date_partition_chain"
+    sql """drop table if exists ${dateTable}"""
+    sql """
+    CREATE TABLE ${dateTable} (
+        order_id INT,
+        order_date DATE,
+        amount DECIMAL(10,2)
+    );
+    """
+    sql """INSERT INTO ${dateTable} VALUES
+        (1, '2023-12-31', 10.00),
+        (2, '2024-01-01', 20.00)"""
+    qt_date_partition_before """SELECT * FROM ${dateTable} ORDER BY order_id"""
+
+    sql """ALTER TABLE ${dateTable} ADD PARTITION KEY year(order_date) AS 
order_year"""
+    sql """ALTER TABLE ${dateTable} ADD PARTITION KEY month(order_date)"""
+    sql """INSERT INTO ${dateTable} VALUES
+        (3, '2024-02-15', 30.00),
+        (4, '2025-03-20', 40.00)"""
+    qt_date_partition_after_add """SELECT * FROM ${dateTable} ORDER BY 
order_id"""
+    order_qt_date_partition_after_add """
+        SELECT `partition`, spec_id, record_count
+        FROM ${dateTable}\$partitions ORDER BY `partition`
+    """
+
+    sql """ALTER TABLE ${dateTable} REPLACE PARTITION KEY month(order_date) 
WITH day(order_date) AS order_day"""
+    sql """INSERT INTO ${dateTable} VALUES (5, '2025-03-21', 50.00)"""
+    qt_date_partition_after_replace """SELECT * FROM ${dateTable} ORDER BY 
order_id"""
+    order_qt_date_partition_after_replace """
+        SELECT `partition`, spec_id, record_count
+        FROM ${dateTable}\$partitions ORDER BY `partition`
+    """
+
+    // Scenario 6: Numeric column bucket/truncate transforms
+    String numericTable = "test_numeric_partition"
+    sql """drop table if exists ${numericTable}"""
+    sql """
+    CREATE TABLE ${numericTable} (
+        item_id INT,
+        category_id BIGINT,
+        stock INT
+    );
+    """
+    sql """INSERT INTO ${numericTable} VALUES
+        (1, 10001, 10),
+        (2, 10002, 20)"""
+    qt_numeric_partition_before """SELECT * FROM ${numericTable} ORDER BY 
item_id"""
+
+    sql """ALTER TABLE ${numericTable} ADD PARTITION KEY bucket(4, item_id) AS 
item_bucket"""
+    sql """ALTER TABLE ${numericTable} ADD PARTITION KEY truncate(3, 
category_id)"""
+    sql """INSERT INTO ${numericTable} VALUES
+        (3, 10003, 30),
+        (4, 20004, 40)"""
+    qt_numeric_partition_after_add """SELECT * FROM ${numericTable} ORDER BY 
item_id"""
+    order_qt_numeric_partition_after_add """
+        SELECT `partition`, spec_id, record_count
+        FROM ${numericTable}\$partitions ORDER BY `partition`
+    """
+
+    sql """ALTER TABLE ${numericTable} DROP PARTITION KEY item_bucket"""
+    sql """INSERT INTO ${numericTable} VALUES (5, 30005, 50)"""
+    qt_numeric_partition_after_drop """SELECT * FROM ${numericTable} ORDER BY 
item_id"""
+
+    sql """drop table if exists ${addTable}"""
+    sql """drop table if exists ${dropTable}"""
+    sql """drop table if exists ${multiTable}"""
+    sql """drop table if exists ${stringTable}"""
+    sql """drop table if exists ${dateTable}"""
+    sql """drop table if exists ${numericTable}"""
+
+    sql """ drop catalog if exists ${catalog_name} """
+}
+
diff --git 
a/regression-test/suites/mtmv_p0/test_iceberg_mtmv_with_partition_evolution.groovy
 
b/regression-test/suites/mtmv_p0/test_iceberg_mtmv_with_partition_evolution.groovy
new file mode 100644
index 00000000000..69395e48433
--- /dev/null
+++ 
b/regression-test/suites/mtmv_p0/test_iceberg_mtmv_with_partition_evolution.groovy
@@ -0,0 +1,170 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("test_iceberg_mtmv_with_partition_evolution", 
"p0,external,iceberg,external_docker,external_docker_hive") {
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String catalog_name = "iceberg_mtmv_catalog_evolution"
+    String dbName = "regression_test_mtmv_partition_evolution_p0"
+    String icebergDb = "iceberg_mtmv_partition_evolution"
+
+    sql """drop catalog if exists ${catalog_name} """
+    sql """CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """switch internal"""
+    sql """drop database if exists ${dbName}"""
+    sql """create database if not exists ${dbName}"""
+    sql """use ${dbName}"""
+
+    sql """drop database if exists ${catalog_name}.${icebergDb} force"""
+    sql """create database ${catalog_name}.${icebergDb}"""
+
+    def assertRefreshFailed = { mvName, expectedError ->
+        Thread.sleep(2000)
+        String showTasks = "select 
TaskId,JobId,JobName,MvId,Status,MvName,MvDatabaseName,ErrorMsg " +
+                "from tasks('type'='mv') where MvDatabaseName = '${dbName}' 
and MvName = '${mvName}' order by CreateTime DESC"
+        String status = "NULL"
+        List<List<Object>> result = []
+        long startTime = System.currentTimeMillis()
+        long timeoutTimestamp = startTime + 5 * 60 * 1000
+        while (timeoutTimestamp > System.currentTimeMillis()
+                && (status == 'PENDING' || status == 'RUNNING' || status == 
'NULL')) {
+            result = sql(showTasks)
+            if (result.isEmpty()) {
+                Thread.sleep(1000)
+                continue
+            }
+            status = result.get(0).get(4).toString()
+            Thread.sleep(1000)
+        }
+        logger.info("MTMV ${mvName} refresh result: " + result?.getAt(0))
+        assertTrue(status == "FAILED", "Task should fail but got status: " + 
status)
+        String errorMsg = result.get(0).get(7).toString()
+        logger.info("MTMV ${mvName} error message: " + errorMsg)
+        assertTrue(errorMsg.contains(expectedError),
+                "Error message should contain '${expectedError}' but got: " + 
errorMsg)
+    }
+
+    // Test partition evolution cases
+    // Test 1: Switch from day to year on same column - should fail (any 
partition evolution should fail)
+    String evolutionTable1 = "evolution_day_to_year"
+    String mvEvolution1 = "mv_evolution_day_to_year"
+    sql """drop table if exists 
${catalog_name}.${icebergDb}.${evolutionTable1}"""
+    sql """drop materialized view if exists ${mvEvolution1}"""
+
+    sql """
+        CREATE TABLE ${catalog_name}.${icebergDb}.${evolutionTable1} (
+          ts DATETIME,
+          value INT)
+        ENGINE=iceberg
+        PARTITION BY LIST (DAY(ts)) ();
+    """
+    sql """insert into ${catalog_name}.${icebergDb}.${evolutionTable1} values 
('2024-01-15 10:00:00', 1), ('2024-02-20 11:00:00', 2)"""
+    sql """CREATE MATERIALIZED VIEW ${mvEvolution1} BUILD DEFERRED REFRESH 
AUTO ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2  PROPERTIES 
('replication_num' = '1') as SELECT * FROM 
${catalog_name}.${icebergDb}.${evolutionTable1}"""
+    sql """REFRESH MATERIALIZED VIEW ${mvEvolution1} complete"""
+    waitingMTMVTaskFinishedByMvName(mvEvolution1, dbName)
+    qt_evolution_day_to_year_initial "select * from ${mvEvolution1} order by 
value"
+
+    // Evolve partition from day to year on same column
+    sql """ALTER TABLE ${catalog_name}.${icebergDb}.${evolutionTable1} DROP 
PARTITION KEY day(ts)"""
+    sql """ALTER TABLE ${catalog_name}.${icebergDb}.${evolutionTable1} ADD 
PARTITION KEY year(ts)"""
+    sql """insert into ${catalog_name}.${icebergDb}.${evolutionTable1} values 
('2024-03-10 12:00:00', 3)"""
+
+    sql """REFRESH MATERIALIZED VIEW ${mvEvolution1} complete"""
+    assertRefreshFailed(mvEvolution1, "is not a valid pct table anymore")
+
+    sql """drop materialized view if exists ${mvEvolution1}"""
+    sql """drop table if exists 
${catalog_name}.${icebergDb}.${evolutionTable1}"""
+
+    // Test 2: Change partition column from c1 to c2 - should fail
+    String evolutionTable2 = "evolution_change_column"
+    String mvEvolution2 = "mv_evolution_change_column"
+    sql """drop table if exists 
${catalog_name}.${icebergDb}.${evolutionTable2}"""
+    sql """drop materialized view if exists ${mvEvolution2}"""
+
+    sql """
+        CREATE TABLE ${catalog_name}.${icebergDb}.${evolutionTable2} (
+          c1 DATETIME,
+          c2 DATETIME,
+          value INT)
+        ENGINE=iceberg
+        PARTITION BY LIST (YEAR(c1)) ();
+    """
+    sql """insert into ${catalog_name}.${icebergDb}.${evolutionTable2} values 
('2024-01-15 10:00:00', '2024-01-16 10:00:00', 1), ('2024-02-20 11:00:00', 
'2024-02-21 11:00:00', 2)"""
+    sql """CREATE MATERIALIZED VIEW ${mvEvolution2} BUILD DEFERRED REFRESH 
AUTO ON MANUAL partition by(`c1`) DISTRIBUTED BY RANDOM BUCKETS 2  PROPERTIES 
('replication_num' = '1') as SELECT * FROM 
${catalog_name}.${icebergDb}.${evolutionTable2}"""
+    sql """REFRESH MATERIALIZED VIEW ${mvEvolution2} complete"""
+    waitingMTMVTaskFinishedByMvName(mvEvolution2, dbName)
+    qt_evolution_change_column_initial "select * from ${mvEvolution2} order by 
value"
+
+    // Evolve partition from c1 to c2 - this should make the table invalid
+    sql """ALTER TABLE ${catalog_name}.${icebergDb}.${evolutionTable2} DROP 
PARTITION KEY year(c1)"""
+    sql """ALTER TABLE ${catalog_name}.${icebergDb}.${evolutionTable2} ADD 
PARTITION KEY year(c2)"""
+    sql """insert into ${catalog_name}.${icebergDb}.${evolutionTable2} values 
('2024-03-10 12:00:00', '2024-03-11 12:00:00', 3)"""
+
+    sql """REFRESH MATERIALIZED VIEW ${mvEvolution2} complete"""
+    assertRefreshFailed(mvEvolution2, "is not a valid pct table anymore")
+
+    sql """drop materialized view if exists ${mvEvolution2}"""
+    sql """drop table if exists 
${catalog_name}.${icebergDb}.${evolutionTable2}"""
+
+    // Test 3: Switch from year to identity - should fail
+    String evolutionTable3 = "evolution_year_to_identity"
+    String mvEvolution3 = "mv_evolution_year_to_identity"
+    sql """drop table if exists 
${catalog_name}.${icebergDb}.${evolutionTable3}"""
+    sql """drop materialized view if exists ${mvEvolution3}"""
+
+    sql """
+        CREATE TABLE ${catalog_name}.${icebergDb}.${evolutionTable3} (
+          ts DATETIME,
+          value INT)
+        ENGINE=iceberg
+        PARTITION BY LIST (YEAR(ts)) ();
+    """
+    sql """insert into ${catalog_name}.${icebergDb}.${evolutionTable3} values 
('2024-01-15 10:00:00', 1), ('2024-02-20 11:00:00', 2)"""
+    sql """CREATE MATERIALIZED VIEW ${mvEvolution3} BUILD DEFERRED REFRESH 
AUTO ON MANUAL partition by(`ts`) DISTRIBUTED BY RANDOM BUCKETS 2  PROPERTIES 
('replication_num' = '1') as SELECT * FROM 
${catalog_name}.${icebergDb}.${evolutionTable3}"""
+    sql """REFRESH MATERIALIZED VIEW ${mvEvolution3} complete"""
+    waitingMTMVTaskFinishedByMvName(mvEvolution3, dbName)
+    qt_evolution_year_to_identity_initial "select * from ${mvEvolution3} order 
by value"
+
+    // Evolve partition from year to identity - this should make the table 
invalid
+    sql """ALTER TABLE ${catalog_name}.${icebergDb}.${evolutionTable3} DROP 
PARTITION KEY year(ts)"""
+    sql """ALTER TABLE ${catalog_name}.${icebergDb}.${evolutionTable3} ADD 
PARTITION KEY ts"""
+
+    sql """REFRESH MATERIALIZED VIEW ${mvEvolution3} complete"""
+    assertRefreshFailed(mvEvolution3, "is not a valid pct table anymore")
+
+    sql """drop materialized view if exists ${mvEvolution3}"""
+    sql """drop table if exists 
${catalog_name}.${icebergDb}.${evolutionTable3}"""
+
+    sql """ drop catalog if exists ${catalog_name} """
+}
+


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to