This is an automated email from the ASF dual-hosted git repository. leonard pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink-cdc.git
commit 2885d12f576271452c62947a7b7084e66d9be98e Author: yuxiqian <[email protected]> AuthorDate: Wed Aug 7 12:08:46 2024 +0800 [FLINK-35991][cdc-runtime] Resolve operator conflicts in transform SQL operator tables --- .../flink/cdc/runtime/parser/TransformParser.java | 10 +- .../parser/metadata/TransformSqlOperatorTable.java | 229 +++++++++++++-------- .../cdc/runtime/parser/TransformParserTest.java | 8 +- 3 files changed, 150 insertions(+), 97 deletions(-) diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java index ae598b7da..ea7bf4c5c 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java @@ -51,8 +51,8 @@ import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlNodeList; +import org.apache.calcite.sql.SqlOperatorTable; import org.apache.calcite.sql.SqlSelect; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.parser.SqlParseException; import org.apache.calcite.sql.parser.SqlParser; import org.apache.calcite.sql.parser.SqlParserPos; @@ -60,7 +60,6 @@ import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.OperandTypes; import org.apache.calcite.sql.type.SqlReturnTypeInference; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; -import org.apache.calcite.sql.util.ListSqlOperatorTable; import org.apache.calcite.sql.util.SqlOperatorTables; import org.apache.calcite.sql.validate.SqlConformanceEnum; import org.apache.calcite.sql.validate.SqlValidator; @@ -156,13 +155,10 @@ public class TransformParser { factory, new CalciteConnectionConfigImpl(new Properties())); TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance(); - SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance(); - ListSqlOperatorTable udfOperatorTable = new ListSqlOperatorTable(); - udfFunctions.forEach(udfOperatorTable::add); + SqlOperatorTable udfOperatorTable = SqlOperatorTables.of(udfFunctions); SqlValidator validator = SqlValidatorUtil.newValidator( - SqlOperatorTables.chain( - sqlStdOperatorTable, transformSqlOperatorTable, udfOperatorTable), + SqlOperatorTables.chain(transformSqlOperatorTable, udfOperatorTable), calciteCatalogReader, factory, SqlValidator.Config.DEFAULT.withIdentifierExpansion(true)); diff --git a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java index 35253f279..658550da0 100644 --- a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java +++ b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java @@ -20,13 +20,18 @@ package org.apache.flink.cdc.runtime.parser.metadata; import org.apache.flink.cdc.runtime.functions.BuiltInScalarFunction; import org.apache.flink.cdc.runtime.functions.BuiltInTimestampFunction; +import org.apache.calcite.sql.SqlBinaryOperator; import org.apache.calcite.sql.SqlFunction; import org.apache.calcite.sql.SqlFunctionCategory; import org.apache.calcite.sql.SqlIdentifier; import org.apache.calcite.sql.SqlKind; import org.apache.calcite.sql.SqlOperator; +import org.apache.calcite.sql.SqlPostfixOperator; +import org.apache.calcite.sql.SqlPrefixOperator; +import org.apache.calcite.sql.SqlSpecialOperator; import org.apache.calcite.sql.SqlSyntax; -import org.apache.calcite.sql.fun.SqlCurrentDateFunction; +import org.apache.calcite.sql.fun.SqlBetweenOperator; +import org.apache.calcite.sql.fun.SqlCaseOperator; import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.InferTypes; import org.apache.calcite.sql.type.OperandTypes; @@ -75,6 +80,78 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable { SqlNameMatchers.withCaseSensitive(false)); } + // The following binary functions are sorted in documentation definitions. See + // https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/core-concept/transform/ for a + // full list of CDC supported built-in functions. + + // -------------------- + // Comparison Functions + // -------------------- + public static final SqlBinaryOperator EQUALS = SqlStdOperatorTable.EQUALS; + public static final SqlBinaryOperator NOT_EQUALS = SqlStdOperatorTable.NOT_EQUALS; + public static final SqlBinaryOperator GREATER_THAN = SqlStdOperatorTable.GREATER_THAN; + public static final SqlBinaryOperator GREATER_THAN_OR_EQUAL = + SqlStdOperatorTable.GREATER_THAN_OR_EQUAL; + public static final SqlBinaryOperator LESS_THAN = SqlStdOperatorTable.LESS_THAN; + public static final SqlBinaryOperator LESS_THAN_OR_EQUAL = + SqlStdOperatorTable.LESS_THAN_OR_EQUAL; + + public static final SqlPostfixOperator IS_NULL = SqlStdOperatorTable.IS_NULL; + public static final SqlPostfixOperator IS_NOT_NULL = SqlStdOperatorTable.IS_NOT_NULL; + + public static final SqlBetweenOperator BETWEEN = SqlStdOperatorTable.BETWEEN; + public static final SqlBetweenOperator NOT_BETWEEN = SqlStdOperatorTable.NOT_BETWEEN; + + public static final SqlSpecialOperator LIKE = SqlStdOperatorTable.LIKE; + public static final SqlSpecialOperator NOT_LIKE = SqlStdOperatorTable.NOT_LIKE; + + public static final SqlBinaryOperator IN = SqlStdOperatorTable.IN; + public static final SqlBinaryOperator NOT_IN = SqlStdOperatorTable.NOT_IN; + + // ----------------- + // Logical Functions + // ----------------- + public static final SqlBinaryOperator OR = SqlStdOperatorTable.OR; + public static final SqlBinaryOperator AND = SqlStdOperatorTable.AND; + public static final SqlPrefixOperator NOT = SqlStdOperatorTable.NOT; + + public static final SqlPostfixOperator IS_FALSE = SqlStdOperatorTable.IS_FALSE; + public static final SqlPostfixOperator IS_NOT_FALSE = SqlStdOperatorTable.IS_NOT_FALSE; + public static final SqlPostfixOperator IS_TRUE = SqlStdOperatorTable.IS_TRUE; + public static final SqlPostfixOperator IS_NOT_TRUE = SqlStdOperatorTable.IS_NOT_TRUE; + + // -------------------- + // Arithmetic Functions + // -------------------- + public static final SqlBinaryOperator PLUS = SqlStdOperatorTable.PLUS; + public static final SqlBinaryOperator MINUS = SqlStdOperatorTable.MINUS; + public static final SqlBinaryOperator MULTIPLY = SqlStdOperatorTable.MULTIPLY; + public static final SqlBinaryOperator DIVIDE = SqlStdOperatorTable.DIVIDE; + public static final SqlBinaryOperator PERCENT_REMAINDER = SqlStdOperatorTable.PERCENT_REMAINDER; + + public static final SqlFunction ABS = SqlStdOperatorTable.ABS; + public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL; + public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR; + public static final SqlFunction ROUND = + new SqlFunction( + "ROUND", + SqlKind.OTHER_FUNCTION, + TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE, + null, + OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC), + SqlFunctionCategory.NUMERIC); + public static final SqlFunction UUID = + BuiltInScalarFunction.newBuilder() + .name("UUID") + .returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36)) + .operandTypeChecker(OperandTypes.NILADIC) + .notDeterministic() + .build(); + + // ---------------- + // String Functions + // ---------------- + public static final SqlBinaryOperator CONCAT = SqlStdOperatorTable.CONCAT; public static final SqlFunction CONCAT_FUNCTION = BuiltInScalarFunction.newBuilder() .name("CONCAT") @@ -85,14 +162,48 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable { .operandTypeChecker( OperandTypes.repeat(SqlOperandCountRanges.from(1), OperandTypes.STRING)) .build(); + + public static final SqlFunction CHAR_LENGTH = SqlStdOperatorTable.CHAR_LENGTH; + public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER; + public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER; + public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM; + public static final SqlFunction REGEXP_REPLACE = + new SqlFunction( + "REGEXP_REPLACE", + SqlKind.OTHER_FUNCTION, + ReturnTypes.cascade( + ReturnTypes.explicit(SqlTypeName.VARCHAR), + SqlTypeTransforms.TO_NULLABLE), + null, + OperandTypes.family( + SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING), + SqlFunctionCategory.STRING); + public static final SqlFunction SUBSTR = + new SqlFunction( + "SUBSTR", + SqlKind.OTHER_FUNCTION, + TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE, + null, + OperandTypes.or( + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER), + OperandTypes.family( + SqlTypeFamily.CHARACTER, + SqlTypeFamily.INTEGER, + SqlTypeFamily.INTEGER)), + SqlFunctionCategory.STRING); + + // ------------------ + // Temporal Functions + // ------------------ + public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME; public static final SqlFunction LOCALTIMESTAMP = new BuiltInTimestampFunction("LOCALTIMESTAMP", SqlTypeName.TIMESTAMP, 3); public static final SqlFunction CURRENT_TIME = new BuiltInTimestampFunction("CURRENT_TIME", SqlTypeName.TIME, 0); + public static final SqlFunction CURRENT_DATE = SqlStdOperatorTable.CURRENT_DATE; public static final SqlFunction CURRENT_TIMESTAMP = new BuiltInTimestampFunction( "CURRENT_TIMESTAMP", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3); - public static final SqlFunction CURRENT_DATE = new SqlCurrentDateFunction(); public static final SqlFunction NOW = new BuiltInTimestampFunction("NOW", SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) { @Override @@ -100,30 +211,16 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable { return SqlSyntax.FUNCTION; } }; - public static final SqlFunction TO_DATE = + public static final SqlFunction DATE_FORMAT = new SqlFunction( - "TO_DATE", + "DATE_FORMAT", SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade( - ReturnTypes.explicit(SqlTypeName.DATE), - SqlTypeTransforms.FORCE_NULLABLE), - null, + TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE, + InferTypes.RETURN_TYPE, OperandTypes.or( - OperandTypes.family(SqlTypeFamily.STRING), + OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING), OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)), SqlFunctionCategory.TIMEDATE); - public static final SqlFunction TO_TIMESTAMP = - new SqlFunction( - "TO_TIMESTAMP", - SqlKind.OTHER_FUNCTION, - ReturnTypes.cascade( - ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3), - SqlTypeTransforms.FORCE_NULLABLE), - null, - OperandTypes.or( - OperandTypes.family(SqlTypeFamily.CHARACTER), - OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)), - SqlFunctionCategory.TIMEDATE); public static final SqlFunction TIMESTAMP_DIFF = new SqlFunction( "TIMESTAMP_DIFF", @@ -135,64 +232,36 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable { OperandTypes.family( SqlTypeFamily.ANY, SqlTypeFamily.TIMESTAMP, SqlTypeFamily.TIMESTAMP), SqlFunctionCategory.TIMEDATE); - public static final SqlFunction REGEXP_REPLACE = + public static final SqlFunction TO_DATE = new SqlFunction( - "REGEXP_REPLACE", + "TO_DATE", SqlKind.OTHER_FUNCTION, ReturnTypes.cascade( - ReturnTypes.explicit(SqlTypeName.VARCHAR), - SqlTypeTransforms.TO_NULLABLE), - null, - OperandTypes.family( - SqlTypeFamily.STRING, SqlTypeFamily.STRING, SqlTypeFamily.STRING), - SqlFunctionCategory.STRING); - public static final SqlFunction SUBSTR = - new SqlFunction( - "SUBSTR", - SqlKind.OTHER_FUNCTION, - TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE, + ReturnTypes.explicit(SqlTypeName.DATE), + SqlTypeTransforms.FORCE_NULLABLE), null, OperandTypes.or( - OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.INTEGER), - OperandTypes.family( - SqlTypeFamily.CHARACTER, - SqlTypeFamily.INTEGER, - SqlTypeFamily.INTEGER)), - SqlFunctionCategory.STRING); - public static final SqlFunction ROUND = + OperandTypes.family(SqlTypeFamily.STRING), + OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)), + SqlFunctionCategory.TIMEDATE); + public static final SqlFunction TO_TIMESTAMP = new SqlFunction( - "ROUND", + "TO_TIMESTAMP", SqlKind.OTHER_FUNCTION, - TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE, + ReturnTypes.cascade( + ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3), + SqlTypeTransforms.FORCE_NULLABLE), null, - OperandTypes.or(OperandTypes.NUMERIC_INTEGER, OperandTypes.NUMERIC), - SqlFunctionCategory.NUMERIC); - public static final SqlFunction UUID = - BuiltInScalarFunction.newBuilder() - .name("UUID") - .returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36)) - .operandTypeChecker(OperandTypes.NILADIC) - .notDeterministic() - .build(); - public static final SqlFunction MOD = SqlStdOperatorTable.MOD; - public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME; - public static final SqlFunction YEAR = SqlStdOperatorTable.YEAR; - public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER; - public static final SqlFunction MONTH = SqlStdOperatorTable.MONTH; - public static final SqlFunction WEEK = SqlStdOperatorTable.WEEK; - public static final SqlFunction TIMESTAMP_ADD = SqlStdOperatorTable.TIMESTAMP_ADD; - public static final SqlOperator BETWEEN = SqlStdOperatorTable.BETWEEN; - public static final SqlOperator SYMMETRIC_BETWEEN = SqlStdOperatorTable.SYMMETRIC_BETWEEN; - public static final SqlOperator NOT_BETWEEN = SqlStdOperatorTable.NOT_BETWEEN; - public static final SqlOperator IN = SqlStdOperatorTable.IN; - public static final SqlOperator NOT_IN = SqlStdOperatorTable.NOT_IN; - public static final SqlFunction CHAR_LENGTH = SqlStdOperatorTable.CHAR_LENGTH; - public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM; - public static final SqlOperator NOT_LIKE = SqlStdOperatorTable.NOT_LIKE; - public static final SqlOperator LIKE = SqlStdOperatorTable.LIKE; - public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER; - public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER; - public static final SqlFunction ABS = SqlStdOperatorTable.ABS; + OperandTypes.or( + OperandTypes.family(SqlTypeFamily.CHARACTER), + OperandTypes.family(SqlTypeFamily.CHARACTER, SqlTypeFamily.CHARACTER)), + SqlFunctionCategory.TIMEDATE); + + // --------------------- + // Conditional Functions + // --------------------- + public static final SqlCaseOperator CASE = SqlStdOperatorTable.CASE; + public static final SqlFunction COALESCE = SqlStdOperatorTable.COALESCE; public static final SqlFunction IF = new SqlFunction( "IF", @@ -235,17 +304,9 @@ public class TransformSqlOperatorTable extends ReflectiveSqlOperatorTable { OperandTypes.family( SqlTypeFamily.BOOLEAN, SqlTypeFamily.TIME, SqlTypeFamily.TIME)), SqlFunctionCategory.NUMERIC); - public static final SqlFunction NULLIF = SqlStdOperatorTable.NULLIF; - public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR; - public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL; - public static final SqlFunction DATE_FORMAT = - new SqlFunction( - "DATE_FORMAT", - SqlKind.OTHER_FUNCTION, - TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE, - InferTypes.RETURN_TYPE, - OperandTypes.or( - OperandTypes.family(SqlTypeFamily.TIMESTAMP, SqlTypeFamily.STRING), - OperandTypes.family(SqlTypeFamily.STRING, SqlTypeFamily.STRING)), - SqlFunctionCategory.TIMEDATE); + + // -------------- + // Cast Functions + // -------------- + public static final SqlFunction CAST = SqlStdOperatorTable.CAST; } diff --git a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java index 5eee2663e..86ebefedc 100644 --- a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java +++ b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java @@ -38,9 +38,7 @@ import org.apache.calcite.rel.type.RelDataTypeSystem; import org.apache.calcite.rex.RexBuilder; import org.apache.calcite.sql.SqlNode; import org.apache.calcite.sql.SqlSelect; -import org.apache.calcite.sql.fun.SqlStdOperatorTable; import org.apache.calcite.sql.type.SqlTypeFactoryImpl; -import org.apache.calcite.sql.util.SqlOperatorTables; import org.apache.calcite.sql.validate.SqlValidator; import org.apache.calcite.sql.validate.SqlValidatorUtil; import org.apache.calcite.sql2rel.RelDecorrelator; @@ -101,10 +99,9 @@ public class TransformParserTest { factory, new CalciteConnectionConfigImpl(new Properties())); TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance(); - SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance(); SqlValidator validator = SqlValidatorUtil.newValidator( - SqlOperatorTables.chain(sqlStdOperatorTable, transformSqlOperatorTable), + transformSqlOperatorTable, calciteCatalogReader, factory, SqlValidator.Config.DEFAULT.withIdentifierExpansion(true)); @@ -144,10 +141,9 @@ public class TransformParserTest { factory, new CalciteConnectionConfigImpl(new Properties())); TransformSqlOperatorTable transformSqlOperatorTable = TransformSqlOperatorTable.instance(); - SqlStdOperatorTable sqlStdOperatorTable = SqlStdOperatorTable.instance(); SqlValidator validator = SqlValidatorUtil.newValidator( - SqlOperatorTables.chain(sqlStdOperatorTable, transformSqlOperatorTable), + transformSqlOperatorTable, calciteCatalogReader, factory, SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
