This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c82ee7f2e [flink] support alter table for flink (#965)
c82ee7f2e is described below

commit c82ee7f2e4a27c6f3b594809409737764afc7bda
Author: JunZhang <[email protected]>
AuthorDate: Sat May 6 15:48:16 2023 +0800

    [flink] support alter table for flink (#965)
---
 docs/content/how-to/altering-tables.md             |  65 ++++-
 .../org/apache/paimon/schema/SchemaManager.java    |   6 +
 paimon-flink/paimon-flink-1.16/pom.xml             |  24 ++
 .../org/apache/paimon/flink/CatalogITCaseBase.java | 120 +++++++++
 .../apache/paimon/flink/SchemaChangeITCase.java    |   5 +-
 .../src/test/resources/log4j2-test.properties      |  28 +++
 .../java/org/apache/paimon/flink/FlinkCatalog.java | 102 +++++++-
 .../apache/paimon/flink/SchemaChangeITCase.java    | 271 ++++++++++++++++++++-
 8 files changed, 611 insertions(+), 10 deletions(-)

diff --git a/docs/content/how-to/altering-tables.md 
b/docs/content/how-to/altering-tables.md
index e5ab0b3ea..d322f8c36 100644
--- a/docs/content/how-to/altering-tables.md
+++ b/docs/content/how-to/altering-tables.md
@@ -114,6 +114,14 @@ The following SQL adds two columns `c1` and `c2` to table 
`my_table`.
 
 {{< tabs "add-columns-example" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table ADD (c1 INT, c2 STRING);
+```
+
+{{< /tab >}}
+
 {{< tab "Spark3" >}}
 
 ```sql
@@ -133,6 +141,17 @@ To add a new column with specified position, use FIRST or 
AFTER col_name.
 
 {{< tabs "add-column-position" >}}
 
+
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table ADD c INT FIRST;
+
+ALTER TABLE my_table ADD c INT AFTER b;
+```
+
+{{< /tab >}}
+
 {{< tab "Spark3" >}}
 
 ```sql
@@ -150,6 +169,15 @@ The following SQL renames column `c0` in table `my_table` 
to `c1`.
 
 {{< tabs "rename-column-name-example" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table RENAME c0 TO c1;
+```
+
+{{< /tab >}}
+
+
 {{< tab "Spark3" >}}
 
 ```sql
@@ -166,6 +194,14 @@ The following SQL drops two columns `c1` and `c2` from 
table `my_table`.
 
 {{< tabs "drop-columns-example" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table DROP (c1, c2);
+```
+
+{{< /tab >}}
+
 {{< tab "Spark3" >}}
 
 ```sql
@@ -182,6 +218,16 @@ The following SQL sets column `coupon_info` to be nullable.
 
 {{< tabs "change-nullability-example" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE my_table (id INT PRIMARY KEY NOT ENFORCED, coupon_info FLOAT NOT 
NULL);
+ALTER TABLE my_table MODIFY coupon_info FLOAT;
+```
+
+{{< /tab >}}
+
+
 {{< tab "Spark3" >}}
 
 ```sql
@@ -216,6 +262,16 @@ To modify an existent column to a new position, use FIRST 
or AFTER col_name.
 
 {{< tabs "change-column-position" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table ALTER col_a FIRST;
+
+ALTER TABLE my_table ALTER col_a AFTER col_b;
+```
+
+{{< /tab >}}
+
 {{< tab "Spark3" >}}
 
 ```sql
@@ -227,13 +283,20 @@ ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;
 {{< /tab >}}
 
 {{< /tabs >}}
-
 ## Changing Column Type
 
 The following SQL changes type of column `col_a` to `DOUBLE`.
 
 {{< tabs "change-column-type" >}}
 
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table MODIFY col_a DOUBLE;
+```
+
+{{< /tab >}}
+
 {{< tab "Spark3" >}}
 
 ```sql
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java 
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 9eb6d349e..0dba66683 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -485,4 +485,10 @@ public class SchemaManager implements Serializable {
                     String.format("Change '%s' is not supported yet.", key));
         }
     }
+
+    public static void checkAlterTablePath(String key) {
+        if (CoreOptions.PATH.key().equalsIgnoreCase(key)) {
+            throw new UnsupportedOperationException("Change path is not 
supported yet.");
+        }
+    }
 }
diff --git a/paimon-flink/paimon-flink-1.16/pom.xml 
b/paimon-flink/paimon-flink-1.16/pom.xml
index 58432555d..64e2f2fd5 100644
--- a/paimon-flink/paimon-flink-1.16/pom.xml
+++ b/paimon-flink/paimon-flink-1.16/pom.xml
@@ -43,6 +43,30 @@ under the License.
             <artifactId>paimon-flink-common</artifactId>
             <version>${project.version}</version>
         </dependency>
+
+        <!-- test dependencies -->
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            <artifactId>flink-test-utils</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.flink</groupId>
+            
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+            <version>${flink.version}</version>
+            <scope>test</scope>
+        </dependency>
+
+        <dependency>
+            <groupId>org.apache.hadoop</groupId>
+            <artifactId>hadoop-hdfs-client</artifactId>
+            <version>${hadoop.version}</version>
+            <scope>test</scope>
+        </dependency>
+
     </dependencies>
 
     <build>
diff --git 
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
new file mode 100644
index 000000000..54f686190
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static 
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+
+/** ITCase for catalog. */
+public abstract class CatalogITCaseBase extends AbstractTestBase {
+
+    protected TableEnvironment tEnv;
+    protected TableEnvironment sEnv;
+    protected String path;
+
+    @Before
+    public void before() throws IOException {
+        tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        String catalog = "PAIMON";
+        path = getTempDirPath("paimon");
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG %s WITH (" + "'type'='paimon', 
'warehouse'='%s')",
+                        catalog, path));
+        tEnv.useCatalog(catalog);
+
+        sEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+        sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL, 
Duration.ofMillis(100));
+        sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
+        sEnv.useCatalog(catalog);
+
+        setParallelism(defaultParallelism());
+        prepareEnv();
+    }
+
+    private void prepareEnv() {
+        Parser parser = ((TableEnvironmentImpl) tEnv).getParser();
+        for (String ddl : ddl()) {
+            tEnv.executeSql(ddl);
+            List<Operation> operations = parser.parse(ddl);
+            if (operations.size() == 1) {
+                Operation operation = operations.get(0);
+                if (operation instanceof CreateCatalogOperation) {
+                    String name = ((CreateCatalogOperation) 
operation).getCatalogName();
+                    sEnv.registerCatalog(name, 
tEnv.getCatalog(name).orElse(null));
+                }
+            }
+        }
+    }
+
+    protected void setParallelism(int parallelism) {
+        tEnv.getConfig()
+                .getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
parallelism);
+        sEnv.getConfig()
+                .getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 
parallelism);
+    }
+
+    protected int defaultParallelism() {
+        return 2;
+    }
+
+    protected List<String> ddl() {
+        return Collections.emptyList();
+    }
+
+    protected List<Row> sql(String query, Object... args) {
+        try (CloseableIterator<Row> iter = 
tEnv.executeSql(String.format(query, args)).collect()) {
+            return ImmutableList.copyOf(iter);
+        } catch (Exception e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    protected CatalogTable table(String tableName) throws 
TableNotExistException {
+        Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+        CatalogBaseTable table =
+                catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(), 
tableName));
+        return (CatalogTable) table;
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
similarity index 97%
copy from 
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
copy to 
paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 5c6c030bd..2995049c7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -18,7 +18,7 @@
 
 package org.apache.paimon.flink;
 
-import org.junit.jupiter.api.Test;
+import org.junit.Test;
 
 import java.util.Map;
 
@@ -27,9 +27,6 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** ITCase for schema changes. */
 public class SchemaChangeITCase extends CatalogITCaseBase {
-
-    // TODO cover more cases once Flink supports more ALTER operations.
-
     @Test
     public void testSetAndRemoveOption() throws Exception {
         sql("CREATE TABLE T (a STRING, b STRING, c STRING)");
diff --git 
a/paimon-flink/paimon-flink-1.16/src/test/resources/log4j2-test.properties 
b/paimon-flink/paimon-flink-1.16/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..1b3980d15
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.16/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+#  Licensed to the Apache Software Foundation (ASF) under one
+#  or more contributor license agreements.  See the NOTICE file
+#  distributed with this work for additional information
+#  regarding copyright ownership.  The ASF licenses this file
+#  to you under the Apache License, Version 2.0 (the
+#  "License"); you may not use this file except in compliance
+#  with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+#  Unless required by applicable law or agreed to in writing, software
+#  distributed under the License is distributed on an "AS IS" BASIS,
+#  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+#  See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index e9d29ac1c..60431600b 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Catalog;
 import org.apache.paimon.catalog.Identifier;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
 import org.apache.paimon.types.DataField;
@@ -42,6 +43,17 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
 import org.apache.flink.table.catalog.CatalogTable;
 import org.apache.flink.table.catalog.CatalogTableImpl;
 import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.AddColumn;
+import org.apache.flink.table.catalog.TableChange.After;
+import org.apache.flink.table.catalog.TableChange.ColumnPosition;
+import org.apache.flink.table.catalog.TableChange.DropColumn;
+import org.apache.flink.table.catalog.TableChange.First;
+import org.apache.flink.table.catalog.TableChange.ModifyColumnName;
+import org.apache.flink.table.catalog.TableChange.ModifyColumnPosition;
+import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
+import org.apache.flink.table.catalog.TableChange.ResetOption;
+import org.apache.flink.table.catalog.TableChange.SetOption;
 import org.apache.flink.table.catalog.exceptions.CatalogException;
 import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
 import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -237,6 +249,50 @@ public class FlinkCatalog extends AbstractCatalog {
         }
     }
 
+    private SchemaChange toSchemaChange(TableChange change) {
+        if (change instanceof AddColumn) {
+            AddColumn add = (AddColumn) change;
+            String comment = add.getColumn().getComment().orElse(null);
+            SchemaChange.Move move = getMove(add.getPosition(), 
add.getColumn().getName());
+            return SchemaChange.addColumn(
+                    add.getColumn().getName(),
+                    LogicalTypeConversion.toDataType(
+                            add.getColumn().getDataType().getLogicalType()),
+                    comment,
+                    move);
+        } else if (change instanceof DropColumn) {
+            DropColumn drop = (DropColumn) change;
+            return SchemaChange.dropColumn(drop.getColumnName());
+        } else if (change instanceof ModifyColumnName) {
+            ModifyColumnName modify = (ModifyColumnName) change;
+            return SchemaChange.renameColumn(modify.getOldColumnName(), 
modify.getNewColumnName());
+        } else if (change instanceof ModifyPhysicalColumnType) {
+            ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType) 
change;
+            return SchemaChange.updateColumnType(
+                    modify.getOldColumn().getName(),
+                    
LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()));
+        } else if (change instanceof ModifyColumnPosition) {
+            ModifyColumnPosition modify = (ModifyColumnPosition) change;
+            SchemaChange.Move move =
+                    getMove(modify.getNewPosition(), 
modify.getNewColumn().getName());
+            return SchemaChange.updateColumnPosition(move);
+        } else if (change instanceof SetOption) {
+            SetOption setOption = (SetOption) change;
+            String key = setOption.getKey();
+            String value = setOption.getValue();
+
+            SchemaManager.checkAlterTablePath(key);
+
+            return SchemaChange.setOption(key, value);
+        } else if (change instanceof ResetOption) {
+            ResetOption resetOption = (ResetOption) change;
+            return SchemaChange.removeOption(resetOption.getKey());
+        } else {
+            throw new UnsupportedOperationException(
+                    "Change is not supported: " + change.getClass());
+        }
+    }
+
     @Override
     public void alterTable(
             ObjectPath tablePath, CatalogBaseTable newTable, boolean 
ignoreIfNotExists)
@@ -283,6 +339,45 @@ public class FlinkCatalog extends AbstractCatalog {
         }
     }
 
+    @Override
+    public void alterTable(
+            ObjectPath tablePath,
+            CatalogBaseTable newTable,
+            List<TableChange> tableChanges,
+            boolean ignoreIfNotExists)
+            throws TableNotExistException, CatalogException {
+        if (ignoreIfNotExists && !tableExists(tablePath)) {
+            return;
+        }
+
+        CatalogTable table = getTable(tablePath);
+
+        validateAlterTable(table, (CatalogTable) newTable);
+
+        List<SchemaChange> changes = new ArrayList<>();
+        if (null != tableChanges) {
+            List<SchemaChange> schemaChanges =
+                    
tableChanges.stream().map(this::toSchemaChange).collect(Collectors.toList());
+            changes.addAll(schemaChanges);
+        }
+
+        try {
+            catalog.alterTable(toIdentifier(tablePath), changes, 
ignoreIfNotExists);
+        } catch (Catalog.TableNotExistException e) {
+            throw new TableNotExistException(getName(), tablePath);
+        }
+    }
+
+    private SchemaChange.Move getMove(ColumnPosition columnPosition, String 
fieldName) {
+        SchemaChange.Move move = null;
+        if (columnPosition instanceof First) {
+            move = SchemaChange.Move.first(fieldName);
+        } else if (columnPosition instanceof After) {
+            move = SchemaChange.Move.after(fieldName, ((After) 
columnPosition).column());
+        }
+        return move;
+    }
+
     private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2) 
{
         org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
         org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
@@ -300,10 +395,9 @@ public class FlinkCatalog extends AbstractCatalog {
             pkEquality = true;
         }
 
-        if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
-                && Objects.equals(ts1.getWatermarkSpecs(), 
ts2.getWatermarkSpecs())
-                && pkEquality)) {
-            throw new UnsupportedOperationException("Altering schema is not 
supported yet.");
+        if (!(Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs()) 
&& pkEquality)) {
+            throw new UnsupportedOperationException(
+                    "Altering Watermark or primary key is not supported yet.");
         }
 
         if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 5c6c030bd..33ce7ca48 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -18,8 +18,12 @@
 
 package org.apache.paimon.flink;
 
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+
+import org.apache.flink.types.Row;
 import org.junit.jupiter.api.Test;
 
+import java.util.List;
 import java.util.Map;
 
 import static org.assertj.core.api.Assertions.assertThat;
@@ -28,7 +32,272 @@ import static 
org.assertj.core.api.Assertions.assertThatThrownBy;
 /** ITCase for schema changes. */
 public class SchemaChangeITCase extends CatalogITCaseBase {
 
-    // TODO cover more cases once Flink supports more ALTER operations.
+    // TODO cover more cases.
+    @Test
+    public void testAddColumn() {
+        sql("CREATE TABLE T (a STRING, b DOUBLE, c FLOAT)");
+        sql("INSERT INTO T VALUES('aaa', 1.2, 3.4)");
+        sql("ALTER TABLE T ADD d INT");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `c` FLOAT,\n"
+                                + "  `d` INT\n"
+                                + ")");
+        sql("INSERT INTO T VALUES('bbb', 4.5, 5.6, 5)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, 3.4, null], 
+I[bbb, 4.5, 5.6, 5]]");
+
+        // add column with after position
+        sql("ALTER TABLE T ADD e INT AFTER b");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `e` INT,\n"
+                                + "  `c` FLOAT,\n"
+                                + "  `d` INT");
+        sql("INSERT INTO T VALUES('ccc', 2.3, 6, 5.6, 5)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[aaa, 1.2, null, 3.4, null], +I[bbb, 4.5, null, 
5.6, 5], +I[ccc, 2.3, 6, 5.6, 5]]");
+
+        // add column with first position
+        sql("ALTER TABLE T ADD f STRING FIRST");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `f` VARCHAR(2147483647),\n"
+                                + "  `a` VARCHAR(2147483647),\n"
+                                + "  `b` DOUBLE,\n"
+                                + "  `e` INT,\n"
+                                + "  `c` FLOAT,\n"
+                                + "  `d` INT");
+
+        sql("INSERT INTO T VALUES('flink', 'fff', 45.34, 4, 2.45, 12)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[null, aaa, 1.2, null, 3.4, null], +I[null, bbb, 
4.5, null, 5.6, 5],"
+                                + " +I[null, ccc, 2.3, 6, 5.6, 5], +I[flink, 
fff, 45.34, 4, 2.45, 12]]");
+
+        // add multiple columns.
+        sql("ALTER TABLE T ADD ( g INT, h BOOLEAN ) ");
+        sql("INSERT INTO T VALUES('ggg', 'hhh', 23.43, 6, 2.34, 34, 23, 
true)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[null, aaa, 1.2, null, 3.4, null, null, null], 
+I[null, bbb, 4.5, null, 5.6, 5, null, null],"
+                                + " +I[null, ccc, 2.3, 6, 5.6, 5, null, null], 
+I[flink, fff, 45.34, 4, 2.45, 12, null, null],"
+                                + " +I[ggg, hhh, 23.43, 6, 2.34, 34, 23, 
true]]");
+    }
+
+    @Test
+    public void testDropColumn() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, 
c STRING, d INT, e FLOAT)");
+        sql("INSERT INTO T VALUES('aaa', 'bbb', 'ccc', 10, 3.4)");
+        sql("ALTER TABLE T DROP e");
+        sql("INSERT INTO T VALUES('ddd', 'eee', 'fff', 20)");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, bbb, ccc, 10], 
+I[ddd, eee, fff, 20]]");
+
+        sql("ALTER TABLE T DROP (c, d)");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),");
+
+        sql("INSERT INTO T VALUES('ggg', 'hhh')");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[aaa, bbb], +I[ddd, eee], 
+I[ggg, hhh]]");
+    }
+
+    @Test
+    public void testRenameColumn() {
+        sql("CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c 
STRING)");
+        sql("INSERT INTO T VALUES('paimon', 'bbb', 'ccc')");
+        sql("ALTER TABLE T RENAME c TO c1");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c1` VARCHAR(2147483647)");
+        result = sql("SELECT a, b, c1 FROM T");
+        assertThat(result.toString()).isEqualTo("[+I[paimon, bbb, ccc]]");
+
+        // column do not exist.
+        assertThatThrownBy(() -> sql("ALTER TABLE T RENAME d TO d1"))
+                .hasMessageContaining("The column `d` does not exist in the 
base table.");
+
+        // target column exist.
+        assertThatThrownBy(() -> sql("ALTER TABLE T RENAME a TO b"))
+                .hasMessageContaining("The column `b` already existed in table 
schema.");
+    }
+
+    @Test
+    public void testDropPrimaryKey() {
+        sql("CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c 
STRING)");
+        assertThatThrownBy(() -> sql("ALTER TABLE T DROP a"))
+                .hasMessageContaining(
+                        "Failed to execute ALTER TABLE statement.\n"
+                                + "The column `a` is used as the primary 
key.");
+    }
+
+    @Test
+    public void testDropPartitionKey() {
+        sql(
+                "CREATE TABLE MyTable (\n"
+                        + "    user_id BIGINT,\n"
+                        + "    item_id BIGINT,\n"
+                        + "    behavior STRING,\n"
+                        + "    dt STRING,\n"
+                        + "    hh STRING,\n"
+                        + "    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+                        + ") PARTITIONED BY (dt, hh)");
+        assertThatThrownBy(() -> sql("ALTER TABLE MyTable DROP dt"))
+                .hasMessageContaining(
+                        "Failed to execute ALTER TABLE statement.\n"
+                                + "The column `dt` is used as the partition 
keys.");
+    }
+
+    @Test
+    public void testModifyColumnType() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, 
c STRING, d INT, e FLOAT)");
+        sql("INSERT INTO T VALUES('paimon', 'bbb', 'ccc', 1, 3.4)");
+        sql("ALTER TABLE T MODIFY e DOUBLE");
+        sql("INSERT INTO T VALUES('flink', 'ddd', 'eee', 4, 6.7)");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,\n"
+                                + "  `e` DOUBLE,");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[flink, ddd, eee, 4, 6.7], +I[paimon, bbb, ccc, 1, 
3.4000000953674316]]");
+
+        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY c DOUBLE"))
+                .hasMessageContaining(
+                        "Could not execute ALTER TABLE PAIMON.default.T\n" + " 
 MODIFY `c` DOUBLE");
+    }
+
+    @Test
+    public void testModifyColumnPosition() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, 
c STRING, d INT, e DOUBLE)");
+        sql("INSERT INTO T VALUES('paimon', 'bbb', 'ccc', 1, 3.4)");
+        sql("ALTER TABLE T MODIFY b STRING FIRST");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,\n"
+                                + "  `e` DOUBLE,");
+
+        sql("INSERT INTO T VALUES('aaa', 'flink', 'ddd', 2, 5.7)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString())
+                .isEqualTo("[+I[aaa, flink, ddd, 2, 5.7], +I[bbb, paimon, ccc, 
1, 3.4]]");
+
+        sql("ALTER TABLE T MODIFY e DOUBLE AFTER c");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `e` DOUBLE,\n"
+                                + "  `d` INT,");
+
+        sql("INSERT INTO T VALUES('sss', 'ggg', 'eee', 4.7, 10)");
+        result = sql("SELECT * FROM T");
+        assertThat(result.toString())
+                .isEqualTo(
+                        "[+I[aaa, flink, ddd, 5.7, 2], +I[sss, ggg, eee, 4.7, 
10], +I[bbb, paimon, ccc, 3.4, 1]]");
+
+        //  move self to first test
+        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY b STRING FIRST"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot move itself for column b"));
+
+        //  move self to after test
+        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY b STRING AFTER b"))
+                .satisfies(
+                        AssertionUtils.anyCauseMatches(
+                                UnsupportedOperationException.class,
+                                "Cannot move itself for column b"));
+
+        // missing column
+        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY h STRING FIRST"))
+                .hasMessageContaining(
+                        "Try to modify a column `h` which does not exist in 
the table");
+
+        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY h STRING AFTER d"))
+                .hasMessageContaining(
+                        "Try to modify a column `h` which does not exist in 
the table");
+    }
+
+    @Test
+    public void testModifyNullability() {
+        sql(
+                "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, 
c STRING, d INT, e FLOAT NOT NULL)");
+        List<Row> result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,\n"
+                                + "  `e` FLOAT NOT NULL,");
+
+        sql("ALTER TABLE T MODIFY e FLOAT");
+        result = sql("SHOW CREATE TABLE T");
+        assertThat(result.toString())
+                .contains(
+                        "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+                                + "  `a` VARCHAR(2147483647) NOT NULL,\n"
+                                + "  `b` VARCHAR(2147483647),\n"
+                                + "  `c` VARCHAR(2147483647),\n"
+                                + "  `d` INT,\n"
+                                + "  `e` FLOAT");
+
+        assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY c STRING NOT NULL"))
+                .hasMessageContaining(
+                        "Could not execute ALTER TABLE PAIMON.default.T\n"
+                                + "  MODIFY `c` STRING NOT NULL");
+    }
 
     @Test
     public void testSetAndRemoveOption() throws Exception {

Reply via email to