This is an automated email from the ASF dual-hosted git repository.

stevenwu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/iceberg.git


The following commit(s) were added to refs/heads/master by this push:
     new 1e52f2edc4 Flink: Support alter table column (#7628)
1e52f2edc4 is described below

commit 1e52f2edc4ec79179662b2179d64833d88f3311d
Author: Yanghao Lin <[email protected]>
AuthorDate: Wed Sep 27 12:47:15 2023 +0800

    Flink: Support alter table column (#7628)
---
 .../org/apache/iceberg/flink/FlinkCatalog.java     | 141 ++++++----
 .../org/apache/iceberg/flink/FlinkSchemaUtil.java  |  10 +
 .../org/apache/iceberg/flink/FlinkTypeToType.java  |   4 +
 .../iceberg/flink/util/FlinkAlterTableUtil.java    | 246 ++++++++++++++++
 .../iceberg/flink/util/FlinkCompatibilityUtil.java |   5 +
 .../iceberg/flink/TestFlinkCatalogTable.java       | 311 +++++++++++++++++++--
 6 files changed, 634 insertions(+), 83 deletions(-)

diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
index 825816fdf4..f022c8abcb 100644
--- a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
+++ b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java
@@ -38,6 +38,7 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -60,8 +61,6 @@ import org.apache.iceberg.PartitionSpec;
 import org.apache.iceberg.Schema;
 import org.apache.iceberg.StructLike;
 import org.apache.iceberg.Table;
-import org.apache.iceberg.Transaction;
-import org.apache.iceberg.UpdateProperties;
 import org.apache.iceberg.catalog.Catalog;
 import org.apache.iceberg.catalog.Namespace;
 import org.apache.iceberg.catalog.SupportsNamespaces;
@@ -69,6 +68,7 @@ import org.apache.iceberg.catalog.TableIdentifier;
 import org.apache.iceberg.exceptions.AlreadyExistsException;
 import org.apache.iceberg.exceptions.NamespaceNotEmptyException;
 import org.apache.iceberg.exceptions.NoSuchNamespaceException;
+import org.apache.iceberg.flink.util.FlinkAlterTableUtil;
 import org.apache.iceberg.flink.util.FlinkCompatibilityUtil;
 import org.apache.iceberg.io.CloseableIterable;
 import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
@@ -91,7 +91,6 @@ import 
org.apache.iceberg.relocated.com.google.common.collect.Sets;
  * independent of the partition of Flink.
  */
 public class FlinkCatalog extends AbstractCatalog {
-
   private final CatalogLoader catalogLoader;
   private final Catalog icebergCatalog;
   private final Namespace baseNamespace;
@@ -439,14 +438,35 @@ public class FlinkCatalog extends AbstractCatalog {
     if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
         && Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
         && equalsPrimary)) {
-      throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+      throw new UnsupportedOperationException(
+          "Altering schema is not supported in the old alterTable API. "
+              + "To alter schema, use the other alterTable API and provide a 
list of TableChange's.");
     }
 
+    validateTablePartition(ct1, ct2);
+  }
+
+  private static void validateTablePartition(CatalogTable ct1, CatalogTable 
ct2) {
     if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
       throw new UnsupportedOperationException("Altering partition keys is not 
supported yet.");
     }
   }
 
+  /**
+   * This alterTable API only supports altering table properties.
+   *
+   * <p>Support for adding/removing/renaming columns cannot be done by 
comparing CatalogTable
+   * instances, unless the Flink schema contains Iceberg column IDs.
+   *
+   * <p>To alter columns, use the other alterTable API and provide a list of 
TableChange's.
+   *
+   * @param tablePath path of the table or view to be modified
+   * @param newTable the new table definition
+   * @param ignoreIfNotExists flag to specify behavior when the table or view 
does not exist: if set
+   *     to false, throw an exception, if set to true, do nothing.
+   * @throws CatalogException in case of any runtime exception
+   * @throws TableNotExistException if the table does not exist
+   */
   @Override
   public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, 
boolean ignoreIfNotExists)
       throws CatalogException, TableNotExistException {
@@ -464,12 +484,6 @@ public class FlinkCatalog extends AbstractCatalog {
     }
 
     CatalogTable table = toCatalogTable(icebergTable);
-
-    // Currently, Flink SQL only support altering table properties.
-
-    // For current Flink Catalog API, support for adding/removing/renaming 
columns cannot be done by
-    // comparing
-    // CatalogTable instances, unless the Flink schema contains Iceberg column 
IDs.
     validateTableSchemaAndPartition(table, (CatalogTable) newTable);
 
     Map<String, String> oldProperties = table.getOptions();
@@ -507,7 +521,66 @@ public class FlinkCatalog extends AbstractCatalog {
               }
             });
 
-    commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
+    FlinkAlterTableUtil.commitChanges(
+        icebergTable, setLocation, setSnapshotId, pickSnapshotId, 
setProperties);
+  }
+
+  @Override
+  public void alterTable(
+      ObjectPath tablePath,
+      CatalogBaseTable newTable,
+      List<TableChange> tableChanges,
+      boolean ignoreIfNotExists)
+      throws TableNotExistException, CatalogException {
+    validateFlinkTable(newTable);
+
+    Table icebergTable;
+    try {
+      icebergTable = loadIcebergTable(tablePath);
+    } catch (TableNotExistException e) {
+      if (!ignoreIfNotExists) {
+        throw e;
+      } else {
+        return;
+      }
+    }
+
+    // Does not support altering partition yet.
+    validateTablePartition(toCatalogTable(icebergTable), (CatalogTable) 
newTable);
+
+    String setLocation = null;
+    String setSnapshotId = null;
+    String cherrypickSnapshotId = null;
+
+    List<TableChange> propertyChanges = Lists.newArrayList();
+    List<TableChange> schemaChanges = Lists.newArrayList();
+    for (TableChange change : tableChanges) {
+      if (change instanceof TableChange.SetOption) {
+        TableChange.SetOption set = (TableChange.SetOption) change;
+
+        if ("location".equalsIgnoreCase(set.getKey())) {
+          setLocation = set.getValue();
+        } else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) {
+          setSnapshotId = set.getValue();
+        } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) {
+          cherrypickSnapshotId = set.getValue();
+        } else {
+          propertyChanges.add(change);
+        }
+      } else if (change instanceof TableChange.ResetOption) {
+        propertyChanges.add(change);
+      } else {
+        schemaChanges.add(change);
+      }
+    }
+
+    FlinkAlterTableUtil.commitChanges(
+        icebergTable,
+        setLocation,
+        setSnapshotId,
+        cherrypickSnapshotId,
+        schemaChanges,
+        propertyChanges);
   }
 
   private static void validateFlinkTable(CatalogBaseTable table) {
@@ -552,52 +625,6 @@ public class FlinkCatalog extends AbstractCatalog {
     return partitionKeysBuilder.build();
   }
 
-  private static void commitChanges(
-      Table table,
-      String setLocation,
-      String setSnapshotId,
-      String pickSnapshotId,
-      Map<String, String> setProperties) {
-    // don't allow setting the snapshot and picking a commit at the same time 
because order is
-    // ambiguous and choosing
-    // one order leads to different results
-    Preconditions.checkArgument(
-        setSnapshotId == null || pickSnapshotId == null,
-        "Cannot set the current snapshot ID and cherry-pick snapshot changes");
-
-    if (setSnapshotId != null) {
-      long newSnapshotId = Long.parseLong(setSnapshotId);
-      table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
-    }
-
-    // if updating the table snapshot, perform that update first in case it 
fails
-    if (pickSnapshotId != null) {
-      long newSnapshotId = Long.parseLong(pickSnapshotId);
-      table.manageSnapshots().cherrypick(newSnapshotId).commit();
-    }
-
-    Transaction transaction = table.newTransaction();
-
-    if (setLocation != null) {
-      transaction.updateLocation().setLocation(setLocation).commit();
-    }
-
-    if (!setProperties.isEmpty()) {
-      UpdateProperties updateProperties = transaction.updateProperties();
-      setProperties.forEach(
-          (k, v) -> {
-            if (v == null) {
-              updateProperties.remove(k);
-            } else {
-              updateProperties.set(k, v);
-            }
-          });
-      updateProperties.commit();
-    }
-
-    transaction.commitTransaction();
-  }
-
   static CatalogTable toCatalogTable(Table table) {
     TableSchema schema = FlinkSchemaUtil.toSchema(table.schema());
     List<String> partitionKeys = toPartitionKeys(table.spec(), table.schema());
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
index 25725639c3..a6b53879ad 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkSchemaUtil.java
@@ -134,6 +134,16 @@ public class FlinkSchemaUtil {
     return TypeUtil.visit(type, new TypeToFlinkType());
   }
 
+  /**
+   * Convert a {@link LogicalType Flink type} to a {@link Type}.
+   *
+   * @param flinkType a FlinkType
+   * @return the equivalent Iceberg type
+   */
+  public static Type convert(LogicalType flinkType) {
+    return flinkType.accept(new FlinkTypeToType());
+  }
+
   /**
    * Convert a {@link RowType} to a {@link TableSchema}.
    *
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
index 6f8bfef2ef..408065f060 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkTypeToType.java
@@ -49,6 +49,10 @@ class FlinkTypeToType extends FlinkTypeVisitor<Type> {
   private final RowType root;
   private int nextId;
 
+  FlinkTypeToType() {
+    this.root = null;
+  }
+
   FlinkTypeToType(RowType root) {
     this.root = root;
     // the root struct's fields use the first ids
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
new file mode 100644
index 0000000000..f0b9bf64fb
--- /dev/null
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java
@@ -0,0 +1,246 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.iceberg.flink.util;
+
+import java.util.List;
+import java.util.Map;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.UniqueConstraint;
+import org.apache.iceberg.Table;
+import org.apache.iceberg.Transaction;
+import org.apache.iceberg.UpdateProperties;
+import org.apache.iceberg.UpdateSchema;
+import org.apache.iceberg.flink.FlinkSchemaUtil;
+import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
+import org.apache.iceberg.types.Type;
+
+public class FlinkAlterTableUtil {
+  private FlinkAlterTableUtil() {}
+
+  public static void commitChanges(
+      Table table,
+      String setLocation,
+      String setSnapshotId,
+      String pickSnapshotId,
+      Map<String, String> setProperties) {
+    commitManageSnapshots(table, setSnapshotId, pickSnapshotId);
+
+    Transaction transaction = table.newTransaction();
+
+    if (setLocation != null) {
+      transaction.updateLocation().setLocation(setLocation).commit();
+    }
+
+    if (!setProperties.isEmpty()) {
+      UpdateProperties updateProperties = transaction.updateProperties();
+      setProperties.forEach(
+          (k, v) -> {
+            if (v == null) {
+              updateProperties.remove(k);
+            } else {
+              updateProperties.set(k, v);
+            }
+          });
+      updateProperties.commit();
+    }
+
+    transaction.commitTransaction();
+  }
+
+  public static void commitChanges(
+      Table table,
+      String setLocation,
+      String setSnapshotId,
+      String pickSnapshotId,
+      List<TableChange> schemaChanges,
+      List<TableChange> propertyChanges) {
+    commitManageSnapshots(table, setSnapshotId, pickSnapshotId);
+
+    Transaction transaction = table.newTransaction();
+
+    if (setLocation != null) {
+      transaction.updateLocation().setLocation(setLocation).commit();
+    }
+
+    if (!schemaChanges.isEmpty()) {
+      UpdateSchema updateSchema = transaction.updateSchema();
+      FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges);
+      updateSchema.commit();
+    }
+
+    if (!propertyChanges.isEmpty()) {
+      UpdateProperties updateProperties = transaction.updateProperties();
+      FlinkAlterTableUtil.applyPropertyChanges(updateProperties, 
propertyChanges);
+      updateProperties.commit();
+    }
+
+    transaction.commitTransaction();
+  }
+
+  public static void commitManageSnapshots(
+      Table table, String setSnapshotId, String cherrypickSnapshotId) {
+    // don't allow setting the snapshot and picking a commit at the same time 
because order is
+    // ambiguous and choosing one order leads to different results
+    Preconditions.checkArgument(
+        setSnapshotId == null || cherrypickSnapshotId == null,
+        "Cannot set the current snapshot ID and cherry-pick snapshot changes");
+
+    if (setSnapshotId != null) {
+      long newSnapshotId = Long.parseLong(setSnapshotId);
+      table.manageSnapshots().setCurrentSnapshot(newSnapshotId).commit();
+    }
+
+    // if updating the table snapshot, perform that update first in case it 
fails
+    if (cherrypickSnapshotId != null) {
+      long newSnapshotId = Long.parseLong(cherrypickSnapshotId);
+      table.manageSnapshots().cherrypick(newSnapshotId).commit();
+    }
+  }
+
+  /**
+   * Applies a list of Flink table changes to an {@link UpdateSchema} 
operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateSchema operation to configure
+   * @param schemaChanges a list of Flink table changes
+   */
+  public static void applySchemaChanges(
+      UpdateSchema pendingUpdate, List<TableChange> schemaChanges) {
+    for (TableChange change : schemaChanges) {
+      if (change instanceof TableChange.AddColumn) {
+        TableChange.AddColumn addColumn = (TableChange.AddColumn) change;
+        Column flinkColumn = addColumn.getColumn();
+        Preconditions.checkArgument(
+            FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn),
+            "Unsupported table change: Adding computed column %s.",
+            flinkColumn.getName());
+        Type icebergType = 
FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType());
+        if (flinkColumn.getDataType().getLogicalType().isNullable()) {
+          pendingUpdate.addColumn(flinkColumn.getName(), icebergType);
+        } else {
+          pendingUpdate.addRequiredColumn(flinkColumn.getName(), icebergType);
+        }
+      } else if (change instanceof TableChange.ModifyColumn) {
+        TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) 
change;
+        applyModifyColumn(pendingUpdate, modifyColumn);
+      } else if (change instanceof TableChange.DropColumn) {
+        TableChange.DropColumn dropColumn = (TableChange.DropColumn) change;
+        pendingUpdate.deleteColumn(dropColumn.getColumnName());
+      } else if (change instanceof TableChange.AddWatermark) {
+        throw new UnsupportedOperationException("Unsupported table change: 
AddWatermark.");
+      } else if (change instanceof TableChange.ModifyWatermark) {
+        throw new UnsupportedOperationException("Unsupported table change: 
ModifyWatermark.");
+      } else if (change instanceof TableChange.DropWatermark) {
+        throw new UnsupportedOperationException("Unsupported table change: 
DropWatermark.");
+      } else if (change instanceof TableChange.AddUniqueConstraint) {
+        TableChange.AddUniqueConstraint addPk = 
(TableChange.AddUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, addPk.getConstraint());
+      } else if (change instanceof TableChange.ModifyUniqueConstraint) {
+        TableChange.ModifyUniqueConstraint modifyPk = 
(TableChange.ModifyUniqueConstraint) change;
+        applyUniqueConstraint(pendingUpdate, modifyPk.getNewConstraint());
+      } else if (change instanceof TableChange.DropConstraint) {
+        throw new UnsupportedOperationException("Unsupported table change: 
DropConstraint.");
+      } else {
+        throw new UnsupportedOperationException("Cannot apply unknown table 
change: " + change);
+      }
+    }
+  }
+
+  /**
+   * Applies a list of Flink table property changes to an {@link 
UpdateProperties} operation.
+   *
+   * @param pendingUpdate an uncommitted UpdateProperty operation to configure
+   * @param propertyChanges a list of Flink table changes
+   */
+  public static void applyPropertyChanges(
+      UpdateProperties pendingUpdate, List<TableChange> propertyChanges) {
+    for (TableChange change : propertyChanges) {
+      if (change instanceof TableChange.SetOption) {
+        TableChange.SetOption setOption = (TableChange.SetOption) change;
+        pendingUpdate.set(setOption.getKey(), setOption.getValue());
+      } else if (change instanceof TableChange.ResetOption) {
+        TableChange.ResetOption resetOption = (TableChange.ResetOption) change;
+        pendingUpdate.remove(resetOption.getKey());
+      } else {
+        throw new UnsupportedOperationException(
+            "The given table change is not a property change: " + change);
+      }
+    }
+  }
+
+  private static void applyModifyColumn(
+      UpdateSchema pendingUpdate, TableChange.ModifyColumn modifyColumn) {
+    if (modifyColumn instanceof TableChange.ModifyColumnName) {
+      TableChange.ModifyColumnName modifyName = (TableChange.ModifyColumnName) 
modifyColumn;
+      pendingUpdate.renameColumn(modifyName.getOldColumnName(), 
modifyName.getNewColumnName());
+    } else if (modifyColumn instanceof TableChange.ModifyColumnPosition) {
+      TableChange.ModifyColumnPosition modifyPosition =
+          (TableChange.ModifyColumnPosition) modifyColumn;
+      applyModifyColumnPosition(pendingUpdate, modifyPosition);
+    } else if (modifyColumn instanceof TableChange.ModifyPhysicalColumnType) {
+      TableChange.ModifyPhysicalColumnType modifyType =
+          (TableChange.ModifyPhysicalColumnType) modifyColumn;
+      Type type = 
FlinkSchemaUtil.convert(modifyType.getNewType().getLogicalType());
+      String columnName = modifyType.getOldColumn().getName();
+      pendingUpdate.updateColumn(columnName, type.asPrimitiveType());
+      if 
(modifyType.getNewColumn().getDataType().getLogicalType().isNullable()) {
+        pendingUpdate.makeColumnOptional(columnName);
+      } else {
+        pendingUpdate.requireColumn(columnName);
+      }
+    } else if (modifyColumn instanceof TableChange.ModifyColumnComment) {
+      TableChange.ModifyColumnComment modifyComment =
+          (TableChange.ModifyColumnComment) modifyColumn;
+      pendingUpdate.updateColumnDoc(
+          modifyComment.getOldColumn().getName(), 
modifyComment.getNewComment());
+    } else {
+      throw new UnsupportedOperationException(
+          "Cannot apply unknown modify-column change: " + modifyColumn);
+    }
+  }
+
+  private static void applyModifyColumnPosition(
+      UpdateSchema pendingUpdate, TableChange.ModifyColumnPosition 
modifyColumnPosition) {
+    TableChange.ColumnPosition newPosition = 
modifyColumnPosition.getNewPosition();
+    if (newPosition instanceof TableChange.First) {
+      pendingUpdate.moveFirst(modifyColumnPosition.getOldColumn().getName());
+    } else if (newPosition instanceof TableChange.After) {
+      TableChange.After after = (TableChange.After) newPosition;
+      pendingUpdate.moveAfter(modifyColumnPosition.getOldColumn().getName(), 
after.column());
+    } else {
+      throw new UnsupportedOperationException(
+          "Cannot apply unknown modify-column-position change: " + 
modifyColumnPosition);
+    }
+  }
+
+  private static void applyUniqueConstraint(
+      UpdateSchema pendingUpdate, UniqueConstraint constraint) {
+    switch (constraint.getType()) {
+      case PRIMARY_KEY:
+        pendingUpdate.setIdentifierFields(constraint.getColumns());
+        break;
+      case UNIQUE_KEY:
+        throw new UnsupportedOperationException(
+            "Unsupported table change: setting unique key constraints.");
+      default:
+        throw new UnsupportedOperationException(
+            "Cannot apply unknown unique constraint: " + 
constraint.getType().name());
+    }
+  }
+}
diff --git 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
index 2c5c587f4e..f02af894e8 100644
--- 
a/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
+++ 
b/flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkCompatibilityUtil.java
@@ -20,6 +20,7 @@ package org.apache.iceberg.flink.util;
 
 import org.apache.flink.api.common.typeinfo.TypeInformation;
 import org.apache.flink.table.api.TableColumn;
+import org.apache.flink.table.catalog.Column;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.runtime.typeutils.InternalTypeInfo;
 import org.apache.flink.table.types.logical.RowType;
@@ -39,4 +40,8 @@ public class FlinkCompatibilityUtil {
   public static boolean isPhysicalColumn(TableColumn column) {
     return column.isPhysical();
   }
+
+  public static boolean isPhysicalColumn(Column column) {
+    return column.isPhysical();
+  }
 }
diff --git 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
index 16dcf4a9f4..8f5ddde918 100644
--- 
a/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
+++ 
b/flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java
@@ -28,6 +28,7 @@ import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.StreamSupport;
 import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableSchema;
 import org.apache.flink.table.api.ValidationException;
 import org.apache.flink.table.api.constraints.UniqueConstraint;
@@ -297,7 +298,7 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
   }
 
   @Test
-  public void testAlterTable() throws TableNotExistException {
+  public void testAlterTableProperties() throws TableNotExistException {
     sql("CREATE TABLE tl(id BIGINT) WITH ('oldK'='oldV')");
     Map<String, String> properties = Maps.newHashMap();
     properties.put("oldK", "oldV");
@@ -313,39 +314,297 @@ public class TestFlinkCatalogTable extends 
FlinkCatalogTestBase {
     assertThat(table("tl").properties()).containsAllEntriesOf(properties);
 
     // remove property
-    CatalogTable catalogTable = catalogTable("tl");
+    sql("ALTER TABLE tl RESET('oldK')");
     properties.remove("oldK");
-    getTableEnv()
-        .getCatalog(getTableEnv().getCurrentCatalog())
-        .get()
-        .alterTable(new ObjectPath(DATABASE, "tl"), 
catalogTable.copy(properties), false);
     assertThat(table("tl").properties()).containsAllEntriesOf(properties);
   }
 
   @Test
-  public void testAlterTableWithPrimaryKey() throws TableNotExistException {
-    sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH 
('oldK'='oldV')");
-    Map<String, String> properties = Maps.newHashMap();
-    properties.put("oldK", "oldV");
+  public void testAlterTableAddColumn() {
+    sql("CREATE TABLE tl(id BIGINT)");
+    Schema schemaBefore = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
+        schemaBefore.asStruct());
 
-    // new
-    sql("ALTER TABLE tl SET('newK'='newV')");
-    properties.put("newK", "newV");
-    assertThat(table("tl").properties()).containsAllEntriesOf(properties);
+    sql("ALTER TABLE tl ADD (dt STRING)");
+    Schema schemaAfter1 = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()))
+            .asStruct(),
+        schemaAfter1.asStruct());
 
-    // update old
-    sql("ALTER TABLE tl SET('oldK'='oldV2')");
-    properties.put("oldK", "oldV2");
-    assertThat(table("tl").properties()).containsAllEntriesOf(properties);
+    // Add multiple columns
+    sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)");
+    Schema schemaAfter2 = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()),
+                Types.NestedField.optional(3, "col1", Types.StringType.get()),
+                Types.NestedField.optional(4, "col2", Types.LongType.get()))
+            .asStruct(),
+        schemaAfter2.asStruct());
 
-    // remove property
-    CatalogTable catalogTable = catalogTable("tl");
-    properties.remove("oldK");
-    getTableEnv()
-        .getCatalog(getTableEnv().getCurrentCatalog())
-        .get()
-        .alterTable(new ObjectPath(DATABASE, "tl"), 
catalogTable.copy(properties), false);
-    assertThat(table("tl").properties()).containsAllEntriesOf(properties);
+    // Adding a required field should fail because Iceberg's SchemaUpdate does 
not allow
+    // incompatible changes.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT 
NULL)"))
+        .hasRootCauseInstanceOf(IllegalArgumentException.class)
+        .hasRootCauseMessage("Incompatible change: cannot add required column: 
pk");
+
+    // Adding an existing field should fail due to Flink's internal validation.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)"))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("Try to add a column `id` which already exists 
in the table.");
+  }
+
+  @Test
+  public void testAlterTableDropColumn() {
+    sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)");
+    Schema schemaBefore = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()),
+                Types.NestedField.optional(3, "col1", Types.StringType.get()),
+                Types.NestedField.optional(4, "col2", Types.LongType.get()))
+            .asStruct(),
+        schemaBefore.asStruct());
+
+    sql("ALTER TABLE tl DROP (dt)");
+    Schema schemaAfter1 = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(3, "col1", Types.StringType.get()),
+                Types.NestedField.optional(4, "col2", Types.LongType.get()))
+            .asStruct(),
+        schemaAfter1.asStruct());
+
+    // Drop multiple columns
+    sql("ALTER TABLE tl DROP (col1, col2)");
+    Schema schemaAfter2 = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(Types.NestedField.optional(1, "id", 
Types.LongType.get())).asStruct(),
+        schemaAfter2.asStruct());
+
+    // Dropping an non-existing field should fail due to Flink's internal 
validation.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)"))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("The column `foo` does not exist in the base 
table.");
+
+    // Dropping an already-deleted field should fail due to Flink's internal 
validation.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)"))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining("The column `dt` does not exist in the base 
table.");
+  }
+
+  @Test
+  public void testAlterTableModifyColumnName() {
+    sql("CREATE TABLE tl(id BIGINT, dt STRING)");
+    Schema schemaBefore = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()))
+            .asStruct(),
+        schemaBefore.asStruct());
+
+    sql("ALTER TABLE tl RENAME dt TO data");
+    Schema schemaAfter = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "data", Types.StringType.get()))
+            .asStruct(),
+        schemaAfter.asStruct());
+  }
+
+  @Test
+  public void testAlterTableModifyColumnType() {
+    sql("CREATE TABLE tl(id INTEGER, dt STRING)");
+    Schema schemaBefore = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()))
+            .asStruct(),
+        schemaBefore.asStruct());
+
+    // Promote type from Integer to Long
+    sql("ALTER TABLE tl MODIFY (id BIGINT)");
+    Schema schemaAfter = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()))
+            .asStruct(),
+        schemaAfter.asStruct());
+
+    // Type change that doesn't follow the type-promotion rule should fail due 
to Iceberg's
+    // validation.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt 
INTEGER)"))
+        .isInstanceOf(TableException.class)
+        .hasRootCauseInstanceOf(IllegalArgumentException.class)
+        .hasRootCauseMessage("Cannot change column type: dt: string -> int");
+  }
+
+  @Test
+  public void testAlterTableModifyColumnNullability() {
+    sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)");
+    Schema schemaBefore = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.required(1, "id", Types.IntegerType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()))
+            .asStruct(),
+        schemaBefore.asStruct());
+
+    // Changing nullability from optional to required should fail
+    // because Iceberg's SchemaUpdate does not allow incompatible changes.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING 
NOT NULL)"))
+        .isInstanceOf(TableException.class)
+        .hasRootCauseInstanceOf(IllegalArgumentException.class)
+        .hasRootCauseMessage("Cannot change column nullability: dt: optional 
-> required");
+
+    // Set nullability from required to optional
+    sql("ALTER TABLE tl MODIFY (id INTEGER)");
+    Schema schemaAfter = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.IntegerType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()))
+            .asStruct(),
+        schemaAfter.asStruct());
+  }
+
+  @Test
+  public void testAlterTableModifyColumnPosition() {
+    sql("CREATE TABLE tl(id BIGINT, dt STRING)");
+    Schema schemaBefore = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()))
+            .asStruct(),
+        schemaBefore.asStruct());
+
+    sql("ALTER TABLE tl MODIFY (dt STRING FIRST)");
+    Schema schemaAfter = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(2, "dt", Types.StringType.get()),
+                Types.NestedField.optional(1, "id", Types.LongType.get()))
+            .asStruct(),
+        schemaAfter.asStruct());
+
+    sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)");
+    Schema schemaAfterAfter = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()))
+            .asStruct(),
+        schemaAfterAfter.asStruct());
+
+    // Modifying the position of a non-existing column should fail due to 
Flink's internal
+    // validation.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY 
(non_existing STRING FIRST)"))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining(
+            "Try to modify a column `non_existing` which does not exist in the 
table.");
+
+    // Moving a column after a non-existing column should fail due to Flink's 
internal validation.
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING 
AFTER non_existing)"))
+        .isInstanceOf(ValidationException.class)
+        .hasMessageContaining(
+            "Referenced column `non_existing` by 'AFTER' does not exist in the 
table.");
+  }
+
+  @Test
+  public void testAlterTableModifyColumnComment() {
+    sql("CREATE TABLE tl(id BIGINT, dt STRING)");
+    Schema schemaBefore = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get()))
+            .asStruct(),
+        schemaBefore.asStruct());
+
+    sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'comment for dt field')");
+    Schema schemaAfter = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.optional(1, "id", Types.LongType.get()),
+                Types.NestedField.optional(2, "dt", Types.StringType.get(), 
"comment for dt field"))
+            .asStruct(),
+        schemaAfter.asStruct());
+  }
+
+  @Test
+  public void testAlterTableConstraint() {
+    sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL, col1 
STRING)");
+    Schema schemaBefore = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.required(1, "id", Types.LongType.get()),
+                Types.NestedField.required(2, "dt", Types.StringType.get()),
+                Types.NestedField.optional(3, "col1", Types.StringType.get()))
+            .asStruct(),
+        schemaBefore.asStruct());
+    Assert.assertEquals(ImmutableSet.of(), 
schemaBefore.identifierFieldNames());
+
+    sql("ALTER TABLE tl ADD (PRIMARY KEY (id) NOT ENFORCED)");
+    Schema schemaAfterAdd = table("tl").schema();
+    Assert.assertEquals(ImmutableSet.of("id"), 
schemaAfterAdd.identifierFieldNames());
+
+    sql("ALTER TABLE tl MODIFY (PRIMARY KEY (dt) NOT ENFORCED)");
+    Schema schemaAfterModify = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.required(1, "id", Types.LongType.get()),
+                Types.NestedField.required(2, "dt", Types.StringType.get()),
+                Types.NestedField.optional(3, "col1", Types.StringType.get()))
+            .asStruct(),
+        schemaAfterModify.asStruct());
+    Assert.assertEquals(ImmutableSet.of("dt"), 
schemaAfterModify.identifierFieldNames());
+
+    // Composite primary key
+    sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, dt) NOT ENFORCED)");
+    Schema schemaAfterComposite = table("tl").schema();
+    Assert.assertEquals(
+        new Schema(
+                Types.NestedField.required(1, "id", Types.LongType.get()),
+                Types.NestedField.required(2, "dt", Types.StringType.get()),
+                Types.NestedField.optional(3, "col1", Types.StringType.get()))
+            .asStruct(),
+        schemaAfterComposite.asStruct());
+    Assert.assertEquals(ImmutableSet.of("id", "dt"), 
schemaAfterComposite.identifierFieldNames());
+
+    // Setting an optional field as primary key should fail
+    // because Iceberg's SchemaUpdate does not allow incompatible changes.
+    Assertions.assertThatThrownBy(
+            () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (col1) NOT 
ENFORCED)"))
+        .isInstanceOf(TableException.class)
+        .hasRootCauseInstanceOf(IllegalArgumentException.class)
+        .hasRootCauseMessage("Cannot add field col1 as an identifier field: 
not a required field");
+
+    // Setting a composite key containing an optional field should fail
+    // because Iceberg's SchemaUpdate does not allow incompatible changes.
+    Assertions.assertThatThrownBy(
+            () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, col1) NOT 
ENFORCED)"))
+        .isInstanceOf(TableException.class)
+        .hasRootCauseInstanceOf(IllegalArgumentException.class)
+        .hasRootCauseMessage("Cannot add field col1 as an identifier field: 
not a required field");
+
+    // Dropping constraints is not supported yet
+    Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP PRIMARY KEY"))
+        .isInstanceOf(TableException.class)
+        .hasRootCauseInstanceOf(UnsupportedOperationException.class)
+        .hasRootCauseMessage("Unsupported table change: DropConstraint.");
   }
 
   @Test


Reply via email to