This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 321b17f4d15 [FLINK-38260][table] Add parser changes for connection sql
321b17f4d15 is described below
commit 321b17f4d15be28c8d6295c383fba9c63bc1cdb8
Author: Hao Li <[email protected]>
AuthorDate: Mon Dec 1 15:35:37 2025 -0800
[FLINK-38260][table] Add parser changes for connection sql
---
.../src/main/codegen/data/Parser.tdd | 13 ++
.../src/main/codegen/includes/parserImpls.ftl | 216 ++++++++++++++++-
.../org/apache/flink/sql/parser/SqlParseUtils.java | 18 +-
.../{SqlDropModel.java => SqlAlterConnection.java} | 38 +--
...delReset.java => SqlAlterConnectionRename.java} | 42 ++--
...odelReset.java => SqlAlterConnectionReset.java} | 18 +-
...rModelReset.java => SqlAlterConnectionSet.java} | 38 +--
.../flink/sql/parser/ddl/SqlAlterModelReset.java | 5 +-
.../flink/sql/parser/ddl/SqlAlterTableReset.java | 5 +-
.../flink/sql/parser/ddl/SqlAnalyzeTable.java | 2 +-
.../flink/sql/parser/ddl/SqlCreateConnection.java | 110 +++++++++
.../sql/parser/ddl/SqlCreateMaterializedTable.java | 2 +-
.../flink/sql/parser/ddl/SqlCreateTable.java | 2 +-
.../{SqlDropModel.java => SqlDropConnection.java} | 37 ++-
.../apache/flink/sql/parser/ddl/SqlDropModel.java | 7 +-
.../sql/parser/dql/SqlRichDescribeConnection.java | 79 +++++++
.../SqlShowConnections.java} | 48 ++--
.../SqlShowCreateConnection.java} | 45 ++--
.../flink/sql/parser/utils/ParserResource.java | 10 +-
.../flink/sql/parser/FlinkSqlParserImplTest.java | 256 ++++++++++++++++++++-
.../converters/SqlDropModelConverter.java | 2 +-
.../planner/utils/OperationConverterUtils.java | 4 +-
22 files changed, 844 insertions(+), 153 deletions(-)
diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
index 49ffde6d757..1585a8b1e04 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
+++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd
@@ -39,6 +39,10 @@
"org.apache.flink.sql.parser.ddl.SqlAlterCatalogOptions"
"org.apache.flink.sql.parser.ddl.SqlAlterCatalogReset"
"org.apache.flink.sql.parser.ddl.SqlAlterCatalogComment"
+ "org.apache.flink.sql.parser.ddl.SqlAlterConnection"
+ "org.apache.flink.sql.parser.ddl.SqlAlterConnectionRename"
+ "org.apache.flink.sql.parser.ddl.SqlAlterConnectionReset"
+ "org.apache.flink.sql.parser.ddl.SqlAlterConnectionSet"
"org.apache.flink.sql.parser.ddl.SqlAlterDatabase"
"org.apache.flink.sql.parser.ddl.SqlAlterFunction"
"org.apache.flink.sql.parser.ddl.SqlAlterMaterializedTable"
@@ -77,6 +81,7 @@
"org.apache.flink.sql.parser.ddl.SqlAlterViewRename"
"org.apache.flink.sql.parser.ddl.SqlCompilePlan"
"org.apache.flink.sql.parser.ddl.SqlCreateCatalog"
+ "org.apache.flink.sql.parser.ddl.SqlCreateConnection"
"org.apache.flink.sql.parser.ddl.SqlCreateDatabase"
"org.apache.flink.sql.parser.ddl.SqlCreateFunction"
"org.apache.flink.sql.parser.ddl.SqlCreateModel"
@@ -88,6 +93,7 @@
"org.apache.flink.sql.parser.ddl.SqlCreateView"
"org.apache.flink.sql.parser.ddl.SqlDistribution"
"org.apache.flink.sql.parser.ddl.SqlDropCatalog"
+ "org.apache.flink.sql.parser.ddl.SqlDropConnection"
"org.apache.flink.sql.parser.ddl.SqlDropDatabase"
"org.apache.flink.sql.parser.ddl.SqlDropFunction"
"org.apache.flink.sql.parser.ddl.SqlDropMaterializedTable"
@@ -138,12 +144,15 @@
"org.apache.flink.sql.parser.dql.SqlShowTables"
"org.apache.flink.sql.parser.dql.SqlShowTables.SqlTableKind"
"org.apache.flink.sql.parser.dql.SqlShowColumns"
+ "org.apache.flink.sql.parser.dql.SqlShowConnections"
"org.apache.flink.sql.parser.dql.SqlShowCreate"
+ "org.apache.flink.sql.parser.dql.SqlShowCreateConnection"
"org.apache.flink.sql.parser.dql.SqlShowCreateMaterializedTable"
"org.apache.flink.sql.parser.dql.SqlShowCreateModel"
"org.apache.flink.sql.parser.dql.SqlShowCreateTable"
"org.apache.flink.sql.parser.dql.SqlShowCreateView"
"org.apache.flink.sql.parser.dql.SqlShowCreateCatalog"
+ "org.apache.flink.sql.parser.dql.SqlRichDescribeConnection"
"org.apache.flink.sql.parser.dql.SqlRichDescribeFunction"
"org.apache.flink.sql.parser.dql.SqlRichDescribeModel"
"org.apache.flink.sql.parser.dql.SqlRichDescribeTable"
@@ -186,6 +195,7 @@
"COMMENT"
"COMPILE"
"COMPUTE"
+ "CONNECTIONS",
"CONTINUOUS"
"DATABASES"
"DISTRIBUTED"
@@ -619,12 +629,14 @@
"SqlAlterFunction()"
"SqlShowFunctions()"
"SqlShowModels()"
+ "SqlShowConnections()"
"SqlShowTables()"
"SqlShowColumns()"
"SqlShowCreate()"
"SqlReplaceTable()"
"SqlAlterMaterializedTable()"
"SqlAlterModel()"
+ "SqlAlterConnection()"
"SqlAlterTable()"
"SqlAlterView()"
"SqlShowModules()"
@@ -649,6 +661,7 @@
"SqlDescribeJob()"
"SqlRichDescribeFunction()"
"SqlRichDescribeModel()"
+ "SqlRichDescribeConnection()"
"SqlRichDescribeTable()"
]
diff --git
a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
index 14bcdfd6f19..d4e0d198d91 100644
--- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
+++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl
@@ -776,6 +776,13 @@ SqlShowCreate SqlShowCreate() :
{
return new SqlShowCreateModel(pos, sqlIdentifier);
}
+ |
+ <CONNECTION>
+ { pos = getPos(); }
+ sqlIdentifier = CompoundIdentifier()
+ {
+ return new SqlShowCreateConnection(pos, sqlIdentifier);
+ }
|
<MATERIALIZED> <TABLE>
{ pos = getPos(); }
@@ -787,7 +794,7 @@ SqlShowCreate SqlShowCreate() :
}
/**
- * DESCRIBE | DESC FUNCTION [ EXTENDED] [[catalogName.]
dataBasesName].functionName sql call.
+ * (DESCRIBE | DESC) FUNCTION [ EXTENDED] [[catalogName.]
dataBasesName].functionName sql call.
* Here we add Rich in className to match the naming of SqlRichDescribeTable.
*/
SqlRichDescribeFunction SqlRichDescribeFunction() :
@@ -806,7 +813,7 @@ SqlRichDescribeFunction SqlRichDescribeFunction() :
}
/**
- * DESCRIBE | DESC MODEL [ EXTENDED] [[catalogName.] dataBasesName].modelName
sql call.
+ * (DESCRIBE | DESC) MODEL [ EXTENDED] [[catalogName.]
dataBasesName].modelName sql call.
* Here we add Rich in className to match the naming of SqlRichDescribeTable.
*/
SqlRichDescribeModel SqlRichDescribeModel() :
@@ -825,7 +832,26 @@ SqlRichDescribeModel SqlRichDescribeModel() :
}
/**
- * DESCRIBE | DESC [ EXTENDED] [[catalogName.] dataBasesName].tableName sql
call.
+ * (DESCRIBE | DESC) CONNECTION [ EXTENDED] [[catalogName.]
dataBasesName].connectionName sql call.
+ * Here we add Rich in className to match the naming of SqlRichDescribeTable.
+ */
+SqlRichDescribeConnection SqlRichDescribeConnection() :
+{
+ SqlIdentifier connectionName;
+ SqlParserPos pos;
+ boolean isExtended = false;
+}
+{
+ ( <DESCRIBE> | <DESC> ) <CONNECTION> { pos = getPos();}
+ [ <EXTENDED> { isExtended = true;} ]
+ connectionName = CompoundIdentifier()
+ {
+ return new SqlRichDescribeConnection(pos, connectionName, isExtended);
+ }
+}
+
+/**
+ * (DESCRIBE | DESC) [ EXTENDED] [[catalogName.] dataBasesName].tableName sql
call.
* Here we add Rich in className to distinguish from calcite's original
SqlDescribeTable.
*/
SqlRichDescribeTable SqlRichDescribeTable() :
@@ -2683,9 +2709,13 @@ SqlCreate SqlCreateExtended(Span s, boolean replace) :
|
create = SqlCreateDatabase(s, replace)
|
+ create = SqlCreateModel(s, isTemporary)
+ |
+ // Lookahead to distinguish <SYSTEM> FUNCTION and <SYSTEM> <CONNECTION>
+ LOOKAHEAD(2)
create = SqlCreateFunction(s, replace, isTemporary)
|
- create = SqlCreateModel(s, isTemporary)
+ create = SqlCreateConnection(s, isTemporary)
)
{
return create;
@@ -2712,9 +2742,13 @@ SqlDrop SqlDropExtended(Span s, boolean replace) :
|
drop = SqlDropDatabase(s, replace)
|
+ drop = SqlDropModel(s, isTemporary)
+ |
+ // Lookahead to distinguish <SYSTEM> FUNCTION and <SYSTEM> <CONNECTION>
+ LOOKAHEAD(2)
drop = SqlDropFunction(s, replace, isTemporary)
|
- drop = SqlDropModel(s, isTemporary)
+ drop = SqlDropConnection(s, isTemporary)
)
{
return drop;
@@ -3345,7 +3379,7 @@ SqlTruncateTable SqlTruncateTable() :
}
/**
-* SHOW MODELS [FROM [catalog.] database] [[NOT] LIKE pattern]; sql call.
+* SHOW MODELS [FROM [catalog.] database] [[NOT] LIKE pattern];
*/
SqlShowModels SqlShowModels() :
{
@@ -3381,6 +3415,43 @@ SqlShowModels SqlShowModels() :
}
}
+/**
+* SHOW CONNECTIONS [LIKE 'pattern'] [FROM catalog_name.db_name];
+*/
+SqlShowConnections SqlShowConnections() :
+{
+ SqlIdentifier databaseName = null;
+ SqlCharStringLiteral likeLiteral = null;
+ String prep = null;
+ boolean notLike = false;
+ SqlParserPos pos;
+}
+{
+ <SHOW> <CONNECTIONS>
+ { pos = getPos(); }
+ [
+ ( <FROM> { prep = "FROM"; } | <IN> { prep = "IN"; } )
+ { pos = getPos(); }
+ databaseName = CompoundIdentifier()
+ ]
+ [
+ [
+ <NOT>
+ {
+ notLike = true;
+ }
+ ]
+ <LIKE> <QUOTED_STRING>
+ {
+ String likeCondition = SqlParserUtil.parseString(token.image);
+ likeLiteral = SqlLiteral.createCharString(likeCondition, getPos());
+ }
+ ]
+ {
+ return new SqlShowConnections(pos, prep, databaseName, notLike,
likeLiteral);
+ }
+}
+
/**
* ALTER MODEL [IF EXISTS] modelName SET (property_key = property_val, ...)
* ALTER MODEL [IF EXISTS] modelName RENAME TO newModelName
@@ -3433,6 +3504,59 @@ SqlAlterModel SqlAlterModel() :
)
}
+/**
+* ALTER CONNECTION [IF EXISTS] connectionName SET (property_key =
property_val, ...)
+* ALTER CONNECTION [IF EXISTS] connectionName RENAME TO newConnectionName
+* ALTER CONNECTION [IF EXISTS] connectionName RESET (property_key, ...)
+* Alter temporary or system connection is not supported.
+*/
+SqlAlterConnection SqlAlterConnection() :
+{
+ SqlParserPos startPos;
+ boolean ifExists = false;
+ SqlIdentifier connectionIdentifier;
+ SqlIdentifier newConnectionIdentifier = null;
+ SqlNodeList propertyList = SqlNodeList.EMPTY;
+ SqlNodeList propertyKeyList = SqlNodeList.EMPTY;
+}
+{
+ <ALTER> <CONNECTION> { startPos = getPos(); }
+ ifExists = IfExistsOpt()
+ connectionIdentifier = CompoundIdentifier()
+ (
+ LOOKAHEAD(2)
+ <RENAME> <TO>
+ newConnectionIdentifier = CompoundIdentifier()
+ {
+ return new SqlAlterConnectionRename(
+ startPos.plus(getPos()),
+ connectionIdentifier,
+ newConnectionIdentifier,
+ ifExists);
+ }
+ |
+ <SET>
+ propertyList = Properties()
+ {
+ return new SqlAlterConnectionSet(
+ startPos.plus(getPos()),
+ connectionIdentifier,
+ ifExists,
+ propertyList);
+ }
+ |
+ <RESET>
+ propertyKeyList = PropertyKeys()
+ {
+ return new SqlAlterConnectionReset(
+ startPos.plus(getPos()),
+ connectionIdentifier,
+ ifExists,
+ propertyKeyList);
+ }
+ )
+}
+
/**
* DROP MODEL [IF EXIST] modelName
*/
@@ -3453,6 +3577,38 @@ SqlDrop SqlDropModel(Span s, boolean isTemporary) :
}
}
+/**
+* DROP [TEMPORARY] [SYSTEM] CONNECTION [IF EXIST] connectionName
+*/
+SqlDrop SqlDropConnection(Span s, boolean isTemporary) :
+{
+ SqlIdentifier connectionIdentifier = null;
+ boolean ifExists = false;
+ boolean isSystemConnection = false;
+}
+{
+ [
+ <SYSTEM>
+ {
+ if (!isTemporary){
+ throw SqlUtil.newContextException(getPos(),
+
ParserResource.RESOURCE.dropSystemConnectionOnlySupportTemporary());
+ }
+ isSystemConnection = true;
+ }
+ ]
+
+ <CONNECTION>
+
+ ifExists = IfExistsOpt()
+
+ connectionIdentifier = CompoundIdentifier()
+
+ {
+ return new SqlDropConnection(s.pos(), connectionIdentifier, ifExists,
isTemporary, isSystemConnection);
+ }
+}
+
/**
* CREATE MODEL [IF NOT EXIST] modelName
* [INPUT(col1 type1, col2 type2, ...)]
@@ -3539,6 +3695,54 @@ SqlCreate SqlCreateModel(Span s, boolean isTemporary) :
}
}
+/**
+* CREATE [TEMPORARY] [SYSTEM] CONNECTION [IF NOT EXISTS]
[catalog_name.][db_name.]connection_name
+* [COMMENT connection_comment]
+* WITH (property_key = property_val, ...)
+*/
+SqlCreate SqlCreateConnection(Span s, boolean isTemporary) :
+{
+ final SqlParserPos startPos = s.pos();
+ boolean ifNotExists = false;
+ boolean isSystem = false;
+ SqlIdentifier connectionIdentifier;
+ SqlCharStringLiteral comment = null;
+ SqlNodeList propertyList = SqlNodeList.EMPTY;
+}
+{
+ [
+ <SYSTEM>
+ {
+ if (!isTemporary){
+ throw SqlUtil.newContextException(getPos(),
+
ParserResource.RESOURCE.createSystemConnectionOnlySupportTemporary());
+ }
+ isSystem = true;
+ }
+ ]
+ <CONNECTION>
+
+ ifNotExists = IfNotExistsOpt()
+
+ connectionIdentifier = CompoundIdentifier()
+ [ <COMMENT> <QUOTED_STRING>
+ {
+ comment = Comment();
+ }
+ ]
+ <WITH>
+ propertyList = Properties()
+ {
+ return new SqlCreateConnection(startPos.plus(getPos()),
+ connectionIdentifier,
+ comment,
+ propertyList,
+ isTemporary,
+ isSystem,
+ ifNotExists);
+ }
+}
+
SqlCharStringLiteral Comment() :
{
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
index 8f3a3831aec..2a289d09bd8 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/SqlParseUtils.java
@@ -20,7 +20,6 @@ package org.apache.flink.sql.parser;
import org.apache.flink.sql.parser.ddl.SqlTableOption;
-import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlLiteral;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
@@ -31,6 +30,8 @@ import javax.annotation.Nullable;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
+import java.util.Set;
+import java.util.function.Function;
import java.util.stream.Collectors;
/** Utils methods for parsing DDLs. */
@@ -89,12 +90,19 @@ public class SqlParseUtils {
.collect(Collectors.toMap(k -> k.getKeyString(),
SqlTableOption::getValueString));
}
- public static List<String> extractList(@Nullable SqlNodeList sqlNodeList) {
+ public static List<String> extractList(
+ @Nullable SqlNodeList sqlNodeList, Function<SqlNode, String>
mapper) {
if (sqlNodeList == null) {
return List.of();
}
- return sqlNodeList.getList().stream()
- .map(p -> ((SqlIdentifier) p).getSimple())
- .collect(Collectors.toList());
+ return
sqlNodeList.getList().stream().map(mapper).collect(Collectors.toList());
+ }
+
+ public static Set<String> extractSet(
+ @Nullable SqlNodeList sqlNodeList, Function<SqlNode, String>
mapper) {
+ if (sqlNodeList == null) {
+ return Set.of();
+ }
+ return
sqlNodeList.getList().stream().map(mapper).collect(Collectors.toSet());
}
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java
similarity index 56%
copy from
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
copy to
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java
index 53b6417f7f5..d8c490c547a 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnection.java
@@ -20,37 +20,39 @@ package org.apache.flink.sql.parser.ddl;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
/**
- * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.]
dataBasesName].modelName
- * syntax.
+ * Abstract class to describe statements like ALTER CONNECTION [IF EXISTS]
[[catalogName.]
+ * dataBasesName.]connectionName ...
*/
-public class SqlDropModel extends SqlDropObject {
- private static final SqlOperator OPERATOR =
- new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL);
+public abstract class SqlAlterConnection extends SqlAlterObject {
- private final boolean isTemporary;
+ private static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("ALTER CONNECTION", SqlKind.OTHER_DDL);
- public SqlDropModel(
- SqlParserPos pos, SqlIdentifier modelName, boolean ifExists,
boolean isTemporary) {
- super(OPERATOR, pos, modelName, ifExists);
- this.isTemporary = isTemporary;
+ protected final boolean ifConnectionExists;
+
+ public SqlAlterConnection(
+ SqlParserPos pos, SqlIdentifier connectionName, boolean
ifConnectionExists) {
+ super(OPERATOR, pos, "CONNECTION", connectionName);
+ this.ifConnectionExists = ifConnectionExists;
}
- public boolean getIsTemporary() {
- return this.isTemporary;
+ /**
+ * Whether to ignore the error if the connection doesn't exist.
+ *
+ * @return true when IF EXISTS is specified.
+ */
+ public boolean ifConnectionExists() {
+ return ifConnectionExists;
}
@Override
- public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword("DROP");
- writer.keyword("MODEL");
- if (ifExists) {
+ public void unparseAlterOperation(SqlWriter writer, int leftPrec, int
rightPrec) {
+ if (ifConnectionExists) {
writer.keyword("IF EXISTS");
}
name.unparse(writer, leftPrec, rightPrec);
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java
similarity index 58%
copy from
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
copy to
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java
index e9392daaa1a..ce50651c4c6 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionRename.java
@@ -18,50 +18,46 @@
package org.apache.flink.sql.parser.ddl;
-import org.apache.flink.sql.parser.SqlParseUtils;
-import org.apache.flink.sql.parser.SqlUnparseUtils;
-
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
-import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
/**
- * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName.]modelName RESET (
'key1' [, 'key2']...).
+ * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName
RENAME TO
+ * newConnectionName.
*/
-public class SqlAlterModelReset extends SqlAlterModel {
- private final SqlNodeList optionKeyList;
+public class SqlAlterConnectionRename extends SqlAlterConnection {
+
+ private final SqlIdentifier newConnectionName;
- public SqlAlterModelReset(
+ public SqlAlterConnectionRename(
SqlParserPos pos,
- SqlIdentifier modelName,
- boolean ifModelExists,
- SqlNodeList optionKeyList) {
- super(pos, modelName, ifModelExists);
- this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList
should not be null");
+ SqlIdentifier connectionName,
+ SqlIdentifier newConnectionName,
+ boolean ifConnectionExists) {
+ super(pos, connectionName, ifConnectionExists);
+ this.newConnectionName =
+ requireNonNull(newConnectionName, "newConnectionName should
not be null");
}
- @Override
- public List<SqlNode> getOperandList() {
- return List.of(name, optionKeyList);
+ public SqlIdentifier getNewConnectionName() {
+ return newConnectionName;
}
- public Set<String> getResetKeys() {
- return optionKeyList.getList().stream()
- .map(SqlParseUtils::extractString)
- .collect(Collectors.toSet());
+ @Override
+ public List<SqlNode> getOperandList() {
+ return List.of(name, newConnectionName);
}
@Override
public void unparseAlterOperation(SqlWriter writer, int leftPrec, int
rightPrec) {
super.unparseAlterOperation(writer, leftPrec, rightPrec);
- SqlUnparseUtils.unparseResetOptions(optionKeyList, writer, leftPrec,
rightPrec);
+ writer.keyword("RENAME TO");
+ newConnectionName.unparse(writer, leftPrec, rightPrec);
}
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java
similarity index 79%
copy from
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
copy to
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java
index e9392daaa1a..23501f75529 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionReset.java
@@ -29,22 +29,22 @@ import org.apache.calcite.sql.parser.SqlParserPos;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
/**
- * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName.]modelName RESET (
'key1' [, 'key2']...).
+ * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName
RESET ( 'key1' [,
+ * 'key2']...).
*/
-public class SqlAlterModelReset extends SqlAlterModel {
+public class SqlAlterConnectionReset extends SqlAlterConnection {
private final SqlNodeList optionKeyList;
- public SqlAlterModelReset(
+ public SqlAlterConnectionReset(
SqlParserPos pos,
- SqlIdentifier modelName,
- boolean ifModelExists,
+ SqlIdentifier connectionName,
+ boolean ifConnectionExists,
SqlNodeList optionKeyList) {
- super(pos, modelName, ifModelExists);
+ super(pos, connectionName, ifConnectionExists);
this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList
should not be null");
}
@@ -54,9 +54,7 @@ public class SqlAlterModelReset extends SqlAlterModel {
}
public Set<String> getResetKeys() {
- return optionKeyList.getList().stream()
- .map(SqlParseUtils::extractString)
- .collect(Collectors.toSet());
+ return SqlParseUtils.extractSet(optionKeyList,
SqlParseUtils::extractString);
}
@Override
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java
similarity index 63%
copy from
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
copy to
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java
index e9392daaa1a..c738d9a7530 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterConnectionSet.java
@@ -28,40 +28,40 @@ import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
import java.util.List;
-import java.util.Set;
-import java.util.stream.Collectors;
+import java.util.Map;
import static java.util.Objects.requireNonNull;
/**
- * ALTER MODEL [IF EXISTS] [[catalogName.] dataBasesName.]modelName RESET (
'key1' [, 'key2']...).
+ * ALTER CONNECTION [IF EXISTS] [[catalogName.] dataBasesName.]connectionName
SET ( name=value [,
+ * name=value]*).
*/
-public class SqlAlterModelReset extends SqlAlterModel {
- private final SqlNodeList optionKeyList;
+public class SqlAlterConnectionSet extends SqlAlterConnection {
- public SqlAlterModelReset(
+ private final SqlNodeList connectionOptionList;
+
+ public SqlAlterConnectionSet(
SqlParserPos pos,
- SqlIdentifier modelName,
- boolean ifModelExists,
- SqlNodeList optionKeyList) {
- super(pos, modelName, ifModelExists);
- this.optionKeyList = requireNonNull(optionKeyList, "optionKeyList
should not be null");
+ SqlIdentifier connectionName,
+ boolean ifConnectionExists,
+ SqlNodeList connectionOptionList) {
+ super(pos, connectionName, ifConnectionExists);
+ this.connectionOptionList =
+ requireNonNull(connectionOptionList, "connectionOptionList
should not be null");
}
- @Override
- public List<SqlNode> getOperandList() {
- return List.of(name, optionKeyList);
+ public Map<String, String> getProperties() {
+ return SqlParseUtils.extractMap(connectionOptionList);
}
- public Set<String> getResetKeys() {
- return optionKeyList.getList().stream()
- .map(SqlParseUtils::extractString)
- .collect(Collectors.toSet());
+ @Override
+ public List<SqlNode> getOperandList() {
+ return List.of(name, connectionOptionList);
}
@Override
public void unparseAlterOperation(SqlWriter writer, int leftPrec, int
rightPrec) {
super.unparseAlterOperation(writer, leftPrec, rightPrec);
- SqlUnparseUtils.unparseResetOptions(optionKeyList, writer, leftPrec,
rightPrec);
+ SqlUnparseUtils.unparseSetOptions(connectionOptionList, writer,
leftPrec, rightPrec);
}
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
index e9392daaa1a..00bfbc1079e 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterModelReset.java
@@ -29,7 +29,6 @@ import org.apache.calcite.sql.parser.SqlParserPos;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -54,9 +53,7 @@ public class SqlAlterModelReset extends SqlAlterModel {
}
public Set<String> getResetKeys() {
- return optionKeyList.getList().stream()
- .map(SqlParseUtils::extractString)
- .collect(Collectors.toSet());
+ return SqlParseUtils.extractSet(optionKeyList,
SqlParseUtils::extractString);
}
@Override
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
index a76163b999e..b6581a5f419 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAlterTableReset.java
@@ -30,7 +30,6 @@ import org.apache.calcite.util.ImmutableNullableList;
import java.util.List;
import java.util.Set;
-import java.util.stream.Collectors;
import static java.util.Objects.requireNonNull;
@@ -58,9 +57,7 @@ public class SqlAlterTableReset extends SqlAlterTable {
}
public Set<String> getResetKeys() {
- return propertyKeyList.getList().stream()
- .map(SqlParseUtils::extractString)
- .collect(Collectors.toSet());
+ return SqlParseUtils.extractSet(propertyKeyList,
SqlParseUtils::extractString);
}
@Override
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
index 4ab91890c19..c90c56a7b8c 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlAnalyzeTable.java
@@ -81,7 +81,7 @@ public class SqlAnalyzeTable extends SqlCall {
}
public List<String> getColumnNames() {
- return SqlParseUtils.extractList(columns);
+ return SqlParseUtils.extractList(columns, p -> ((SqlIdentifier)
p).getSimple());
}
public boolean isAllColumns() {
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java
new file mode 100644
index 00000000000..c9936893a14
--- /dev/null
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateConnection.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.ddl;
+
+import org.apache.flink.sql.parser.ExtendedSqlNode;
+import org.apache.flink.sql.parser.SqlUnparseUtils;
+import org.apache.flink.sql.parser.error.SqlValidateException;
+
+import org.apache.calcite.sql.SqlCharStringLiteral;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+import org.apache.calcite.util.ImmutableNullableList;
+
+import javax.annotation.Nonnull;
+
+import java.util.List;
+
+/**
+ * {@link SqlNode} to describe the CREATE CONNECTION syntax. CREATE
[TEMPORARY] [SYSTEM] CONNECTION
+ * [IF NOT EXISTS] [[catalogName.] dataBasesName].connectionName [COMMENT
connection_comment] WITH
+ * (name=value, [name=value]*).
+ */
+public class SqlCreateConnection extends SqlCreateObject implements
ExtendedSqlNode {
+
+ private static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("CREATE CONNECTION", SqlKind.OTHER_DDL);
+
+ private final boolean isSystem;
+
+ public SqlCreateConnection(
+ SqlParserPos pos,
+ SqlIdentifier connectionName,
+ SqlCharStringLiteral comment,
+ SqlNodeList propertyList,
+ boolean isTemporary,
+ boolean isSystem,
+ boolean ifNotExists) {
+ super(
+ OPERATOR,
+ pos,
+ connectionName,
+ isTemporary,
+ false,
+ ifNotExists,
+ propertyList,
+ comment);
+ this.isSystem = isSystem;
+ }
+
+ @Override
+ public @Nonnull List<SqlNode> getOperandList() {
+ return ImmutableNullableList.of(name, comment, properties);
+ }
+
+ public boolean isSystem() {
+ return isSystem;
+ }
+
+ @Override
+ public void validate() throws SqlValidateException {
+ if (properties == null || properties.isEmpty()) {
+ throw new SqlValidateException(
+ getParserPosition(), "Connection property list can not be
empty.");
+ }
+ }
+
+ @Override
+ protected String getScope() {
+ return "CONNECTION";
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("CREATE");
+ if (isTemporary()) {
+ writer.keyword("TEMPORARY");
+ }
+ if (isSystem) {
+ writer.keyword("SYSTEM");
+ }
+ writer.keyword("CONNECTION");
+ if (isIfNotExists()) {
+ writer.keyword("IF NOT EXISTS");
+ }
+ name.unparse(writer, leftPrec, rightPrec);
+ SqlUnparseUtils.unparseComment(comment, true, writer, leftPrec,
rightPrec);
+ SqlUnparseUtils.unparseProperties(properties, writer, leftPrec,
rightPrec);
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
index 628571fdcb4..8039e282968 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateMaterializedTable.java
@@ -119,7 +119,7 @@ public class SqlCreateMaterializedTable extends
SqlCreateObject implements Exten
}
public List<String> getPartitionKeyList() {
- return SqlParseUtils.extractList(partitionKeyList);
+ return SqlParseUtils.extractList(partitionKeyList, p ->
((SqlIdentifier) p).getSimple());
}
@Nullable
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
index 2d6769b2252..8fa0a4c7ca5 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java
@@ -136,7 +136,7 @@ public class SqlCreateTable extends SqlCreateObject
implements ExtendedSqlNode {
}
public List<String> getPartitionKeyList() {
- return SqlParseUtils.extractList(partitionKeyList);
+ return SqlParseUtils.extractList(partitionKeyList, p ->
((SqlIdentifier) p).getSimple());
}
public List<SqlTableConstraint> getTableConstraints() {
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java
similarity index 60%
copy from
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
copy to
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java
index 53b6417f7f5..3af65a4ba11 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropConnection.java
@@ -20,36 +20,51 @@ package org.apache.flink.sql.parser.ddl;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
-import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
/**
- * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.]
dataBasesName].modelName
- * syntax.
+ * {@link org.apache.calcite.sql.SqlNode} to describe the DROP CONNECTION [IF
EXISTS]
+ * [[catalogName.] dataBasesName].connectionName syntax.
*/
-public class SqlDropModel extends SqlDropObject {
+public class SqlDropConnection extends SqlDropObject {
private static final SqlOperator OPERATOR =
- new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL);
+ new SqlSpecialOperator("DROP CONNECTION", SqlKind.OTHER_DDL);
private final boolean isTemporary;
+ private final boolean isSystemConnection;
- public SqlDropModel(
- SqlParserPos pos, SqlIdentifier modelName, boolean ifExists,
boolean isTemporary) {
- super(OPERATOR, pos, modelName, ifExists);
+ public SqlDropConnection(
+ SqlParserPos pos,
+ SqlIdentifier connectionName,
+ boolean ifExists,
+ boolean isTemporary,
+ boolean isSystemConnection) {
+ super(OPERATOR, pos, connectionName, ifExists);
this.isTemporary = isTemporary;
+ this.isSystemConnection = isSystemConnection;
}
- public boolean getIsTemporary() {
- return this.isTemporary;
+ public boolean isTemporary() {
+ return isTemporary;
+ }
+
+ public boolean isSystemConnection() {
+ return isSystemConnection;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("DROP");
- writer.keyword("MODEL");
+ if (isTemporary) {
+ writer.keyword("TEMPORARY");
+ }
+ if (isSystemConnection) {
+ writer.keyword("SYSTEM");
+ }
+ writer.keyword("CONNECTION");
if (ifExists) {
writer.keyword("IF EXISTS");
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
index 53b6417f7f5..8aeb082a54f 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
@@ -42,13 +42,16 @@ public class SqlDropModel extends SqlDropObject {
this.isTemporary = isTemporary;
}
- public boolean getIsTemporary() {
- return this.isTemporary;
+ public boolean isTemporary() {
+ return isTemporary;
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
writer.keyword("DROP");
+ if (isTemporary) {
+ writer.keyword("TEMPORARY");
+ }
writer.keyword("MODEL");
if (ifExists) {
writer.keyword("IF EXISTS");
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java
new file mode 100644
index 00000000000..9fb9bb9b79c
--- /dev/null
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlRichDescribeConnection.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.sql.parser.dql;
+
+import org.apache.calcite.sql.SqlCall;
+import org.apache.calcite.sql.SqlIdentifier;
+import org.apache.calcite.sql.SqlKind;
+import org.apache.calcite.sql.SqlNode;
+import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
+import org.apache.calcite.sql.SqlWriter;
+import org.apache.calcite.sql.parser.SqlParserPos;
+
+import java.util.Collections;
+import java.util.List;
+
+/**
+ * DESCRIBE CONNECTION [EXTENDED] [[catalogName.] dataBasesName].sqlIdentifier
sql call. Here we add
+ * Rich in className to follow the convention of {@link
org.apache.calcite.sql.SqlDescribeTable},
+ * which only had it to distinguish from calcite's original SqlDescribeTable,
even though calcite
+ * does not have SqlDescribeConnection.
+ */
+public class SqlRichDescribeConnection extends SqlCall {
+
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("DESCRIBE CONNECTION", SqlKind.OTHER);
+ protected final SqlIdentifier connectionNameIdentifier;
+ private final boolean isExtended;
+
+ public SqlRichDescribeConnection(
+ SqlParserPos pos, SqlIdentifier connectionNameIdentifier, boolean
isExtended) {
+ super(pos);
+ this.connectionNameIdentifier = connectionNameIdentifier;
+ this.isExtended = isExtended;
+ }
+
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
+ }
+
+ @Override
+ public List<SqlNode> getOperandList() {
+ return Collections.singletonList(connectionNameIdentifier);
+ }
+
+ public boolean isExtended() {
+ return isExtended;
+ }
+
+ public String[] fullConnectionName() {
+ return connectionNameIdentifier.names.toArray(new String[0]);
+ }
+
+ @Override
+ public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
+ writer.keyword("DESCRIBE CONNECTION");
+ if (isExtended) {
+ writer.keyword("EXTENDED");
+ }
+ connectionNameIdentifier.unparse(writer, leftPrec, rightPrec);
+ }
+}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java
similarity index 53%
copy from
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
copy to
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java
index 53b6417f7f5..a0dd89f99e6 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowConnections.java
@@ -16,43 +16,45 @@
* limitations under the License.
*/
-package org.apache.flink.sql.parser.ddl;
+package org.apache.flink.sql.parser.dql;
+import org.apache.calcite.sql.SqlCharStringLiteral;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlOperator;
import org.apache.calcite.sql.SqlSpecialOperator;
-import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
-/**
- * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.]
dataBasesName].modelName
- * syntax.
- */
-public class SqlDropModel extends SqlDropObject {
- private static final SqlOperator OPERATOR =
- new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL);
+/** {@link SqlNode} to describe the SHOW CONNECTIONS syntax. */
+public class SqlShowConnections extends SqlShowCall {
- private final boolean isTemporary;
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("SHOW CONNECTIONS", SqlKind.OTHER);
- public SqlDropModel(
- SqlParserPos pos, SqlIdentifier modelName, boolean ifExists,
boolean isTemporary) {
- super(OPERATOR, pos, modelName, ifExists);
- this.isTemporary = isTemporary;
+ public SqlShowConnections(
+ SqlParserPos pos,
+ String preposition,
+ SqlIdentifier databaseName,
+ boolean notLike,
+ SqlCharStringLiteral likeLiteral) {
+ // only LIKE currently supported for SHOW CONNECTIONS
+ super(
+ pos,
+ preposition,
+ databaseName,
+ likeLiteral == null ? null : "LIKE",
+ likeLiteral,
+ notLike);
}
- public boolean getIsTemporary() {
- return this.isTemporary;
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
}
@Override
- public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword("DROP");
- writer.keyword("MODEL");
- if (ifExists) {
- writer.keyword("IF EXISTS");
- }
- name.unparse(writer, leftPrec, rightPrec);
+ String getOperationName() {
+ return "SHOW CONNECTIONS";
}
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java
similarity index 59%
copy from
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
copy to
flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java
index 53b6417f7f5..0434bbb6dc1 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlDropModel.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/dql/SqlShowCreateConnection.java
@@ -16,7 +16,7 @@
* limitations under the License.
*/
-package org.apache.flink.sql.parser.ddl;
+package org.apache.flink.sql.parser.dql;
import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlKind;
@@ -26,33 +26,36 @@ import org.apache.calcite.sql.SqlSpecialOperator;
import org.apache.calcite.sql.SqlWriter;
import org.apache.calcite.sql.parser.SqlParserPos;
-/**
- * {@link SqlNode} to describe the DROP MODEL [IF EXISTS] [[catalogName.]
dataBasesName].modelName
- * syntax.
- */
-public class SqlDropModel extends SqlDropObject {
- private static final SqlOperator OPERATOR =
- new SqlSpecialOperator("DROP MODEL", SqlKind.OTHER_DDL);
+import java.util.Collections;
+import java.util.List;
+
+/** SHOW CREATE CONNECTION sql call. */
+public class SqlShowCreateConnection extends SqlShowCreate {
+
+ public static final SqlSpecialOperator OPERATOR =
+ new SqlSpecialOperator("SHOW CREATE CONNECTION",
SqlKind.OTHER_DDL);
+
+ public SqlShowCreateConnection(SqlParserPos pos, SqlIdentifier
connectionName) {
+ super(pos, connectionName);
+ }
- private final boolean isTemporary;
+ public SqlIdentifier getConnectionName() {
+ return sqlIdentifier;
+ }
- public SqlDropModel(
- SqlParserPos pos, SqlIdentifier modelName, boolean ifExists,
boolean isTemporary) {
- super(OPERATOR, pos, modelName, ifExists);
- this.isTemporary = isTemporary;
+ @Override
+ public SqlOperator getOperator() {
+ return OPERATOR;
}
- public boolean getIsTemporary() {
- return this.isTemporary;
+ @Override
+ public List<SqlNode> getOperandList() {
+ return Collections.singletonList(sqlIdentifier);
}
@Override
public void unparse(SqlWriter writer, int leftPrec, int rightPrec) {
- writer.keyword("DROP");
- writer.keyword("MODEL");
- if (ifExists) {
- writer.keyword("IF EXISTS");
- }
- name.unparse(writer, leftPrec, rightPrec);
+ writer.keyword("SHOW CREATE CONNECTION");
+ sqlIdentifier.unparse(writer, leftPrec, rightPrec);
}
}
diff --git
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
index 9c29119ba9e..57e2e55c17d 100644
---
a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
+++
b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/utils/ParserResource.java
@@ -35,7 +35,15 @@ public interface ParserResource {
Resources.ExInst<ParseException> overwriteIsOnlyUsedWithInsert();
@Resources.BaseMessage(
- "CREATE SYSTEM FUNCTION is not supported, system functions can
only be registered as temporary function, you can use CREATE TEMPORARY SYSTEM
FUNCTION instead.")
+ "CREATE SYSTEM CONNECTION is not supported, system connections can
only be registered as temporary connections, you can use CREATE TEMPORARY
SYSTEM CONNECTION instead.")
+ Resources.ExInst<ParseException>
createSystemConnectionOnlySupportTemporary();
+
+ @Resources.BaseMessage(
+ "DROP SYSTEM CONNECTION is not supported, system connections can
only be dropped as temporary connections, you can use DROP TEMPORARY SYSTEM
CONNECTION instead.")
+ Resources.ExInst<ParseException>
dropSystemConnectionOnlySupportTemporary();
+
+ @Resources.BaseMessage(
+ "CREATE SYSTEM FUNCTION is not supported, system functions can
only be registered as temporary functions, you can use CREATE TEMPORARY SYSTEM
FUNCTION instead.")
Resources.ExInst<ParseException>
createSystemFunctionOnlySupportTemporary();
@Resources.BaseMessage("Duplicate EXPLAIN DETAIL is not allowed.")
diff --git
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
index 8a6b3abe8a6..64ea1fa26e7 100644
---
a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
+++
b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java
@@ -2479,7 +2479,7 @@ class FlinkSqlParserImplTest extends SqlParserTest {
.fails(
"CREATE SYSTEM FUNCTION is not supported, "
+ "system functions can only be registered as
temporary "
- + "function, you can use CREATE TEMPORARY
SYSTEM FUNCTION instead.");
+ + "functions, you can use CREATE TEMPORARY
SYSTEM FUNCTION instead.");
// test create function using jar
sql("create temporary function function1 as
'org.apache.flink.function.function1' language java using jar
'file:///path/to/test.jar'")
@@ -3262,6 +3262,12 @@ class FlinkSqlParserImplTest extends SqlParserTest {
sql("drop model catalog1.db1.m1").ok("DROP MODEL
`CATALOG1`.`DB1`.`M1`");
}
+ @Test
+ void testDropTemporaryModel() {
+ sql("drop temporary model m1").ok("DROP TEMPORARY MODEL `M1`");
+ sql("drop temporary model if exists m1").ok("DROP TEMPORARY MODEL IF
EXISTS `M1`");
+ }
+
@Test
void testDropModelIfExists() {
sql("drop model if exists catalog1.db1.m1")
@@ -3480,6 +3486,254 @@ class FlinkSqlParserImplTest extends SqlParserTest {
+ "FROM TABLE(`ML_PREDICT`(`INPUT` => (TABLE
`MY_TABLE`), `MODEL` => (MODEL `MY_MODEL`)))");
}
+ //
=====================================================================================
+ // Connection DDL/DQL Tests
+ //
=====================================================================================
+
+ @Test
+ void testCreateConnection() {
+ sql("create connection conn1\n"
+ + " COMMENT 'connection_comment'\n"
+ + " WITH (\n"
+ + " 'type'='basic',\n"
+ + " 'url'='http://example.com',\n"
+ + " 'username'='user1',\n"
+ + " 'password'='pass1'\n"
+ + " )\n")
+ .ok(
+ "CREATE CONNECTION `CONN1`\n"
+ + "COMMENT 'connection_comment'\n"
+ + "WITH (\n"
+ + " 'type' = 'basic',\n"
+ + " 'url' = 'http://example.com',\n"
+ + " 'username' = 'user1',\n"
+ + " 'password' = 'pass1'\n"
+ + ")");
+ }
+
+ @Test
+ void testCreateConnectionIfNotExists() {
+ sql("create connection if not exists conn1\n"
+ + " WITH (\n"
+ + " 'type'='bearer',\n"
+ + " 'token'='my_token'\n"
+ + " )\n")
+ .ok(
+ "CREATE CONNECTION IF NOT EXISTS `CONN1`\n"
+ + "WITH (\n"
+ + " 'type' = 'bearer',\n"
+ + " 'token' = 'my_token'\n"
+ + ")");
+ }
+
+ @Test
+ void testCreateTemporaryConnection() {
+ sql("create temporary connection conn1\n"
+ + " WITH (\n"
+ + " 'type'='oauth',\n"
+ + " 'client_id'='client1'\n"
+ + " )\n")
+ .ok(
+ "CREATE TEMPORARY CONNECTION `CONN1`\n"
+ + "WITH (\n"
+ + " 'type' = 'oauth',\n"
+ + " 'client_id' = 'client1'\n"
+ + ")");
+ }
+
+ @Test
+ void testCreateSystemConnection() {
+ sql("create ^system^ connection conn1\n"
+ + " WITH (\n"
+ + " 'type'='basic',\n"
+ + " 'url'='http://example.com'\n"
+ + " )\n")
+ .fails(
+ "(?s)CREATE SYSTEM CONNECTION is not supported, "
+ + "system connections can only be registered
as temporary "
+ + "connections, you can use CREATE TEMPORARY
SYSTEM CONNECTION "
+ + "instead\\..*");
+ }
+
+ @Test
+ void testCreateTemporarySystemConnection() {
+ sql("create temporary system connection conn1\n"
+ + " WITH (\n"
+ + " 'type'='custom_type',\n"
+ + " 'api_key'='key123'\n"
+ + " )\n")
+ .ok(
+ "CREATE TEMPORARY SYSTEM CONNECTION `CONN1`\n"
+ + "WITH (\n"
+ + " 'type' = 'custom_type',\n"
+ + " 'api_key' = 'key123'\n"
+ + ")");
+ }
+
+ @Test
+ void testCreateConnectionWithQualifiedName() {
+ sql("create connection catalog1.db1.conn1\n"
+ + " WITH ('type'='basic',
'url'='http://example.com')\n")
+ .ok(
+ "CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1`\n"
+ + "WITH (\n"
+ + " 'type' = 'basic',\n"
+ + " 'url' = 'http://example.com'\n"
+ + ")");
+ }
+
+ @Test
+ void testDropConnection() {
+ sql("drop connection conn1").ok("DROP CONNECTION `CONN1`");
+ sql("drop connection db1.conn1").ok("DROP CONNECTION `DB1`.`CONN1`");
+ sql("drop connection catalog1.db1.conn1").ok("DROP CONNECTION
`CATALOG1`.`DB1`.`CONN1`");
+ }
+
+ @Test
+ void testDropConnectionIfExists() {
+ sql("drop connection if exists catalog1.db1.conn1")
+ .ok("DROP CONNECTION IF EXISTS `CATALOG1`.`DB1`.`CONN1`");
+ }
+
+ @Test
+ void testDropTemporaryConnection() {
+ sql("drop temporary connection conn1").ok("DROP TEMPORARY CONNECTION
`CONN1`");
+ sql("drop temporary connection if exists conn1")
+ .ok("DROP TEMPORARY CONNECTION IF EXISTS `CONN1`");
+ }
+
+ @Test
+ void testDropTemporarySystemConnection() {
+ sql("drop temporary system connection conn1")
+ .ok("DROP TEMPORARY SYSTEM CONNECTION `CONN1`");
+ sql("drop temporary system connection if exists conn1")
+ .ok("DROP TEMPORARY SYSTEM CONNECTION IF EXISTS `CONN1`");
+ }
+
+ @Test
+ void testDropSystemConnection() {
+ sql("drop ^system^ connection conn1")
+ .fails(
+ "(?s)DROP SYSTEM CONNECTION is not supported, "
+ + "system connections can only be dropped as
temporary "
+ + "connections, you can use DROP TEMPORARY
SYSTEM CONNECTION "
+ + "instead\\..*");
+ }
+
+ @Test
+ void testAlterConnectionSet() {
+ final String sql =
+ "alter connection conn1 set ('password' = 'new_password','url'
= 'http://new.com')";
+ final String expected =
+ "ALTER CONNECTION `CONN1` SET (\n"
+ + " 'password' = 'new_password',\n"
+ + " 'url' = 'http://new.com'\n"
+ + ")";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testAlterConnectionSetWithQualifiedName() {
+ final String sql = "alter connection catalog1.db1.conn1 set ('token' =
'new_token')";
+ final String expected =
+ "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1` SET (\n"
+ + " 'token' = 'new_token'\n"
+ + ")";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testAlterConnectionRename() {
+ final String sql = "alter connection conn1 rename to conn2";
+ final String expected = "ALTER CONNECTION `CONN1` RENAME TO `CONN2`";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testAlterConnectionRenameWithQualifiedName() {
+ final String sql = "alter connection catalog1.db1.conn1 rename to
conn2";
+ final String expected = "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1`
RENAME TO `CONN2`";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testAlterConnectionReset() {
+ final String sql = "alter connection conn1 reset ('password', 'url')";
+ final String expected = "ALTER CONNECTION `CONN1` RESET (\n
'password',\n 'url'\n)";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testAlterConnectionResetWithQualifiedName() {
+ final String sql = "alter connection catalog1.db1.conn1 reset
('token')";
+ final String expected = "ALTER CONNECTION `CATALOG1`.`DB1`.`CONN1`
RESET (\n 'token'\n)";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testAlterConnectionIfExists() {
+ final String sql =
+ "alter connection if exists conn1 set ('password' =
'new_password','url' = 'http://new.com')";
+ final String expected =
+ "ALTER CONNECTION IF EXISTS `CONN1` SET (\n"
+ + " 'password' = 'new_password',\n"
+ + " 'url' = 'http://new.com'\n"
+ + ")";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testAlterConnectionRenameIfExists() {
+ final String sql = "alter connection if exists conn1 rename to conn2";
+ final String expected = "ALTER CONNECTION IF EXISTS `CONN1` RENAME TO
`CONN2`";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testAlterConnectionResetIfExists() {
+ final String sql = "alter connection if exists conn1 reset
('password', 'url')";
+ final String expected =
+ "ALTER CONNECTION IF EXISTS `CONN1` RESET (\n 'password',\n
'url'\n)";
+ sql(sql).ok(expected);
+ }
+
+ @Test
+ void testShowConnections() {
+ sql("show connections").ok("SHOW CONNECTIONS");
+ sql("show connections from db1").ok("SHOW CONNECTIONS FROM `DB1`");
+ sql("show connections from catalog1.db1").ok("SHOW CONNECTIONS FROM
`CATALOG1`.`DB1`");
+ sql("show connections in db1").ok("SHOW CONNECTIONS IN `DB1`");
+ sql("show connections in catalog1.db1").ok("SHOW CONNECTIONS IN
`CATALOG1`.`DB1`");
+ }
+
+ @Test
+ void testShowConnectionsLike() {
+ sql("show connections like '%conn%'").ok("SHOW CONNECTIONS LIKE
'%CONN%'");
+ sql("show connections from db1 like 'my_%'").ok("SHOW CONNECTIONS FROM
`DB1` LIKE 'MY_%'");
+ sql("show connections not like 'temp_%'").ok("SHOW CONNECTIONS NOT
LIKE 'TEMP_%'");
+ }
+
+ @Test
+ void testShowCreateConnection() {
+ sql("show create connection conn1").ok("SHOW CREATE CONNECTION
`CONN1`");
+ sql("show create connection catalog1.db1.conn1")
+ .ok("SHOW CREATE CONNECTION `CATALOG1`.`DB1`.`CONN1`");
+ }
+
+ @Test
+ void testDescribeConnection() {
+ sql("describe connection conn1").ok("DESCRIBE CONNECTION `CONN1`");
+ sql("describe connection catalog1.db1.conn1")
+ .ok("DESCRIBE CONNECTION `CATALOG1`.`DB1`.`CONN1`");
+ sql("describe connection extended conn1").ok("DESCRIBE CONNECTION
EXTENDED `CONN1`");
+
+ sql("desc connection conn1").ok("DESCRIBE CONNECTION `CONN1`");
+ sql("desc connection catalog1.db1.conn1")
+ .ok("DESCRIBE CONNECTION `CATALOG1`.`DB1`.`CONN1`");
+ sql("desc connection extended catalog1.db1.conn1")
+ .ok("DESCRIBE CONNECTION EXTENDED `CATALOG1`.`DB1`.`CONN1`");
+ }
+
/*
* This test was backported from Calcite 1.38 (CALCITE-6266).
* Remove it together with upgrade to Calcite 1.38.
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java
index 90a45c5a425..4e0d85c6dbc 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/converters/SqlDropModelConverter.java
@@ -35,6 +35,6 @@ public class SqlDropModelConverter implements
SqlNodeConverter<SqlDropModel> {
context.getCatalogManager().qualifyIdentifier(unresolvedIdentifier);
return new DropModelOperation(
- identifier, sqlDropModel.getIfExists(),
sqlDropModel.getIsTemporary());
+ identifier, sqlDropModel.getIfExists(),
sqlDropModel.isTemporary());
}
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
index b9992b43dd8..ff25ff6b0e0 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/OperationConverterUtils.java
@@ -24,6 +24,7 @@ import org.apache.flink.table.catalog.TableDistribution;
import org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
import org.apache.calcite.sql.SqlDialect;
+import org.apache.calcite.sql.SqlIdentifier;
import org.apache.calcite.sql.SqlNode;
import org.apache.calcite.sql.SqlNodeList;
import org.apache.calcite.sql.SqlNumericLiteral;
@@ -52,7 +53,8 @@ public class OperationConverterUtils {
}
SqlNodeList columns = distribution.getBucketColumns();
- List<String> bucketColumns = SqlParseUtils.extractList(columns);
+ List<String> bucketColumns =
+ SqlParseUtils.extractList(columns, p -> ((SqlIdentifier)
p).getSimple());
return TableDistribution.of(kind, bucketCount, bucketColumns);
}