This is an automated email from the ASF dual-hosted git repository.
czweng pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git
The following commit(s) were added to refs/heads/release-0.3 by this push:
new 8bcec69b [FLINK-30608] support rename table
8bcec69b is described below
commit 8bcec69b5f9d0bd6a270889e8e7947270c0e3c44
Author: tsreaper <[email protected]>
AuthorDate: Mon Mar 6 14:45:57 2023 +0800
[FLINK-30608] support rename table
This closes #572.
---
docs/content/docs/how-to/altering-tables.md | 30 +++++++++++++
.../flink/table/store/connector/FlinkCatalog.java | 11 ++++-
.../store/connector/FileSystemCatalogITCase.java | 52 +++++++++++++++++++++-
.../flink/table/store/file/catalog/Catalog.java | 13 ++++++
.../store/file/catalog/FileSystemCatalog.java | 20 +++++++++
.../apache/flink/table/store/hive/HiveCatalog.java | 27 +++++++++++
.../flink/table/store/hive/HiveCatalogITCase.java | 35 +++++++++++++++
.../flink/table/store/spark/SparkCatalog.java | 11 ++++-
.../flink/table/store/spark/SparkReadTestBase.java | 15 +++++++
.../store/spark/SparkSchemaEvolutionITCase.java | 34 ++++++++++++--
10 files changed, 239 insertions(+), 9 deletions(-)
diff --git a/docs/content/docs/how-to/altering-tables.md
b/docs/content/docs/how-to/altering-tables.md
index 48048872..090860c0 100644
--- a/docs/content/docs/how-to/altering-tables.md
+++ b/docs/content/docs/how-to/altering-tables.md
@@ -54,6 +54,36 @@ ALTER TABLE my_table SET TBLPROPERTIES (
{{< /tabs >}}
+## Rename Table Name
+
+The following SQL rename the table name to new name.
+
+{{< tabs "rename-table-name" >}}
+
+{{< tab "Flink" >}}
+
+```sql
+ALTER TABLE my_table RENAME TO my_table_new;
+```
+
+{{< /tab >}}
+
+{{< tab "Spark3" >}}
+
+```sql
+ALTER TABLE my_table RENAME TO my_table_new;
+```
+
+{{< /tab >}}
+
+{{< /tabs >}}
+
+{{< hint info >}}
+
+If you use object storage, such as S3 or OSS, please use this syntax
carefully, because the renaming of object storage is not atomic, and only
partial files may be moved in case of failure.
+
+{{< /hint >}}
+
## Removing Table Properties
The following SQL removes `write-buffer-size` table property.
diff --git
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 55e5f30e..9769e868 100644
---
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -314,8 +314,15 @@ public class FlinkCatalog extends AbstractCatalog {
@Override
public final void renameTable(
ObjectPath tablePath, String newTableName, boolean
ignoreIfNotExists)
- throws CatalogException {
- throw new UnsupportedOperationException();
+ throws CatalogException, TableNotExistException,
TableAlreadyExistException {
+ ObjectPath toTable = new ObjectPath(tablePath.getDatabaseName(),
newTableName);
+ try {
+ catalog.renameTable(tablePath, toTable, ignoreIfNotExists);
+ } catch (Catalog.TableNotExistException e) {
+ throw new TableNotExistException(getName(), tablePath);
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new TableAlreadyExistException(getName(), toTable);
+ }
}
@Override
diff --git
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
index 69ee10c2..932511e9 100644
---
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
+++
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/FileSystemCatalogITCase.java
@@ -18,25 +18,37 @@
package org.apache.flink.table.store.connector;
+import org.apache.flink.core.fs.Path;
+import org.apache.flink.table.catalog.ObjectPath;
+import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.utils.BlockingIterator;
import org.apache.flink.table.store.kafka.KafkaTableTestBase;
import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
+import java.io.File;
import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.util.UUID;
import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.AssertionsForClassTypes.assertThatThrownBy;
/** ITCase for {@link FlinkCatalog}. */
public class FileSystemCatalogITCase extends KafkaTableTestBase {
+ private String path;
+ private static final String DB_NAME = "default";
+
@Before
public void before() throws IOException {
- String path = TEMPORARY_FOLDER.newFolder().toURI().toString();
+ path = TEMPORARY_FOLDER.newFolder().toURI().toString();
tEnv.executeSql(
String.format(
"CREATE CATALOG fs WITH ('type'='table-store',
'warehouse'='%s')", path));
@@ -50,6 +62,34 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
innerTestWriteRead();
}
+ @Test
+ public void testRenameTable() throws Exception {
+ tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
+ tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
+ tEnv.executeSql("INSERT INTO t1 VALUES(1),(2)").await();
+ // the source table do not exist.
+ assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t3 RENAME TO
t4"))
+ .hasMessage("Table `fs`.`default`.`t3` doesn't exist or is a
temporary table.");
+
+ // the target table has existed.
+ assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO
t2"))
+ .hasMessage("Could not execute ALTER TABLE fs.default.t1
RENAME TO fs.default.t2");
+
+ tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
+ Assert.assertEquals(Arrays.asList(Row.of("t2"), Row.of("t3")),
collect("SHOW TABLES"));
+
+ ObjectPath objectPath = new ObjectPath(DB_NAME, "t3");
+ Catalog catalog =
+ ((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+ Path tablePath = catalog.getTableLocation(objectPath);
+ Assert.assertEquals(tablePath.toString(), path + DB_NAME + ".db" +
File.separator + "t3");
+
+ BlockingIterator<Row, Row> iterator =
+ BlockingIterator.of(tEnv.from("t3").execute().collect());
+ List<Row> result = iterator.collectAndClose(2);
+ assertThat(result).containsExactlyInAnyOrder(Row.of(1), Row.of(2));
+ }
+
@Test
public void testLogWriteRead() throws Exception {
String topic = UUID.randomUUID().toString();
@@ -107,4 +147,14 @@ public class FileSystemCatalogITCase extends
KafkaTableTestBase {
List<Row> result = iterator.collectAndClose(2);
assertThat(result).containsExactlyInAnyOrder(Row.of("1", "2", "3"),
Row.of("4", "5", "6"));
}
+
+ private List<Row> collect(String sql) throws Exception {
+ List<Row> result = new ArrayList<>();
+ try (CloseableIterator<Row> it = tEnv.executeSql(sql).collect()) {
+ while (it.hasNext()) {
+ result.add(it.next());
+ }
+ }
+ return result;
+ }
}
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
index 919f6067..3789187a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/Catalog.java
@@ -150,6 +150,19 @@ public interface Catalog extends AutoCloseable {
void createTable(ObjectPath tablePath, UpdateSchema tableSchema, boolean
ignoreIfExists)
throws TableAlreadyExistException, DatabaseNotExistException;
+ /**
+ * Rename a table.
+ *
+ * @param fromTable the name of the table which need to rename
+ * @param toTable the new table
+ * @param ignoreIfNotExists Flag to specify behavior when the table does
not exist: if set to
+ * false, throw an exception, if set to true, do nothing.
+ * @throws TableNotExistException if the from table does not exist
+ * @throws TableAlreadyExistException if the to table already exists
+ */
+ void renameTable(ObjectPath fromTable, ObjectPath toTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException;
+
/**
* Modify an existing table from {@link SchemaChange}s.
*
diff --git
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
index 6dae077e..c5a9a80a 100644
---
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
+++
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/catalog/FileSystemCatalog.java
@@ -163,6 +163,26 @@ public class FileSystemCatalog extends AbstractCatalog {
uncheck(() -> new SchemaManager(path).commitNewVersion(table));
}
+ @Override
+ public void renameTable(ObjectPath fromTable, ObjectPath toTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException {
+ Path fromPath = getTableLocation(fromTable);
+ if (!tableExists(fromPath)) {
+ if (ignoreIfNotExists) {
+ return;
+ }
+
+ throw new TableNotExistException(fromTable);
+ }
+
+ Path toPath = getTableLocation(toTable);
+ if (tableExists(toPath)) {
+ throw new TableAlreadyExistException(toTable);
+ }
+
+ uncheck(() -> fs.rename(fromPath, toPath));
+ }
+
@Override
public void alterTable(
ObjectPath tablePath, List<SchemaChange> changes, boolean
ignoreIfNotExists)
diff --git
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
index 90269ee4..e9db107c 100644
---
a/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
+++
b/flink-table-store-hive/flink-table-store-hive-catalog/src/main/java/org/apache/flink/table/store/hive/HiveCatalog.java
@@ -239,6 +239,33 @@ public class HiveCatalog extends AbstractCatalog {
}
}
+ @Override
+ public void renameTable(ObjectPath fromTable, ObjectPath toTable, boolean
ignoreIfNotExists)
+ throws TableNotExistException, TableAlreadyExistException {
+ if (!tableStoreTableExists(fromTable)) {
+ if (ignoreIfNotExists) {
+ return;
+ } else {
+ throw new TableNotExistException(fromTable);
+ }
+ }
+
+ if (tableExists(toTable)) {
+ throw new TableAlreadyExistException(toTable);
+ }
+
+ try {
+ String fromDB = fromTable.getDatabaseName();
+ String fromTableName = fromTable.getObjectName();
+ Table table = client.getTable(fromDB, fromTableName);
+ table.setDbName(toTable.getDatabaseName());
+ table.setTableName(toTable.getObjectName());
+ client.alter_table(fromDB, fromTableName, table);
+ } catch (TException e) {
+ throw new RuntimeException("Failed to rename table " +
fromTable.getFullName(), e);
+ }
+ }
+
@Override
public void alterTable(
ObjectPath tablePath, List<SchemaChange> changes, boolean
ignoreIfNotExists)
diff --git
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
index 3ac047b4..0cde2fe7 100644
---
a/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
+++
b/flink-table-store-hive/flink-table-store-hive-catalog/src/test/java/org/apache/flink/table/store/hive/HiveCatalogITCase.java
@@ -23,7 +23,9 @@ import org.apache.flink.core.fs.Path;
import org.apache.flink.table.api.EnvironmentSettings;
import org.apache.flink.table.api.TableEnvironment;
import org.apache.flink.table.api.internal.TableEnvironmentImpl;
+import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.store.connector.FlinkCatalog;
+import org.apache.flink.table.store.file.catalog.Catalog;
import org.apache.flink.table.store.file.catalog.CatalogLock;
import org.apache.flink.types.Row;
import org.apache.flink.util.CloseableIterator;
@@ -42,6 +44,7 @@ import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
+import java.io.File;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
@@ -276,6 +279,38 @@ public class HiveCatalogITCase {
}
}
+ @Test
+ public void testRenameTable() throws Exception {
+ tEnv.executeSql("CREATE TABLE t1 (a INT)").await();
+ tEnv.executeSql("CREATE TABLE t2 (a INT)").await();
+ tEnv.executeSql("INSERT INTO t1 SELECT 1");
+ // the source table do not exist.
+ assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t3 RENAME TO
t4"))
+ .hasMessage(
+ "Table `my_hive`.`test_db`.`t3` doesn't exist or is a
temporary table.");
+
+ // the target table has existed.
+ assertThatThrownBy(() -> tEnv.executeSql("ALTER TABLE t1 RENAME TO
t2"))
+ .hasMessage(
+ "Could not execute ALTER TABLE my_hive.test_db.t1
RENAME TO my_hive.test_db.t2");
+
+ tEnv.executeSql("ALTER TABLE t1 RENAME TO t3").await();
+ List<String> tables = hiveShell.executeQuery("SHOW TABLES");
+ Assert.assertTrue(tables.contains("t3"));
+ Assert.assertFalse(tables.contains("t1"));
+
+ ObjectPath objectPath = new ObjectPath("test_db", "t3");
+ Catalog catalog =
+ ((FlinkCatalog)
tEnv.getCatalog(tEnv.getCurrentCatalog()).get()).catalog();
+ org.apache.flink.core.fs.Path tablePath =
catalog.getTableLocation(objectPath);
+ Assert.assertEquals(tablePath.toString(), path + "test_db.db" +
File.separator + "t3");
+
+ // TODO: the hiverunner (4.0) has a bug ,it can not rename the table
path correctly ,
+ // we should upgrade it to the 6.0 later ,and update the test case
for query.
+ assertThatThrownBy(() -> tEnv.executeSql("SELECT * FROM t3"))
+ .hasMessageContaining("SQL validation failed. There is no
table stored in");
+ }
+
@Test
public void testHiveLock() throws InterruptedException {
tEnv.executeSql("CREATE TABLE t (a INT)");
diff --git
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
index 8c925f4f..98d4e49b 100644
---
a/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
+++
b/flink-table-store-spark/src/main/java/org/apache/flink/table/store/spark/SparkCatalog.java
@@ -357,7 +357,14 @@ public class SparkCatalog implements TableCatalog,
SupportsNamespaces {
}
@Override
- public void renameTable(Identifier oldIdent, Identifier newIdent) {
- throw new UnsupportedOperationException();
+ public void renameTable(Identifier oldIdent, Identifier newIdent)
+ throws NoSuchTableException, TableAlreadyExistsException {
+ try {
+ catalog.renameTable(objectPath(oldIdent), objectPath(newIdent),
false);
+ } catch (Catalog.TableNotExistException e) {
+ throw new NoSuchTableException(oldIdent);
+ } catch (Catalog.TableAlreadyExistException e) {
+ throw new TableAlreadyExistsException(newIdent);
+ }
}
}
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
index 15ec0c6f..99e1af9d 100644
---
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkReadTestBase.java
@@ -40,7 +40,9 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
import java.io.File;
import java.io.IOException;
@@ -71,7 +73,11 @@ public abstract class SparkReadTestBase {
spark = SparkSession.builder().master("local[2]").getOrCreate();
spark.conf().set("spark.sql.catalog.tablestore",
SparkCatalog.class.getName());
spark.conf().set("spark.sql.catalog.tablestore.warehouse",
warehousePath.toString());
+ spark.sql("USE tablestore");
+ }
+ @BeforeEach
+ public void beforeEach() throws Exception {
// flink sink
tablePath1 = new Path(warehousePath, "default.db/t1");
SimpleTableTestHelper testHelper1 = new
SimpleTableTestHelper(tablePath1, rowType1());
@@ -129,6 +135,15 @@ public abstract class SparkReadTestBase {
testHelper2.commit();
}
+ @AfterEach
+ public void afterEach() {
+ List<Row> tables = spark.sql("show tables").collectAsList();
+ tables.forEach(
+ table -> {
+ spark.sql("DROP TABLE " + table.getString(0) + "." +
table.getString(1));
+ });
+ }
+
protected static SimpleTableTestHelper createTestHelper(Path tablePath)
throws Exception {
RowType rowType =
new RowType(
diff --git
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
index 22a096dd..d75956a5 100644
---
a/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
+++
b/flink-table-store-spark/src/test/java/org/apache/flink/table/store/spark/SparkSchemaEvolutionITCase.java
@@ -105,6 +105,36 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
"java.lang.IllegalArgumentException: ADD COLUMN cannot
specify NOT NULL.");
}
+ @Test
+ public void testRenameTable() {
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE t3 RENAME TO t4"))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Table or view not found: t3");
+
+ assertThatThrownBy(() -> spark.sql("ALTER TABLE t1 RENAME TO t2"))
+ .isInstanceOf(AnalysisException.class)
+ .hasMessageContaining("Table default.t2 already exists");
+
+ spark.sql("ALTER TABLE t1 RENAME TO t3");
+ List<Row> tables = spark.sql("SHOW TABLES").collectAsList();
+ assertThat(tables.stream().map(Row::toString))
+ .containsExactlyInAnyOrder("[default,t2,false]",
"[default,t3,false]");
+
+ List<Row> afterRename =
+ spark.sql("SHOW CREATE TABLE
tablestore.default.t3").collectAsList();
+ assertThat(afterRename.toString())
+ .isEqualTo(
+ "[[CREATE TABLE t3 (\n"
+ + " `a` INT NOT NULL,\n"
+ + " `b` BIGINT,\n"
+ + " `c` STRING)\n"
+ + buildTableProperties("default.db/t3")
+ + "]]");
+
+ List<Row> data = spark.sql("SELECT * FROM t3").collectAsList();
+ assertThat(data.toString()).isEqualTo("[[1,2,1], [5,6,3]]");
+ }
+
@Test
public void testRenameColumn() throws Exception {
Path tablePath = new Path(warehousePath,
"default.db/testRenameColumn");
@@ -149,7 +179,6 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
@Test
public void testRenamePartitionKey() {
- spark.sql("USE tablestore");
spark.sql(
"CREATE TABLE default.testRenamePartitionKey (\n"
+ "a BIGINT,\n"
@@ -255,7 +284,6 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
@Test
public void testDropPartitionKey() {
- spark.sql("USE tablestore");
spark.sql(
"CREATE TABLE default.testDropPartitionKey (\n"
+ "a BIGINT,\n"
@@ -295,7 +323,6 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
@Test
public void testDropPrimaryKey() {
- spark.sql("USE tablestore");
spark.sql(
"CREATE TABLE default.testDropPrimaryKey (\n"
+ "a BIGINT,\n"
@@ -385,7 +412,6 @@ public class SparkSchemaEvolutionITCase extends
SparkReadTestBase {
@Test
public void testAlterPrimaryKeyNullability() {
- spark.sql("USE tablestore");
spark.sql(
"CREATE TABLE default.testAlterPkNullability (\n"
+ "a BIGINT,\n"