This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 7402fd1ed34 [FLINK-39726][table] Support `USING CONNECTION` clause in
`CREATE TABLE`
7402fd1ed34 is described below
commit 7402fd1ed34c9b560b4b18b8a3b5a3b7b87a7012
Author: Hao Li <[email protected]>
AuthorDate: Wed May 27 13:58:44 2026 -0700
[FLINK-39726][table] Support `USING CONNECTION` clause in `CREATE TABLE`
---
.../src/main/codegen/includes/parserImpls.ftl | 12 +++
.../apache/flink/sql/parser/SqlUnparseUtils.java | 12 +++
.../flink/sql/parser/ddl/SqlReplaceTableAs.java | 1 +
.../flink/sql/parser/ddl/table/SqlCreateTable.java | 14 ++-
.../sql/parser/ddl/table/SqlCreateTableAs.java | 1 +
.../sql/parser/ddl/table/SqlCreateTableLike.java | 2 +
.../flink/sql/parser/utils/ParserResource.java | 4 +
.../flink/sql/parser/FlinkSqlParserImplTest.java | 116 +++++++++++++++++++++
8 files changed, 161 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index cf01aee2f7e..b3021950f12 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -1596,6 +1596,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean
isTemporary) :
SqlNodeList propertyList = SqlNodeList.EMPTY;
SqlDistribution distribution = null;
SqlNodeList partitionColumns = SqlNodeList.EMPTY;
+ SqlIdentifier connection = null;
SqlParserPos pos = startPos;
boolean isColumnsIdentifiersOnly = false;
}
@@ -1629,6 +1630,10 @@ SqlCreate SqlCreateTable(Span s, boolean replace,
boolean isTemporary) :
<PARTITIONED> <BY>
partitionColumns = ParenthesizedSimpleIdentifierList()
]
+ [
+ <USING> <CONNECTION>
+ connection = CompoundIdentifier()
+ ]
[
<WITH>
propertyList = Properties()
@@ -1652,6 +1657,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean
isTemporary) :
partitionColumns,
watermark,
comment,
+ connection,
tableLike,
isTemporary,
ifNotExists);
@@ -1660,6 +1666,11 @@ SqlCreate SqlCreateTable(Span s, boolean replace,
boolean isTemporary) :
<AS>
asQuery = OrderedQueryOrExpr(ExprContext.ACCEPT_QUERY)
{
+ if (connection != null) {
+ throw SqlUtil.newContextException(
+ pos,
+
ParserResource.RESOURCE.usingConnectionWithAsUnsupported());
+ }
if (replace) {
return new SqlReplaceTableAs(startPos.plus(getPos()),
tableName,
@@ -1706,6 +1717,7 @@ SqlCreate SqlCreateTable(Span s, boolean replace, boolean
isTemporary) :
partitionColumns,
watermark,
comment,
+ connection,
isTemporary,
ifNotExists);
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
index 619f7a5e932..c66e2e4f6e0 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlUnparseUtils.java
@@ -25,6 +25,7 @@ import
org.apache.flink.sql.parser.ddl.constraint.SqlTableConstraint;
import org.apache.flink.sql.parser.ddl.materializedtable.SqlStartMode;
import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlIntervalLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
@@ -100,6 +101,17 @@ public class SqlUnparseUtils {
writer.endList(partitionedByFrame);
}
+ public static void unparseUsingConnection(
+ SqlIdentifier connection, SqlWriter writer, int leftPrec, int
rightPrec) {
+ if (connection == null) {
+ return;
+ }
+ writer.newlineAndIndent();
+ writer.keyword("USING");
+ writer.keyword("CONNECTION");
+ connection.unparse(writer, leftPrec, rightPrec);
+ }
+
public static void unparseFreshness(
SqlIntervalLiteral freshness,
boolean withNewLine,
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
index 943951f3224..db3cf838119 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlReplaceTableAs.java
@@ -107,6 +107,7 @@ public class SqlReplaceTableAs extends SqlCreateTable
implements ExtendedSqlNode
partitionKeyList,
watermark,
comment,
+ null,
isTemporary,
ifNotExists,
true);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java
index 0da0686b66b..1da49e51324 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTable.java
@@ -66,6 +66,8 @@ public class SqlCreateTable extends SqlCreateObject
implements ExtendedSqlNode {
private final SqlWatermark watermark;
+ private final SqlIdentifier connection;
+
public SqlCreateTable(
SqlParserPos pos,
SqlIdentifier tableName,
@@ -76,6 +78,7 @@ public class SqlCreateTable extends SqlCreateObject
implements ExtendedSqlNode {
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
+ @Nullable SqlIdentifier connection,
boolean isTemporary,
boolean ifNotExists) {
this(
@@ -89,6 +92,7 @@ public class SqlCreateTable extends SqlCreateObject
implements ExtendedSqlNode {
partitionKeyList,
watermark,
comment,
+ connection,
isTemporary,
ifNotExists,
false);
@@ -105,6 +109,7 @@ public class SqlCreateTable extends SqlCreateObject
implements ExtendedSqlNode {
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
+ @Nullable SqlIdentifier connection,
boolean isTemporary,
boolean ifNotExists,
boolean replace) {
@@ -117,6 +122,7 @@ public class SqlCreateTable extends SqlCreateObject
implements ExtendedSqlNode {
this.partitionKeyList =
requireNonNull(partitionKeyList, "partitionKeyList should not
be null");
this.watermark = watermark;
+ this.connection = connection;
}
@Override
@@ -128,7 +134,8 @@ public class SqlCreateTable extends SqlCreateObject
implements ExtendedSqlNode {
properties,
partitionKeyList,
watermark,
- comment);
+ comment,
+ connection);
}
public SqlNodeList getColumnList() {
@@ -151,6 +158,10 @@ public class SqlCreateTable extends SqlCreateObject
implements ExtendedSqlNode {
return Optional.ofNullable(watermark);
}
+ public Optional<SqlIdentifier> getConnection() {
+ return Optional.ofNullable(connection);
+ }
+
@Override
protected String getScope() {
return "TABLE";
@@ -214,6 +225,7 @@ public class SqlCreateTable extends SqlCreateObject
implements ExtendedSqlNode {
SqlUnparseUtils.unparseComment(comment, true, writer, leftPrec,
rightPrec);
SqlUnparseUtils.unparseDistribution(distribution, writer, leftPrec,
rightPrec);
SqlUnparseUtils.unparsePartitionKeyList(partitionKeyList, writer,
leftPrec, rightPrec);
+ SqlUnparseUtils.unparseUsingConnection(connection, writer, leftPrec,
rightPrec);
SqlUnparseUtils.unparseProperties(properties, writer, leftPrec,
rightPrec);
}
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java
index 3f897199a69..631d37ea717 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableAs.java
@@ -99,6 +99,7 @@ public class SqlCreateTableAs extends SqlCreateTable {
partitionKeyList,
watermark,
comment,
+ null,
isTemporary,
ifNotExists,
false);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java
index b9e99470940..b36b5ee983e 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/table/SqlCreateTableLike.java
@@ -86,6 +86,7 @@ public class SqlCreateTableLike extends SqlCreateTable {
SqlNodeList partitionKeyList,
@Nullable SqlWatermark watermark,
@Nullable SqlCharStringLiteral comment,
+ @Nullable SqlIdentifier connection,
SqlTableLike tableLike,
boolean isTemporary,
boolean ifNotExists) {
@@ -100,6 +101,7 @@ public class SqlCreateTableLike extends SqlCreateTable {
partitionKeyList,
watermark,
comment,
+ connection,
isTemporary,
ifNotExists,
false);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 5e1672865a4..d459ca28dc8 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -83,4 +83,8 @@ public interface ParserResource {
@Resources.BaseMessage("DROP TEMPORARY MATERIALIZED TABLE is not
supported.")
Resources.ExInst<ParseException>
dropTemporaryMaterializedTableUnsupported();
+
+ @Resources.BaseMessage(
+ "USING CONNECTION clause is not supported with CREATE TABLE AS
SELECT or REPLACE TABLE AS SELECT statements.")
+ Resources.ExInst<ParseException> usingConnectionWithAsUnsupported();
}
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 1932f2f4a90..1d1e7d639fa 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -1050,6 +1050,122 @@ class FlinkSqlParserImplTest extends SqlParserTest {
sql(sql).ok(expected);
}
+ @Test
+ void testCreateTableUsingConnection() {
+ final String sql =
+ "CREATE TABLE orders (\n"
+ + " order_id INT,\n"
+ + " customer_id INT,\n"
+ + " amount DECIMAL(10, 2)\n"
+ + ") USING CONNECTION mycat.mydb.mysql_prod\n"
+ + "WITH (\n"
+ + " 'connector' = 'jdbc',\n"
+ + " 'tables' = 'orders'\n"
+ + ")";
+ final String expected =
+ "CREATE TABLE `ORDERS` (\n"
+ + " `ORDER_ID` INTEGER,\n"
+ + " `CUSTOMER_ID` INTEGER,\n"
+ + " `AMOUNT` DECIMAL(10, 2)\n"
+ + ")\n"
+ + "USING CONNECTION `MYCAT`.`MYDB`.`MYSQL_PROD`\n"
+ + "WITH (\n"
+ + " 'connector' = 'jdbc',\n"
+ + " 'tables' = 'orders'\n"
+ + ")";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testCreateTableUsingConnectionWithPartitionAndDistribution() {
+ final String sql =
+ "CREATE TABLE tbl1 (\n"
+ + " a bigint,\n"
+ + " h varchar,\n"
+ + " b varchar\n"
+ + ")\n"
+ + "DISTRIBUTED BY HASH(a) INTO 3 BUCKETS\n"
+ + "PARTITIONED BY (a, h)\n"
+ + "USING CONNECTION cat1.db1.conn1\n"
+ + "WITH (\n"
+ + " 'connector' = 'jdbc'\n"
+ + ")";
+ final String expected =
+ "CREATE TABLE `TBL1` (\n"
+ + " `A` BIGINT,\n"
+ + " `H` VARCHAR,\n"
+ + " `B` VARCHAR\n"
+ + ")\n"
+ + "DISTRIBUTED BY HASH(`A`) INTO 3 BUCKETS\n"
+ + "PARTITIONED BY (`A`, `H`)\n"
+ + "USING CONNECTION `CAT1`.`DB1`.`CONN1`\n"
+ + "WITH (\n"
+ + " 'connector' = 'jdbc'\n"
+ + ")";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testCreateTableAsWithUsingConnectionFails() {
+ // CTAS goes through SqlCreateTable with replace=false; the parser
explicitly rejects
+ // USING CONNECTION + AS via
ParserResource.usingConnectionWithAsUnsupported().
+ final String sql =
+ "^CREATE^ TABLE t1\n"
+ + "USING CONNECTION cat1.db1.conn1\n"
+ + "WITH ('connector' = 'jdbc')\n"
+ + "AS SELECT 1 AS a";
+ sql(sql).fails(
+ "(?s).*USING CONNECTION clause is not supported with "
+ + "CREATE TABLE AS SELECT or REPLACE TABLE AS
SELECT statements\\..*");
+ }
+
+ @Test
+ void testReplaceTableAsWithUsingConnectionFails() {
+ // REPLACE TABLE always has an AS clause, so USING CONNECTION is never
valid;
+ // it's not even accepted by the SqlReplaceTable production.
+ final String sql =
+ "REPLACE TABLE t1\n"
+ + "^USING^ CONNECTION cat1.db1.conn1\n"
+ + "WITH ('connector' = 'jdbc')\n"
+ + "AS SELECT 1 AS a";
+ sql(sql).fails("(?s).*Encountered \"USING\" at line 2, column 1.\n.*");
+ }
+
+ @Test
+ void testCreateOrReplaceTableAsWithUsingConnectionFails() {
+ // CREATE OR REPLACE TABLE AS goes through SqlCreateTable with
replace=true and hits
+ // the same usingConnectionWithAsUnsupported() error path as CTAS.
+ final String sql =
+ "^CREATE^ OR REPLACE TABLE t1\n"
+ + "USING CONNECTION cat1.db1.conn1\n"
+ + "WITH ('connector' = 'jdbc')\n"
+ + "AS SELECT 1 AS a";
+ sql(sql).fails(
+ "(?s).*USING CONNECTION clause is not supported with "
+ + "CREATE TABLE AS SELECT or REPLACE TABLE AS
SELECT statements\\..*");
+ }
+
+ @Test
+ void testCreateTableLikeUsingConnection() {
+ final String sql =
+ "CREATE TABLE t1 (\n"
+ + " a INT\n"
+ + ")\n"
+ + "USING CONNECTION cat1.db1.conn1\n"
+ + "WITH ('connector' = 'jdbc')\n"
+ + "LIKE base_table";
+ final String expected =
+ "CREATE TABLE `T1` (\n"
+ + " `A` INTEGER\n"
+ + ")\n"
+ + "USING CONNECTION `CAT1`.`DB1`.`CONN1`\n"
+ + "WITH (\n"
+ + " 'connector' = 'jdbc'\n"
+ + ")\n"
+ + "LIKE `BASE_TABLE`";
+ sql(sql).ok(expected);
+ }
+
String buildDistributionInput(final String distributionClause) {
return "CREATE TABLE tbl1 (\n"
+ " a bigint,\n"