stevenzwu commented on code in PR #7628: URL: https://github.com/apache/iceberg/pull/7628#discussion_r1320859179
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java: ########## @@ -0,0 +1,247 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import java.util.List; +import java.util.Map; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.iceberg.Table; +import org.apache.iceberg.Transaction; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; + +public class FlinkAlterTableUtil { + private FlinkAlterTableUtil() {} + + public static void commitChanges( + Table table, + String setLocation, + String setSnapshotId, + String pickSnapshotId, + Map<String, String> setProperties) { + commitManageSnapshots(table, setSnapshotId, pickSnapshotId); + + Transaction transaction = table.newTransaction(); + + if (setLocation != null) { + transaction.updateLocation().setLocation(setLocation).commit(); + } + + if (!setProperties.isEmpty()) { + UpdateProperties updateProperties = transaction.updateProperties(); + setProperties.forEach( + (k, v) -> { + if (v == null) { + updateProperties.remove(k); + } else { + updateProperties.set(k, v); + } + }); + updateProperties.commit(); + } + + transaction.commitTransaction(); + } + + public static void commitChanges( + Table table, + String setLocation, + String setSnapshotId, + String pickSnapshotId, + List<TableChange> schemaChanges, + List<TableChange> propertyChanges) { + commitManageSnapshots(table, setSnapshotId, pickSnapshotId); + + Transaction transaction = table.newTransaction(); + + if (setLocation != null) { + transaction.updateLocation().setLocation(setLocation).commit(); + } + + if (!schemaChanges.isEmpty()) { + UpdateSchema updateSchema = transaction.updateSchema(); + FlinkAlterTableUtil.applySchemaChanges(updateSchema, schemaChanges); + updateSchema.commit(); + } + + if (!propertyChanges.isEmpty()) { + UpdateProperties updateProperties = transaction.updateProperties(); + FlinkAlterTableUtil.applyPropertyChanges(updateProperties, propertyChanges); + updateProperties.commit(); + } + + transaction.commitTransaction(); + } + + public static void commitManageSnapshots( + Table table, String setSnapshotId, String cherrypickSnapshotId) { + // don't allow setting the snapshot and picking a commit at the same time because order is + // ambiguous and choosing + // one order leads to different results Review Comment: nit: fix the line break probably from google-java-format automation long time ago. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ########## @@ -514,7 +534,68 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean } }); - commitChanges(icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties); + FlinkAlterTableUtil.commitChanges( + icebergTable, setLocation, setSnapshotId, pickSnapshotId, setProperties); + } + + @Override + public void alterTable( + ObjectPath tablePath, + CatalogBaseTable newTable, + List<TableChange> tableChanges, + boolean ignoreIfNotExists) + throws TableNotExistException, CatalogException { + validateFlinkTable(newTable); + + Table icebergTable; + try { + icebergTable = loadIcebergTable(tablePath); + } catch (TableNotExistException e) { + if (!ignoreIfNotExists) { + throw e; + } else { + return; + } + } + + // Does not support altering partition yet. + validateTablePartition(toCatalogTable(icebergTable), (CatalogTable) newTable); + + String setLocation = null; + String setSnapshotId = null; + String cherrypickSnapshotId = null; + + List<TableChange> propertyChanges = Lists.newArrayList(); + List<TableChange> schemaChanges = Lists.newArrayList(); + for (TableChange change : tableChanges) { + if (change instanceof TableChange.SetOption) { + TableChange.SetOption set = (TableChange.SetOption) change; + + if ("location".equalsIgnoreCase(set.getKey())) { + setLocation = set.getValue(); + } else if ("current-snapshot-id".equalsIgnoreCase(set.getKey())) { + setSnapshotId = set.getValue(); + } else if ("cherry-pick-snapshot-id".equalsIgnoreCase(set.getKey())) { + cherrypickSnapshotId = set.getValue(); + } else { + propertyChanges.add(change); + } + + } else if (change instanceof TableChange.ResetOption) { + propertyChanges.add(change); + Review Comment: nit: empty line not needed. same for a couple other places in this method ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); + + // Dropping an already-deleted field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableModifyColumnName() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl RENAME dt TO data"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnType() { + sql("CREATE TABLE tl(id INTEGER, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Promote type from Integer to Long + sql("ALTER TABLE tl MODIFY (id BIGINT)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + + // Type change that doesn't follow the type-promotion rule should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) + .isInstanceOf(TableException.class); + } + + @Test + public void testAlterTableModifyColumnNullability() { + sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Cannot change nullability from optional to required + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) + .isInstanceOf(TableException.class); Review Comment: nit: add assertion on error msg ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); + + // Dropping an already-deleted field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableModifyColumnName() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl RENAME dt TO data"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnType() { + sql("CREATE TABLE tl(id INTEGER, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Promote type from Integer to Long + sql("ALTER TABLE tl MODIFY (id BIGINT)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + + // Type change that doesn't follow the type-promotion rule should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) + .isInstanceOf(TableException.class); + } + + @Test + public void testAlterTableModifyColumnNullability() { + sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Cannot change nullability from optional to required + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Set nullability from required to optional + sql("ALTER TABLE tl MODIFY (id INTEGER)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnPosition() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING FIRST)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(1, "id", Types.LongType.get())) + .asStruct(), + schemaAfter.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)"); + Schema schemaAfterAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfterAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnComment() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'some data')"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get(), "some data")) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableConstraint() { + sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL, col1 STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + Assert.assertEquals(ImmutableSet.of(), schemaBefore.identifierFieldNames()); + + sql("ALTER TABLE tl ADD (PRIMARY KEY (id) NOT ENFORCED)"); + Schema schemaAfterAdd = table("tl").schema(); + Assert.assertEquals(ImmutableSet.of("id"), schemaAfterAdd.identifierFieldNames()); + + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (dt) NOT ENFORCED)"); + Schema schemaAfterModify = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaAfterModify.asStruct()); + Assert.assertEquals(ImmutableSet.of("dt"), schemaAfterModify.identifierFieldNames()); + + // Composite primary key + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, dt) NOT ENFORCED)"); + Schema schemaAfterComposite = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaAfterComposite.asStruct()); + Assert.assertEquals(ImmutableSet.of("id", "dt"), schemaAfterComposite.identifierFieldNames()); + + // Setting an optional field as primary key should fail + Assertions.assertThatThrownBy( + () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (col1) NOT ENFORCED)")) + .isInstanceOf(TableException.class); + + // Setting a composite key containing an optional field should fail + Assertions.assertThatThrownBy( + () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, col1) NOT ENFORCED)")) + .isInstanceOf(TableException.class); Review Comment: nit: assert error msg too ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); Review Comment: nit: can we add assert on error msg too ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); Review Comment: nit: nit: can we add assert on error msg too ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail Review Comment: nit: can you comment which code perform the validation/check on adding a required field? ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); Review Comment: nit: add assertion on error msg ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); + + // Dropping an already-deleted field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class); Review Comment: nit: add assertion on error msg ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); + + // Dropping an already-deleted field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableModifyColumnName() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl RENAME dt TO data"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnType() { + sql("CREATE TABLE tl(id INTEGER, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Promote type from Integer to Long + sql("ALTER TABLE tl MODIFY (id BIGINT)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + + // Type change that doesn't follow the type-promotion rule should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) + .isInstanceOf(TableException.class); + } + + @Test + public void testAlterTableModifyColumnNullability() { + sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Cannot change nullability from optional to required + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Set nullability from required to optional + sql("ALTER TABLE tl MODIFY (id INTEGER)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnPosition() { Review Comment: can we add some failure tests? like non-exist column ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); + + // Dropping an already-deleted field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableModifyColumnName() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl RENAME dt TO data"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnType() { + sql("CREATE TABLE tl(id INTEGER, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Promote type from Integer to Long + sql("ALTER TABLE tl MODIFY (id BIGINT)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + + // Type change that doesn't follow the type-promotion rule should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) + .isInstanceOf(TableException.class); Review Comment: nit: add assertion on error msg ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); + + // Dropping an already-deleted field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableModifyColumnName() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl RENAME dt TO data"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnType() { + sql("CREATE TABLE tl(id INTEGER, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Promote type from Integer to Long + sql("ALTER TABLE tl MODIFY (id BIGINT)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + + // Type change that doesn't follow the type-promotion rule should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) + .isInstanceOf(TableException.class); + } + + @Test + public void testAlterTableModifyColumnNullability() { + sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Cannot change nullability from optional to required + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Set nullability from required to optional + sql("ALTER TABLE tl MODIFY (id INTEGER)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnPosition() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING FIRST)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(1, "id", Types.LongType.get())) + .asStruct(), + schemaAfter.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)"); + Schema schemaAfterAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfterAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnComment() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'some data')"); Review Comment: nit: comment as `comment for dt field`? ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); + + // Dropping an already-deleted field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableModifyColumnName() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl RENAME dt TO data"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnType() { + sql("CREATE TABLE tl(id INTEGER, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Promote type from Integer to Long + sql("ALTER TABLE tl MODIFY (id BIGINT)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + + // Type change that doesn't follow the type-promotion rule should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) + .isInstanceOf(TableException.class); + } + + @Test + public void testAlterTableModifyColumnNullability() { + sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Cannot change nullability from optional to required + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Set nullability from required to optional + sql("ALTER TABLE tl MODIFY (id INTEGER)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnPosition() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING FIRST)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(1, "id", Types.LongType.get())) + .asStruct(), + schemaAfter.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)"); + Schema schemaAfterAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfterAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnComment() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'some data')"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get(), "some data")) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableConstraint() { + sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL, col1 STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + Assert.assertEquals(ImmutableSet.of(), schemaBefore.identifierFieldNames()); + + sql("ALTER TABLE tl ADD (PRIMARY KEY (id) NOT ENFORCED)"); + Schema schemaAfterAdd = table("tl").schema(); + Assert.assertEquals(ImmutableSet.of("id"), schemaAfterAdd.identifierFieldNames()); + + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (dt) NOT ENFORCED)"); + Schema schemaAfterModify = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaAfterModify.asStruct()); + Assert.assertEquals(ImmutableSet.of("dt"), schemaAfterModify.identifierFieldNames()); + + // Composite primary key + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, dt) NOT ENFORCED)"); + Schema schemaAfterComposite = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaAfterComposite.asStruct()); + Assert.assertEquals(ImmutableSet.of("id", "dt"), schemaAfterComposite.identifierFieldNames()); + + // Setting an optional field as primary key should fail + Assertions.assertThatThrownBy( + () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (col1) NOT ENFORCED)")) + .isInstanceOf(TableException.class); Review Comment: nit: assert error msg too ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,265 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (pk STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Adding an existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl ADD (id STRING)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableDropColumn() { + sql("CREATE TABLE tl(id BIGINT, dt STRING, col1 STRING, col2 BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl DROP (dt)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter1.asStruct()); + + // Drop multiple columns + sql("ALTER TABLE tl DROP (col1, col2)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaAfter2.asStruct()); + + // Dropping an non-existing field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (foo)")) + .isInstanceOf(ValidationException.class); + + // Dropping an already-deleted field should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP (dt)")) + .isInstanceOf(ValidationException.class); + } + + @Test + public void testAlterTableModifyColumnName() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl RENAME dt TO data"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "data", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnType() { + sql("CREATE TABLE tl(id INTEGER, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Promote type from Integer to Long + sql("ALTER TABLE tl MODIFY (id BIGINT)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + + // Type change that doesn't follow the type-promotion rule should fail + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt INTEGER)")) + .isInstanceOf(TableException.class); + } + + @Test + public void testAlterTableModifyColumnNullability() { + sql("CREATE TABLE tl(id INTEGER NOT NULL, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + // Cannot change nullability from optional to required + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl MODIFY (dt STRING NOT NULL)")) + .isInstanceOf(TableException.class); + + // Set nullability from required to optional + sql("ALTER TABLE tl MODIFY (id INTEGER)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.IntegerType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnPosition() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING FIRST)"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(1, "id", Types.LongType.get())) + .asStruct(), + schemaAfter.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING AFTER id)"); + Schema schemaAfterAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfterAfter.asStruct()); + } + + @Test + public void testAlterTableModifyColumnComment() { + sql("CREATE TABLE tl(id BIGINT, dt STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + + sql("ALTER TABLE tl MODIFY (dt STRING COMMENT 'some data')"); + Schema schemaAfter = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get(), "some data")) + .asStruct(), + schemaAfter.asStruct()); + } + + @Test + public void testAlterTableConstraint() { + sql("CREATE TABLE tl(id BIGINT NOT NULL, dt STRING NOT NULL, col1 STRING)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaBefore.asStruct()); + Assert.assertEquals(ImmutableSet.of(), schemaBefore.identifierFieldNames()); + + sql("ALTER TABLE tl ADD (PRIMARY KEY (id) NOT ENFORCED)"); + Schema schemaAfterAdd = table("tl").schema(); + Assert.assertEquals(ImmutableSet.of("id"), schemaAfterAdd.identifierFieldNames()); + + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (dt) NOT ENFORCED)"); + Schema schemaAfterModify = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaAfterModify.asStruct()); + Assert.assertEquals(ImmutableSet.of("dt"), schemaAfterModify.identifierFieldNames()); + + // Composite primary key + sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, dt) NOT ENFORCED)"); + Schema schemaAfterComposite = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.required(1, "id", Types.LongType.get()), + Types.NestedField.required(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get())) + .asStruct(), + schemaAfterComposite.asStruct()); + Assert.assertEquals(ImmutableSet.of("id", "dt"), schemaAfterComposite.identifierFieldNames()); + + // Setting an optional field as primary key should fail + Assertions.assertThatThrownBy( + () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (col1) NOT ENFORCED)")) + .isInstanceOf(TableException.class); + + // Setting a composite key containing an optional field should fail + Assertions.assertThatThrownBy( + () -> sql("ALTER TABLE tl MODIFY (PRIMARY KEY (id, col1) NOT ENFORCED)")) + .isInstanceOf(TableException.class); + + // Dropping constraints is not supported yet + Assertions.assertThatThrownBy(() -> sql("ALTER TABLE tl DROP PRIMARY KEY")) + .isInstanceOf(TableException.class); Review Comment: nit: assert error msg too ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/FlinkCatalog.java: ########## @@ -472,11 +482,16 @@ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable, boolean CatalogTable table = toCatalogTable(icebergTable); - // Currently, Flink SQL only support altering table properties. + // This alterTable API only supports altering table properties. - // For current Flink Catalog API, support for adding/removing/renaming columns cannot be done by + // Support for adding/removing/renaming columns cannot be done by // comparing // CatalogTable instances, unless the Flink schema contains Iceberg column IDs. + + // To alter columns, use the other alterTable API and provide a list of TableChange's. + LOG.warn( Review Comment: but if this method is called for just changing table property, we will still see the warning msg. that can be confusing. I am not sure why upstream Flink didn't mark this as deprecated. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
