This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch release-0.3
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/release-0.3 by this push:
     new ed96a7ca [FLINK-30595] support create table like
ed96a7ca is described below

commit ed96a7ca247e0b3bf7d02599f6bdca30bf7c31b3
Author: JunZhang <[email protected]>
AuthorDate: Mon Jan 9 10:37:25 2023 +0800

    [FLINK-30595] support create table like
    
    This closes #467
---
 docs/content/docs/how-to/creating-tables.md        | 24 ++++++++++++++++++++++
 .../flink/table/store/connector/FlinkCatalog.java  |  5 -----
 .../table/store/connector/CatalogTableITCase.java  | 10 +++++++++
 3 files changed, 34 insertions(+), 5 deletions(-)

diff --git a/docs/content/docs/how-to/creating-tables.md 
b/docs/content/docs/how-to/creating-tables.md
index 8e3a656c..eba5d91d 100644
--- a/docs/content/docs/how-to/creating-tables.md
+++ b/docs/content/docs/how-to/creating-tables.md
@@ -112,6 +112,30 @@ Partition keys must be a subset of primary keys if primary 
keys are defined.
 
 {{< /hint >}}
 
+### Create Table Like
+
+{{< tabs "create-table-like" >}}
+
+{{< tab "Flink" >}}
+
+To create a table with the same schema, partition, and table properties as 
another table, use CREATE TABLE LIKE.
+
+```sql
+CREATE TABLE MyTable (
+    user_id BIGINT,
+    item_id BIGINT,
+    behavior STRING,
+    dt STRING,
+    hh STRING,
+    PRIMARY KEY (dt, hh, user_id) NOT ENFORCED
+) ;
+
+CREATE TABLE MyTableLike LIKE MyTable;
+```
+
+{{< /tab >}}
+
+
 ### Table Properties
 
 Users can specify table properties to enable features or improve performance 
of Table Store. For a complete list of such properties, see 
[configurations]({{< ref "docs/maintenance/configurations" >}}).
diff --git 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
index 253714db..03091912 100644
--- 
a/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
+++ 
b/flink-table-store-connector/src/main/java/org/apache/flink/table/store/connector/FlinkCatalog.java
@@ -19,7 +19,6 @@
 package org.apache.flink.table.store.connector;
 
 import org.apache.flink.annotation.VisibleForTesting;
-import org.apache.flink.core.fs.Path;
 import org.apache.flink.table.catalog.AbstractCatalog;
 import org.apache.flink.table.catalog.CatalogBaseTable;
 import org.apache.flink.table.catalog.CatalogDatabase;
@@ -203,10 +202,6 @@ public class FlinkCatalog extends AbstractCatalog {
         // remove table path
         String specific = options.remove(PATH.key());
         if (specific != null) {
-            if (!catalog.getTableLocation(tablePath).equals(new 
Path(specific))) {
-                throw new IllegalArgumentException(
-                        "Illegal table path in table options: " + specific);
-            }
             catalogTable = catalogTable.copy(options);
         }
 
diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
index f2f652d5..7516fe30 100644
--- 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/CatalogTableITCase.java
@@ -123,4 +123,14 @@ public class CatalogTableITCase extends CatalogITCaseBase {
                         "+I[3, 1, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]",
                         "+I[4, 1, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT\"},{\"id\":1,\"name\":\"b\",\"type\":\"INT\"}]]");
     }
+
+    @Test
+    public void testCreateTableLike() throws Exception {
+        sql("CREATE TABLE T (a INT)");
+        sql("CREATE TABLE T1 LIKE T");
+        List<Row> result = sql("SELECT * FROM T1$schemas s");
+        System.out.println(result);
+        assertThat(result.toString())
+                .isEqualTo("[+I[0, 
[{\"id\":0,\"name\":\"a\",\"type\":\"INT\"}], [], [], {}, ]]");
+    }
 }

Reply via email to