This is an automated email from the ASF dual-hosted git repository.
fanng pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/gravitino.git
The following commit(s) were added to refs/heads/main by this push:
new 3448dca2c [#3981]feat(flink-connector):Support alter table operation
for hive (#4097)
3448dca2c is described below
commit 3448dca2c8394b83214be79cc8d9f8f925095af6
Author: Peidian li <[email protected]>
AuthorDate: Wed Jul 24 21:05:46 2024 +0800
[#3981]feat(flink-connector):Support alter table operation for hive (#4097)
### What changes were proposed in this pull request?
- Support alter table operation for hive table.
### Why are the changes needed?
- Fix: #3981
### Does this PR introduce _any_ user-facing change?
- no
### How was this patch tested?
- add UTs
---
.../flink/connector/catalog/BaseCatalog.java | 167 +++++++++++-
.../flink/connector/utils/TableUtils.java | 44 ++++
.../gravitino/flink/connector/utils/TypeUtils.java | 4 +
.../flink/connector/catalog/TestBaseCatalog.java | 67 +++++
.../connector/integration/test/FlinkCommonIT.java | 281 ++++++++++++++++++++-
.../flink/connector/utils/TestTypeUtils.java | 3 +
6 files changed, 563 insertions(+), 3 deletions(-)
diff --git
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
index 7fdc8f108..6b76e31b8 100644
---
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
+++
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/catalog/BaseCatalog.java
@@ -20,11 +20,13 @@
package org.apache.gravitino.flink.connector.catalog;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.collect.MapDifference;
import com.google.common.collect.Maps;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
+import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.compress.utils.Lists;
@@ -68,6 +70,7 @@ import
org.apache.gravitino.exceptions.SchemaAlreadyExistsException;
import org.apache.gravitino.exceptions.TableAlreadyExistsException;
import org.apache.gravitino.flink.connector.PartitionConverter;
import org.apache.gravitino.flink.connector.PropertiesConverter;
+import org.apache.gravitino.flink.connector.utils.TableUtils;
import org.apache.gravitino.flink.connector.utils.TypeUtils;
import org.apache.gravitino.rel.Column;
import org.apache.gravitino.rel.Table;
@@ -254,6 +257,8 @@ public abstract class BaseCatalog extends AbstractCatalog {
@Override
public void createTable(ObjectPath tablePath, CatalogBaseTable table,
boolean ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException,
CatalogException {
+ Preconditions.checkArgument(
+ table instanceof ResolvedCatalogBaseTable, "table should be resolved");
NameIdentifier identifier =
NameIdentifier.of(tablePath.getDatabaseName(),
tablePath.getObjectName());
@@ -280,10 +285,72 @@ public abstract class BaseCatalog extends AbstractCatalog
{
}
}
+ /**
+ * The method only is used to change the comments. To alter columns, use the
other alterTable API
+ * and provide a list of TableChanges.
+ *
+ * @param tablePath path of the table or view to be modified
+ * @param newTable the new table definition
+ * @param ignoreIfNotExists flag to specify behavior when the table or view
does not exist: if set
+ * to false, throw an exception, if set to true, do nothing.
+ * @throws TableNotExistException if the table not exists.
+ * @throws CatalogException in case of any runtime exception.
+ */
@Override
- public void alterTable(ObjectPath objectPath, CatalogBaseTable
catalogBaseTable, boolean b)
+ public void alterTable(ObjectPath tablePath, CatalogBaseTable newTable,
boolean ignoreIfNotExists)
throws TableNotExistException, CatalogException {
- throw new UnsupportedOperationException();
+ CatalogBaseTable existingTable;
+
+ try {
+ existingTable = this.getTable(tablePath);
+ } catch (TableNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw e;
+ }
+ return;
+ }
+
+ if (existingTable.getTableKind() != newTable.getTableKind()) {
+ throw new CatalogException(
+ String.format(
+ "Table types don't match. Existing table is '%s' and new table
is '%s'.",
+ existingTable.getTableKind(), newTable.getTableKind()));
+ }
+
+ NameIdentifier identifier =
+ NameIdentifier.of(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ catalog()
+ .asTableCatalog()
+ .alterTable(identifier, getGravitinoTableChanges(existingTable,
newTable));
+ }
+
+ @Override
+ public void alterTable(
+ ObjectPath tablePath,
+ CatalogBaseTable newTable,
+ List<org.apache.flink.table.catalog.TableChange> tableChanges,
+ boolean ignoreIfNotExists)
+ throws TableNotExistException, CatalogException {
+ CatalogBaseTable existingTable;
+ try {
+ existingTable = this.getTable(tablePath);
+ } catch (TableNotExistException e) {
+ if (!ignoreIfNotExists) {
+ throw e;
+ }
+ return;
+ }
+
+ if (existingTable.getTableKind() != newTable.getTableKind()) {
+ throw new CatalogException(
+ String.format(
+ "Table types don't match. Existing table is '%s' and new table
is '%s'.",
+ existingTable.getTableKind(), newTable.getTableKind()));
+ }
+
+ NameIdentifier identifier =
+ NameIdentifier.of(tablePath.getDatabaseName(),
tablePath.getObjectName());
+ catalog().asTableCatalog().alterTable(identifier,
getGravitinoTableChanges(tableChanges));
}
@Override
@@ -470,6 +537,102 @@ public abstract class BaseCatalog extends AbstractCatalog
{
null);
}
+ private static void removeProperty(
+ org.apache.flink.table.catalog.TableChange.ResetOption change,
List<TableChange> changes) {
+ changes.add(TableChange.removeProperty(change.getKey()));
+ }
+
+ private static void setProperty(
+ org.apache.flink.table.catalog.TableChange.SetOption change,
List<TableChange> changes) {
+ changes.add(TableChange.setProperty(change.getKey(), change.getValue()));
+ }
+
+ private static void dropColumn(
+ org.apache.flink.table.catalog.TableChange.DropColumn change,
List<TableChange> changes) {
+ changes.add(TableChange.deleteColumn(new String[]
{change.getColumnName()}, true));
+ }
+
+ private static void addColumn(
+ org.apache.flink.table.catalog.TableChange.AddColumn change,
List<TableChange> changes) {
+ changes.add(
+ TableChange.addColumn(
+ new String[] {change.getColumn().getName()},
+
TypeUtils.toGravitinoType(change.getColumn().getDataType().getLogicalType()),
+ change.getColumn().getComment().orElse(null),
+ TableUtils.toGravitinoColumnPosition(change.getPosition())));
+ }
+
+ private static void modifyColumn(
+ org.apache.flink.table.catalog.TableChange change, List<TableChange>
changes) {
+ if (change instanceof
org.apache.flink.table.catalog.TableChange.ModifyColumnName) {
+ org.apache.flink.table.catalog.TableChange.ModifyColumnName
modifyColumnName =
+ (org.apache.flink.table.catalog.TableChange.ModifyColumnName) change;
+ changes.add(
+ TableChange.renameColumn(
+ new String[] {modifyColumnName.getOldColumnName()},
+ modifyColumnName.getNewColumnName()));
+ } else if (change
+ instanceof
org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType) {
+ org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType
modifyColumnType =
+
(org.apache.flink.table.catalog.TableChange.ModifyPhysicalColumnType) change;
+ changes.add(
+ TableChange.updateColumnType(
+ new String[] {modifyColumnType.getOldColumn().getName()},
+
TypeUtils.toGravitinoType(modifyColumnType.getNewType().getLogicalType())));
+ } else if (change instanceof
org.apache.flink.table.catalog.TableChange.ModifyColumnPosition) {
+ org.apache.flink.table.catalog.TableChange.ModifyColumnPosition
modifyColumnPosition =
+ (org.apache.flink.table.catalog.TableChange.ModifyColumnPosition)
change;
+ changes.add(
+ TableChange.updateColumnPosition(
+ new String[] {modifyColumnPosition.getOldColumn().getName()},
+
TableUtils.toGravitinoColumnPosition(modifyColumnPosition.getNewPosition())));
+ } else if (change instanceof
org.apache.flink.table.catalog.TableChange.ModifyColumnComment) {
+ org.apache.flink.table.catalog.TableChange.ModifyColumnComment
modifyColumnComment =
+ (org.apache.flink.table.catalog.TableChange.ModifyColumnComment)
change;
+ changes.add(
+ TableChange.updateColumnComment(
+ new String[] {modifyColumnComment.getOldColumn().getName()},
+ modifyColumnComment.getNewComment()));
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Not support ModifyColumn : %s", change.getClass()));
+ }
+ }
+
+ @VisibleForTesting
+ static TableChange[] getGravitinoTableChanges(
+ CatalogBaseTable existingTable, CatalogBaseTable newTable) {
+ Preconditions.checkNotNull(newTable.getComment(), "The new comment should
not be null");
+ List<TableChange> changes = Lists.newArrayList();
+ if (!Objects.equals(newTable.getComment(), existingTable.getComment())) {
+ changes.add(TableChange.updateComment(newTable.getComment()));
+ }
+ return changes.toArray(new TableChange[0]);
+ }
+
+ @VisibleForTesting
+ static TableChange[] getGravitinoTableChanges(
+ List<org.apache.flink.table.catalog.TableChange> tableChanges) {
+ List<TableChange> changes = Lists.newArrayList();
+ for (org.apache.flink.table.catalog.TableChange change : tableChanges) {
+ if (change instanceof
org.apache.flink.table.catalog.TableChange.AddColumn) {
+ addColumn((org.apache.flink.table.catalog.TableChange.AddColumn)
change, changes);
+ } else if (change instanceof
org.apache.flink.table.catalog.TableChange.DropColumn) {
+ dropColumn((org.apache.flink.table.catalog.TableChange.DropColumn)
change, changes);
+ } else if (change instanceof
org.apache.flink.table.catalog.TableChange.ModifyColumn) {
+ modifyColumn(change, changes);
+ } else if (change instanceof
org.apache.flink.table.catalog.TableChange.SetOption) {
+ setProperty((org.apache.flink.table.catalog.TableChange.SetOption)
change, changes);
+ } else if (change instanceof
org.apache.flink.table.catalog.TableChange.ResetOption) {
+
removeProperty((org.apache.flink.table.catalog.TableChange.ResetOption) change,
changes);
+ } else {
+ throw new UnsupportedOperationException(
+ String.format("Not supported change : %s", change.getClass()));
+ }
+ }
+ return changes.toArray(new TableChange[0]);
+ }
+
@VisibleForTesting
static SchemaChange[] getSchemaChange(CatalogDatabase current,
CatalogDatabase updated) {
Map<String, String> currentProperties = current.getProperties();
diff --git
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java
new file mode 100644
index 000000000..68bae4180
--- /dev/null
+++
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TableUtils.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.gravitino.flink.connector.utils;
+
+import org.apache.gravitino.rel.TableChange;
+
+public class TableUtils {
+ private TableUtils() {}
+
+ public static TableChange.ColumnPosition toGravitinoColumnPosition(
+ org.apache.flink.table.catalog.TableChange.ColumnPosition
columnPosition) {
+ if (columnPosition == null) {
+ return null;
+ }
+
+ if (columnPosition instanceof
org.apache.flink.table.catalog.TableChange.First) {
+ return TableChange.ColumnPosition.first();
+ } else if (columnPosition instanceof
org.apache.flink.table.catalog.TableChange.After) {
+ org.apache.flink.table.catalog.TableChange.After after =
+ (org.apache.flink.table.catalog.TableChange.After) columnPosition;
+ return TableChange.ColumnPosition.after(after.column());
+ } else {
+ throw new IllegalArgumentException(
+ String.format("Not support column position : %s",
columnPosition.getClass()));
+ }
+ }
+}
diff --git
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
index a11e20cb3..7c5635037 100644
---
a/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
+++
b/flink-connector/src/main/java/org/apache/gravitino/flink/connector/utils/TypeUtils.java
@@ -37,6 +37,8 @@ public class TypeUtils {
return Types.DoubleType.get();
case INTEGER:
return Types.IntegerType.get();
+ case BIGINT:
+ return Types.LongType.get();
default:
throw new UnsupportedOperationException(
"Not support type: " + logicalType.asSummaryString());
@@ -51,6 +53,8 @@ public class TypeUtils {
return DataTypes.STRING();
case INTEGER:
return DataTypes.INT();
+ case LONG:
+ return DataTypes.BIGINT();
default:
throw new UnsupportedOperationException("Not support " +
gravitinoType.toString());
}
diff --git
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
index a016a9603..89d25f8bd 100644
---
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
+++
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/catalog/TestBaseCatalog.java
@@ -18,11 +18,20 @@
*/
package org.apache.gravitino.flink.connector.catalog;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import java.util.List;
import java.util.Map;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.Schema;
+import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
import org.apache.flink.table.catalog.CatalogDatabaseImpl;
+import org.apache.flink.table.catalog.CatalogTable;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.TableChange;
import org.apache.gravitino.SchemaChange;
+import org.apache.gravitino.rel.types.Types;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -49,4 +58,62 @@ public class TestBaseCatalog {
Assertions.assertEquals("key2", ((SchemaChange.SetProperty)
schemaChange[2]).getProperty());
Assertions.assertEquals("new-value2", ((SchemaChange.SetProperty)
schemaChange[2]).getValue());
}
+
+ @Test
+ public void testTableChanges() {
+ List<TableChange> tableChanges =
+ ImmutableList.of(
+ TableChange.add(Column.physical("test", DataTypes.INT())),
+ TableChange.modifyPhysicalColumnType(
+ Column.physical("test", DataTypes.INT()), DataTypes.DOUBLE()),
+ TableChange.modifyColumnName(Column.physical("test",
DataTypes.INT()), "test2"),
+ TableChange.dropColumn("aaa"),
+ TableChange.modifyColumnComment(
+ Column.physical("test", DataTypes.INT()), "new comment"),
+ TableChange.modifyColumnPosition(
+ Column.physical("test", DataTypes.INT()),
+ TableChange.ColumnPosition.after("test2")),
+ TableChange.modifyColumnPosition(
+ Column.physical("test", DataTypes.INT()),
TableChange.ColumnPosition.first()),
+ TableChange.set("key", "value"),
+ TableChange.reset("key"));
+
+ List<org.apache.gravitino.rel.TableChange> expected =
+ ImmutableList.of(
+ org.apache.gravitino.rel.TableChange.addColumn(
+ new String[] {"test"}, Types.IntegerType.get()),
+ org.apache.gravitino.rel.TableChange.updateColumnType(
+ new String[] {"test"}, Types.DoubleType.get()),
+ org.apache.gravitino.rel.TableChange.renameColumn(new String[]
{"test"}, "test2"),
+ org.apache.gravitino.rel.TableChange.deleteColumn(new String[]
{"aaa"}, true),
+ org.apache.gravitino.rel.TableChange.updateColumnComment(
+ new String[] {"test"}, "new comment"),
+ org.apache.gravitino.rel.TableChange.updateColumnPosition(
+ new String[] {"test"},
+
org.apache.gravitino.rel.TableChange.ColumnPosition.after("test2")),
+ org.apache.gravitino.rel.TableChange.updateColumnPosition(
+ new String[] {"test"},
org.apache.gravitino.rel.TableChange.ColumnPosition.first()),
+ org.apache.gravitino.rel.TableChange.setProperty("key", "value"),
+ org.apache.gravitino.rel.TableChange.removeProperty("key"));
+
+ org.apache.gravitino.rel.TableChange[] gravitinoTableChanges =
+ BaseCatalog.getGravitinoTableChanges(tableChanges);
+ Assertions.assertArrayEquals(expected.toArray(), gravitinoTableChanges);
+ }
+
+ @Test
+ public void testTableChangesWithoutColumnChange() {
+ Schema schema = Schema.newBuilder().column("test", "INT").build();
+ CatalogBaseTable table =
+ CatalogTable.of(
+ schema, "test", ImmutableList.of(), ImmutableMap.of("key",
"value", "key2", "value2"));
+ CatalogBaseTable newTable =
+ CatalogTable.of(
+ schema, "new comment", ImmutableList.of(), ImmutableMap.of("key",
"new value"));
+ org.apache.gravitino.rel.TableChange[] tableChanges =
+ BaseCatalog.getGravitinoTableChanges(table, newTable);
+ List<org.apache.gravitino.rel.TableChange> expected =
+
ImmutableList.of(org.apache.gravitino.rel.TableChange.updateComment("new
comment"));
+ Assertions.assertArrayEquals(expected.toArray(), tableChanges);
+ }
}
diff --git
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
index 06905fff9..46b1d1d23 100644
---
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
+++
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/integration/test/FlinkCommonIT.java
@@ -22,15 +22,25 @@ package
org.apache.gravitino.flink.connector.integration.test;
import static
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.assertColumns;
import static
org.apache.gravitino.flink.connector.integration.test.utils.TestUtils.toFlinkPhysicalColumn;
import static
org.apache.gravitino.rel.expressions.transforms.Transforms.EMPTY_TRANSFORM;
+import static org.junit.jupiter.api.Assertions.fail;
+import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
import java.util.Optional;
+import org.apache.commons.compress.utils.Lists;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.api.ResultKind;
import org.apache.flink.table.api.TableResult;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogTable;
import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.catalog.ResolvedCatalogTable;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.exceptions.DatabaseNotExistException;
+import org.apache.flink.table.catalog.exceptions.TableAlreadyExistException;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.types.Row;
import org.apache.gravitino.Catalog;
@@ -325,9 +335,278 @@ public abstract class FlinkCommonIT extends FlinkEnvIT {
CatalogTable catalogTable = (CatalogTable) table;
Assertions.assertFalse(catalogTable.isPartitioned());
} catch (TableNotExistException e) {
- Assertions.fail(e);
+ fail(e);
}
},
true);
}
+
+ @Test
+ public void testRenameColumn() {
+ String databaseName = "test_renam_column_db";
+ String tableName = "test_rename_column";
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ TableResult result =
+ sql(
+ "CREATE TABLE %s "
+ + "(user_id INT COMMENT 'USER_ID', "
+ + " order_amount DOUBLE COMMENT 'ORDER_AMOUNT')"
+ + " COMMENT 'test comment'",
+ tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+ result = sql("ALTER TABLE %s RENAME user_id TO user_id_new",
tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+ Column[] actual =
+ catalog
+ .asTableCatalog()
+ .loadTable(NameIdentifier.of(databaseName, tableName))
+ .columns();
+ Column[] expected =
+ new Column[] {
+ Column.of("user_id_new", Types.IntegerType.get(), "USER_ID"),
+ Column.of("order_amount", Types.DoubleType.get(),
"ORDER_AMOUNT")
+ };
+ assertColumns(expected, actual);
+ },
+ true);
+ }
+
+ @Test
+ public void testAlterTableComment() {
+ String databaseName = "test_alter_table_comment_database";
+ String tableName = "test_alter_table_comment";
+ String newComment = "new_table_comment";
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ Optional<org.apache.flink.table.catalog.Catalog> flinkCatalog =
+ tableEnv.getCatalog(currentCatalog().name());
+ if (flinkCatalog.isPresent()) {
+ org.apache.flink.table.catalog.Catalog currentFlinkCatalog =
flinkCatalog.get();
+ ObjectPath currentTablePath = new ObjectPath(databaseName,
tableName);
+ try {
+ // use java api to create a new table
+ org.apache.flink.table.api.Schema schema =
+ org.apache.flink.table.api.Schema.newBuilder()
+ .column("test", DataTypes.INT())
+ .build();
+ CatalogTable newTable =
+ CatalogTable.of(schema, "test comment", ImmutableList.of(),
ImmutableMap.of());
+ List<org.apache.flink.table.catalog.Column> columns =
Lists.newArrayList();
+
columns.add(org.apache.flink.table.catalog.Column.physical("test",
DataTypes.INT()));
+ ResolvedSchema resolvedSchema = new ResolvedSchema(columns, new
ArrayList<>(), null);
+ currentFlinkCatalog.createTable(
+ currentTablePath, new ResolvedCatalogTable(newTable,
resolvedSchema), false);
+ CatalogTable table = (CatalogTable)
currentFlinkCatalog.getTable(currentTablePath);
+
+ // alter table comment
+ currentFlinkCatalog.alterTable(
+ currentTablePath,
+ CatalogTable.of(
+ table.getUnresolvedSchema(),
+ newComment,
+ table.getPartitionKeys(),
+ table.getOptions()),
+ false);
+
+ CatalogTable loadedTable =
+ (CatalogTable)
currentFlinkCatalog.getTable(currentTablePath);
+ Assertions.assertEquals(newComment, loadedTable.getComment());
+ Table gravitinoTable =
+ currentCatalog()
+ .asTableCatalog()
+ .loadTable(NameIdentifier.of(databaseName, tableName));
+ Assertions.assertEquals(newComment, gravitinoTable.comment());
+ } catch (DatabaseNotExistException
+ | TableAlreadyExistException
+ | TableNotExistException e) {
+ fail(e);
+ }
+ } else {
+ fail("Catalog doesn't exist");
+ }
+ },
+ true);
+ }
+
+ @Test
+ public void testAlterTableAddColumn() {
+ String databaseName = "test_alter_table_add_column_db";
+ String tableName = "test_alter_table_add_column";
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ TableResult result =
+ sql(
+ "CREATE TABLE %s "
+ + "(user_id INT COMMENT 'USER_ID', "
+ + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+ + " COMMENT 'test comment'",
+ tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ result = sql("ALTER TABLE %s ADD new_column_2 INT AFTER
order_amount", tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+ Column[] actual =
+ catalog
+ .asTableCatalog()
+ .loadTable(NameIdentifier.of(databaseName, tableName))
+ .columns();
+ Column[] expected =
+ new Column[] {
+ Column.of("user_id", Types.IntegerType.get(), "USER_ID"),
+ Column.of("order_amount", Types.IntegerType.get(),
"ORDER_AMOUNT"),
+ Column.of("new_column_2", Types.IntegerType.get(), null),
+ };
+ assertColumns(expected, actual);
+ },
+ true);
+ }
+
+ @Test
+ public void testAlterTableDropColumn() {
+ String databaseName = "test_alter_table_drop_column_db";
+ String tableName = "test_alter_table_drop_column";
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ TableResult result =
+ sql(
+ "CREATE TABLE %s "
+ + "(user_id INT COMMENT 'USER_ID', "
+ + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+ + " COMMENT 'test comment'",
+ tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ result = sql("ALTER TABLE %s DROP user_id", tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ Column[] actual =
+ catalog
+ .asTableCatalog()
+ .loadTable(NameIdentifier.of(databaseName, tableName))
+ .columns();
+ Column[] expected =
+ new Column[] {Column.of("order_amount", Types.IntegerType.get(),
"ORDER_AMOUNT")};
+ assertColumns(expected, actual);
+ },
+ true);
+ }
+
+ @Test
+ public void testAlterColumnTypeAndChangeOrder() {
+ String databaseName = "test_alter_table_alter_column_db";
+ String tableName = "test_alter_table_rename_column";
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ TableResult result =
+ sql(
+ "CREATE TABLE %s "
+ + "(user_id BIGINT COMMENT 'USER_ID', "
+ + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+ + " COMMENT 'test comment'"
+ + " WITH ("
+ + "'%s' = '%s')",
+ tableName, "test key", "test value");
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ result =
+ sql("ALTER TABLE %s MODIFY order_amount BIGINT COMMENT 'new
comment2'", tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ result =
+ sql(
+ "ALTER TABLE %s MODIFY user_id BIGINT COMMENT 'new comment'
AFTER order_amount",
+ tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ Column[] actual =
+ catalog
+ .asTableCatalog()
+ .loadTable(NameIdentifier.of(databaseName, tableName))
+ .columns();
+ Column[] expected =
+ new Column[] {
+ Column.of("order_amount", Types.LongType.get(), "new
comment2"),
+ Column.of("user_id", Types.LongType.get(), "new comment")
+ };
+ assertColumns(expected, actual);
+ },
+ true);
+ }
+
+ @Test
+ public void testRenameTable() {
+ String databaseName = "test_rename_table_db";
+ String tableName = "test_rename_table";
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ TableResult result =
+ sql(
+ "CREATE TABLE %s "
+ + "(user_id INT COMMENT 'USER_ID', "
+ + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+ + " COMMENT 'test comment'",
+ tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ String newTableName = "new_rename_table_name";
+ result = sql("ALTER TABLE %s RENAME TO %s", tableName, newTableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ Assertions.assertFalse(
+
catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName,
tableName)));
+ Assertions.assertTrue(
+
catalog.asTableCatalog().tableExists(NameIdentifier.of(databaseName,
newTableName)));
+ },
+ true);
+ }
+
+ @Test
+ public void testAlterTableProperties() {
+ String databaseName = "test_alter_table_properties_db";
+ String tableName = "test_alter_table_properties";
+ doWithSchema(
+ currentCatalog(),
+ databaseName,
+ catalog -> {
+ TableResult result =
+ sql(
+ "CREATE TABLE %s "
+ + "(user_id INT COMMENT 'USER_ID', "
+ + " order_amount INT COMMENT 'ORDER_AMOUNT')"
+ + " COMMENT 'test comment'"
+ + " WITH ("
+ + "'%s' = '%s')",
+ tableName, "key", "value");
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ result = sql("ALTER TABLE %s SET ('key2' = 'value2', 'key' =
'value1')", tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+ Map<String, String> properties =
+ catalog
+ .asTableCatalog()
+ .loadTable(NameIdentifier.of(databaseName, tableName))
+ .properties();
+
+ Assertions.assertEquals("value1", properties.get("key"));
+ Assertions.assertEquals("value2", properties.get("key2"));
+ result = sql("ALTER TABLE %s RESET ('key2')", tableName);
+ TestUtils.assertTableResult(result, ResultKind.SUCCESS);
+
+ properties =
+ catalog
+ .asTableCatalog()
+ .loadTable(NameIdentifier.of(databaseName, tableName))
+ .properties();
+ Assertions.assertEquals("value1", properties.get("key"));
+ Assertions.assertNull(properties.get("key2"));
+ },
+ true);
+ }
}
diff --git
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
index e6d78f114..e9b1f8343 100644
---
a/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
+++
b/flink-connector/src/test/java/org/apache/gravitino/flink/connector/utils/TestTypeUtils.java
@@ -21,6 +21,7 @@ package org.apache.gravitino.flink.connector.utils;
import org.apache.flink.table.api.DataTypes;
import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.types.logical.BigIntType;
import org.apache.flink.table.types.logical.DoubleType;
import org.apache.flink.table.types.logical.IntType;
import org.apache.flink.table.types.logical.UnresolvedUserDefinedType;
@@ -37,6 +38,7 @@ public class TestTypeUtils {
Types.StringType.get(), TypeUtils.toGravitinoType(new
VarCharType(Integer.MAX_VALUE)));
Assertions.assertEquals(Types.DoubleType.get(),
TypeUtils.toGravitinoType(new DoubleType()));
Assertions.assertEquals(Types.IntegerType.get(),
TypeUtils.toGravitinoType(new IntType()));
+ Assertions.assertEquals(Types.LongType.get(),
TypeUtils.toGravitinoType(new BigIntType()));
Assertions.assertThrows(
UnsupportedOperationException.class,
() ->
@@ -49,6 +51,7 @@ public class TestTypeUtils {
Assertions.assertEquals(DataTypes.DOUBLE(),
TypeUtils.toFlinkType(Types.DoubleType.get()));
Assertions.assertEquals(DataTypes.STRING(),
TypeUtils.toFlinkType(Types.StringType.get()));
Assertions.assertEquals(DataTypes.INT(),
TypeUtils.toFlinkType(Types.IntegerType.get()));
+ Assertions.assertEquals(DataTypes.BIGINT(),
TypeUtils.toFlinkType(Types.LongType.get()));
Assertions.assertThrows(
UnsupportedOperationException.class,
() -> TypeUtils.toFlinkType(Types.UnparsedType.of("unknown")));