This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git
The following commit(s) were added to refs/heads/master by this push:
new c82ee7f2e [flink] support alter table for flink (#965)
c82ee7f2e is described below
commit c82ee7f2e4a27c6f3b594809409737764afc7bda
Author: JunZhang <[email protected]>
AuthorDate: Sat May 6 15:48:16 2023 +0800
[flink] support alter table for flink (#965)
---
docs/content/how-to/altering-tables.md | 65 ++++-
.../org/apache/paimon/schema/SchemaManager.java | 6 +
paimon-flink/paimon-flink-1.16/pom.xml | 24 ++
.../org/apache/paimon/flink/CatalogITCaseBase.java | 120 +++++++++
.../apache/paimon/flink/SchemaChangeITCase.java | 5 +-
.../src/test/resources/log4j2-test.properties | 28 +++
.../java/org/apache/paimon/flink/FlinkCatalog.java | 102 +++++++-
.../apache/paimon/flink/SchemaChangeITCase.java | 271 ++++++++++++++++++++-
8 files changed, 611 insertions(+), 10 deletions(-)
diff --git a/docs/content/how-to/altering-tables.md
b/docs/content/how-to/altering-tables.md
index e5ab0b3ea..d322f8c36 100644
--- a/docs/content/how-to/altering-tables.md
+++ b/docs/content/how-to/altering-tables.md
@@ -114,6 +114,14 @@ The following SQL adds two columns `c1` and `c2` to table
`my_table`.
{{< tabs "add-columns-example" >}}
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table ADD (c1 INT, c2 STRING);
+```
+
+{{< /tab >}}
+
{{< tab "Spark3" >}}
```sql
@@ -133,6 +141,17 @@ To add a new column with specified position, use FIRST or
AFTER col_name.
{{< tabs "add-column-position" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table ADD c INT FIRST;
+
+ALTER TABLE my_table ADD c INT AFTER b;
+```
+
+{{< /tab >}}
+
{{< tab "Spark3" >}}
```sql
@@ -150,6 +169,15 @@ The following SQL renames column `c0` in table `my_table`
to `c1`.
{{< tabs "rename-column-name-example" >}}
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table RENAME c0 TO c1;
+```
+
+{{< /tab >}}
+
+
{{< tab "Spark3" >}}
```sql
@@ -166,6 +194,14 @@ The following SQL drops two columns `c1` and `c2` from
table `my_table`.
{{< tabs "drop-columns-example" >}}
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table DROP (c1, c2);
+```
+
+{{< /tab >}}
+
{{< tab "Spark3" >}}
```sql
@@ -182,6 +218,16 @@ The following SQL sets column `coupon_info` to be nullable.
{{< tabs "change-nullability-example" >}}
+{{< tab "Flink" >}}
+
+```sql
+CREATE TABLE my_table (id INT PRIMARY KEY NOT ENFORCED, coupon_info FLOAT NOT
NULL);
+ALTER TABLE my_table MODIFY coupon_info FLOAT;
+```
+
+{{< /tab >}}
+
+
{{< tab "Spark3" >}}
```sql
@@ -216,6 +262,16 @@ To modify an existent column to a new position, use FIRST
or AFTER col_name.
{{< tabs "change-column-position" >}}
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table ALTER col_a FIRST;
+
+ALTER TABLE my_table ALTER col_a AFTER col_b;
+```
+
+{{< /tab >}}
+
{{< tab "Spark3" >}}
```sql
@@ -227,13 +283,20 @@ ALTER TABLE my_table ALTER COLUMN col_a AFTER col_b;
{{< /tab >}}
{{< /tabs >}}
-
## Changing Column Type
The following SQL changes type of column `col_a` to `DOUBLE`.
{{< tabs "change-column-type" >}}
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table MODIFY col_a DOUBLE;
+```
+
+{{< /tab >}}
+
{{< tab "Spark3" >}}
```sql
diff --git
a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
index 9eb6d349e..0dba66683 100644
--- a/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
+++ b/paimon-core/src/main/java/org/apache/paimon/schema/SchemaManager.java
@@ -485,4 +485,10 @@ public class SchemaManager implements Serializable {
String.format("Change '%s' is not supported yet.", key));
}
}
+
+ public static void checkAlterTablePath(String key) {
+ if (CoreOptions.PATH.key().equalsIgnoreCase(key)) {
+ throw new UnsupportedOperationException("Change path is not
supported yet.");
+ }
+ }
}
diff --git a/paimon-flink/paimon-flink-1.16/pom.xml
b/paimon-flink/paimon-flink-1.16/pom.xml
index 58432555d..64e2f2fd5 100644
--- a/paimon-flink/paimon-flink-1.16/pom.xml
+++ b/paimon-flink/paimon-flink-1.16/pom.xml
@@ -43,6 +43,30 @@ under the License.
<artifactId>paimon-flink-common</artifactId>
<version>${project.version}</version>
</dependency>
+
+ <!-- test dependencies -->
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+ <artifactId>flink-test-utils</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.flink</groupId>
+
<artifactId>flink-table-planner_${scala.binary.version}</artifactId>
+ <version>${flink.version}</version>
+ <scope>test</scope>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-hdfs-client</artifactId>
+ <version>${hadoop.version}</version>
+ <scope>test</scope>
+ </dependency>
+
</dependencies>
<build>
diff --git
a/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
new file mode 100644
index 000000000..54f686190
--- /dev/null
+++
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/CatalogITCaseBase.java
@@ -0,0 +1,120 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink;
+
+import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableList;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.Catalog;
+import org.apache.flink.table.catalog.CatalogBaseTable;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.exceptions.TableNotExistException;
+import org.apache.flink.table.delegation.Parser;
+import org.apache.flink.table.operations.Operation;
+import org.apache.flink.table.operations.ddl.CreateCatalogOperation;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.Before;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.List;
+
+import static
org.apache.flink.streaming.api.environment.ExecutionCheckpointingOptions.CHECKPOINTING_INTERVAL;
+
+/** ITCase for catalog. */
+public abstract class CatalogITCaseBase extends AbstractTestBase {
+
+ protected TableEnvironment tEnv;
+ protected TableEnvironment sEnv;
+ protected String path;
+
+ @Before
+ public void before() throws IOException {
+ tEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+ String catalog = "PAIMON";
+ path = getTempDirPath("paimon");
+ tEnv.executeSql(
+ String.format(
+ "CREATE CATALOG %s WITH (" + "'type'='paimon',
'warehouse'='%s')",
+ catalog, path));
+ tEnv.useCatalog(catalog);
+
+ sEnv =
TableEnvironment.create(EnvironmentSettings.newInstance().inStreamingMode().build());
+ sEnv.getConfig().getConfiguration().set(CHECKPOINTING_INTERVAL,
Duration.ofMillis(100));
+ sEnv.registerCatalog(catalog, tEnv.getCatalog(catalog).get());
+ sEnv.useCatalog(catalog);
+
+ setParallelism(defaultParallelism());
+ prepareEnv();
+ }
+
+ private void prepareEnv() {
+ Parser parser = ((TableEnvironmentImpl) tEnv).getParser();
+ for (String ddl : ddl()) {
+ tEnv.executeSql(ddl);
+ List<Operation> operations = parser.parse(ddl);
+ if (operations.size() == 1) {
+ Operation operation = operations.get(0);
+ if (operation instanceof CreateCatalogOperation) {
+ String name = ((CreateCatalogOperation)
operation).getCatalogName();
+ sEnv.registerCatalog(name,
tEnv.getCatalog(name).orElse(null));
+ }
+ }
+ }
+ }
+
+ protected void setParallelism(int parallelism) {
+ tEnv.getConfig()
+ .getConfiguration()
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
parallelism);
+ sEnv.getConfig()
+ .getConfiguration()
+
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM,
parallelism);
+ }
+
+ protected int defaultParallelism() {
+ return 2;
+ }
+
+ protected List<String> ddl() {
+ return Collections.emptyList();
+ }
+
+ protected List<Row> sql(String query, Object... args) {
+ try (CloseableIterator<Row> iter =
tEnv.executeSql(String.format(query, args)).collect()) {
+ return ImmutableList.copyOf(iter);
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ protected CatalogTable table(String tableName) throws
TableNotExistException {
+ Catalog catalog = tEnv.getCatalog(tEnv.getCurrentCatalog()).get();
+ CatalogBaseTable table =
+ catalog.getTable(new ObjectPath(catalog.getDefaultDatabase(),
tableName));
+ return (CatalogTable) table;
+ }
+}
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
similarity index 97%
copy from
paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
copy to
paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 5c6c030bd..2995049c7 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-1.16/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -18,7 +18,7 @@
package org.apache.paimon.flink;
-import org.junit.jupiter.api.Test;
+import org.junit.Test;
import java.util.Map;
@@ -27,9 +27,6 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for schema changes. */
public class SchemaChangeITCase extends CatalogITCaseBase {
-
- // TODO cover more cases once Flink supports more ALTER operations.
-
@Test
public void testSetAndRemoveOption() throws Exception {
sql("CREATE TABLE T (a STRING, b STRING, c STRING)");
diff --git
a/paimon-flink/paimon-flink-1.16/src/test/resources/log4j2-test.properties
b/paimon-flink/paimon-flink-1.16/src/test/resources/log4j2-test.properties
new file mode 100644
index 000000000..1b3980d15
--- /dev/null
+++ b/paimon-flink/paimon-flink-1.16/src/test/resources/log4j2-test.properties
@@ -0,0 +1,28 @@
+################################################################################
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+################################################################################
+
+# Set root logger level to OFF to not flood build logs
+# set manually to INFO for debugging purposes
+rootLogger.level = OFF
+rootLogger.appenderRef.test.ref = TestLogger
+
+appender.testlogger.name = TestLogger
+appender.testlogger.type = CONSOLE
+appender.testlogger.target = SYSTEM_ERR
+appender.testlogger.layout.type = PatternLayout
+appender.testlogger.layout.pattern = %-4r [%tid %t] %-5p %c %x - %m%n
diff --git
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
index e9d29ac1c..60431600b 100644
---
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
+++
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/FlinkCatalog.java
@@ -23,6 +23,7 @@ import org.apache.paimon.catalog.Catalog;
import org.apache.paimon.catalog.Identifier;
import org.apache.paimon.schema.Schema;
import org.apache.paimon.schema.SchemaChange;
+import org.apache.paimon.schema.SchemaManager;
import org.apache.paimon.table.FileStoreTable;
import org.apache.paimon.table.Table;
import org.apache.paimon.types.DataField;
@@ -42,6 +43,17 @@ import org.apache.flink.table.catalog.CatalogPartitionSpec;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.CatalogTableImpl;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.TableChange;
+import org.apache.flink.table.catalog.TableChange.AddColumn;
+import org.apache.flink.table.catalog.TableChange.After;
+import org.apache.flink.table.catalog.TableChange.ColumnPosition;
+import org.apache.flink.table.catalog.TableChange.DropColumn;
+import org.apache.flink.table.catalog.TableChange.First;
+import org.apache.flink.table.catalog.TableChange.ModifyColumnName;
+import org.apache.flink.table.catalog.TableChange.ModifyColumnPosition;
+import org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType;
+import org.apache.flink.table.catalog.TableChange.ResetOption;
+import org.apache.flink.table.catalog.TableChange.SetOption;
import org.apache.flink.table.catalog.exceptions.CatalogException;
import org.apache.flink.table.catalog.exceptions.DatabaseAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.DatabaseNotEmptyException;
@@ -237,6 +249,50 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
+ private SchemaChange toSchemaChange(TableChange change) {
+ if (change instanceof AddColumn) {
+ AddColumn add = (AddColumn) change;
+ String comment = add.getColumn().getComment().orElse(null);
+ SchemaChange.Move move = getMove(add.getPosition(),
add.getColumn().getName());
+ return SchemaChange.addColumn(
+ add.getColumn().getName(),
+ LogicalTypeConversion.toDataType(
+ add.getColumn().getDataType().getLogicalType()),
+ comment,
+ move);
+ } else if (change instanceof DropColumn) {
+ DropColumn drop = (DropColumn) change;
+ return SchemaChange.dropColumn(drop.getColumnName());
+ } else if (change instanceof ModifyColumnName) {
+ ModifyColumnName modify = (ModifyColumnName) change;
+ return SchemaChange.renameColumn(modify.getOldColumnName(),
modify.getNewColumnName());
+ } else if (change instanceof ModifyPhysicalColumnType) {
+ ModifyPhysicalColumnType modify = (ModifyPhysicalColumnType)
change;
+ return SchemaChange.updateColumnType(
+ modify.getOldColumn().getName(),
+
LogicalTypeConversion.toDataType(modify.getNewType().getLogicalType()));
+ } else if (change instanceof ModifyColumnPosition) {
+ ModifyColumnPosition modify = (ModifyColumnPosition) change;
+ SchemaChange.Move move =
+ getMove(modify.getNewPosition(),
modify.getNewColumn().getName());
+ return SchemaChange.updateColumnPosition(move);
+ } else if (change instanceof SetOption) {
+ SetOption setOption = (SetOption) change;
+ String key = setOption.getKey();
+ String value = setOption.getValue();
+
+ SchemaManager.checkAlterTablePath(key);
+
+ return SchemaChange.setOption(key, value);
+ } else if (change instanceof ResetOption) {
+ ResetOption resetOption = (ResetOption) change;
+ return SchemaChange.removeOption(resetOption.getKey());
+ } else {
+ throw new UnsupportedOperationException(
+ "Change is not supported: " + change.getClass());
+ }
+ }
+
@Override
public void alterTable(
ObjectPath tablePath, CatalogBaseTable newTable, boolean
ignoreIfNotExists)
@@ -283,6 +339,45 @@ public class FlinkCatalog extends AbstractCatalog {
}
}
+ @Override
+ public void alterTable(
+ ObjectPath tablePath,
+ CatalogBaseTable newTable,
+ List<TableChange> tableChanges,
+ boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ if (ignoreIfNotExists && !tableExists(tablePath)) {
+ return;
+ }
+
+ CatalogTable table = getTable(tablePath);
+
+ validateAlterTable(table, (CatalogTable) newTable);
+
+ List<SchemaChange> changes = new ArrayList<>();
+ if (null != tableChanges) {
+ List<SchemaChange> schemaChanges =
+
tableChanges.stream().map(this::toSchemaChange).collect(Collectors.toList());
+ changes.addAll(schemaChanges);
+ }
+
+ try {
+ catalog.alterTable(toIdentifier(tablePath), changes,
ignoreIfNotExists);
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException(getName(), tablePath);
+ }
+ }
+
+ private SchemaChange.Move getMove(ColumnPosition columnPosition, String
fieldName) {
+ SchemaChange.Move move = null;
+ if (columnPosition instanceof First) {
+ move = SchemaChange.Move.first(fieldName);
+ } else if (columnPosition instanceof After) {
+ move = SchemaChange.Move.after(fieldName, ((After)
columnPosition).column());
+ }
+ return move;
+ }
+
private static void validateAlterTable(CatalogTable ct1, CatalogTable ct2)
{
org.apache.flink.table.api.TableSchema ts1 = ct1.getSchema();
org.apache.flink.table.api.TableSchema ts2 = ct2.getSchema();
@@ -300,10 +395,9 @@ public class FlinkCatalog extends AbstractCatalog {
pkEquality = true;
}
- if (!(Objects.equals(ts1.getTableColumns(), ts2.getTableColumns())
- && Objects.equals(ts1.getWatermarkSpecs(),
ts2.getWatermarkSpecs())
- && pkEquality)) {
- throw new UnsupportedOperationException("Altering schema is not
supported yet.");
+ if (!(Objects.equals(ts1.getWatermarkSpecs(), ts2.getWatermarkSpecs())
&& pkEquality)) {
+ throw new UnsupportedOperationException(
+ "Altering Watermark or primary key is not supported yet.");
}
if (!ct1.getPartitionKeys().equals(ct2.getPartitionKeys())) {
diff --git
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
index 5c6c030bd..33ce7ca48 100644
---
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
+++
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/SchemaChangeITCase.java
@@ -18,8 +18,12 @@
package org.apache.paimon.flink;
+import org.apache.paimon.testutils.assertj.AssertionUtils;
+
+import org.apache.flink.types.Row;
import org.junit.jupiter.api.Test;
+import java.util.List;
import java.util.Map;
import static org.assertj.core.api.Assertions.assertThat;
@@ -28,7 +32,272 @@ import static
org.assertj.core.api.Assertions.assertThatThrownBy;
/** ITCase for schema changes. */
public class SchemaChangeITCase extends CatalogITCaseBase {
- // TODO cover more cases once Flink supports more ALTER operations.
+ // TODO cover more cases.
+ @Test
+ public void testAddColumn() {
+ sql("CREATE TABLE T (a STRING, b DOUBLE, c FLOAT)");
+ sql("INSERT INTO T VALUES('aaa', 1.2, 3.4)");
+ sql("ALTER TABLE T ADD d INT");
+ List<Row> result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` VARCHAR(2147483647),\n"
+ + " `b` DOUBLE,\n"
+ + " `c` FLOAT,\n"
+ + " `d` INT\n"
+ + ")");
+ sql("INSERT INTO T VALUES('bbb', 4.5, 5.6, 5)");
+ result = sql("SELECT * FROM T");
+ assertThat(result.toString()).isEqualTo("[+I[aaa, 1.2, 3.4, null],
+I[bbb, 4.5, 5.6, 5]]");
+
+ // add column with after position
+ sql("ALTER TABLE T ADD e INT AFTER b");
+ result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` VARCHAR(2147483647),\n"
+ + " `b` DOUBLE,\n"
+ + " `e` INT,\n"
+ + " `c` FLOAT,\n"
+ + " `d` INT");
+ sql("INSERT INTO T VALUES('ccc', 2.3, 6, 5.6, 5)");
+ result = sql("SELECT * FROM T");
+ assertThat(result.toString())
+ .isEqualTo(
+ "[+I[aaa, 1.2, null, 3.4, null], +I[bbb, 4.5, null,
5.6, 5], +I[ccc, 2.3, 6, 5.6, 5]]");
+
+ // add column with first position
+ sql("ALTER TABLE T ADD f STRING FIRST");
+ result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `f` VARCHAR(2147483647),\n"
+ + " `a` VARCHAR(2147483647),\n"
+ + " `b` DOUBLE,\n"
+ + " `e` INT,\n"
+ + " `c` FLOAT,\n"
+ + " `d` INT");
+
+ sql("INSERT INTO T VALUES('flink', 'fff', 45.34, 4, 2.45, 12)");
+ result = sql("SELECT * FROM T");
+ assertThat(result.toString())
+ .isEqualTo(
+ "[+I[null, aaa, 1.2, null, 3.4, null], +I[null, bbb,
4.5, null, 5.6, 5],"
+ + " +I[null, ccc, 2.3, 6, 5.6, 5], +I[flink,
fff, 45.34, 4, 2.45, 12]]");
+
+ // add multiple columns.
+ sql("ALTER TABLE T ADD ( g INT, h BOOLEAN ) ");
+ sql("INSERT INTO T VALUES('ggg', 'hhh', 23.43, 6, 2.34, 34, 23,
true)");
+ result = sql("SELECT * FROM T");
+ assertThat(result.toString())
+ .isEqualTo(
+ "[+I[null, aaa, 1.2, null, 3.4, null, null, null],
+I[null, bbb, 4.5, null, 5.6, 5, null, null],"
+ + " +I[null, ccc, 2.3, 6, 5.6, 5, null, null],
+I[flink, fff, 45.34, 4, 2.45, 12, null, null],"
+ + " +I[ggg, hhh, 23.43, 6, 2.34, 34, 23,
true]]");
+ }
+
+ @Test
+ public void testDropColumn() {
+ sql(
+ "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING,
c STRING, d INT, e FLOAT)");
+ sql("INSERT INTO T VALUES('aaa', 'bbb', 'ccc', 10, 3.4)");
+ sql("ALTER TABLE T DROP e");
+ sql("INSERT INTO T VALUES('ddd', 'eee', 'fff', 20)");
+ List<Row> result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` VARCHAR(2147483647) NOT NULL,\n"
+ + " `b` VARCHAR(2147483647),\n"
+ + " `c` VARCHAR(2147483647),\n"
+ + " `d` INT,");
+ result = sql("SELECT * FROM T");
+ assertThat(result.toString()).isEqualTo("[+I[aaa, bbb, ccc, 10],
+I[ddd, eee, fff, 20]]");
+
+ sql("ALTER TABLE T DROP (c, d)");
+ result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` VARCHAR(2147483647) NOT NULL,\n"
+ + " `b` VARCHAR(2147483647),");
+
+ sql("INSERT INTO T VALUES('ggg', 'hhh')");
+ result = sql("SELECT * FROM T");
+ assertThat(result.toString()).isEqualTo("[+I[aaa, bbb], +I[ddd, eee],
+I[ggg, hhh]]");
+ }
+
+ @Test
+ public void testRenameColumn() {
+ sql("CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c
STRING)");
+ sql("INSERT INTO T VALUES('paimon', 'bbb', 'ccc')");
+ sql("ALTER TABLE T RENAME c TO c1");
+ List<Row> result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` VARCHAR(2147483647) NOT NULL,\n"
+ + " `b` VARCHAR(2147483647),\n"
+ + " `c1` VARCHAR(2147483647)");
+ result = sql("SELECT a, b, c1 FROM T");
+ assertThat(result.toString()).isEqualTo("[+I[paimon, bbb, ccc]]");
+
+ // column do not exist.
+ assertThatThrownBy(() -> sql("ALTER TABLE T RENAME d TO d1"))
+ .hasMessageContaining("The column `d` does not exist in the
base table.");
+
+ // target column exist.
+ assertThatThrownBy(() -> sql("ALTER TABLE T RENAME a TO b"))
+ .hasMessageContaining("The column `b` already existed in table
schema.");
+ }
+
+ @Test
+ public void testDropPrimaryKey() {
+ sql("CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING, c
STRING)");
+ assertThatThrownBy(() -> sql("ALTER TABLE T DROP a"))
+ .hasMessageContaining(
+ "Failed to execute ALTER TABLE statement.\n"
+ + "The column `a` is used as the primary
key.");
+ }
+
+ @Test
+ public void testDropPartitionKey() {
+ sql(
+ "CREATE TABLE MyTable (\n"
+ + " user_id BIGINT,\n"
+ + " item_id BIGINT,\n"
+ + " behavior STRING,\n"
+ + " dt STRING,\n"
+ + " hh STRING,\n"
+ + " PRIMARY KEY (dt, hh, user_id) NOT ENFORCED\n"
+ + ") PARTITIONED BY (dt, hh)");
+ assertThatThrownBy(() -> sql("ALTER TABLE MyTable DROP dt"))
+ .hasMessageContaining(
+ "Failed to execute ALTER TABLE statement.\n"
+ + "The column `dt` is used as the partition
keys.");
+ }
+
+ @Test
+ public void testModifyColumnType() {
+ sql(
+ "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING,
c STRING, d INT, e FLOAT)");
+ sql("INSERT INTO T VALUES('paimon', 'bbb', 'ccc', 1, 3.4)");
+ sql("ALTER TABLE T MODIFY e DOUBLE");
+ sql("INSERT INTO T VALUES('flink', 'ddd', 'eee', 4, 6.7)");
+ List<Row> result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` VARCHAR(2147483647) NOT NULL,\n"
+ + " `b` VARCHAR(2147483647),\n"
+ + " `c` VARCHAR(2147483647),\n"
+ + " `d` INT,\n"
+ + " `e` DOUBLE,");
+ result = sql("SELECT * FROM T");
+ assertThat(result.toString())
+ .isEqualTo(
+ "[+I[flink, ddd, eee, 4, 6.7], +I[paimon, bbb, ccc, 1,
3.4000000953674316]]");
+
+ assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY c DOUBLE"))
+ .hasMessageContaining(
+ "Could not execute ALTER TABLE PAIMON.default.T\n" + "
MODIFY `c` DOUBLE");
+ }
+
+ @Test
+ public void testModifyColumnPosition() {
+ sql(
+ "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING,
c STRING, d INT, e DOUBLE)");
+ sql("INSERT INTO T VALUES('paimon', 'bbb', 'ccc', 1, 3.4)");
+ sql("ALTER TABLE T MODIFY b STRING FIRST");
+ List<Row> result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `b` VARCHAR(2147483647),\n"
+ + " `a` VARCHAR(2147483647) NOT NULL,\n"
+ + " `c` VARCHAR(2147483647),\n"
+ + " `d` INT,\n"
+ + " `e` DOUBLE,");
+
+ sql("INSERT INTO T VALUES('aaa', 'flink', 'ddd', 2, 5.7)");
+ result = sql("SELECT * FROM T");
+ assertThat(result.toString())
+ .isEqualTo("[+I[aaa, flink, ddd, 2, 5.7], +I[bbb, paimon, ccc,
1, 3.4]]");
+
+ sql("ALTER TABLE T MODIFY e DOUBLE AFTER c");
+ result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `b` VARCHAR(2147483647),\n"
+ + " `a` VARCHAR(2147483647) NOT NULL,\n"
+ + " `c` VARCHAR(2147483647),\n"
+ + " `e` DOUBLE,\n"
+ + " `d` INT,");
+
+ sql("INSERT INTO T VALUES('sss', 'ggg', 'eee', 4.7, 10)");
+ result = sql("SELECT * FROM T");
+ assertThat(result.toString())
+ .isEqualTo(
+ "[+I[aaa, flink, ddd, 5.7, 2], +I[sss, ggg, eee, 4.7,
10], +I[bbb, paimon, ccc, 3.4, 1]]");
+
+ // move self to first test
+ assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY b STRING FIRST"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot move itself for column b"));
+
+ // move self to after test
+ assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY b STRING AFTER b"))
+ .satisfies(
+ AssertionUtils.anyCauseMatches(
+ UnsupportedOperationException.class,
+ "Cannot move itself for column b"));
+
+ // missing column
+ assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY h STRING FIRST"))
+ .hasMessageContaining(
+ "Try to modify a column `h` which does not exist in
the table");
+
+ assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY h STRING AFTER d"))
+ .hasMessageContaining(
+ "Try to modify a column `h` which does not exist in
the table");
+ }
+
+ @Test
+ public void testModifyNullability() {
+ sql(
+ "CREATE TABLE T (a STRING PRIMARY KEY NOT ENFORCED, b STRING,
c STRING, d INT, e FLOAT NOT NULL)");
+ List<Row> result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` VARCHAR(2147483647) NOT NULL,\n"
+ + " `b` VARCHAR(2147483647),\n"
+ + " `c` VARCHAR(2147483647),\n"
+ + " `d` INT,\n"
+ + " `e` FLOAT NOT NULL,");
+
+ sql("ALTER TABLE T MODIFY e FLOAT");
+ result = sql("SHOW CREATE TABLE T");
+ assertThat(result.toString())
+ .contains(
+ "CREATE TABLE `PAIMON`.`default`.`T` (\n"
+ + " `a` VARCHAR(2147483647) NOT NULL,\n"
+ + " `b` VARCHAR(2147483647),\n"
+ + " `c` VARCHAR(2147483647),\n"
+ + " `d` INT,\n"
+ + " `e` FLOAT");
+
+ assertThatThrownBy(() -> sql("ALTER TABLE T MODIFY c STRING NOT NULL"))
+ .hasMessageContaining(
+ "Could not execute ALTER TABLE PAIMON.default.T\n"
+ + " MODIFY `c` STRING NOT NULL");
+ }
@Test
public void testSetAndRemoveOption() throws Exception {