stevenzwu commented on code in PR #7628: URL: https://github.com/apache/iceberg/pull/7628#discussion_r1334926298
########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import java.util.List; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; + +public class FlinkAlterTableUtil { + private FlinkAlterTableUtil() {} + + /** + * Applies a list of Flink table changes to an {@link UpdateSchema} operation. + * + * @param pendingUpdate an uncommitted UpdateSchema operation to configure + * @param schemaChanges a list of Flink table changes + */ + public static void applySchemaChanges( + UpdateSchema pendingUpdate, List<TableChange> schemaChanges) { + for (TableChange change : schemaChanges) { + if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + Column flinkColumn = addColumn.getColumn(); + Preconditions.checkArgument( + FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn), + "Adding computed columns is not supported yet: %s", + flinkColumn.getName()); + Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); + pendingUpdate.addColumn(flinkColumn.getName(), icebergType); + + } else if (change instanceof TableChange.ModifyColumn) { + TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; + applyModifyColumn(pendingUpdate, modifyColumn); + + } else if (change instanceof TableChange.DropColumn) { + TableChange.DropColumn dropColumn = (TableChange.DropColumn) change; + pendingUpdate.deleteColumn(dropColumn.getColumnName()); + + } else if (change instanceof TableChange.AddWatermark) { + throw new UnsupportedOperationException("Adding watermark specs is not supported yet. "); Review Comment: this (the extra whitespace in the end) needs to be addressed. ########## flink/v1.17/flink/src/main/java/org/apache/iceberg/flink/util/FlinkAlterTableUtil.java: ########## @@ -0,0 +1,176 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.flink.util; + +import java.util.List; +import org.apache.flink.table.catalog.Column; +import org.apache.flink.table.catalog.TableChange; +import org.apache.flink.table.catalog.UniqueConstraint; +import org.apache.iceberg.UpdateProperties; +import org.apache.iceberg.UpdateSchema; +import org.apache.iceberg.flink.FlinkSchemaUtil; +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; +import org.apache.iceberg.types.Type; + +public class FlinkAlterTableUtil { + private FlinkAlterTableUtil() {} + + /** + * Applies a list of Flink table changes to an {@link UpdateSchema} operation. + * + * @param pendingUpdate an uncommitted UpdateSchema operation to configure + * @param schemaChanges a list of Flink table changes + */ + public static void applySchemaChanges( + UpdateSchema pendingUpdate, List<TableChange> schemaChanges) { + for (TableChange change : schemaChanges) { + if (change instanceof TableChange.AddColumn) { + TableChange.AddColumn addColumn = (TableChange.AddColumn) change; + Column flinkColumn = addColumn.getColumn(); + Preconditions.checkArgument( + FlinkCompatibilityUtil.isPhysicalColumn(flinkColumn), + "Adding computed columns is not supported yet: %s", + flinkColumn.getName()); + Type icebergType = FlinkSchemaUtil.convert(flinkColumn.getDataType().getLogicalType()); + pendingUpdate.addColumn(flinkColumn.getName(), icebergType); + + } else if (change instanceof TableChange.ModifyColumn) { + TableChange.ModifyColumn modifyColumn = (TableChange.ModifyColumn) change; + applyModifyColumn(pendingUpdate, modifyColumn); + + } else if (change instanceof TableChange.DropColumn) { + TableChange.DropColumn dropColumn = (TableChange.DropColumn) change; + pendingUpdate.deleteColumn(dropColumn.getColumnName()); + + } else if (change instanceof TableChange.AddWatermark) { + throw new UnsupportedOperationException("Adding watermark specs is not supported yet. "); Review Comment: also the error msg doesn't follow the coding style. you can referring to `ArrowVectorAccessor` for examples. Please review all the error msgs in this class E.g., it could be sth like `Unsupported schema change: adding watermark specs` ########## flink/v1.17/flink/src/test/java/org/apache/iceberg/flink/TestFlinkCatalogTable.java: ########## @@ -322,39 +323,293 @@ public void testAlterTable() throws TableNotExistException { Assert.assertEquals(properties, table("tl").properties()); // remove property - CatalogTable catalogTable = catalogTable("tl"); + sql("ALTER TABLE tl RESET('oldK')"); properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); Assert.assertEquals(properties, table("tl").properties()); } @Test - public void testAlterTableWithPrimaryKey() throws TableNotExistException { - sql("CREATE TABLE tl(id BIGINT, PRIMARY KEY(id) NOT ENFORCED) WITH ('oldK'='oldV')"); - Map<String, String> properties = Maps.newHashMap(); - properties.put("oldK", "oldV"); + public void testAlterTableAddColumn() { + sql("CREATE TABLE tl(id BIGINT)"); + Schema schemaBefore = table("tl").schema(); + Assert.assertEquals( + new Schema(Types.NestedField.optional(1, "id", Types.LongType.get())).asStruct(), + schemaBefore.asStruct()); - // new - sql("ALTER TABLE tl SET('newK'='newV')"); - properties.put("newK", "newV"); - Assert.assertEquals(properties, table("tl").properties()); + sql("ALTER TABLE tl ADD (dt STRING)"); + Schema schemaAfter1 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get())) + .asStruct(), + schemaAfter1.asStruct()); - // update old - sql("ALTER TABLE tl SET('oldK'='oldV2')"); - properties.put("oldK", "oldV2"); - Assert.assertEquals(properties, table("tl").properties()); + // Add multiple columns + sql("ALTER TABLE tl ADD (col1 STRING, col2 BIGINT)"); + Schema schemaAfter2 = table("tl").schema(); + Assert.assertEquals( + new Schema( + Types.NestedField.optional(1, "id", Types.LongType.get()), + Types.NestedField.optional(2, "dt", Types.StringType.get()), + Types.NestedField.optional(3, "col1", Types.StringType.get()), + Types.NestedField.optional(4, "col2", Types.LongType.get())) + .asStruct(), + schemaAfter2.asStruct()); - // remove property - CatalogTable catalogTable = catalogTable("tl"); - properties.remove("oldK"); - getTableEnv() - .getCatalog(getTableEnv().getCurrentCatalog()) - .get() - .alterTable(new ObjectPath(DATABASE, "tl"), catalogTable.copy(properties), false); - Assert.assertEquals(properties, table("tl").properties()); + // Adding a required field should fail due to Iceberg's validation. Review Comment: oh. this is because `SchemaUpdate` by default doesn't allow incompatible change. can you add the note to the comment for easier understanding for future reader. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
