This is an automated email from the ASF dual-hosted git repository. jark pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 78499853bff8eccc2b174a883a7cfdc4125eb499 Author: yuzhao.cyz <[email protected]> AuthorDate: Tue Mar 17 19:00:55 2020 +0800 [FLINK-14338][sql-parser] Bump sql parser Calcite dependency to 1.22.0 * Add Junit5 supports for tests * Move classes under package calcite.sql to flink.sql.parser.type * Remove ExtendedSqlBasicTypeNameSpec, use SqlAlienSystemTypeNameSpec instead * In Parser.tdd, re-organize the imports order to alphabetical order, remove the useless tailing commas, extends nonReservedKeywordsToAdd * Fix the JavaCC compile warnings --- .../src/main/codegen/data/Parser.tdd | 201 ++++++----- .../src/main/codegen/includes/parserImpls.ftl | 19 +- .../java/org/apache/calcite/sql/package-info.java | 27 -- .../flink/sql/parser/ddl/SqlCreateTable.java | 18 +- .../parser/type/ExtendedSqlBasicTypeNameSpec.java | 57 --- .../parser/type}/ExtendedSqlRowTypeNameSpec.java | 8 +- .../sql/parser/type}/SqlMapTypeNameSpec.java | 6 +- .../sql/parser/validate/FlinkSqlConformance.java | 10 + .../flink/sql/parser/FlinkDDLDataTypeTest.java | 4 +- .../flink/sql/parser/FlinkSqlParserImplTest.java | 392 +++++++++++---------- 10 files changed, 365 insertions(+), 377 deletions(-) diff --git a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd index 1783a5e..14158c7 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd +++ b/flink-table/flink-sql-parser/src/main/codegen/data/Parser.tdd @@ -21,69 +21,70 @@ # List of additional classes and packages to import. # Example. "org.apache.calcite.sql.*", "java.util.List". + # Please keep the import classes in alphabetical order if new class is added. imports: [ - "org.apache.calcite.sql.ExtendedSqlRowTypeNameSpec", - "org.apache.calcite.sql.SqlMapTypeNameSpec", - "org.apache.flink.sql.parser.ddl.SqlCreateTable", + "org.apache.flink.sql.parser.ddl.SqlAlterDatabase" + "org.apache.flink.sql.parser.ddl.SqlAlterFunction" + "org.apache.flink.sql.parser.ddl.SqlAlterTable" + "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties" + "org.apache.flink.sql.parser.ddl.SqlAlterTableRename" + "org.apache.flink.sql.parser.ddl.SqlCreateCatalog" + "org.apache.flink.sql.parser.ddl.SqlCreateDatabase" + "org.apache.flink.sql.parser.ddl.SqlCreateFunction" + "org.apache.flink.sql.parser.ddl.SqlCreateTable" + "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext" + "org.apache.flink.sql.parser.ddl.SqlCreateView" + "org.apache.flink.sql.parser.ddl.SqlDropDatabase" + "org.apache.flink.sql.parser.ddl.SqlDropFunction" "org.apache.flink.sql.parser.ddl.SqlDropTable" - "org.apache.flink.sql.parser.ddl.SqlCreateTable.TableCreationContext", - "org.apache.flink.sql.parser.ddl.SqlCreateView", - "org.apache.flink.sql.parser.ddl.SqlDropView", - "org.apache.flink.sql.parser.ddl.SqlTableColumn", - "org.apache.flink.sql.parser.ddl.SqlTableOption", - "org.apache.flink.sql.parser.ddl.SqlWatermark", - "org.apache.flink.sql.parser.ddl.SqlUseCatalog", - "org.apache.flink.sql.parser.ddl.SqlUseDatabase", - "org.apache.flink.sql.parser.ddl.SqlCreateDatabase", - "org.apache.flink.sql.parser.ddl.SqlDropDatabase", - "org.apache.flink.sql.parser.ddl.SqlAlterDatabase", - "org.apache.flink.sql.parser.ddl.SqlAlterFunction", - "org.apache.flink.sql.parser.ddl.SqlCreateFunction", - "org.apache.flink.sql.parser.ddl.SqlDropFunction", - "org.apache.flink.sql.parser.ddl.SqlAlterTable", - "org.apache.flink.sql.parser.ddl.SqlAlterTableRename", - "org.apache.flink.sql.parser.ddl.SqlAlterTableProperties", - "org.apache.flink.sql.parser.ddl.SqlCreateCatalog", - "org.apache.flink.sql.parser.dml.RichSqlInsert", - "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword", - "org.apache.flink.sql.parser.dql.SqlShowCatalogs", - "org.apache.flink.sql.parser.dql.SqlDescribeCatalog", - "org.apache.flink.sql.parser.dql.SqlShowDatabases", - "org.apache.flink.sql.parser.dql.SqlShowFunctions", - "org.apache.flink.sql.parser.dql.SqlDescribeDatabase", - "org.apache.flink.sql.parser.dql.SqlShowTables", - "org.apache.flink.sql.parser.dql.SqlRichDescribeTable", - "org.apache.flink.sql.parser.type.ExtendedSqlBasicTypeNameSpec", - "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec", - "org.apache.flink.sql.parser.utils.SqlTimeUnit", - "org.apache.flink.sql.parser.utils.ParserResource", - "org.apache.flink.sql.parser.validate.FlinkSqlConformance", - "org.apache.flink.sql.parser.SqlProperty", - "org.apache.calcite.sql.SqlDrop", - "org.apache.calcite.sql.SqlCreate", - "java.util.List", + "org.apache.flink.sql.parser.ddl.SqlDropView" + "org.apache.flink.sql.parser.ddl.SqlTableColumn" + "org.apache.flink.sql.parser.ddl.SqlTableOption" + "org.apache.flink.sql.parser.ddl.SqlUseCatalog" + "org.apache.flink.sql.parser.ddl.SqlUseDatabase" + "org.apache.flink.sql.parser.ddl.SqlWatermark" + "org.apache.flink.sql.parser.dml.RichSqlInsert" + "org.apache.flink.sql.parser.dml.RichSqlInsertKeyword" + "org.apache.flink.sql.parser.dql.SqlDescribeCatalog" + "org.apache.flink.sql.parser.dql.SqlDescribeDatabase" + "org.apache.flink.sql.parser.dql.SqlShowCatalogs" + "org.apache.flink.sql.parser.dql.SqlShowDatabases" + "org.apache.flink.sql.parser.dql.SqlShowFunctions" + "org.apache.flink.sql.parser.dql.SqlShowTables" + "org.apache.flink.sql.parser.dql.SqlRichDescribeTable" + "org.apache.flink.sql.parser.type.ExtendedSqlCollectionTypeNameSpec" + "org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec" + "org.apache.flink.sql.parser.type.SqlMapTypeNameSpec" + "org.apache.flink.sql.parser.utils.SqlTimeUnit" + "org.apache.flink.sql.parser.utils.ParserResource" + "org.apache.flink.sql.parser.validate.FlinkSqlConformance" + "org.apache.flink.sql.parser.SqlProperty" + "org.apache.calcite.sql.SqlAlienSystemTypeNameSpec" + "org.apache.calcite.sql.SqlCreate" + "org.apache.calcite.sql.SqlDrop" + "java.util.List" "java.util.ArrayList" ] # List of new keywords. Example: "DATABASES", "TABLES". If the keyword is not a reserved # keyword, please also add it to 'nonReservedKeywords' section. keywords: [ - "COMMENT", - "PARTITIONED", - "IF", - "WATERMARK", - "OVERWRITE", - "STRING", - "BYTES", - "RAW", - "CATALOGS", - "USE", - "DATABASES", - "FUNCTIONS", - "EXTENDED", - "SCALA", - "TABLES", + "BYTES" + "CATALOGS" + "COMMENT" + "DATABASES" + "EXTENDED" + "FUNCTIONS" + "IF" + "OVERWRITE" + "PARTITIONED" + "RAW" "RENAME" + "SCALA" + "STRING" + "TABLES" + "USE" + "WATERMARK" ] # List of keywords from "keywords" section that are not reserved. @@ -109,15 +110,14 @@ "C" "CASCADE" "CATALOG" - "CATALOGS" "CATALOG_NAME" "CENTURY" "CHAIN" + "CHARACTERISTICS" + "CHARACTERS" "CHARACTER_SET_CATALOG" "CHARACTER_SET_NAME" "CHARACTER_SET_SCHEMA" - "CHARACTERISTICS" - "CHARACTERS" "CLASS_ORIGIN" "COBOL" "COLLATION" @@ -128,22 +128,22 @@ "COMMAND_FUNCTION" "COMMAND_FUNCTION_CODE" "COMMITTED" - "CONDITION_NUMBER" "CONDITIONAL" + "CONDITION_NUMBER" "CONNECTION" "CONNECTION_NAME" "CONSTRAINT_CATALOG" "CONSTRAINT_NAME" - "CONSTRAINT_SCHEMA" "CONSTRAINTS" + "CONSTRAINT_SCHEMA" "CONSTRUCTOR" "CONTINUE" "CURSOR_NAME" "DATA" "DATABASE" - "DATABASES" "DATETIME_INTERVAL_CODE" "DATETIME_INTERVAL_PRECISION" + "DAYS" "DECADE" "DEFAULTS" "DEFERRABLE" @@ -169,7 +169,6 @@ "EXCEPTION" "EXCLUDE" "EXCLUDING" - "EXTENDED" "FINAL" "FIRST" "FOLLOWING" @@ -177,7 +176,6 @@ "FORTRAN" "FOUND" "FRAC_SECOND" - "FUNCTIONS" "G" "GENERAL" "GENERATED" @@ -186,6 +184,7 @@ "GOTO" "GRANTED" "HIERARCHY" + "HOURS" "IGNORE" "IMMEDIATE" "IMMEDIATELY" @@ -198,8 +197,8 @@ "INSTANTIABLE" "INVOKER" "ISODOW" - "ISOYEAR" "ISOLATION" + "ISOYEAR" "JAVA" "JSON" "K" @@ -213,15 +212,18 @@ "LIBRARY" "LOCATOR" "M" + "MAP" "MATCHED" "MAXVALUE" - "MICROSECOND" "MESSAGE_LENGTH" "MESSAGE_OCTET_LENGTH" "MESSAGE_TEXT" - "MILLISECOND" + "MICROSECOND" "MILLENNIUM" + "MILLISECOND" + "MINUTES" "MINVALUE" + "MONTHS" "MORE_" "MUMPS" "NAME" @@ -265,7 +267,6 @@ "QUARTER" "READ" "RELATIVE" - "RENAME" "REPEATABLE" "REPLACE" "RESPECT" @@ -289,6 +290,7 @@ "SCOPE_CATALOGS" "SCOPE_NAME" "SCOPE_SCHEMA" + "SECONDS" "SECTION" "SECURITY" "SELF" @@ -329,8 +331,8 @@ "SQL_INTERVAL_YEAR" "SQL_INTERVAL_YEAR_TO_MONTH" "SQL_LONGVARBINARY" - "SQL_LONGVARNCHAR" "SQL_LONGVARCHAR" + "SQL_LONGVARNCHAR" "SQL_NCHAR" "SQL_NCLOB" "SQL_NUMERIC" @@ -358,7 +360,6 @@ "STYLE" "SUBCLASS_ORIGIN" "SUBSTITUTE" - "TABLES" "TABLE_NAME" "TEMPORARY" "TIES" @@ -374,6 +375,7 @@ "TRIGGER_CATALOG" "TRIGGER_NAME" "TRIGGER_SCHEMA" + "TUMBLE" "TYPE" "UNBOUNDED" "UNCOMMITTED" @@ -381,45 +383,54 @@ "UNDER" "UNNAMED" "USAGE" - "USE" "USER_DEFINED_TYPE_CATALOG" "USER_DEFINED_TYPE_CODE" "USER_DEFINED_TYPE_NAME" "USER_DEFINED_TYPE_SCHEMA" - "UTF8" "UTF16" "UTF32" + "UTF8" "VERSION" "VIEW" "WEEK" - "WRAPPER" "WORK" + "WRAPPER" "WRITE" "XML" - "ZONE", + "YEARS" + "ZONE" + ] + # List of non-reserved keywords to add; + # items in this list become non-reserved + nonReservedKeywordsToAdd: [ # not in core, added in Flink - "PARTITIONED", - "IF", + "IF" "OVERWRITE" + "PARTITIONED" + ] + + # List of non-reserved keywords to remove; + # items in this list become reserved + nonReservedKeywordsToRemove: [ ] # List of methods for parsing custom SQL statements. # Return type of method implementation should be 'SqlNode'. # Example: SqlShowDatabases(), SqlShowTables(). statementParserMethods: [ - "RichSqlInsert()", - "SqlShowCatalogs()", - "SqlDescribeCatalog()", - "SqlUseCatalog()", - "SqlShowDatabases()", - "SqlUseDatabase()", - "SqlAlterDatabase()", - "SqlDescribeDatabase()", - "SqlAlterFunction()", - "SqlShowFunctions()", - "SqlShowTables()", - "SqlRichDescribeTable()", + "RichSqlInsert()" + "SqlShowCatalogs()" + "SqlDescribeCatalog()" + "SqlUseCatalog()" + "SqlShowDatabases()" + "SqlUseDatabase()" + "SqlAlterDatabase()" + "SqlDescribeDatabase()" + "SqlAlterFunction()" + "SqlShowFunctions()" + "SqlShowTables()" + "SqlRichDescribeTable()" "SqlAlterTable()" ] @@ -433,9 +444,9 @@ # Return type of method implementation should be "SqlTypeNameSpec". # Example: SqlParseTimeStampZ(). dataTypeParserMethods: [ - "ExtendedSqlBasicTypeName()", - "CustomizedCollectionsTypeName()", - "SqlMapTypeName()", + "ExtendedSqlBasicTypeName()" + "CustomizedCollectionsTypeName()" + "SqlMapTypeName()" "ExtendedSqlRowTypeName()" ] @@ -454,18 +465,18 @@ # List of methods for parsing extensions to "CREATE [OR REPLACE]" calls. # Each must accept arguments "(SqlParserPos pos, boolean replace)". createStatementParserMethods: [ - "SqlCreateCatalog", - "SqlCreateTable", - "SqlCreateView", - "SqlCreateDatabase", + "SqlCreateCatalog" + "SqlCreateTable" + "SqlCreateView" + "SqlCreateDatabase" "SqlCreateFunction" ] # List of methods for parsing extensions to "DROP" calls. # Each must accept arguments "(Span s)". dropStatementParserMethods: [ - "SqlDropTable", - "SqlDropView", + "SqlDropTable" + "SqlDropView" "SqlDropDatabase" "SqlDropFunction" ] diff --git a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl index a93ef66..96a25a4 100644 --- a/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl +++ b/flink-table/flink-sql-parser/src/main/codegen/includes/parserImpls.ftl @@ -119,7 +119,10 @@ SqlCreate SqlCreateDatabase(Span s, boolean replace) : } { <DATABASE> { startPos = getPos(); } - [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ] + [ + LOOKAHEAD(3) + <IF> <NOT> <EXISTS> { ifNotExists = true; } + ] databaseName = CompoundIdentifier() [ <COMMENT> <QUOTED_STRING> { @@ -217,7 +220,10 @@ SqlCreate SqlCreateFunction(Span s, boolean replace) : <FUNCTION> - [ <IF> <NOT> <EXISTS> { ifNotExists = true; } ] + [ + LOOKAHEAD(3) + <IF> <NOT> <EXISTS> { ifNotExists = true; } + ] functionIdentifier = CompoundIdentifier() @@ -248,12 +254,13 @@ SqlDrop SqlDropFunction(Span s, boolean replace) : boolean isSystemFunction = false; } { - [ <TEMPORARY> {isTemporary = true;} + [ + <TEMPORARY> {isTemporary = true;} [ <SYSTEM> { isSystemFunction = true; } ] ] <FUNCTION> - [ <IF> <EXISTS> { ifExists = true; } ] + [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ] functionIdentifier = CompoundIdentifier() @@ -281,7 +288,7 @@ SqlAlterFunction SqlAlterFunction() : <FUNCTION> { startPos = getPos(); } - [ <IF> <EXISTS> { ifExists = true; } ] + [ LOOKAHEAD(2) <IF> <EXISTS> { ifExists = true; } ] functionIdentifier = CompoundIdentifier() @@ -806,7 +813,7 @@ SqlTypeNameSpec ExtendedSqlBasicTypeName() : } ) { - return new ExtendedSqlBasicTypeNameSpec(typeAlias, typeName, precision, getPos()); + return new SqlAlienSystemTypeNameSpec(typeAlias, typeName, precision, getPos()); } } diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/package-info.java b/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/package-info.java deleted file mode 100644 index 439912b..0000000 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/package-info.java +++ /dev/null @@ -1,27 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -/** - * This package is needed because the constructor of SqlTypeNameSpec is package scope, - * we should merge this package into org.apache.flink.sql.parser.type when CALCITE-3360 - * is resolved. - */ -@PackageMarker -package org.apache.calcite.sql; - -import org.apache.calcite.avatica.util.PackageMarker; diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java index c4c7f11..19839a0 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/ddl/SqlCreateTable.java @@ -20,8 +20,8 @@ package org.apache.flink.sql.parser.ddl; import org.apache.flink.sql.parser.ExtendedSqlNode; import org.apache.flink.sql.parser.error.SqlValidateException; +import org.apache.flink.sql.parser.type.ExtendedSqlRowTypeNameSpec; -import org.apache.calcite.sql.ExtendedSqlRowTypeNameSpec; import org.apache.calcite.sql.SqlBasicCall; import org.apache.calcite.sql.SqlCall; import org.apache.calcite.sql.SqlCharStringLiteral; @@ -69,10 +69,8 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode { private final SqlNodeList partitionKeyList; - @Nullable private final SqlWatermark watermark; - @Nullable private final SqlCharStringLiteral comment; public SqlCreateTable( @@ -83,8 +81,8 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode { List<SqlNodeList> uniqueKeysList, SqlNodeList propertyList, SqlNodeList partitionKeyList, - SqlWatermark watermark, - SqlCharStringLiteral comment) { + @Nullable SqlWatermark watermark, + @Nullable SqlCharStringLiteral comment) { super(OPERATOR, pos, false, false); this.tableName = requireNonNull(tableName, "tableName should not be null"); this.columnList = requireNonNull(columnList, "columnList should not be null"); @@ -219,10 +217,12 @@ public class SqlCreateTable extends SqlCreate implements ExtendedSqlNode { * have been reversed. */ public String getColumnSqlString() { - SqlPrettyWriter writer = new SqlPrettyWriter(AnsiSqlDialect.DEFAULT); - writer.setAlwaysUseParentheses(true); - writer.setSelectListItemsOnSeparateLines(false); - writer.setIndentation(0); + SqlPrettyWriter writer = new SqlPrettyWriter( + SqlPrettyWriter.config() + .withDialect(AnsiSqlDialect.DEFAULT) + .withAlwaysUseParentheses(true) + .withSelectListItemsOnSeparateLines(false) + .withIndentation(0)); writer.startList("", ""); for (SqlNode column : columnList) { writer.sep(","); diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlBasicTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlBasicTypeNameSpec.java deleted file mode 100644 index ec81a0d..0000000 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlBasicTypeNameSpec.java +++ /dev/null @@ -1,57 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.sql.parser.type; - -import org.apache.calcite.sql.SqlBasicTypeNameSpec; -import org.apache.calcite.sql.SqlWriter; -import org.apache.calcite.sql.parser.SqlParserPos; -import org.apache.calcite.sql.type.SqlTypeName; - -/** - * A sql type name specification of extended basic data type, it has a counterpart basic - * sql type name but always represents as a special alias in Flink. - * - * <p>For example, STRING is synonym of VARCHAR(INT_MAX) - * and BYTES is synonym of VARBINARY(INT_MAX). - */ -public class ExtendedSqlBasicTypeNameSpec extends SqlBasicTypeNameSpec { - // Type alias used for unparsing. - private final String typeAlias; - - /** - * Creates a {@code ExtendedSqlBuiltinTypeNameSpec} instance. - * - * @param typeName type name - * @param precision type precision - * @param pos parser position - */ - public ExtendedSqlBasicTypeNameSpec( - String typeAlias, - SqlTypeName typeName, - int precision, - SqlParserPos pos) { - super(typeName, precision, pos); - this.typeAlias = typeAlias; - } - - @Override - public void unparse(SqlWriter writer, int leftPrec, int rightPrec) { - writer.keyword(typeAlias); - } -} diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/ExtendedSqlRowTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java similarity index 93% rename from flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/ExtendedSqlRowTypeNameSpec.java rename to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java index 42828d8..37e841b 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/ExtendedSqlRowTypeNameSpec.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/ExtendedSqlRowTypeNameSpec.java @@ -16,10 +16,16 @@ * limitations under the License. */ -package org.apache.calcite.sql; +package org.apache.flink.sql.parser.type; import org.apache.calcite.rel.type.RelDataType; import org.apache.calcite.rel.type.RelDataTypeFactory; +import org.apache.calcite.sql.SqlCharStringLiteral; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlRowTypeNameSpec; +import org.apache.calcite.sql.SqlTypeNameSpec; +import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SqlValidator; diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlMapTypeNameSpec.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java similarity index 93% rename from flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlMapTypeNameSpec.java rename to flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java index 0e22dbf..5725456 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/calcite/sql/SqlMapTypeNameSpec.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/type/SqlMapTypeNameSpec.java @@ -16,9 +16,13 @@ * limitations under the License. */ -package org.apache.calcite.sql; +package org.apache.flink.sql.parser.type; import org.apache.calcite.rel.type.RelDataType; +import org.apache.calcite.sql.SqlDataTypeSpec; +import org.apache.calcite.sql.SqlIdentifier; +import org.apache.calcite.sql.SqlTypeNameSpec; +import org.apache.calcite.sql.SqlWriter; import org.apache.calcite.sql.parser.SqlParserPos; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SqlValidator; diff --git a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java index d53b050..cf32026 100644 --- a/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java +++ b/flink-table/flink-sql-parser/src/main/java/org/apache/flink/sql/parser/validate/FlinkSqlConformance.java @@ -146,6 +146,16 @@ public enum FlinkSqlConformance implements SqlConformance { return false; } + @Override + public boolean allowPluralTimeUnits() { + return false; + } + + @Override + public boolean allowQualifyingCommonColumn() { + return true; + } + /** * Whether to allow "create table T(i int, j int) partitioned by (i)" grammar. */ diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java index f5edc4f..5e0e0e5 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkDDLDataTypeTest.java @@ -44,6 +44,7 @@ import org.apache.calcite.sql.parser.SqlParserTest; import org.apache.calcite.sql.parser.SqlParserUtil; import org.apache.calcite.sql.pretty.SqlPrettyWriter; import org.apache.calcite.sql.test.SqlTestFactory; +import org.apache.calcite.sql.test.SqlTests; import org.apache.calcite.sql.type.SqlTypeName; import org.apache.calcite.sql.validate.SqlConformance; import org.apache.calcite.sql.validate.SqlConformanceEnum; @@ -51,7 +52,6 @@ import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorCatalogReader; import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.test.MockSqlOperatorTable; -import org.apache.calcite.test.SqlValidatorTestCase; import org.apache.calcite.test.catalog.MockCatalogReaderSimple; import org.apache.calcite.util.SourceStringReader; import org.apache.calcite.util.Util; @@ -439,7 +439,7 @@ public class FlinkDDLDataTypeTest { private void checkEx(String expectedMsgPattern, SqlParserUtil.StringAndPos sap, Throwable thrown) { - SqlValidatorTestCase.checkEx(thrown, expectedMsgPattern, sap); + SqlTests.checkEx(thrown, expectedMsgPattern, sap, SqlTests.Stage.VALIDATE); } } diff --git a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java index 3057324..0509e91 100644 --- a/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java +++ b/flink-table/flink-sql-parser/src/test/java/org/apache/flink/sql/parser/FlinkSqlParserImplTest.java @@ -37,6 +37,7 @@ import org.junit.Before; import org.junit.Test; import java.io.Reader; +import java.util.function.UnaryOperator; import static org.junit.Assert.assertEquals; import static org.junit.Assert.fail; @@ -51,9 +52,10 @@ public class FlinkSqlParserImplTest extends SqlParserTest { return FlinkSqlParserImpl.FACTORY; } - protected SqlParser getSqlParser(Reader source) { + protected SqlParser getSqlParser(Reader source, + UnaryOperator<SqlParser.ConfigBuilder> transform) { if (conformance0 == null) { - return super.getSqlParser(source); + return super.getSqlParser(source, transform); } else { // overwrite the default sql conformance. return SqlParser.create(source, @@ -75,12 +77,12 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testShowCatalogs() { - check("show catalogs", "SHOW CATALOGS"); + sql("show catalogs").ok("SHOW CATALOGS"); } @Test public void testDescribeCatalog() { - check("describe catalog a", "DESCRIBE CATALOG `A`"); + sql("describe catalog a").ok("DESCRIBE CATALOG `A`"); } /** @@ -92,7 +94,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testUseCatalog() { - check("use catalog a", "USE CATALOG `A`"); + sql("use catalog a").ok("USE CATALOG `A`"); } @Test @@ -112,91 +114,94 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testShowDataBases() { - check("show databases", "SHOW DATABASES"); + sql("show databases").ok("SHOW DATABASES"); } @Test public void testUseDataBase() { - check("use default_db", "USE `DEFAULT_DB`"); - check("use defaultCatalog.default_db", "USE `DEFAULTCATALOG`.`DEFAULT_DB`"); + sql("use default_db").ok("USE `DEFAULT_DB`"); + sql("use defaultCatalog.default_db").ok("USE `DEFAULTCATALOG`.`DEFAULT_DB`"); } @Test public void testCreateDatabase() { - check("create database db1", "CREATE DATABASE `DB1`"); - check("create database if not exists db1", "CREATE DATABASE IF NOT EXISTS `DB1`"); - check("create database catalog1.db1", "CREATE DATABASE `CATALOG1`.`DB1`"); - check("create database db1 comment 'test create database'", - "CREATE DATABASE `DB1`\n" + - "COMMENT 'test create database'"); - check("create database db1 comment 'test create database'" + - "with ( 'key1' = 'value1', 'key2.a' = 'value2.a')", - "CREATE DATABASE `DB1`\n" + - "COMMENT 'test create database' WITH (\n" + - " 'key1' = 'value1',\n" + - " 'key2.a' = 'value2.a'\n" + - ")"); + sql("create database db1").ok("CREATE DATABASE `DB1`"); + sql("create database if not exists db1").ok("CREATE DATABASE IF NOT EXISTS `DB1`"); + sql("create database catalog1.db1").ok("CREATE DATABASE `CATALOG1`.`DB1`"); + final String sql = "create database db1 comment 'test create database'"; + final String expected = "CREATE DATABASE `DB1`\n" + + "COMMENT 'test create database'"; + sql(sql).ok(expected); + final String sql1 = "create database db1 comment 'test create database'" + + "with ( 'key1' = 'value1', 'key2.a' = 'value2.a')"; + final String expected1 = "CREATE DATABASE `DB1`\n" + + "COMMENT 'test create database' WITH (\n" + + " 'key1' = 'value1',\n" + + " 'key2.a' = 'value2.a'\n" + + ")"; + sql(sql1).ok(expected1); } @Test public void testDropDatabase() { - check("drop database db1", "DROP DATABASE `DB1` RESTRICT"); - check("drop database catalog1.db1", "DROP DATABASE `CATALOG1`.`DB1` RESTRICT"); - check("drop database db1 RESTRICT", "DROP DATABASE `DB1` RESTRICT"); - check("drop database db1 CASCADE", "DROP DATABASE `DB1` CASCADE"); + sql("drop database db1").ok("DROP DATABASE `DB1` RESTRICT"); + sql("drop database catalog1.db1").ok("DROP DATABASE `CATALOG1`.`DB1` RESTRICT"); + sql("drop database db1 RESTRICT").ok("DROP DATABASE `DB1` RESTRICT"); + sql("drop database db1 CASCADE").ok("DROP DATABASE `DB1` CASCADE"); } @Test public void testAlterDatabase() { - check("alter database db1 set ('key1' = 'value1','key2.a' = 'value2.a')", - "ALTER DATABASE `DB1` SET (\n" + - " 'key1' = 'value1',\n" + - " 'key2.a' = 'value2.a'\n" + - ")"); + final String sql = "alter database db1 set ('key1' = 'value1','key2.a' = 'value2.a')"; + final String expected = "ALTER DATABASE `DB1` SET (\n" + + " 'key1' = 'value1',\n" + + " 'key2.a' = 'value2.a'\n" + + ")"; + sql(sql).ok(expected); } @Test public void testDescribeDatabase() { - check("describe database db1", "DESCRIBE DATABASE `DB1`"); - check("describe database catlog1.db1", "DESCRIBE DATABASE `CATLOG1`.`DB1`"); - check("describe database extended db1", "DESCRIBE DATABASE EXTENDED `DB1`"); + sql("describe database db1").ok("DESCRIBE DATABASE `DB1`"); + sql("describe database catlog1.db1").ok("DESCRIBE DATABASE `CATLOG1`.`DB1`"); + sql("describe database extended db1").ok("DESCRIBE DATABASE EXTENDED `DB1`"); } @Test public void testAlterFunction() { - check("alter function function1 as 'org.apache.fink.function.function1'", - "ALTER FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'"); + sql("alter function function1 as 'org.apache.fink.function.function1'") + .ok("ALTER FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'"); - check("alter temporary function function1 as 'org.apache.fink.function.function1'", - "ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'"); + sql("alter temporary function function1 as 'org.apache.fink.function.function1'") + .ok("ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'"); - check("alter temporary function function1 as 'org.apache.fink.function.function1' language scala", - "ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA"); + sql("alter temporary function function1 as 'org.apache.fink.function.function1' language scala") + .ok("ALTER TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA"); - check ("alter temporary system function function1 as 'org.apache.fink.function.function1'", - "ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'"); + sql("alter temporary system function function1 as 'org.apache.fink.function.function1'") + .ok("ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'"); - check("alter temporary system function function1 as 'org.apache.fink.function.function1' language java", - "ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA"); + sql("alter temporary system function function1 as 'org.apache.fink.function.function1' language java") + .ok("ALTER TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA"); } @Test public void testShowFuntions() { - check("show functions", "SHOW FUNCTIONS"); - check("show functions db1", "SHOW FUNCTIONS `DB1`"); - check("show functions catalog1.db1", "SHOW FUNCTIONS `CATALOG1`.`DB1`"); + sql("show functions").ok("SHOW FUNCTIONS"); + sql("show functions db1").ok("SHOW FUNCTIONS `DB1`"); + sql("show functions catalog1.db1").ok("SHOW FUNCTIONS `CATALOG1`.`DB1`"); } @Test public void testShowTables() { - check("show tables", "SHOW TABLES"); + sql("show tables").ok("SHOW TABLES"); } @Test public void testDescribeTable() { - check("describe tbl", "DESCRIBE `TBL`"); - check("describe catlog1.db1.tbl", "DESCRIBE `CATLOG1`.`DB1`.`TBL`"); - check("describe extended db1", "DESCRIBE EXTENDED `DB1`"); + sql("describe tbl").ok("DESCRIBE `TBL`"); + sql("describe catlog1.db1.tbl").ok("DESCRIBE `CATLOG1`.`DB1`.`TBL`"); + sql("describe extended db1").ok("DESCRIBE EXTENDED `DB1`"); } /** @@ -208,18 +213,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testAlterTable() { - check("alter table t1 rename to t2", "ALTER TABLE `T1` RENAME TO `T2`"); - check("alter table c1.d1.t1 rename to t2", "ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`"); - check("alter table t1 set ('key1'='value1')", - "ALTER TABLE `T1` SET (\n" + - " 'key1' = 'value1'\n" + - ")"); + sql("alter table t1 rename to t2").ok("ALTER TABLE `T1` RENAME TO `T2`"); + sql("alter table c1.d1.t1 rename to t2").ok("ALTER TABLE `C1`.`D1`.`T1` RENAME TO `T2`"); + final String sql = "alter table t1 set ('key1'='value1')"; + final String expected = "ALTER TABLE `T1` SET (\n" + + " 'key1' = 'value1'\n" + + ")"; + sql(sql).ok(expected); } @Test public void testCreateTable() { conformance0 = FlinkSqlConformance.HIVE; - check("CREATE TABLE tbl1 (\n" + + final String sql = "CREATE TABLE tbl1 (\n" + " a bigint,\n" + " h varchar, \n" + " g as 2 * (a + 1), \n" + @@ -232,8 +238,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest { " with (\n" + " 'connector' = 'kafka', \n" + " 'kafka.topic' = 'log.test'\n" + - ")\n", - "CREATE TABLE `TBL1` (\n" + + ")\n"; + final String expected = "CREATE TABLE `TBL1` (\n" + " `A` BIGINT,\n" + " `H` VARCHAR,\n" + " `G` AS (2 * (`A` + 1)),\n" + @@ -246,13 +252,14 @@ public class FlinkSqlParserImplTest extends SqlParserTest { "WITH (\n" + " 'connector' = 'kafka',\n" + " 'kafka.topic' = 'log.test'\n" + - ")"); + ")"; + sql(sql).ok(expected); } @Test public void testCreateTableWithComment() { conformance0 = FlinkSqlConformance.HIVE; - check("CREATE TABLE tbl1 (\n" + + final String sql = "CREATE TABLE tbl1 (\n" + " a bigint comment 'test column comment AAA.',\n" + " h varchar, \n" + " g as 2 * (a + 1), \n" + @@ -266,8 +273,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest { " with (\n" + " 'connector' = 'kafka', \n" + " 'kafka.topic' = 'log.test'\n" + - ")\n", - "CREATE TABLE `TBL1` (\n" + + ")\n"; + final String expected = "CREATE TABLE `TBL1` (\n" + " `A` BIGINT COMMENT 'test column comment AAA.',\n" + " `H` VARCHAR,\n" + " `G` AS (2 * (`A` + 1)),\n" + @@ -281,13 +288,14 @@ public class FlinkSqlParserImplTest extends SqlParserTest { "WITH (\n" + " 'connector' = 'kafka',\n" + " 'kafka.topic' = 'log.test'\n" + - ")"); + ")"; + sql(sql).ok(expected); } @Test public void testCreateTableWithPrimaryKeyAndUniqueKey() { conformance0 = FlinkSqlConformance.HIVE; - check("CREATE TABLE tbl1 (\n" + + final String sql = "CREATE TABLE tbl1 (\n" + " a bigint comment 'test column comment AAA.',\n" + " h varchar, \n" + " g as 2 * (a + 1), \n" + @@ -302,8 +310,8 @@ public class FlinkSqlParserImplTest extends SqlParserTest { " with (\n" + " 'connector' = 'kafka', \n" + " 'kafka.topic' = 'log.test'\n" + - ")\n", - "CREATE TABLE `TBL1` (\n" + + ")\n"; + final String expected = "CREATE TABLE `TBL1` (\n" + " `A` BIGINT COMMENT 'test column comment AAA.',\n" + " `H` VARCHAR,\n" + " `G` AS (2 * (`A` + 1)),\n" + @@ -318,12 +326,13 @@ public class FlinkSqlParserImplTest extends SqlParserTest { "WITH (\n" + " 'connector' = 'kafka',\n" + " 'kafka.topic' = 'log.test'\n" + - ")"); + ")"; + sql(sql).ok(expected); } @Test public void testCreateTableWithWatermark() { - String sql = "CREATE TABLE tbl1 (\n" + + final String sql = "CREATE TABLE tbl1 (\n" + " ts timestamp(3),\n" + " id varchar, \n" + " watermark FOR ts AS ts - interval '3' second\n" + @@ -332,20 +341,20 @@ public class FlinkSqlParserImplTest extends SqlParserTest { " 'connector' = 'kafka', \n" + " 'kafka.topic' = 'log.test'\n" + ")\n"; - check(sql, - "CREATE TABLE `TBL1` (\n" + + final String expected = "CREATE TABLE `TBL1` (\n" + " `TS` TIMESTAMP(3),\n" + " `ID` VARCHAR,\n" + " WATERMARK FOR `TS` AS (`TS` - INTERVAL '3' SECOND)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'kafka.topic' = 'log.test'\n" + - ")"); + ")"; + sql(sql).ok(expected); } @Test public void testCreateTableWithWatermarkOnComputedColumn() { - String sql = "CREATE TABLE tbl1 (\n" + + final String sql = "CREATE TABLE tbl1 (\n" + " log_ts varchar,\n" + " ts as to_timestamp(log_ts), \n" + " WATERMARK FOR ts AS ts + interval '1' second\n" + @@ -354,34 +363,35 @@ public class FlinkSqlParserImplTest extends SqlParserTest { " 'connector' = 'kafka', \n" + " 'kafka.topic' = 'log.test'\n" + ")\n"; - check(sql, - "CREATE TABLE `TBL1` (\n" + + final String expected = "CREATE TABLE `TBL1` (\n" + " `LOG_TS` VARCHAR,\n" + " `TS` AS `TO_TIMESTAMP`(`LOG_TS`),\n" + " WATERMARK FOR `TS` AS (`TS` + INTERVAL '1' SECOND)\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'kafka.topic' = 'log.test'\n" + - ")"); + ")"; + sql(sql).ok(expected); } @Test public void testCreateTableWithWatermarkOnNestedField() { - check("CREATE TABLE tbl1 (\n" + + final String sql = "CREATE TABLE tbl1 (\n" + " f1 row<q1 bigint, q2 row<t1 timestamp, t2 varchar>, q3 boolean>,\n" + " WATERMARK FOR f1.q2.t1 AS NOW()\n" + ")\n" + " with (\n" + " 'connector' = 'kafka', \n" + " 'kafka.topic' = 'log.test'\n" + - ")\n", - "CREATE TABLE `TBL1` (\n" + + ")\n"; + final String expected = "CREATE TABLE `TBL1` (\n" + " `F1` ROW< `Q1` BIGINT, `Q2` ROW< `T1` TIMESTAMP, `T2` VARCHAR >, `Q3` BOOLEAN >,\n" + " WATERMARK FOR `F1`.`Q2`.`T1` AS `NOW`()\n" + ") WITH (\n" + " 'connector' = 'kafka',\n" + " 'kafka.topic' = 'log.test'\n" + - ")"); + ")"; + sql(sql).ok(expected); } @Test @@ -434,48 +444,52 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testCreateTableWithComplexType() { - check("CREATE TABLE tbl1 (\n" + - " a ARRAY<bigint>, \n" + - " b MAP<int, varchar>,\n" + - " c ROW<cc0 int, cc1 float, cc2 varchar>,\n" + - " d MULTISET<varchar>,\n" + - " PRIMARY KEY (a, b) \n" + - ") with (\n" + - " 'x' = 'y', \n" + - " 'asd' = 'data'\n" + - ")\n", "CREATE TABLE `TBL1` (\n" + - " `A` ARRAY< BIGINT >,\n" + - " `B` MAP< INTEGER, VARCHAR >,\n" + - " `C` ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR >,\n" + - " `D` MULTISET< VARCHAR >,\n" + - " PRIMARY KEY (`A`, `B`)\n" + - ") WITH (\n" + - " 'x' = 'y',\n" + - " 'asd' = 'data'\n" + - ")"); + final String sql = "CREATE TABLE tbl1 (\n" + + " a ARRAY<bigint>, \n" + + " b MAP<int, varchar>,\n" + + " c ROW<cc0 int, cc1 float, cc2 varchar>,\n" + + " d MULTISET<varchar>,\n" + + " PRIMARY KEY (a, b) \n" + + ") with (\n" + + " 'x' = 'y', \n" + + " 'asd' = 'data'\n" + + ")\n"; + final String expected = "CREATE TABLE `TBL1` (\n" + + " `A` ARRAY< BIGINT >,\n" + + " `B` MAP< INTEGER, VARCHAR >,\n" + + " `C` ROW< `CC0` INTEGER, `CC1` FLOAT, `CC2` VARCHAR >,\n" + + " `D` MULTISET< VARCHAR >,\n" + + " PRIMARY KEY (`A`, `B`)\n" + + ") WITH (\n" + + " 'x' = 'y',\n" + + " 'asd' = 'data'\n" + + ")"; + sql(sql).ok(expected); } @Test public void testCreateTableWithNestedComplexType() { - check("CREATE TABLE tbl1 (\n" + - " a ARRAY<ARRAY<bigint>>, \n" + - " b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" + - " c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" + - " d MULTISET<ARRAY<int>>,\n" + - " PRIMARY KEY (a, b) \n" + - ") with (\n" + - " 'x' = 'y', \n" + - " 'asd' = 'data'\n" + - ")\n", "CREATE TABLE `TBL1` (\n" + - " `A` ARRAY< ARRAY< BIGINT > >,\n" + - " `B` MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" + - " `C` ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` VARCHAR >,\n" + - " `D` MULTISET< ARRAY< INTEGER > >,\n" + - " PRIMARY KEY (`A`, `B`)\n" + - ") WITH (\n" + - " 'x' = 'y',\n" + - " 'asd' = 'data'\n" + - ")"); + final String sql = "CREATE TABLE tbl1 (\n" + + " a ARRAY<ARRAY<bigint>>, \n" + + " b MAP<MAP<int, varchar>, ARRAY<varchar>>,\n" + + " c ROW<cc0 ARRAY<int>, cc1 float, cc2 varchar>,\n" + + " d MULTISET<ARRAY<int>>,\n" + + " PRIMARY KEY (a, b) \n" + + ") with (\n" + + " 'x' = 'y', \n" + + " 'asd' = 'data'\n" + + ")\n"; + final String expected = "CREATE TABLE `TBL1` (\n" + + " `A` ARRAY< ARRAY< BIGINT > >,\n" + + " `B` MAP< MAP< INTEGER, VARCHAR >, ARRAY< VARCHAR > >,\n" + + " `C` ROW< `CC0` ARRAY< INTEGER >, `CC1` FLOAT, `CC2` VARCHAR >,\n" + + " `D` MULTISET< ARRAY< INTEGER > >,\n" + + " PRIMARY KEY (`A`, `B`)\n" + + ") WITH (\n" + + " 'x' = 'y',\n" + + " 'asd' = 'data'\n" + + ")"; + sql(sql).ok(expected); } @Test @@ -494,7 +508,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { " 'k1' = 'v1',\n" + " 'k2' = 'v2'\n" + ")"; - check(sql, expected); + sql(sql).ok(expected); } @Test @@ -530,7 +544,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testColumnSqlString() { - String sql = "CREATE TABLE sls_stream (\n" + + final String sql = "CREATE TABLE sls_stream (\n" + " a bigint, \n" + " f as a + 1, \n" + " b varchar,\n" + @@ -542,7 +556,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { " 'x' = 'y', \n" + " 'asd' = 'data'\n" + ")\n"; - String expected = "`A`, (`A` + 1) AS `F`, `B`, " + final String expected = "`A`, (`A` + 1) AS `F`, `B`, " + "`TOTIMESTAMP`(`B`, 'yyyy-MM-dd HH:mm:ss') AS `TS`, " + "`PROCTIME`() AS `PROC`, `C`"; sql(sql).node(new ValidationMatcher() @@ -552,7 +566,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testCreateInvalidPartitionedTable() { conformance0 = FlinkSqlConformance.HIVE; - String sql = "create table sls_stream1(\n" + + final String sql = "create table sls_stream1(\n" + " a bigint,\n" + " b VARCHAR,\n" + " PRIMARY KEY(a, b)\n" + @@ -567,7 +581,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testNotAllowedCreatePartition() { conformance0 = FlinkSqlConformance.DEFAULT; - String sql = "create table sls_stream1(\n" + + final String sql = "create table sls_stream1(\n" + " a bigint,\n" + " b VARCHAR\n" + ") PARTITIONED BY (a^)^ with ( 'x' = 'y', 'asd' = 'dada')"; @@ -576,7 +590,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testCreateTableWithMinusInOptionKey() { - String sql = "create table source_table(\n" + + final String sql = "create table source_table(\n" + " a int,\n" + " b bigint,\n" + " c string\n" + @@ -586,7 +600,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { " 'a.b-c-connector.e-f.g' = 'ada',\n" + " 'a.b-c-d.e-1231.g' = 'ada',\n" + " 'a.b-c-d.*' = 'adad')\n"; - String expected = "CREATE TABLE `SOURCE_TABLE` (\n" + + final String expected = "CREATE TABLE `SOURCE_TABLE` (\n" + " `A` INTEGER,\n" + " `B` BIGINT,\n" + " `C` STRING\n" + @@ -597,12 +611,12 @@ public class FlinkSqlParserImplTest extends SqlParserTest { " 'a.b-c-d.e-1231.g' = 'ada',\n" + " 'a.b-c-d.*' = 'adad'\n" + ")"; - check(sql, expected); + sql(sql).ok(expected); } @Test public void testCreateTableWithOptionKeyAsIdentifier() { - String sql = "create table source_table(\n" + + final String sql = "create table source_table(\n" + " a int,\n" + " b bigint,\n" + " c string\n" + @@ -614,14 +628,16 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testDropTable() { - String sql = "DROP table catalog1.db1.tbl1"; - check(sql, "DROP TABLE `CATALOG1`.`DB1`.`TBL1`"); + final String sql = "DROP table catalog1.db1.tbl1"; + final String expected = "DROP TABLE `CATALOG1`.`DB1`.`TBL1`"; + sql(sql).ok(expected); } @Test public void testDropIfExists() { - String sql = "DROP table IF EXISTS catalog1.db1.tbl1"; - check(sql, "DROP TABLE IF EXISTS `CATALOG1`.`DB1`.`TBL1`"); + final String sql = "DROP table IF EXISTS catalog1.db1.tbl1"; + final String expected = "DROP TABLE IF EXISTS `CATALOG1`.`DB1`.`TBL1`"; + sql(sql).ok(expected); } @Test @@ -658,7 +674,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testInsertExtendedColumnAsStaticPartition1() { - String expected = "INSERT INTO `EMPS` EXTEND (`Z` BOOLEAN) (`X`, `Y`)\n" + final String expected = "INSERT INTO `EMPS` EXTEND (`Z` BOOLEAN) (`X`, `Y`)\n" + "PARTITION (`Z` = 'ab')\n" + "(SELECT *\n" + "FROM `EMPS`)"; @@ -676,17 +692,19 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testInsertOverwrite() { // non-partitioned - check("INSERT OVERWRITE myDB.myTbl SELECT * FROM src", - "INSERT OVERWRITE `MYDB`.`MYTBL`\n" + final String sql = "INSERT OVERWRITE myDB.myTbl SELECT * FROM src"; + final String expected = "INSERT OVERWRITE `MYDB`.`MYTBL`\n" + "(SELECT *\n" - + "FROM `SRC`)"); + + "FROM `SRC`)"; + sql(sql).ok(expected); // partitioned - check("INSERT OVERWRITE myTbl PARTITION (p1='v1',p2='v2') SELECT * FROM src", - "INSERT OVERWRITE `MYTBL`\n" + final String sql1 = "INSERT OVERWRITE myTbl PARTITION (p1='v1',p2='v2') SELECT * FROM src"; + final String expected1 = "INSERT OVERWRITE `MYTBL`\n" + "PARTITION (`P1` = 'v1', `P2` = 'v2')\n" + "(SELECT *\n" - + "FROM `SRC`)"); + + "FROM `SRC`)"; + sql(sql1).ok(expected1); } @Test @@ -702,7 +720,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { "AS\n" + "SELECT `COL1`\n" + "FROM `TBL`"; - check(sql, expected); + sql(sql).ok(expected); } @Test @@ -713,7 +731,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { "AS\n" + "SELECT `COL1`\n" + "FROM `TBL`"; - check(sql, expected); + sql(sql).ok(expected); } @Test @@ -723,7 +741,7 @@ public class FlinkSqlParserImplTest extends SqlParserTest { "AS\n" + "SELECT `COL3`, `COL4`\n" + "FROM `TBL`"; - check(sql, expected); + sql(sql).ok(expected); } @Test @@ -731,31 +749,41 @@ public class FlinkSqlParserImplTest extends SqlParserTest { final String sql = "create view v(^*^) COMMENT 'this is a view' as select col1 from tbl"; final String expected = "(?s).*Encountered \"\\*\" at line 1, column 15.*"; - checkFails(sql, expected); + sql(sql).fails(expected); } @Test public void testDropView() { final String sql = "DROP VIEW IF EXISTS view_name"; - check(sql, "DROP VIEW IF EXISTS `VIEW_NAME`"); + final String expected = "DROP VIEW IF EXISTS `VIEW_NAME`"; + sql(sql).ok(expected); } // Override the test because our ROW field type default is nullable, // which is different with Calcite. @Override public void testCastAsRowType() { - checkExp("cast(a as row(f0 int, f1 varchar))", - "CAST(`A` AS ROW(`F0` INTEGER, `F1` VARCHAR))"); - checkExp("cast(a as row(f0 int not null, f1 varchar null))", - "CAST(`A` AS ROW(`F0` INTEGER NOT NULL, `F1` VARCHAR))"); - checkExp("cast(a as row(f0 row(ff0 int not null, ff1 varchar null) null," + - " f1 timestamp not null))", - "CAST(`A` AS ROW(`F0` ROW(`FF0` INTEGER NOT NULL, `FF1` VARCHAR)," + - " `F1` TIMESTAMP NOT NULL))"); - checkExp("cast(a as row(f0 bigint not null, f1 decimal null) array)", - "CAST(`A` AS ROW(`F0` BIGINT NOT NULL, `F1` DECIMAL) ARRAY)"); - checkExp("cast(a as row(f0 varchar not null, f1 timestamp null) multiset)", - "CAST(`A` AS ROW(`F0` VARCHAR NOT NULL, `F1` TIMESTAMP) MULTISET)"); + final String expr = "cast(a as row(f0 int, f1 varchar))"; + final String expected = "CAST(`A` AS ROW(`F0` INTEGER, `F1` VARCHAR))"; + expr(expr).ok(expected); + + final String expr1 = "cast(a as row(f0 int not null, f1 varchar null))"; + final String expected1 = "CAST(`A` AS ROW(`F0` INTEGER NOT NULL, `F1` VARCHAR))"; + expr(expr1).ok(expected1); + + final String expr2 = "cast(a as row(f0 row(ff0 int not null, ff1 varchar null) null," + + " f1 timestamp not null))"; + final String expected2 = "CAST(`A` AS ROW(`F0` ROW(`FF0` INTEGER NOT NULL, `FF1` VARCHAR)," + + " `F1` TIMESTAMP NOT NULL))"; + expr(expr2).ok(expected2); + + final String expr3 = "cast(a as row(f0 bigint not null, f1 decimal null) array)"; + final String expected3 = "CAST(`A` AS ROW(`F0` BIGINT NOT NULL, `F1` DECIMAL) ARRAY)"; + expr(expr3).ok(expected3); + + final String expr4 = "cast(a as row(f0 varchar not null, f1 timestamp null) multiset)"; + final String expected4 = "CAST(`A` AS ROW(`F0` VARCHAR NOT NULL, `F1` TIMESTAMP) MULTISET)"; + expr(expr4).ok(expected4); } @Test @@ -772,44 +800,50 @@ public class FlinkSqlParserImplTest extends SqlParserTest { @Test public void testCreateFunction() { - check("create function catalog1.db1.function1 as 'org.apache.fink.function.function1'", - "CREATE FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); + sql("create function catalog1.db1.function1 as 'org.apache.fink.function.function1'") + .ok("CREATE FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); - check("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'", - "CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); + sql("create temporary function catalog1.db1.function1 as 'org.apache.fink.function.function1'") + .ok("CREATE TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); - check("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'", - "CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); + sql("create temporary system function catalog1.db1.function1 as 'org.apache.fink.function.function1'") + .ok("CREATE TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); - check("create temporary function db1.function1 as 'org.apache.fink.function.function1'", - "CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); + sql("create temporary function db1.function1 as 'org.apache.fink.function.function1'") + .ok("CREATE TEMPORARY FUNCTION `DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); - check("create temporary function function1 as 'org.apache.fink.function.function1'", - "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'"); + sql("create temporary function function1 as 'org.apache.fink.function.function1'") + .ok("CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1'"); - check("create temporary function if not exists catalog1.db1.function1 as 'org.apache.fink.function.function1'", - "CREATE TEMPORARY FUNCTION IF NOT EXISTS `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); + sql("create temporary function if not exists catalog1.db1.function1 as 'org.apache.fink.function.function1'") + .ok("CREATE TEMPORARY FUNCTION IF NOT EXISTS `CATALOG1`.`DB1`.`FUNCTION1` AS 'org.apache.fink.function.function1'"); - check("create temporary function function1 as 'org.apache.fink.function.function1' language java", - "CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA"); + sql("create temporary function function1 as 'org.apache.fink.function.function1' language java") + .ok("CREATE TEMPORARY FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE JAVA"); - check("create temporary system function function1 as 'org.apache.fink.function.function1' language scala", - "CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA"); + sql("create temporary system function function1 as 'org.apache.fink.function.function1' language scala") + .ok("CREATE TEMPORARY SYSTEM FUNCTION `FUNCTION1` AS 'org.apache.fink.function.function1' LANGUAGE SCALA"); } @Test public void testDropTemporaryFunction() { - check("drop temporary function catalog1.db1.function1", - "DROP TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`"); + sql("drop temporary function catalog1.db1.function1") + .ok("DROP TEMPORARY FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`"); - check("drop temporary system function catalog1.db1.function1", - "DROP TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`"); + sql("drop temporary system function catalog1.db1.function1") + .ok("DROP TEMPORARY SYSTEM FUNCTION `CATALOG1`.`DB1`.`FUNCTION1`"); - check("drop temporary function if exists catalog1.db1.function1", - "DROP TEMPORARY FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`"); + sql("drop temporary function if exists catalog1.db1.function1") + .ok("DROP TEMPORARY FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`"); - check("drop temporary system function if exists catalog1.db1.function1", - "DROP TEMPORARY SYSTEM FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`"); + sql("drop temporary system function if exists catalog1.db1.function1") + .ok("DROP TEMPORARY SYSTEM FUNCTION IF EXISTS `CATALOG1`.`DB1`.`FUNCTION1`"); + } + + @Override + public void testTableHintsInInsert() { + // Override the superclass tests because Flink insert parse block + // is totally customized, and the hints are not supported yet. } /** Matcher that invokes the #validate() of the {@link ExtendedSqlNode} instance. **/
