This is an automated email from the ASF dual-hosted git repository.

morningman pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/doris.git


The following commit(s) were added to refs/heads/master by this push:
     new 57fcb6ba431 [feat](iceberg) support schema change for iceberg (#52591)
57fcb6ba431 is described below

commit 57fcb6ba431de1bd5420d397d35d83b6dc63180f
Author: Socrates <[email protected]>
AuthorDate: Sat Jul 19 11:57:35 2025 +0800

    [feat](iceberg) support schema change for iceberg (#52591)
    
    ### What problem does this PR solve?
    Support schema change ddl for iceberg table:
    - Adding, deleting, and renaming top-level columns (not support nested
    columns)
    - Reordering top-level columns (not support nested columns)
    - Widening the type of int, float, and decimal fields
---
 .../main/java/org/apache/doris/alter/Alter.java    |  31 +-
 .../org/apache/doris/analysis/AddColumnClause.java |   6 +-
 .../apache/doris/analysis/ModifyColumnClause.java  |   6 +-
 .../org/apache/doris/catalog/RefreshManager.java   |   8 +-
 .../org/apache/doris/datasource/CatalogIf.java     |  34 ++
 .../org/apache/doris/datasource/CatalogMgr.java    |   6 +
 .../apache/doris/datasource/ExternalCatalog.java   | 135 +++++++
 .../apache/doris/datasource/ExternalObjectLog.java |  13 +
 .../doris/datasource/hive/HiveMetadataOps.java     |   3 +-
 .../datasource/iceberg/IcebergMetadataOps.java     | 193 +++++++++
 .../doris/datasource/iceberg/IcebergUtils.java     |  73 ++++
 .../datasource/operations/ExternalMetadataOps.java |  97 +++++
 .../trees/plans/commands/AlterTableCommand.java    |   2 +
 .../trees/plans/commands/info/AddColumnOp.java     |   6 +-
 .../trees/plans/commands/info/AddColumnsOp.java    |   6 +-
 .../trees/plans/commands/info/DropColumnOp.java    |   6 +-
 .../trees/plans/commands/info/ModifyColumnOp.java  |   6 +-
 .../plans/commands/info/ReorderColumnsOp.java      |   6 +-
 .../iceberg/iceberg_schema_change_ddl.out          | Bin 0 -> 8460 bytes
 .../iceberg_schema_change_ddl_with_branch.out      | Bin 0 -> 6618 bytes
 .../iceberg/iceberg_schema_change_ddl.groovy       | 441 +++++++++++++++++++++
 .../iceberg_schema_change_ddl_with_branch.groovy   | 290 ++++++++++++++
 22 files changed, 1350 insertions(+), 18 deletions(-)

diff --git a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java 
b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
index 28a33d5c425..9e5fc8230ee 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/alter/Alter.java
@@ -17,6 +17,8 @@
 
 package org.apache.doris.alter;
 
+import org.apache.doris.analysis.AddColumnClause;
+import org.apache.doris.analysis.AddColumnsClause;
 import org.apache.doris.analysis.AddPartitionClause;
 import org.apache.doris.analysis.AddPartitionLikeClause;
 import org.apache.doris.analysis.AlterClause;
@@ -28,9 +30,11 @@ import org.apache.doris.analysis.CreateMaterializedViewStmt;
 import org.apache.doris.analysis.CreateOrReplaceBranchClause;
 import org.apache.doris.analysis.CreateOrReplaceTagClause;
 import org.apache.doris.analysis.DropBranchClause;
+import org.apache.doris.analysis.DropColumnClause;
 import org.apache.doris.analysis.DropPartitionClause;
 import org.apache.doris.analysis.DropPartitionFromIndexClause;
 import org.apache.doris.analysis.DropTagClause;
+import org.apache.doris.analysis.ModifyColumnClause;
 import org.apache.doris.analysis.ModifyColumnCommentClause;
 import org.apache.doris.analysis.ModifyDistributionClause;
 import org.apache.doris.analysis.ModifyEngineClause;
@@ -38,6 +42,7 @@ import org.apache.doris.analysis.ModifyPartitionClause;
 import org.apache.doris.analysis.ModifyTableCommentClause;
 import org.apache.doris.analysis.ModifyTablePropertiesClause;
 import org.apache.doris.analysis.PartitionRenameClause;
+import org.apache.doris.analysis.ReorderColumnsClause;
 import org.apache.doris.analysis.ReplacePartitionClause;
 import org.apache.doris.analysis.ReplaceTableClause;
 import org.apache.doris.analysis.RollupRenameClause;
@@ -406,6 +411,29 @@ public class Alter {
                 table.getCatalog().dropTag(
                         table,
                         ((DropTagClause) alterClause).getDropTagInfo());
+            } else if (alterClause instanceof TableRenameClause) {
+                TableRenameClause tableRename = (TableRenameClause) 
alterClause;
+                table.getCatalog().renameTable(
+                        table.getDbName(), table.getName(), 
tableRename.getNewTableName());
+            } else if (alterClause instanceof AddColumnClause) {
+                AddColumnClause addColumn = (AddColumnClause) alterClause;
+                table.getCatalog().addColumn(table, addColumn.getColumn(), 
addColumn.getColPos());
+            } else if (alterClause instanceof AddColumnsClause) {
+                AddColumnsClause addColumns = (AddColumnsClause) alterClause;
+                table.getCatalog().addColumns(table, addColumns.getColumns());
+            } else if (alterClause instanceof DropColumnClause) {
+                DropColumnClause dropColumn = (DropColumnClause) alterClause;
+                table.getCatalog().dropColumn(table, dropColumn.getColName());
+            } else if (alterClause instanceof ColumnRenameClause) {
+                ColumnRenameClause columnRename = (ColumnRenameClause) 
alterClause;
+                table.getCatalog().renameColumn(
+                        table, columnRename.getColName(), 
columnRename.getNewColName());
+            } else if (alterClause instanceof ModifyColumnClause) {
+                ModifyColumnClause modifyColumn = (ModifyColumnClause) 
alterClause;
+                table.getCatalog().modifyColumn(table, 
modifyColumn.getColumn(), modifyColumn.getColPos());
+            } else if (alterClause instanceof ReorderColumnsClause) {
+                ReorderColumnsClause reorderColumns = (ReorderColumnsClause) 
alterClause;
+                table.getCatalog().reorderColumns(table, 
reorderColumns.getColumnsByPos());
             } else {
                 throw new UserException("Invalid alter operations for external 
table: " + alterClauses);
             }
@@ -680,7 +708,7 @@ public class Alter {
         String dbName = dbTableName.getDb();
         String tableName = dbTableName.getTbl();
         DatabaseIf dbIf = Env.getCurrentEnv().getCatalogMgr()
-                .getCatalogOrException(ctlName, catalog -> new 
DdlException("Unknown catalog " + catalog))
+                .getCatalogOrDdlException(ctlName)
                 .getDbOrDdlException(dbName);
         TableIf tableIf = dbIf.getTableOrDdlException(tableName);
         List<AlterClause> alterClauses = Lists.newArrayList();
@@ -1183,7 +1211,6 @@ public class Alter {
                      .status == Tablet.TabletStatus.HEALTHY;
     }
 
-
     private Map<Tag, List<Replica>> getReplicasWithTag(Tablet tablet) {
         return tablet.getReplicas().stream()
                 .collect(Collectors.groupingBy(replica -> 
Env.getCurrentSystemInfo()
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/AddColumnClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/AddColumnClause.java
index f7ae9083502..48b629bfa92 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/AddColumnClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/AddColumnClause.java
@@ -22,7 +22,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.ErrorCode;
@@ -86,7 +86,9 @@ public class AddColumnClause extends AlterTableClause {
             throw new AnalysisException("No column definition in add column 
clause.");
         }
         if (tableName != null) {
-            Table table = 
Env.getCurrentInternalCatalog().getDbOrDdlException(tableName.getDb())
+            TableIf table = Env.getCurrentEnv().getCatalogMgr()
+                    .getCatalogOrDdlException(tableName.getCtl())
+                    .getDbOrDdlException(tableName.getDb())
                     .getTableOrDdlException(tableName.getTbl());
             if (table instanceof OlapTable && ((OlapTable) 
table).getKeysType() == KeysType.AGG_KEYS
                     && columnDef.getAggregateType() == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyColumnClause.java 
b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyColumnClause.java
index 170c81796b8..18a6909eb19 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyColumnClause.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/analysis/ModifyColumnClause.java
@@ -22,7 +22,7 @@ import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.DdlException;
 
@@ -84,7 +84,9 @@ public class ModifyColumnClause extends AlterTableClause {
             throw new AnalysisException("No column definition in modify column 
clause.");
         }
         if (tableName != null) {
-            Table table = 
Env.getCurrentInternalCatalog().getDbOrDdlException(tableName.getDb())
+            TableIf table = Env.getCurrentEnv().getCatalogMgr()
+                    .getCatalogOrDdlException(tableName.getCtl())
+                    .getDbOrDdlException(tableName.getDb())
                     .getTableOrDdlException(tableName.getTbl());
             if (table instanceof OlapTable && ((OlapTable) 
table).getKeysType() == KeysType.AGG_KEYS
                     && columnDef.getAggregateType() == null) {
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java 
b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
index 685a1baee9c..65d05e30920 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/catalog/RefreshManager.java
@@ -179,7 +179,13 @@ public class RefreshManager {
             LOG.warn("failed to find table when replaying refresh table: {}", 
log.debugForRefreshTable());
             return;
         }
-        refreshTableInternal(db.get(), table.get(), log.getLastUpdateTime());
+        if (!Strings.isNullOrEmpty(log.getNewTableName())) {
+            // this is a rename table op
+            db.get().unregisterTable(log.getTableName());
+            db.get().resetMetaCacheNames();
+        } else {
+            refreshTableInternal(db.get(), table.get(), 
log.getLastUpdateTime());
+        }
     }
 
     public void refreshExternalTableFromEvent(String catalogName, String 
dbName, String tableName,
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
index 895c0331eae..50765a750f7 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogIf.java
@@ -17,9 +17,11 @@
 
 package org.apache.doris.datasource;
 
+import org.apache.doris.analysis.ColumnPosition;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.TableIf;
@@ -198,6 +200,10 @@ public interface CatalogIf<T extends DatabaseIf> {
     void dropTable(String dbName, String tableName, boolean isView, boolean 
isMtmv, boolean ifExists,
                    boolean force) throws DdlException;
 
+    default void renameTable(String dbName, String oldTableName, String 
newTableName) throws DdlException {
+        throw new UnsupportedOperationException("Not support rename table 
operation");
+    }
+
     void truncateTable(String dbName, String tableName, PartitionNames 
partitionNames, boolean forceDrop,
             String rawTruncateSql)
             throws DdlException;
@@ -234,4 +240,32 @@ public interface CatalogIf<T extends DatabaseIf> {
     default void dropTag(TableIf dorisTable, DropTagInfo tagInfo) throws 
UserException {
         throw new UserException("Not support drop tag operation");
     }
+
+    // schema change operations:
+    // - Adding, deleting, modify and renaming columns
+    // - Reordering top-level columns fields
+
+    default void addColumn(TableIf table, Column column, ColumnPosition 
columnPosition) throws UserException {
+        throw new UserException("Not support add column operation");
+    }
+
+    default void addColumns(TableIf table, List<Column> columns) throws 
UserException {
+        throw new UserException("Not support add columns operation");
+    }
+
+    default void dropColumn(TableIf table, String name) throws UserException {
+        throw new UserException("Not support drop column operation");
+    }
+
+    default void renameColumn(TableIf table, String oldName, String newName) 
throws UserException {
+        throw new UserException("Not support rename column operation");
+    }
+
+    default void modifyColumn(TableIf table, Column column, ColumnPosition 
columnPosition) throws UserException {
+        throw new UserException("Not support update column operation");
+    }
+
+    default void reorderColumns(TableIf table, List<String> newOrder) throws 
UserException {
+        throw new UserException("Not support reorder columns operation");
+    }
 }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
index fb23573cd24..bf4d7e4b5e1 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/CatalogMgr.java
@@ -185,6 +185,12 @@ public class CatalogMgr implements Writable, 
GsonPostProcessable {
         return catalog;
     }
 
+    public CatalogIf getCatalogOrDdlException(String name) throws DdlException 
{
+        return getCatalogOrException(name,
+                catalog -> new 
DdlException(ErrorCode.ERR_UNKNOWN_CATALOG.formatErrorMsg(catalog),
+                        ErrorCode.ERR_UNKNOWN_CATALOG));
+    }
+
     public CatalogIf getCatalogOrAnalysisException(String name) throws 
AnalysisException {
         return getCatalogOrException(name,
                 catalog -> new 
AnalysisException(ErrorCode.ERR_UNKNOWN_CATALOG.formatErrorMsg(catalog),
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
index b3687bd6380..f64f15de1da 100644
--- a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
+++ b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalCatalog.java
@@ -17,9 +17,11 @@
 
 package org.apache.doris.datasource;
 
+import org.apache.doris.analysis.ColumnPosition;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.analysis.PartitionNames;
 import org.apache.doris.analysis.TableName;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.DatabaseIf;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.InfoSchemaDb;
@@ -1132,6 +1134,23 @@ public abstract class ExternalCatalog
         }
     }
 
+    @Override
+    public void renameTable(String dbName, String oldTableName, String 
newTableName) throws DdlException {
+        makeSureInitialized();
+        if (metadataOps == null) {
+            throw new DdlException("Rename table is not supported for catalog: 
" + getName());
+        }
+        try {
+            metadataOps.renameTable(dbName, oldTableName, newTableName);
+            Env.getCurrentEnv().getEditLog()
+                    .logRefreshExternalTable(
+                            ExternalObjectLog.createForRenameTable(getId(), 
dbName, oldTableName, newTableName));
+        } catch (Exception e) {
+            LOG.warn("Failed to rename table {} in database {}.", 
oldTableName, dbName, e);
+            throw e;
+        }
+    }
+
     @Override
     public void dropTable(String dbName, String tableName, boolean isView, 
boolean isMtmv, boolean ifExists,
                           boolean force) throws DdlException {
@@ -1472,5 +1491,121 @@ public abstract class ExternalCatalog
             resetToUninitialized(true);
         }
     }
+
+    // log the refresh external table operation
+    private void logRefreshExternalTable(ExternalTable dorisTable) {
+        Env.getCurrentEnv().getEditLog()
+                .logRefreshExternalTable(
+                        
ExternalObjectLog.createForRefreshTable(dorisTable.getCatalog().getId(),
+                                dorisTable.getDbName(), dorisTable.getName()));
+    }
+
+    @Override
+    public void addColumn(TableIf dorisTable, Column column, ColumnPosition 
position) throws UserException {
+        makeSureInitialized();
+        Preconditions.checkState(dorisTable instanceof ExternalTable, 
dorisTable.getName());
+        ExternalTable externalTable = (ExternalTable) dorisTable;
+        if (metadataOps == null) {
+            throw new DdlException("Add column operation is not supported for 
catalog: " + getName());
+        }
+        try {
+            metadataOps.addColumn(externalTable, column, position);
+            logRefreshExternalTable(externalTable);
+        } catch (Exception e) {
+            LOG.warn("Failed to add column {} to table {}.{} in catalog {}",
+                    column.getName(), externalTable.getDbName(), 
externalTable.getName(), getName(), e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void addColumns(TableIf dorisTable, List<Column> columns) throws 
UserException {
+        makeSureInitialized();
+        Preconditions.checkState(dorisTable instanceof ExternalTable, 
dorisTable.getName());
+        ExternalTable externalTable = (ExternalTable) dorisTable;
+        if (metadataOps == null) {
+            throw new DdlException("Add columns operation is not supported for 
catalog: " + getName());
+        }
+        try {
+            metadataOps.addColumns(externalTable, columns);
+            logRefreshExternalTable(externalTable);
+        } catch (Exception e) {
+            LOG.warn("Failed to add columns to table {}.{} in catalog {}",
+                    externalTable.getDbName(), externalTable.getName(), 
getName(), e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void dropColumn(TableIf dorisTable, String columnName) throws 
UserException {
+        makeSureInitialized();
+        Preconditions.checkState(dorisTable instanceof ExternalTable, 
dorisTable.getName());
+        ExternalTable externalTable = (ExternalTable) dorisTable;
+        if (metadataOps == null) {
+            throw new DdlException("Drop column operation is not supported for 
catalog: " + getName());
+        }
+        try {
+            metadataOps.dropColumn(externalTable, columnName);
+            logRefreshExternalTable(externalTable);
+        } catch (Exception e) {
+            LOG.warn("Failed to drop column {} from table {}.{} in catalog {}",
+                    columnName, externalTable.getDbName(), 
externalTable.getName(), getName(), e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void renameColumn(TableIf dorisTable, String oldName, String 
newName) throws UserException {
+        makeSureInitialized();
+        Preconditions.checkState(dorisTable instanceof ExternalTable, 
dorisTable.getName());
+        ExternalTable externalTable = (ExternalTable) dorisTable;
+        if (metadataOps == null) {
+            throw new DdlException("Rename column operation is not supported 
for catalog: " + getName());
+        }
+        try {
+            metadataOps.renameColumn(externalTable, oldName, newName);
+            logRefreshExternalTable(externalTable);
+        } catch (Exception e) {
+            LOG.warn("Failed to rename column {} to {} in table {}.{} in 
catalog {}",
+                    oldName, newName, externalTable.getDbName(), 
externalTable.getName(), getName(), e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void modifyColumn(TableIf dorisTable, Column column, ColumnPosition 
columnPosition) throws UserException {
+        makeSureInitialized();
+        Preconditions.checkState(dorisTable instanceof ExternalTable, 
dorisTable.getName());
+        ExternalTable externalTable = (ExternalTable) dorisTable;
+        if (metadataOps == null) {
+            throw new DdlException("Modify column operation is not supported 
for catalog: " + getName());
+        }
+        try {
+            metadataOps.modifyColumn(externalTable, column, columnPosition);
+            logRefreshExternalTable(externalTable);
+        } catch (Exception e) {
+            LOG.warn("Failed to modify column {} in table {}.{} in catalog {}",
+                    column.getName(), externalTable.getDbName(), 
externalTable.getName(), getName(), e);
+            throw e;
+        }
+    }
+
+    @Override
+    public void reorderColumns(TableIf dorisTable, List<String> newOrder) 
throws UserException {
+        makeSureInitialized();
+        Preconditions.checkState(dorisTable instanceof ExternalTable, 
dorisTable.getName());
+        ExternalTable externalTable = (ExternalTable) dorisTable;
+        if (metadataOps == null) {
+            throw new DdlException("Reorder columns operation is not supported 
for catalog: " + getName());
+        }
+        try {
+            metadataOps.reorderColumns(externalTable, newOrder);
+            logRefreshExternalTable(externalTable);
+        } catch (Exception e) {
+            LOG.warn("Failed to reorder columns in table {}.{} in catalog {}",
+                    externalTable.getDbName(), externalTable.getName(), 
getName(), e);
+            throw e;
+        }
+    }
 }
 
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
index 411d40c07d2..cf7872f8b39 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/ExternalObjectLog.java
@@ -49,6 +49,9 @@ public class ExternalObjectLog implements Writable {
     @SerializedName(value = "tableName")
     private String tableName;
 
+    @SerializedName(value = "ntn")
+    private String newTableName; // for rename table op
+
     @SerializedName(value = "invalidCache")
     private boolean invalidCache;
 
@@ -77,6 +80,16 @@ public class ExternalObjectLog implements Writable {
         return externalObjectLog;
     }
 
+    public static ExternalObjectLog createForRenameTable(long catalogId, 
String dbName, String tblName,
+            String newTblName) {
+        ExternalObjectLog externalObjectLog = new ExternalObjectLog();
+        externalObjectLog.setCatalogId(catalogId);
+        externalObjectLog.setDbName(dbName);
+        externalObjectLog.setTableName(tblName);
+        externalObjectLog.setNewTableName(newTblName);
+        return externalObjectLog;
+    }
+
     @Override
     public void write(DataOutput out) throws IOException {
         Text.writeString(out, GsonUtils.GSON.toJson(this));
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
index 43c838aea77..040e68f86a7 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/hive/HiveMetadataOps.java
@@ -58,10 +58,11 @@ import java.util.Set;
 import java.util.function.Function;
 
 public class HiveMetadataOps implements ExternalMetadataOps {
+    private static final Logger LOG = 
LogManager.getLogger(HiveMetadataOps.class);
+
     public static final String LOCATION_URI_KEY = "location";
     public static final String FILE_FORMAT_KEY = "file_format";
     public static final Set<String> DORIS_HIVE_KEYS = 
ImmutableSet.of(FILE_FORMAT_KEY, LOCATION_URI_KEY);
-    private static final Logger LOG = 
LogManager.getLogger(HiveMetadataOps.class);
     private static final int MIN_CLIENT_POOL_SIZE = 8;
     private final HMSCachedClient client;
     private final HMSExternalCatalog catalog;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
index 956b2795703..9927168ff7a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergMetadataOps.java
@@ -17,6 +17,7 @@
 
 package org.apache.doris.datasource.iceberg;
 
+import org.apache.doris.analysis.ColumnPosition;
 import org.apache.doris.analysis.CreateTableStmt;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
@@ -46,12 +47,15 @@ import org.apache.iceberg.Schema;
 import org.apache.iceberg.Snapshot;
 import org.apache.iceberg.SnapshotRef;
 import org.apache.iceberg.Table;
+import org.apache.iceberg.UpdateSchema;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
 import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.catalog.ViewCatalog;
+import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.types.Type;
+import org.apache.iceberg.types.Types.NestedField;
 import org.apache.iceberg.view.View;
 import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
@@ -356,6 +360,30 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         catalog.dropTable(getTableIdentifier(remoteDbName, remoteTblName), 
true);
     }
 
+    public void renameTableImpl(String dbName, String tblName, String 
newTblName) throws DdlException {
+        try {
+            preExecutionAuthenticator.execute(() -> {
+                catalog.renameTable(getTableIdentifier(dbName, tblName), 
getTableIdentifier(dbName, newTblName));
+                return null;
+            });
+        } catch (Exception e) {
+            throw new DdlException(
+                    "Failed to rename table: " + tblName + " to " + newTblName 
+ ", error message is:" + e.getMessage(),
+                    e);
+        }
+    }
+
+    @Override
+    public void afterRenameTable(String dbName, String oldName, String 
newName) {
+        Optional<ExternalDatabase<?>> db = dorisCatalog.getDbForReplay(dbName);
+        if (db.isPresent()) {
+            db.get().unregisterTable(oldName);
+            db.get().resetMetaCacheNames();
+        }
+        LOG.info("after rename table {}.{}.{} to {}, is db exists: {}",
+                dorisCatalog.getName(), dbName, oldName, newName, 
db.isPresent());
+    }
+
     @Override
     public void truncateTableImpl(ExternalTable dorisTable, List<String> 
partitions) {
         throw new UnsupportedOperationException("Truncate Iceberg table is not 
supported.");
@@ -512,6 +540,171 @@ public class IcebergMetadataOps implements 
ExternalMetadataOps {
         }
     }
 
+    private void addOneColumn(UpdateSchema updateSchema, Column column) throws 
UserException {
+        if (!column.isAllowNull()) {
+            throw new UserException("can't add a non-nullable column to an 
Iceberg table");
+        }
+        org.apache.iceberg.types.Type dorisType = 
IcebergUtils.dorisTypeToIcebergType(column.getType());
+        Literal<?> defaultValue = 
IcebergUtils.parseIcebergLiteral(column.getDefaultValue(), dorisType);
+        updateSchema.addColumn(column.getName(), dorisType, 
column.getComment(), defaultValue);
+    }
+
+    private void applyPosition(UpdateSchema updateSchema, ColumnPosition 
position, String columnName) {
+        if (position.isFirst()) {
+            updateSchema.moveFirst(columnName);
+        } else {
+            updateSchema.moveAfter(columnName, position.getLastCol());
+        }
+    }
+
+    private void refreshTable(ExternalTable dorisTable) {
+        Optional<ExternalDatabase<?>> db = 
dorisCatalog.getDbForReplay(dorisTable.getRemoteDbName());
+        if (db.isPresent()) {
+            Optional<?> tbl = 
db.get().getTableForReplay(dorisTable.getRemoteName());
+            if (tbl.isPresent()) {
+                Env.getCurrentEnv().getRefreshManager()
+                        .refreshTableInternal(db.get(), (ExternalTable) 
tbl.get(), System.currentTimeMillis());
+            }
+        }
+    }
+
+    @Override
+    public void addColumn(ExternalTable dorisTable, Column column, 
ColumnPosition position)
+            throws UserException {
+        validateCommonColumnInfo(column);
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+        UpdateSchema updateSchema = icebergTable.updateSchema();
+        addOneColumn(updateSchema, column);
+        if (position != null) {
+            applyPosition(updateSchema, position, column.getName());
+        }
+        try {
+            preExecutionAuthenticator.execute(() -> updateSchema.commit());
+        } catch (Exception e) {
+            throw new UserException("Failed to add column: " + 
column.getName() + " to table: "
+                    + icebergTable.name() + ", error message is: " + 
e.getMessage(), e);
+        }
+        refreshTable(dorisTable);
+    }
+
+    @Override
+    public void addColumns(ExternalTable dorisTable, List<Column> columns) 
throws UserException {
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+        UpdateSchema updateSchema = icebergTable.updateSchema();
+        for (Column column : columns) {
+            validateCommonColumnInfo(column);
+            addOneColumn(updateSchema, column);
+        }
+        try {
+            preExecutionAuthenticator.execute(() -> updateSchema.commit());
+        } catch (Exception e) {
+            throw new UserException("Failed to add columns to table: " + 
icebergTable.name()
+                    + ", error message is: " + e.getMessage(), e);
+        }
+        refreshTable(dorisTable);
+    }
+
+    @Override
+    public void dropColumn(ExternalTable dorisTable, String columnName) throws 
UserException {
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+        UpdateSchema updateSchema = icebergTable.updateSchema();
+        updateSchema.deleteColumn(columnName);
+        try {
+            preExecutionAuthenticator.execute(() -> updateSchema.commit());
+        } catch (Exception e) {
+            throw new UserException("Failed to drop column: " + columnName + " 
from table: "
+                    + icebergTable.name() + ", error message is: " + 
e.getMessage(), e);
+        }
+        refreshTable(dorisTable);
+    }
+
+    @Override
+    public void renameColumn(ExternalTable dorisTable, String oldName, String 
newName) throws UserException {
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+        UpdateSchema updateSchema = icebergTable.updateSchema();
+        updateSchema.renameColumn(oldName, newName);
+        try {
+            preExecutionAuthenticator.execute(() -> updateSchema.commit());
+        } catch (Exception e) {
+            throw new UserException("Failed to rename column: " + oldName + " 
to " + newName
+                    + " in table: " + icebergTable.name() + ", error message 
is: " + e.getMessage(), e);
+        }
+        refreshTable(dorisTable);
+    }
+
+    @Override
+    public void modifyColumn(ExternalTable dorisTable, Column column, 
ColumnPosition position)
+            throws UserException {
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+        validateForModifyColumn(column, icebergTable);
+        Type icebergType = 
IcebergUtils.dorisTypeToIcebergType(column.getType());
+        UpdateSchema updateSchema = icebergTable.updateSchema();
+        updateSchema.updateColumn(column.getName(), 
icebergType.asPrimitiveType(), column.getComment());
+        if (column.isAllowNull()) {
+            // we can change a required column to optional, but not the other 
way around
+            // because we don't know whether there is existing data with null 
values.
+            updateSchema.makeColumnOptional(column.getName());
+        }
+        if (position != null) {
+            applyPosition(updateSchema, position, column.getName());
+        }
+        try {
+            preExecutionAuthenticator.execute(() -> updateSchema.commit());
+        } catch (Exception e) {
+            throw new UserException("Failed to modify column: " + 
column.getName() + " in table: "
+                    + icebergTable.name() + ", error message is: " + 
e.getMessage(), e);
+        }
+        refreshTable(dorisTable);
+    }
+
+    private void validateForModifyColumn(Column column, Table icebergTable) 
throws UserException {
+        validateCommonColumnInfo(column);
+        // check complex type
+        if (column.getType().isComplexType()) {
+            throw new UserException("Modify column type to non-primitive type 
is not supported: " + column.getType());
+        }
+        // check exist
+        NestedField currentCol = 
icebergTable.schema().findField(column.getName());
+        if (currentCol == null) {
+            throw new UserException("Column " + column.getName() + " does not 
exist");
+        }
+        // check nullable
+        if (currentCol.isOptional() && !column.isAllowNull()) {
+            throw new UserException("Can not change nullable column " + 
column.getName() + " to not null");
+        }
+    }
+
+    private void validateCommonColumnInfo(Column column) throws UserException {
+        // check aggregation method
+        if (column.isAggregated()) {
+            throw new UserException("Can not specify aggregation method for 
iceberg table column");
+        }
+        // check auto inc
+        if (column.isAutoInc()) {
+            throw new UserException("Can not specify auto incremental iceberg 
table column");
+        }
+    }
+
+    @Override
+    public void reorderColumns(ExternalTable dorisTable, List<String> 
newOrder) throws UserException {
+        if (newOrder == null || newOrder.isEmpty()) {
+            throw new UserException("Reorder column failed, new order is 
empty.");
+        }
+        Table icebergTable = IcebergUtils.getIcebergTable(dorisTable);
+        UpdateSchema updateSchema = icebergTable.updateSchema();
+        updateSchema.moveFirst(newOrder.get(0));
+        for (int i = 1; i < newOrder.size(); i++) {
+            updateSchema.moveAfter(newOrder.get(i), newOrder.get(i - 1));
+        }
+        try {
+            preExecutionAuthenticator.execute(() -> updateSchema.commit());
+        } catch (Exception e) {
+            throw new UserException("Failed to reorder columns in table: " + 
icebergTable.name()
+                    + ", error message is: " + e.getMessage(), e);
+        }
+        refreshTable(dorisTable);
+    }
+
     public PreExecutionAuthenticator getPreExecutionAuthenticator() {
         return preExecutionAuthenticator;
     }
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
index b030ab61f6e..e27530a6f69 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/iceberg/IcebergUtils.java
@@ -90,6 +90,7 @@ import org.apache.iceberg.TableProperties;
 import org.apache.iceberg.expressions.And;
 import org.apache.iceberg.expressions.Expression;
 import org.apache.iceberg.expressions.Expressions;
+import org.apache.iceberg.expressions.Literal;
 import org.apache.iceberg.expressions.ManifestEvaluator;
 import org.apache.iceberg.expressions.Not;
 import org.apache.iceberg.expressions.Or;
@@ -108,6 +109,8 @@ import org.apache.logging.log4j.LogManager;
 import org.apache.logging.log4j.Logger;
 
 import java.io.IOException;
+import java.math.BigDecimal;
+import java.nio.ByteBuffer;
 import java.time.DateTimeException;
 import java.time.Instant;
 import java.time.LocalDateTime;
@@ -121,6 +124,7 @@ import java.util.Locale;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import java.util.UUID;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -604,6 +608,75 @@ public class IcebergUtils {
                 .getIcebergMetadataCache().getIcebergTable(dorisTable);
     }
 
+    public static org.apache.iceberg.types.Type dorisTypeToIcebergType(Type 
type) {
+        DorisTypeToIcebergType visitor = type.isStructType() ? new 
DorisTypeToIcebergType((StructType) type)
+                : new DorisTypeToIcebergType();
+        return DorisTypeToIcebergType.visit(type, visitor);
+    }
+
+    public static Literal<?> parseIcebergLiteral(String value, 
org.apache.iceberg.types.Type type) {
+        if (value == null) {
+            return null;
+        }
+        switch (type.typeId()) {
+            case BOOLEAN:
+                try {
+                    return Literal.of(Boolean.parseBoolean(value));
+                } catch (IllegalArgumentException e) {
+                    throw new IllegalArgumentException("Invalid Boolean 
string: " + value, e);
+                }
+            case INTEGER:
+            case DATE:
+                try {
+                    return Literal.of(Integer.parseInt(value));
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid Int string: " 
+ value, e);
+                }
+            case LONG:
+            case TIME:
+            case TIMESTAMP:
+            case TIMESTAMP_NANO:
+                try {
+                    return Literal.of(Long.parseLong(value));
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid Long string: " 
+ value, e);
+                }
+            case FLOAT:
+                try {
+                    return Literal.of(Float.parseFloat(value));
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid Float string: 
" + value, e);
+                }
+            case DOUBLE:
+                try {
+                    return Literal.of(Double.parseDouble(value));
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid Double string: 
" + value, e);
+                }
+            case STRING:
+                return Literal.of(value);
+            case UUID:
+                try {
+                    return Literal.of(UUID.fromString(value));
+                } catch (IllegalArgumentException e) {
+                    throw new IllegalArgumentException("Invalid UUID string: " 
+ value, e);
+                }
+            case FIXED:
+            case BINARY:
+            case GEOMETRY:
+            case GEOGRAPHY:
+                return Literal.of(ByteBuffer.wrap(value.getBytes()));
+            case DECIMAL:
+                try {
+                    return Literal.of(new BigDecimal(value));
+                } catch (NumberFormatException e) {
+                    throw new IllegalArgumentException("Invalid Decimal 
string: " + value, e);
+                }
+            default:
+                throw new IllegalArgumentException("Cannot parse unknown type: 
" + type);
+        }
+    }
+
     private static void updateIcebergColumnUniqueId(Column column, 
Types.NestedField icebergField) {
         column.setUniqueId(icebergField.fieldId());
         List<NestedField> icebergFields = Lists.newArrayList();
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
index 5a31011f142..15d3348e11a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/datasource/operations/ExternalMetadataOps.java
@@ -17,7 +17,9 @@
 
 package org.apache.doris.datasource.operations;
 
+import org.apache.doris.analysis.ColumnPosition;
 import org.apache.doris.analysis.CreateTableStmt;
+import org.apache.doris.catalog.Column;
 import org.apache.doris.common.DdlException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.datasource.ExternalTable;
@@ -113,6 +115,26 @@ public interface ExternalMetadataOps {
     default void afterDropTable(String dbName, String tblName) {
     }
 
+    /**
+     * rename table in external metastore
+     * @param dbName
+     * @param oldName
+     * @param newName
+     * @throws DdlException
+     */
+    default void renameTable(String dbName, String oldName, String newName) 
throws DdlException {
+        renameTableImpl(dbName, oldName, newName);
+        afterRenameTable(dbName, oldName, newName);
+    }
+
+    default void renameTableImpl(String dbName, String oldName, String 
newName) throws DdlException {
+        throw new UnsupportedOperationException("Rename table operation is not 
supported for this table type.");
+    }
+
+    default void afterRenameTable(String dbName, String oldName, String 
newName) {
+        throw new UnsupportedOperationException("After rename table operation 
is not supported for this table type.");
+    }
+
     /**
      * truncate table in external metastore
      *
@@ -194,6 +216,81 @@ public interface ExternalMetadataOps {
 
     void dropBranchImpl(ExternalTable dorisTable, DropBranchInfo branchInfo) 
throws UserException;
 
+    /**
+     * add column for external table
+     *
+     * @param dorisTable
+     * @param column
+     * @param position
+     * @throws UserException
+     */
+    default void addColumn(ExternalTable dorisTable, Column column, 
ColumnPosition position)
+            throws UserException {
+        throw new UnsupportedOperationException("Add column operation is not 
supported for this table type.");
+    }
+
+    /**
+     * add columns for external table
+     *
+     * @param dorisTable
+     * @param columns
+     * @throws UserException
+     */
+    default void addColumns(ExternalTable dorisTable, List<Column> columns)
+            throws UserException {
+        throw new UnsupportedOperationException("Add columns operation is not 
supported for this table type.");
+    }
+
+    /**
+     * drop column for external table
+     *
+     * @param dorisTable
+     * @param columnName
+     * @throws UserException
+     */
+    default void dropColumn(ExternalTable dorisTable, String columnName)
+            throws UserException {
+        throw new UnsupportedOperationException("Drop column operation is not 
supported for this table type.");
+    }
+
+    /**
+     * rename column for external table
+     *
+     * @param dorisTable
+     * @param oldName
+     * @param newName
+     * @throws UserException
+     */
+    default void renameColumn(ExternalTable dorisTable, String oldName, String 
newName)
+            throws UserException {
+        throw new UnsupportedOperationException("Rename column operation is 
not supported for this table type.");
+    }
+
+    /**
+     * update column for external table
+     *
+     * @param dorisTable
+     * @param column
+     * @param position
+     * @throws UserException
+     */
+    default void modifyColumn(ExternalTable dorisTable, Column column, 
ColumnPosition position)
+            throws UserException {
+        throw new UnsupportedOperationException("Modify column operation is 
not supported for this table type.");
+    }
+
+    /**
+     * reorder columns for external table
+     *
+     * @param dorisTable
+     * @param newOrder
+     * @throws UserException
+     */
+    default void reorderColumns(ExternalTable dorisTable, List<String> 
newOrder)
+            throws UserException {
+        throw new UnsupportedOperationException("Reorder columns operation is 
not supported for this table type.");
+    }
+
     /**
      *
      * @return
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommand.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommand.java
index 6af1974e7dd..38d2d6ba307 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommand.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/AlterTableCommand.java
@@ -53,6 +53,7 @@ import 
org.apache.doris.nereids.trees.plans.commands.info.EnableFeatureOp;
 import org.apache.doris.nereids.trees.plans.commands.info.ModifyColumnOp;
 import org.apache.doris.nereids.trees.plans.commands.info.ModifyEngineOp;
 import 
org.apache.doris.nereids.trees.plans.commands.info.ModifyTablePropertiesOp;
+import org.apache.doris.nereids.trees.plans.commands.info.RenameColumnOp;
 import org.apache.doris.nereids.trees.plans.commands.info.RenameTableOp;
 import org.apache.doris.nereids.trees.plans.commands.info.ReorderColumnsOp;
 import org.apache.doris.nereids.trees.plans.commands.info.TableNameInfo;
@@ -243,6 +244,7 @@ public class AlterTableCommand extends Command implements 
ForwardWithSync {
                     || alterClause instanceof AddColumnOp
                     || alterClause instanceof AddColumnsOp
                     || alterClause instanceof DropColumnOp
+                    || alterClause instanceof RenameColumnOp
                     || alterClause instanceof ModifyColumnOp
                     || alterClause instanceof ReorderColumnsOp
                     || alterClause instanceof ModifyEngineOp
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnOp.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnOp.java
index 7c3831e3cd0..39e1d309c3a 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnOp.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnOp.java
@@ -27,7 +27,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
@@ -137,7 +137,9 @@ public class AddColumnOp extends AlterTableOp {
         boolean isEnableMergeOnWrite = false;
         KeysType keysType = KeysType.DUP_KEYS;
         Set<String> clusterKeySet = 
Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
-        Table table = 
Env.getCurrentInternalCatalog().getDbOrDdlException(tableName.getDb())
+        TableIf table = Env.getCurrentEnv().getCatalogMgr()
+                .getCatalogOrDdlException(tableName.getCtl())
+                .getDbOrDdlException(tableName.getDb())
                 .getTableOrDdlException(tableName.getTbl());
         if (table instanceof OlapTable) {
             isOlap = true;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnsOp.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnsOp.java
index b064fb3d29f..f04a9338aa9 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnsOp.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/AddColumnsOp.java
@@ -23,7 +23,7 @@ import org.apache.doris.analysis.AlterTableClause;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.qe.ConnectContext;
@@ -77,7 +77,9 @@ public class AddColumnsOp extends AlterTableOp {
             AddColumnOp.validateColumnDef(tableName, colDef, null, rollupName);
         }
 
-        Table table = 
Env.getCurrentInternalCatalog().getDbOrDdlException(tableName.getDb())
+        TableIf table = Env.getCurrentEnv().getCatalogMgr()
+                .getCatalogOrDdlException(tableName.getCtl())
+                .getDbOrDdlException(tableName.getDb())
                 .getTableOrDdlException(tableName.getTbl());
         if (table instanceof OlapTable) {
             boolean seeValueColumn = false;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DropColumnOp.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DropColumnOp.java
index cdc36ececad..de3b01c7865 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DropColumnOp.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/DropColumnOp.java
@@ -25,7 +25,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.ErrorCode;
 import org.apache.doris.common.ErrorReport;
@@ -73,7 +73,9 @@ public class DropColumnOp extends AlterTableOp {
                     colName, FeNameFormat.getColumnNameRegex());
         }
 
-        Table table = 
Env.getCurrentInternalCatalog().getDbOrDdlException(tableName.getDb())
+        TableIf table = Env.getCurrentEnv().getCatalogMgr()
+                .getCatalogOrDdlException(tableName.getCtl())
+                .getDbOrDdlException(tableName.getDb())
                 .getTableOrDdlException(tableName.getTbl());
         if (table instanceof OlapTable) {
             OlapTable olapTable = (OlapTable) table;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyColumnOp.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyColumnOp.java
index f1afdcf8564..73cf98c19cf 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyColumnOp.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ModifyColumnOp.java
@@ -26,7 +26,7 @@ import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.KeysType;
 import org.apache.doris.catalog.MaterializedIndexMeta;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.nereids.util.RelationUtil;
@@ -87,7 +87,9 @@ public class ModifyColumnOp extends AlterTableOp {
         KeysType keysType = KeysType.DUP_KEYS;
         Set<String> clusterKeySet = 
Sets.newTreeSet(String.CASE_INSENSITIVE_ORDER);
         Column originalColumn = null;
-        Table table = 
Env.getCurrentInternalCatalog().getDbOrDdlException(tableName.getDb())
+        TableIf table = Env.getCurrentEnv().getCatalogMgr()
+                .getCatalogOrDdlException(tableName.getCtl())
+                .getDbOrDdlException(tableName.getDb())
                 .getTableOrDdlException(tableName.getTbl());
         OlapTable olapTable = null;
         List<Column> schemaColumns = null;
diff --git 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ReorderColumnsOp.java
 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ReorderColumnsOp.java
index a920a972d86..f674f5a3a68 100644
--- 
a/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ReorderColumnsOp.java
+++ 
b/fe/fe-core/src/main/java/org/apache/doris/nereids/trees/plans/commands/info/ReorderColumnsOp.java
@@ -23,7 +23,7 @@ import org.apache.doris.analysis.ReorderColumnsClause;
 import org.apache.doris.catalog.Column;
 import org.apache.doris.catalog.Env;
 import org.apache.doris.catalog.OlapTable;
-import org.apache.doris.catalog.Table;
+import org.apache.doris.catalog.TableIf;
 import org.apache.doris.common.AnalysisException;
 import org.apache.doris.common.UserException;
 import org.apache.doris.qe.ConnectContext;
@@ -62,7 +62,9 @@ public class ReorderColumnsOp extends AlterTableOp {
             throw new AnalysisException("No column in reorder columns 
clause.");
         }
 
-        Table table = 
Env.getCurrentInternalCatalog().getDbOrDdlException(tableName.getDb())
+        TableIf table = Env.getCurrentEnv().getCatalogMgr()
+                .getCatalogOrDdlException(tableName.getCtl())
+                .getDbOrDdlException(tableName.getDb())
                 .getTableOrDdlException(tableName.getTbl());
         if (table instanceof OlapTable) {
             OlapTable olapTable = (OlapTable) table;
diff --git 
a/regression-test/data/external_table_p0/iceberg/iceberg_schema_change_ddl.out 
b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change_ddl.out
new file mode 100644
index 00000000000..2f68c298f37
Binary files /dev/null and 
b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change_ddl.out 
differ
diff --git 
a/regression-test/data/external_table_p0/iceberg/iceberg_schema_change_ddl_with_branch.out
 
b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change_ddl_with_branch.out
new file mode 100644
index 00000000000..7546005e2f2
Binary files /dev/null and 
b/regression-test/data/external_table_p0/iceberg/iceberg_schema_change_ddl_with_branch.out
 differ
diff --git 
a/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change_ddl.groovy
 
b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change_ddl.groovy
new file mode 100644
index 00000000000..bc6002500ea
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change_ddl.groovy
@@ -0,0 +1,441 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_schema_change_ddl", 
"p0,external,doris,external_docker,external_docker_doris") {
+
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "iceberg_schema_change_ddl"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """switch ${catalog_name};"""
+    sql """drop database if exists iceberg_schema_change_ddl_db force"""
+    sql """create database iceberg_schema_change_ddl_db"""
+    sql """ use iceberg_schema_change_ddl_db;""" 
+
+    sql """ set enable_fallback_to_original_planner=false; """
+    sql """set show_column_comment_in_describe=true;"""
+        
+    // Test table name
+    String table_name = "iceberg_ddl_test"
+    String partition_table_name = "iceberg_ddl_partition_test"
+        
+    // Clean up existing table if exists
+    sql """ drop table if exists ${table_name} """
+    sql """ drop table if exists ${partition_table_name} """
+        
+    // Test 1: Create initial Iceberg table with basic schema
+    sql """
+    CREATE TABLE ${table_name} (
+        id INT not null,
+        name STRING not null,
+        age INT not null,
+        score DOUBLE not null
+    );
+    """
+    
+    // Insert initial data
+    sql """
+    INSERT INTO ${table_name} VALUES 
+    (1, 'Alice', 25, 95.5),
+    (2, 'Bob', 30, 87.2),
+    (3, 'Charlie', 22, 92.8)
+    """
+        
+    // Verify initial state
+    qt_init_1 """ DESC ${table_name} """
+    qt_init_2 """ SELECT * FROM ${table_name} ORDER BY id """
+
+    // Test 2: ADD COLUMN - basic type
+    test {
+        sql """ ALTER TABLE iceberg_ddl_test ADD COLUMN email STRING not null 
DEFAULT 'N/A'; """
+        exception "can't add a non-nullable column to an Iceberg table"
+    }
+    sql """ ALTER TABLE ${table_name} ADD COLUMN email STRING """
+    // not support initial default value for new column
+    // This will throw an exception in Iceberg v2, but is allowed in v3
+    test {
+        sql """ ALTER TABLE ${table_name} ADD COLUMN phone STRING DEFAULT 
'N/A' COMMENT 'User phone number' """
+        exception "Invalid initial default for phone: non-null default (N/A) 
is not supported until v3"
+    }
+    // Add column with comment and set position after age
+    sql """ ALTER TABLE ${table_name} ADD COLUMN phone STRING COMMENT 'User 
phone number' after age """
+    // Verify schema after adding column
+    qt_add_1 """ DESC ${table_name} """
+    
+    // Insert data with new column (existing rows should have NULL for new 
column)
+    sql """ INSERT INTO ${table_name} VALUES (4, 'David', 28, '123-456-7890', 
89.1, '[email protected]') """
+    // Verify data with new column
+    qt_add_2 """ SELECT * FROM ${table_name} ORDER BY id """
+    qt_add_3 """ SELECT id, email FROM ${table_name} WHERE email IS NOT NULL 
ORDER BY id """
+    qt_add_4 """ SELECT id, email FROM ${table_name} WHERE email IS NULL ORDER 
BY id """
+        
+    // Test 3: ADD complex type column
+    sql """ ALTER TABLE ${table_name} ADD COLUMN address STRUCT<city: STRING, 
country: STRING> """
+        
+    qt_add_multi_1 """ DESC ${table_name} """
+    
+    // Insert data with all columns
+    sql """ INSERT INTO ${table_name} VALUES (5, 'Eve', 26, '223-345-132', 
91.3, '[email protected]', STRUCT('New York', 'USA'))  """
+    
+    qt_add_multi_2 """ SELECT * FROM ${table_name} ORDER BY id """
+    qt_add_multi_3 """ SELECT id, address FROM ${table_name} WHERE id = 5 
ORDER BY id """
+
+    // Invalid add column
+    test {
+        sql """ALTER TABLE ${table_name} ADD COLUMN invalid_col1 int sum"""
+        exception "Can not specify aggregation method for iceberg table column"
+    }
+    test {
+        sql """ALTER TABLE ${table_name} ADD COLUMN invalid_col1 int 
AUTO_INCREMENT"""
+        exception "Can not specify auto incremental iceberg table column"
+    }
+        
+    // Test 4: RENAME COLUMN
+    sql """ ALTER TABLE ${table_name} RENAME COLUMN score grade """
+        
+    // Verify column renamed
+    qt_rename_1 """ DESC ${table_name} """
+    qt_rename_2 """ SELECT id, grade FROM ${table_name} ORDER BY id """
+    qt_rename_3 """ SELECT * FROM ${table_name} WHERE grade > 90 ORDER BY id 
"""
+
+    // Test 5: DROP COLUMN
+    sql """ ALTER TABLE ${table_name} DROP COLUMN name """
+        
+    // Verify column dropped
+    qt_drop_1 """ DESC ${table_name} """
+    qt_drop_2 """ SELECT * FROM ${table_name} ORDER BY id """
+    qt_drop_3 """ SELECT id, age FROM ${table_name} WHERE age > 25 ORDER BY id 
"""
+
+    // Test 6: ADD COLUMNS with default values
+    sql """ ALTER TABLE ${table_name} ADD COLUMN (col1 FLOAT COMMENT 'User 
defined column1', col2 STRING COMMENT 'User defined column2') """
+
+    // Verify new columns with default values
+    qt_add_columns_1 """ DESC ${table_name} """
+    qt_add_columns_2 """ SELECT id, col1, col2 FROM ${table_name} ORDER BY id 
"""
+
+    // invalid add columns
+    test {
+        sql """ALTER TABLE ${table_name} ADD COLUMN (col3 FLOAT COMMENT 'User 
defined column1', col4 int SUM COMMENT 'User defined column2')"""
+        exception "Can not specify aggregation method for iceberg table column"
+    }
+
+    // Test 7: modify column type
+    sql """ ALTER TABLE ${table_name} MODIFY COLUMN age BIGINT """
+    // modify column type and add comment
+    sql """ ALTER TABLE ${table_name} MODIFY COLUMN col1 DOUBLE COMMENT 
'Updated column1 type' """
+    // can't update column type by losing precision
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN age int """
+        exception "Cannot change column type: age: long -> int"
+    }
+    // modify age to nullable and move to first position
+    sql """ ALTER TABLE ${table_name} MODIFY COLUMN age bigint NULL FIRST"""
+    sql """ insert into ${table_name} values (null, 6, '123-456-7890', 100.0, 
'[email protected]', STRUCT('Los Angeles', 'USA'), null, null)"""
+    // Verify column type changed
+    qt_modify_1 """ DESC ${table_name} """
+    qt_modify_2 """ SELECT id, age, col1 FROM ${table_name} ORDER BY id """
+    qt_modify_3 """ SELECT * FROM ${table_name} WHERE age > 25 ORDER BY id """
+    // modify age to not null, which is not allowed
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN age bigint not null"""
+        exception "Can not change nullable column age to not null"
+    }
+    // invalid modify
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN age bigint sum"""
+        exception "Can not specify aggregation method for iceberg table column"
+    }
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN age bigint 
AUTO_INCREMENT"""
+        exception "Can not specify auto incremental iceberg table column"
+    }
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN non_col bigint"""
+        exception "Column non_col does not exist"
+    }
+    // not comment, the comment will be removed
+    qt_before_no_comment "desc ${table_name}" 
+    sql """ ALTER TABLE ${table_name} MODIFY COLUMN col1 DOUBLE"""
+    qt_after_no_comment "desc ${table_name}"
+    sql """ ALTER TABLE ${table_name} MODIFY COLUMN col1 DOUBLE COMMENT 
'Updated column1 type'"""
+    qt_after_no_comment "desc ${table_name}"
+
+    // Test 7.1: More positive modify column type cases
+    // Add test columns for type conversion tests
+    sql """ ALTER TABLE ${table_name} ADD COLUMN test_float FLOAT """
+    sql """ ALTER TABLE ${table_name} ADD COLUMN test_decimal DECIMAL(5,2) """
+    sql """ INSERT INTO ${table_name} (id, test_float, test_decimal) VALUES 
(7, 3.14, 123.45) """
+    
+    // Positive case: float -> double
+    sql """ ALTER TABLE ${table_name} MODIFY COLUMN test_float DOUBLE """
+    
+    // Positive case: decimal precision expansion (scale unchanged)
+    sql """ ALTER TABLE ${table_name} MODIFY COLUMN test_decimal DECIMAL(10,2) 
"""
+    
+    // Verify positive type changes
+    qt_modify_positive_1 """ DESC ${table_name} """
+    qt_modify_positive_2 """ SELECT id, test_float, test_decimal FROM 
${table_name} WHERE id = 7 """
+
+    // Test 7.2: Negative modify column type cases (should all fail)
+    
+    // double -> float (precision loss)
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN col1 FLOAT """
+        exception "Cannot change column type"
+    }
+    
+    // bigint -> int (precision loss) - already tested above but added here 
for completeness
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN age INT """
+        exception "Cannot change column type"
+    }
+    
+    // string -> numeric types
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN email INT """
+        exception "Cannot change column type"
+    }
+    
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN email DOUBLE """
+        exception "Cannot change column type"
+    }
+    
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN email DECIMAL(10,2) """
+        exception "Cannot change column type"
+    }
+    
+    // numeric -> string
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN id STRING """
+        exception "Cannot change column type"
+    }
+    
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN test_float STRING """
+        exception "Cannot change column type"
+    }
+    
+    // decimal precision reduction
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN test_decimal 
DECIMAL(3,2) """
+        exception "Cannot change column type"
+    }
+    
+    // decimal scale change (even if precision increases)
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN test_decimal 
DECIMAL(15,3) """
+        exception "Cannot change column type"
+    }
+    
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN test_decimal 
DECIMAL(15,1) """
+        exception "Cannot change column type"
+    }
+    
+    // int -> float/double (may lose precision for large integers)
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN id FLOAT """
+        exception "Cannot change column type"
+    }
+    
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN id DOUBLE """
+        exception "Cannot change column type"
+    }
+    
+    // float/double -> decimal
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN test_float 
DECIMAL(10,2) """
+        exception "Cannot change column type"
+    }
+    
+    // decimal -> float/double
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN test_decimal FLOAT """
+        exception "Cannot change column type"
+    }
+    
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN test_decimal DOUBLE """
+        exception "Cannot change column type"
+    }
+    
+    // struct/complex type changes
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN address STRING """
+        exception "Cannot change column type"
+    }
+    
+    test {
+        sql """ ALTER TABLE ${table_name} MODIFY COLUMN email STRUCT<name: 
STRING> """
+        exception "Modify column type to non-primitive type is not supported"
+    }
+
+    // Test 8: reorder columns
+    sql """ ALTER TABLE ${table_name} ORDER BY (id, age, col1, col2, grade, 
phone, email, address) """
+    // Verify column order changed
+    qt_reorder_1 """ DESC ${table_name} """
+    qt_reorder_2 """ SELECT * FROM ${table_name} ORDER BY id """
+
+    // Test 9: Rename table
+    String renamed_table_name = "iceberg_ddl_test_renamed"
+    sql """ drop table if exists ${renamed_table_name} """
+    // before rename
+    qt_before_rename_show_tables_old """show tables like '${table_name}'"""
+    qt_before_rename_show_tables_new """show tables like 
'${renamed_table_name}'"""
+    sql """ ALTER TABLE ${table_name} RENAME ${renamed_table_name} """
+    qt_after_rename_show_tables_old """show tables like '${table_name}'"""
+    qt_after_rename_show_tables_new """show tables like 
'${renamed_table_name}'"""
+
+    // Verify table renamed
+    qt_rename_table_1 """ DESC ${renamed_table_name} """
+    qt_rename_table_2 """ SELECT * FROM ${renamed_table_name} ORDER BY id """
+    test {
+        sql """ select * from ${table_name} """
+        exception "Table [iceberg_ddl_test] does not exist in database 
[iceberg_schema_change_ddl_db]"
+    }
+    sql """ ALTER TABLE ${renamed_table_name} RENAME ${table_name} """
+    qt_rename_table_back_1 """ SELECT * FROM ${table_name} ORDER BY id """
+    
+    // Test partitioned table schema change
+    sql """ drop table if exists ${partition_table_name} """
+    sql """ CREATE TABLE ${partition_table_name} (
+        id INT,
+        name STRING,
+        age INT,
+        score DOUBLE
+    ) PARTITION BY LIST (name) ();"""
+    // Insert initial data
+    sql """ INSERT INTO ${partition_table_name} VALUES 
+    (1, 'Alice', 25, 95.5),
+    (2, 'Bob', 30, 87.2),
+    (3, 'Charlie', 22, 92.8) """;
+    // Verify initial state
+    qt_partition_init_1 """ DESC ${partition_table_name} """
+    qt_partition_init_2 """ SELECT * FROM ${partition_table_name} ORDER BY id 
"""
+
+    // can't drop partitioned column
+    test {
+        sql """ ALTER TABLE ${partition_table_name} DROP COLUMN name """
+        exception "Failed to drop column"
+    }
+
+    // add new columns to partitioned table
+    sql """ ALTER TABLE ${partition_table_name} ADD COLUMN email STRING """
+    sql """ ALTER TABLE ${partition_table_name} ADD COLUMN phone STRING 
COMMENT 'User phone number' """
+
+    // Verify schema after adding columns
+    qt_partition_add_1 """ DESC ${partition_table_name} """
+
+    // reorder columns in partitioned table
+    sql """ ALTER TABLE ${partition_table_name} ORDER BY (id, age, email, 
phone, score, name) """
+    // Verify column order changed
+    qt_partition_reorder_1 """ DESC ${partition_table_name} """
+    qt_partition_reorder_2 """ SELECT * FROM ${partition_table_name} ORDER BY 
id """
+
+    // Test: Modify partition column type (should fail)
+    // The 'name' column is the partition column for this table
+    
+    // Test modifying partition column type
+    // no change, still string
+    sql """ ALTER TABLE ${partition_table_name} MODIFY COLUMN name 
VARCHAR(100) """
+    qt_partition_alter "desc ${partition_table_name}"
+
+    // no change, still string
+    sql """ ALTER TABLE ${partition_table_name} MODIFY COLUMN name TEXT """
+    qt_partition_alter "desc ${partition_table_name}"
+    
+    test {
+        sql """ ALTER TABLE ${partition_table_name} MODIFY COLUMN name INT """
+        exception "Cannot change column type"
+    }
+    
+    sql """ ALTER TABLE ${partition_table_name} MODIFY COLUMN name STRING 
FIRST """
+    qt_reorder_partition_col """desc ${partition_table_name}"""
+    
+    // Test modifying partition column comment (might be allowed)
+    sql """ ALTER TABLE ${partition_table_name} MODIFY COLUMN name STRING 
COMMENT 'Updated partition column comment' """
+    qt_comment_partition_col """desc ${partition_table_name}"""
+    
+    // Create another partitioned table with integer partition column for more 
tests
+    String int_partition_table_name = "iceberg_ddl_int_partition_test"
+    sql """ drop table if exists ${int_partition_table_name} """
+    sql """ CREATE TABLE ${int_partition_table_name} (
+        id INT,
+        name STRING,
+        age INT,
+        department_id INT
+    ) PARTITION BY LIST (department_id) ();"""
+    
+    // Insert test data
+    sql """ INSERT INTO ${int_partition_table_name} VALUES 
+    (1, 'Alice', 25, 100),
+    (2, 'Bob', 30, 200),
+    (3, 'Charlie', 22, 100) """;
+    
+    // Test modifying integer partition column type
+    sql """ ALTER TABLE ${int_partition_table_name} MODIFY COLUMN 
department_id BIGINT """
+    qt_desc_partition_bigint "desc ${int_partition_table_name}"
+    sql """ INSERT INTO ${int_partition_table_name} VALUES 
+    (1, 'Alice', 25, 9223372036854775807),
+    (2, 'Bob', 30, 9223372036854775806),
+    (3, 'Charlie', 22, 9223372036854775805) """;
+    order_qt_select_partition_bigint "select * from 
${int_partition_table_name}"
+    
+    test {
+        sql """ ALTER TABLE ${int_partition_table_name} MODIFY COLUMN 
department_id STRING """
+        exception "Cannot change column type"
+    }
+    
+    test {
+        sql """ ALTER TABLE ${int_partition_table_name} MODIFY COLUMN 
department_id DOUBLE """
+        exception "Cannot change column type"
+    }
+    
+    // Verify that non-partition columns can still be modified in partitioned 
tables
+    sql """ ALTER TABLE ${int_partition_table_name} MODIFY COLUMN age BIGINT 
"""
+    sql """ ALTER TABLE ${int_partition_table_name} MODIFY COLUMN name 
VARCHAR(200) COMMENT 'Updated name column' """
+    
+    // Verify the changes
+    qt_partition_modify_1 """ DESC ${int_partition_table_name} """
+    order_qt_partition_modify_2 """ SELECT * FROM ${int_partition_table_name} 
ORDER BY id """
+    
+    // Clean up
+    sql """ drop table if exists ${int_partition_table_name} """
+}
+
diff --git 
a/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change_ddl_with_branch.groovy
 
b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change_ddl_with_branch.groovy
new file mode 100644
index 00000000000..484cca71d45
--- /dev/null
+++ 
b/regression-test/suites/external_table_p0/iceberg/iceberg_schema_change_ddl_with_branch.groovy
@@ -0,0 +1,290 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements.  See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership.  The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License.  You may obtain a copy of the License at
+//
+//   http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied.  See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+suite("iceberg_schema_change_ddl_with_branch", 
"p0,external,doris,external_docker,external_docker_doris") {
+
+    String enabled = context.config.otherConfigs.get("enableIcebergTest")
+    if (enabled == null || !enabled.equalsIgnoreCase("true")) {
+        logger.info("disable iceberg test.")
+        return
+    }
+
+
+    String rest_port = context.config.otherConfigs.get("iceberg_rest_uri_port")
+    String minio_port = context.config.otherConfigs.get("iceberg_minio_port")
+    String externalEnvIp = context.config.otherConfigs.get("externalEnvIp")
+    String catalog_name = "iceberg_schema_change_ddl"
+
+    sql """drop catalog if exists ${catalog_name}"""
+    sql """
+    CREATE CATALOG ${catalog_name} PROPERTIES (
+        'type'='iceberg',
+        'iceberg.catalog.type'='rest',
+        'uri' = 'http://${externalEnvIp}:${rest_port}',
+        "s3.access_key" = "admin",
+        "s3.secret_key" = "password",
+        "s3.endpoint" = "http://${externalEnvIp}:${minio_port}";,
+        "s3.region" = "us-east-1"
+    );"""
+
+    sql """switch ${catalog_name};"""
+    sql """ use test_db;""" 
+
+    sql """ set enable_fallback_to_original_planner=false; """
+
+    // 
==================================================================================
+    // Test schema change isolation between different branches and tags
+    // 
==================================================================================
+    
+    String branch_table_name = "iceberg_branch_schema_test"
+    sql """ drop table if exists ${branch_table_name} """
+    
+    // Create a test table for branch schema change testing
+    sql """
+    CREATE TABLE ${branch_table_name} (
+        id INT,
+        name STRING,
+        age INT,
+        score DOUBLE
+    );
+    """
+    
+    // Step 1: Insert initial data and create branch1 & tag1
+    sql """
+    INSERT INTO ${branch_table_name} VALUES 
+    (1, 'Alice', 25, 95.5),
+    (2, 'Bob', 30, 87.2),
+    (3, 'Charlie', 22, 92.8)
+    """
+    
+    // Verify initial state on main branch
+    qt_initial_state_1 """ DESC ${branch_table_name} """
+    qt_initial_state_2 """ SELECT * FROM ${branch_table_name} ORDER BY id """
+    
+    // Create branch1 and tag1 after initial data (4 columns: id, name, age, 
score)
+    sql """ ALTER TABLE ${branch_table_name} CREATE BRANCH branch1 """
+    sql """ ALTER TABLE ${branch_table_name} CREATE TAG tag1 """
+    
+    // Verify branch1 and tag1 have same schema and data as main at this point
+    qt_branch1_initial """ SELECT * FROM ${branch_table_name}@branch(branch1) 
ORDER BY id """
+    qt_tag1_initial """ SELECT * FROM ${branch_table_name}@tag(tag1) ORDER BY 
id """
+    
+    // Step 2: Add email column to main branch and insert more data
+    sql """ ALTER TABLE ${branch_table_name} ADD COLUMN email STRING """
+    sql """ INSERT INTO ${branch_table_name} VALUES (4, 'David', 28, 89.1, 
'[email protected]') """
+    
+    // Main branch now has 5 columns: id, name, age, score, email
+    qt_main_with_email_1 """ DESC ${branch_table_name} """
+    qt_main_with_email_2 """ SELECT * FROM ${branch_table_name} ORDER BY id """
+    
+    // Create branch2 and tag2 after adding email column
+    sql """ ALTER TABLE ${branch_table_name} CREATE BRANCH branch2 """
+    sql """ ALTER TABLE ${branch_table_name} CREATE TAG tag2 """
+    
+    // Verify branch2 and tag2 have email column
+    qt_branch2_with_email """ SELECT * FROM 
${branch_table_name}@branch(branch2) ORDER BY id """
+    qt_tag2_with_email """ SELECT * FROM ${branch_table_name}@tag(tag2) ORDER 
BY id """
+    
+    // Step 3: Drop age column from main branch and insert more data
+    sql """ ALTER TABLE ${branch_table_name} DROP COLUMN age """
+    sql """ INSERT INTO ${branch_table_name} VALUES (5, 'Eve', 91.3, 
'[email protected]') """
+    
+    // Main branch now has 4 columns: id, name, score, email (no age)
+    qt_main_no_age_1 """ DESC ${branch_table_name} """
+    qt_main_no_age_2 """ SELECT * FROM ${branch_table_name} ORDER BY id """
+    
+    // Create branch3 and tag3 after dropping age column
+    sql """ ALTER TABLE ${branch_table_name} CREATE BRANCH branch3 """
+    sql """ ALTER TABLE ${branch_table_name} CREATE TAG tag3 """
+    
+    // Verify branch3 and tag3 don't have age column
+    qt_branch3_no_age """ SELECT * FROM ${branch_table_name}@branch(branch3) 
ORDER BY id """
+    qt_tag3_no_age """ SELECT * FROM ${branch_table_name}@tag(tag3) ORDER BY 
id """
+    
+    // Step 4: Rename score to grade in main branch and insert more data
+    sql """ ALTER TABLE ${branch_table_name} RENAME COLUMN score grade """
+    sql """ INSERT INTO ${branch_table_name} VALUES (6, 'Frank', 88.7, 
'[email protected]') """
+    
+    // Main branch now has 4 columns: id, name, grade, email (score renamed to 
grade)
+    qt_main_with_grade_1 """ DESC ${branch_table_name} """
+    qt_main_with_grade_2 """ SELECT * FROM ${branch_table_name} ORDER BY id """
+    
+    // Create branch4 and tag4 after renaming score to grade
+    sql """ ALTER TABLE ${branch_table_name} CREATE BRANCH branch4 """
+    sql """ ALTER TABLE ${branch_table_name} CREATE TAG tag4 """
+    
+    // Verify branch4 and tag4 have grade instead of score
+    qt_branch4_with_grade """ SELECT * FROM 
${branch_table_name}@branch(branch4) ORDER BY id """
+    qt_tag4_with_grade """ SELECT * FROM ${branch_table_name}@tag(tag4) ORDER 
BY id """
+    
+    // Step 5: Add phone column to main branch and insert final data
+    sql """ ALTER TABLE ${branch_table_name} ADD COLUMN phone STRING """
+    sql """ INSERT INTO ${branch_table_name} VALUES (7, 'Grace', 93.2, 
'[email protected]', '555-0123') """
+    
+    // Main branch now has 5 columns: id, name, grade, email, phone
+    qt_main_final_1 """ DESC ${branch_table_name} """
+    qt_main_final_2 """ SELECT * FROM ${branch_table_name} ORDER BY id """
+    
+    // Create final branch5 and tag5
+    sql """ ALTER TABLE ${branch_table_name} CREATE BRANCH branch5 """
+    sql """ ALTER TABLE ${branch_table_name} CREATE TAG tag5 """
+    
+    // Verify branch5 and tag5 have all columns including phone
+    qt_branch5_final """ SELECT * FROM ${branch_table_name}@branch(branch5) 
ORDER BY id """
+    qt_tag5_final """ SELECT * FROM ${branch_table_name}@tag(tag5) ORDER BY id 
"""
+    
+    // 
==================================================================================
+    // Verify schema behavior: branches get latest schema, tags keep 
creation-time schema
+    // 
==================================================================================
+    
+    // Test specific column queries to verify schema differences
+    
+    // IMPORTANT: Branches will use the LATEST schema from main branch
+    // Tags will use the schema from when they were created
+    
+    // branch1: should use LATEST schema (same as main) - id, name, grade, 
email, phone
+    qt_branch1_latest_schema """ SELECT * FROM 
${branch_table_name}@branch(branch1) ORDER BY id """
+    
+    // tag1: should have ORIGINAL schema when created - id, name, age, score 
(no email, phone, grade)
+    qt_tag1_original_schema """ SELECT * FROM ${branch_table_name}@tag(tag1) 
ORDER BY id """
+    qt_tag1_age_score """ SELECT id, age, score FROM 
${branch_table_name}@tag(tag1) ORDER BY id """
+    
+    // branch2: should use LATEST schema (same as main) - id, name, grade, 
email, phone
+    qt_branch2_latest_schema """ SELECT * FROM 
${branch_table_name}@branch(branch2) ORDER BY id """
+    
+    // tag2: should have schema when created - id, name, age, score, email (no 
phone, grade)
+    qt_tag2_creation_schema """ SELECT * FROM ${branch_table_name}@tag(tag2) 
ORDER BY id """
+    qt_tag2_age_email """ SELECT id, age, score, email FROM 
${branch_table_name}@tag(tag2) ORDER BY id """
+    
+    // branch3: should use LATEST schema (same as main) - id, name, grade, 
email, phone
+    qt_branch3_latest_schema """ SELECT * FROM 
${branch_table_name}@branch(branch3) ORDER BY id """
+    
+    // tag3: should have schema when created - id, name, score, email (no age, 
phone, grade)
+    qt_tag3_creation_schema """ SELECT * FROM ${branch_table_name}@tag(tag3) 
ORDER BY id """
+    qt_tag3_score_email """ SELECT id, score, email FROM 
${branch_table_name}@tag(tag3) ORDER BY id """
+    
+    // branch4: should use LATEST schema (same as main) - id, name, grade, 
email, phone
+    qt_branch4_latest_schema """ SELECT * FROM 
${branch_table_name}@branch(branch4) ORDER BY id """
+    
+    // tag4: should have schema when created - id, name, grade, email (no age, 
score, phone)
+    qt_tag4_creation_schema """ SELECT * FROM ${branch_table_name}@tag(tag4) 
ORDER BY id """
+    qt_tag4_grade_email """ SELECT id, grade, email FROM 
${branch_table_name}@tag(tag4) ORDER BY id """
+    
+    // branch5: should use LATEST schema (same as main) - id, name, grade, 
email, phone
+    qt_branch5_latest_schema """ SELECT * FROM 
${branch_table_name}@branch(branch5) ORDER BY id """
+    
+    // tag5: should have schema when created - id, name, grade, email, phone
+    qt_tag5_creation_schema """ SELECT * FROM ${branch_table_name}@tag(tag5) 
ORDER BY id """
+    qt_tag5_all_cols """ SELECT id, grade, email, phone FROM 
${branch_table_name}@tag(tag5) ORDER BY id """
+    
+    // 
==================================================================================
+    // Negative tests: verify schema behavior differences between branches and 
tags
+    // 
==================================================================================
+    
+    // ALL BRANCHES should have the LATEST schema (same as main)
+    // So all branches should have: id, name, grade, email, phone
+    
+    // Verify all branches have the latest columns
+    qt_all_branches_have_grade """ SELECT id, grade FROM 
${branch_table_name}@branch(branch1) WHERE grade > 0 ORDER BY id """
+    qt_all_branches_have_email """ SELECT id, email FROM 
${branch_table_name}@branch(branch2) WHERE email IS NOT NULL ORDER BY id """
+    qt_all_branches_have_phone """ SELECT id, phone FROM 
${branch_table_name}@branch(branch3) WHERE phone IS NOT NULL ORDER BY id """
+    
+    // All branches should NOT have old columns that were dropped/renamed
+    test {
+        sql """ SELECT age FROM ${branch_table_name}@branch(branch1) """
+        exception "Unknown column 'age'"
+    }
+    test {
+        sql """ SELECT score FROM ${branch_table_name}@branch(branch2) """
+        exception "Unknown column 'score'"
+    }
+    
+    // TAGS should have their CREATION-TIME schema
+    
+    // tag1 should not have email, phone, or grade (only has age, score)
+    test {
+        sql """ SELECT email FROM ${branch_table_name}@tag(tag1) """
+        exception "Unknown column 'email'"
+    }
+    test {
+        sql """ SELECT phone FROM ${branch_table_name}@tag(tag1) """
+        exception "Unknown column 'phone'"
+    }
+    test {
+        sql """ SELECT grade FROM ${branch_table_name}@tag(tag1) """
+        exception "Unknown column 'grade'"
+    }
+    
+    // tag2 should not have phone or grade (has age, score, email)
+    test {
+        sql """ SELECT phone FROM ${branch_table_name}@tag(tag2) """
+        exception "Unknown column 'phone'"
+    }
+    test {
+        sql """ SELECT grade FROM ${branch_table_name}@tag(tag2) """
+        exception "Unknown column 'grade'"
+    }
+    
+    // tag3 should not have age, phone, or grade (has score, email)
+    test {
+        sql """ SELECT age FROM ${branch_table_name}@tag(tag3) """
+        exception "Unknown column 'age'"
+    }
+    test {
+        sql """ SELECT phone FROM ${branch_table_name}@tag(tag3) """
+        exception "Unknown column 'phone'"
+    }
+    test {
+        sql """ SELECT grade FROM ${branch_table_name}@tag(tag3) """
+        exception "Unknown column 'grade'"
+    }
+    
+    // tag4 should not have age, score, or phone (has grade, email)
+    test {
+        sql """ SELECT age FROM ${branch_table_name}@tag(tag4) """
+        exception "Unknown column 'age'"
+    }
+    test {
+        sql """ SELECT score FROM ${branch_table_name}@tag(tag4) """
+        exception "Unknown column 'score'"
+    }
+    test {
+        sql """ SELECT phone FROM ${branch_table_name}@tag(tag4) """
+        exception "Unknown column 'phone'"
+    }
+    
+    // 
==================================================================================
+    // Summary: Final verification showing branch vs tag schema behavior
+    // 
==================================================================================
+    
+    // Main branch has the latest schema
+    qt_summary_main """ SELECT * FROM ${branch_table_name} ORDER BY id """
+    
+    // ALL BRANCHES use the LATEST schema (same as main)
+    qt_summary_branch1 """ SELECT * FROM ${branch_table_name}@branch(branch1) 
ORDER BY id """
+    qt_summary_branch2 """ SELECT * FROM ${branch_table_name}@branch(branch2) 
ORDER BY id """
+    qt_summary_branch3 """ SELECT * FROM ${branch_table_name}@branch(branch3) 
ORDER BY id """
+    qt_summary_branch4 """ SELECT * FROM ${branch_table_name}@branch(branch4) 
ORDER BY id """
+    qt_summary_branch5 """ SELECT * FROM ${branch_table_name}@branch(branch5) 
ORDER BY id """
+    
+    // TAGS use their CREATION-TIME schema
+    qt_summary_tag1 """ SELECT * FROM ${branch_table_name}@tag(tag1) ORDER BY 
id """      // Original: id, name, age, score
+    qt_summary_tag2 """ SELECT * FROM ${branch_table_name}@tag(tag2) ORDER BY 
id """      // With email: id, name, age, score, email  
+    qt_summary_tag3 """ SELECT * FROM ${branch_table_name}@tag(tag3) ORDER BY 
id """      // No age: id, name, score, email
+    qt_summary_tag4 """ SELECT * FROM ${branch_table_name}@tag(tag4) ORDER BY 
id """      // Renamed: id, name, grade, email
+    qt_summary_tag5 """ SELECT * FROM ${branch_table_name}@tag(tag5) ORDER BY 
id """      // Final: id, name, grade, email, phone
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to