This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 7402fd1ed34 [FLINK-39726][table] Support `USING CONNECTION` clause in 
`CREATE TABLE`
7402fd1ed34 is described below

commit 7402fd1ed34c9b560b4b18b8a3b5a3b7b87a7012
Author: Hao Li <[email protected]>
AuthorDate: Wed May 27 13:58:44 2026 -0700

    [FLINK-39726][table] Support `USING CONNECTION` clause in `CREATE TABLE`
---
 .../src/main/codegen/includes/parserImpls.ftl      |  12 +++
 .../apache/flink/sql/parser/SqlUnparseUtils.java   |  12 +++
 .../flink/sql/parser/ddl/SqlReplaceTableAs.java    |   1 +
 .../flink/sql/parser/ddl/table/SqlCreateTable.java |  14 ++-
 .../sql/parser/ddl/table/SqlCreateTableAs.java     |   1 +
 .../sql/parser/ddl/table/SqlCreateTableLike.java   |   2 +
 .../flink/sql/parser/utils/ParserResource.java     |   4 +
 .../flink/sql/parser/FlinkSqlParserImplTest.java   | 116 +++++++++++++++++++++
 8 files changed, 161 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl 
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index cf01aee2f7e..b3021950f12 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1596,6 +1596,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean 
isTemporary) :
     SqlNodeList propertyList = SqlNodeList.EMPTY;
     SqlDistribution distribution = null;
     SqlNodeList partitionColumns = SqlNodeList.EMPTY;
+    SqlIdentifier connection = null;
     SqlParserPos pos = startPos;
     boolean isColumnsIdentifiersOnly = false;
 }
@@ -1629,6 +1630,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace, 
boolean isTemporary) :
         <PARTITIONED> <BY>
         partitionColumns = ParenthesizedSimpleIdentifierList()
     ]
+    [
+        <USING> <CONNECTION>
+        connection = CompoundIdentifier()
+    ]
     [
         <WITH>
         propertyList = Properties()
@@ -1652,6 +1657,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean 
isTemporary) :
                 partitionColumns,
                 watermark,
                 comment,
+                connection,
                 tableLike,
                 isTemporary,
                 ifNotExists);
@@ -1660,6 +1666,11 @@ SqlCreate SqlCreateTable(Span s, boolean replace, 
boolean isTemporary) :
         <AS>
         asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
         {
+            if (connection != null) {
+                throw SqlUtil.newContextException(
+                    pos,
+                    
ParserResource.RESOURCE.usingConnectionWithAsUnsupported());
+            }
             if (replace) {
                 return new SqlReplaceTableAs(startPos.plus(getPos()),
                     tableName,
@@ -1706,6 +1717,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean 
isTemporary) :
             partitionColumns,
             watermark,
             comment,
+            connection,
             isTemporary,
             ifNotExists);
     }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
index 619f7a5e932..c66e2e4f6e0 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
 import org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode;
 
 import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlIntervalLiteral;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
@@ -100,6 +101,17 @@ public class SqlUnparseUtils {
         writer.endList(partitionedByFrame);
     }
 
+    public static void unparseUsingConnection(
+            SqlIdentifier connection, SqlWriter writer, int leftPrec, int 
rightPrec) {
+        if (connection == null) {
+            return;
+        }
+        writer.newlineAndIndent();
+        writer.keyword("USING");
+        writer.keyword("CONNECTION");
+        connection.unparse(writer, leftPrec, rightPrec);
+    }
+
     public static void unparseFreshness(
             SqlIntervalLiteral freshness,
             boolean withNewLine,
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
index 943951f3224..db3cf838119 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -107,6 +107,7 @@ public class SqlReplaceTableAs extends SqlCreateTable 
implements ExtendedSqlNode
                 partitionKeyList,
                 watermark,
                 comment,
+                null,
                 isTemporary,
                 ifNotExists,
                 true);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java
index 0da0686b66b..1da49e51324 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java
@@ -66,6 +66,8 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
 
     private final SqlWatermark watermark;
 
+    private final SqlIdentifier connection;
+
     public SqlCreateTable(
             SqlParserPos pos,
             SqlIdentifier tableName,
@@ -76,6 +78,7 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
             SqlNodeList partitionKeyList,
             @Nullable SqlWatermark watermark,
             @Nullable SqlCharStringLiteral comment,
+            @Nullable SqlIdentifier connection,
             boolean isTemporary,
             boolean ifNotExists) {
         this(
@@ -89,6 +92,7 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
                 partitionKeyList,
                 watermark,
                 comment,
+                connection,
                 isTemporary,
                 ifNotExists,
                 false);
@@ -105,6 +109,7 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
             SqlNodeList partitionKeyList,
             @Nullable SqlWatermark watermark,
             @Nullable SqlCharStringLiteral comment,
+            @Nullable SqlIdentifier connection,
             boolean isTemporary,
             boolean ifNotExists,
             boolean replace) {
@@ -117,6 +122,7 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
         this.partitionKeyList =
                 requireNonNull(partitionKeyList, "partitionKeyList should not 
be null");
         this.watermark = watermark;
+        this.connection = connection;
     }
 
     @Override
@@ -128,7 +134,8 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
                 properties,
                 partitionKeyList,
                 watermark,
-                comment);
+                comment,
+                connection);
     }
 
     public SqlNodeList getColumnList() {
@@ -151,6 +158,10 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
         return Optional.ofNullable(watermark);
     }
 
+    public Optional<SqlIdentifier> getConnection() {
+        return Optional.ofNullable(connection);
+    }
+
     @Override
     protected String getScope() {
         return "TABLE";
@@ -214,6 +225,7 @@ public class SqlCreateTable extends SqlCreateObject 
implements ExtendedSqlNode {
         SqlUnparseUtils.unparseComment(comment, true, writer, leftPrec, 
rightPrec);
         SqlUnparseUtils.unparseDistribution(distribution, writer, leftPrec, 
rightPrec);
         SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer, 
leftPrec, rightPrec);
+        SqlUnparseUtils.unparseUsingConnection(connection, writer, leftPrec, 
rightPrec);
         SqlUnparseUtils.unparseProperties(properties, writer, leftPrec, 
rightPrec);
     }
 }
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java
index 3f897199a69..631d37ea717 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java
@@ -99,6 +99,7 @@ public class SqlCreateTableAs extends SqlCreateTable {
                 partitionKeyList,
                 watermark,
                 comment,
+                null,
                 isTemporary,
                 ifNotExists,
                 false);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java
index b9e99470940..b36b5ee983e 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java
@@ -86,6 +86,7 @@ public class SqlCreateTableLike extends SqlCreateTable {
             SqlNodeList partitionKeyList,
             @Nullable SqlWatermark watermark,
             @Nullable SqlCharStringLiteral comment,
+            @Nullable SqlIdentifier connection,
             SqlTableLike tableLike,
             boolean isTemporary,
             boolean ifNotExists) {
@@ -100,6 +101,7 @@ public class SqlCreateTableLike extends SqlCreateTable {
                 partitionKeyList,
                 watermark,
                 comment,
+                connection,
                 isTemporary,
                 ifNotExists,
                 false);
diff --git 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 5e1672865a4..d459ca28dc8 100644
--- 
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++ 
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -83,4 +83,8 @@ public interface ParserResource {
 
     @Resources.BaseMessage("DROP TEMPORARY MATERIALIZED TABLE is not 
supported.")
     Resources.ExInst<ParseException> 
dropTemporaryMaterializedTableUnsupported();
+
+    @Resources.BaseMessage(
+            "USING CONNECTION clause is not supported with CREATE TABLE AS 
SELECT or REPLACE TABLE AS SELECT statements.")
+    Resources.ExInst<ParseException> usingConnectionWithAsUnsupported();
 }
diff --git 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 1932f2f4a90..1d1e7d639fa 100644
--- 
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++ 
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1050,6 +1050,122 @@ class FlinkSqlParserImplTest extends SqlParserTest {
         sql(sql).ok(expected);
     }
 
+    @Test
+    void testCreateTableUsingConnection() {
+        final String sql =
+                "CREATE TABLE orders (\n"
+                        + "  order_id INT,\n"
+                        + "  customer_id INT,\n"
+                        + "  amount DECIMAL(10, 2)\n"
+                        + ") USING CONNECTION mycat.mydb.mysql_prod\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'jdbc',\n"
+                        + "  'tables' = 'orders'\n"
+                        + ")";
+        final String expected =
+                "CREATE TABLE `ORDERS` (\n"
+                        + "  `ORDER_ID` INTEGER,\n"
+                        + "  `CUSTOMER_ID` INTEGER,\n"
+                        + "  `AMOUNT` DECIMAL(10, 2)\n"
+                        + ")\n"
+                        + "USING CONNECTION `MYCAT`.`MYDB`.`MYSQL_PROD`\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'jdbc',\n"
+                        + "  'tables' = 'orders'\n"
+                        + ")";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testCreateTableUsingConnectionWithPartitionAndDistribution() {
+        final String sql =
+                "CREATE TABLE tbl1 (\n"
+                        + "  a bigint,\n"
+                        + "  h varchar,\n"
+                        + "  b varchar\n"
+                        + ")\n"
+                        + "DISTRIBUTED BY HASH(a) INTO 3 BUCKETS\n"
+                        + "PARTITIONED BY (a, h)\n"
+                        + "USING CONNECTION cat1.db1.conn1\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'jdbc'\n"
+                        + ")";
+        final String expected =
+                "CREATE TABLE `TBL1` (\n"
+                        + "  `A` BIGINT,\n"
+                        + "  `H` VARCHAR,\n"
+                        + "  `B` VARCHAR\n"
+                        + ")\n"
+                        + "DISTRIBUTED BY HASH(`A`) INTO 3 BUCKETS\n"
+                        + "PARTITIONED BY (`A`, `H`)\n"
+                        + "USING CONNECTION `CAT1`.`DB1`.`CONN1`\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'jdbc'\n"
+                        + ")";
+        sql(sql).ok(expected);
+    }
+
+    @Test
+    void testCreateTableAsWithUsingConnectionFails() {
+        // CTAS goes through SqlCreateTable with replace=false; the parser 
explicitly rejects
+        // USING CONNECTION + AS via 
ParserResource.usingConnectionWithAsUnsupported().
+        final String sql =
+                "^CREATE^ TABLE t1\n"
+                        + "USING CONNECTION cat1.db1.conn1\n"
+                        + "WITH ('connector' = 'jdbc')\n"
+                        + "AS SELECT 1 AS a";
+        sql(sql).fails(
+                        "(?s).*USING CONNECTION clause is not supported with "
+                                + "CREATE TABLE AS SELECT or REPLACE TABLE AS 
SELECT statements\\..*");
+    }
+
+    @Test
+    void testReplaceTableAsWithUsingConnectionFails() {
+        // REPLACE TABLE always has an AS clause, so USING CONNECTION is never 
valid;
+        // it's not even accepted by the SqlReplaceTable production.
+        final String sql =
+                "REPLACE TABLE t1\n"
+                        + "^USING^ CONNECTION cat1.db1.conn1\n"
+                        + "WITH ('connector' = 'jdbc')\n"
+                        + "AS SELECT 1 AS a";
+        sql(sql).fails("(?s).*Encountered \"USING\" at line 2, column 1.\n.*");
+    }
+
+    @Test
+    void testCreateOrReplaceTableAsWithUsingConnectionFails() {
+        // CREATE OR REPLACE TABLE AS goes through SqlCreateTable with 
replace=true and hits
+        // the same usingConnectionWithAsUnsupported() error path as CTAS.
+        final String sql =
+                "^CREATE^ OR REPLACE TABLE t1\n"
+                        + "USING CONNECTION cat1.db1.conn1\n"
+                        + "WITH ('connector' = 'jdbc')\n"
+                        + "AS SELECT 1 AS a";
+        sql(sql).fails(
+                        "(?s).*USING CONNECTION clause is not supported with "
+                                + "CREATE TABLE AS SELECT or REPLACE TABLE AS 
SELECT statements\\..*");
+    }
+
+    @Test
+    void testCreateTableLikeUsingConnection() {
+        final String sql =
+                "CREATE TABLE t1 (\n"
+                        + "  a INT\n"
+                        + ")\n"
+                        + "USING CONNECTION cat1.db1.conn1\n"
+                        + "WITH ('connector' = 'jdbc')\n"
+                        + "LIKE base_table";
+        final String expected =
+                "CREATE TABLE `T1` (\n"
+                        + "  `A` INTEGER\n"
+                        + ")\n"
+                        + "USING CONNECTION `CAT1`.`DB1`.`CONN1`\n"
+                        + "WITH (\n"
+                        + "  'connector' = 'jdbc'\n"
+                        + ")\n"
+                        + "LIKE `BASE_TABLE`";
+        sql(sql).ok(expected);
+    }
+
     String buildDistributionInput(final String distributionClause) {
         return "CREATE TABLE tbl1 (\n"
                 + "  a bigint,\n"

Reply via email to