This is an automated email from the ASF dual-hosted git repository.

leonard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-cdc.git

commit 2885d12f576271452c62947a7b7084e66d9be98e
Author: yuxiqian <[email protected]>
AuthorDate: Wed Aug 7 12:08:46 2024 +0800

    [FLINK-35991][cdc-runtime] Resolve operator conflicts in transform SQL 
operator tables
---
 .../flink/cdc/runtime/parser/TransformParser.java  |  10 +-
 .../parser/metadata/TransformSqlOperatorTable.java | 229 +++++++++++++--------
 .../cdc/runtime/parser/TransformParserTest.java    |   8 +-
 3 files changed, 150 insertions(+), 97 deletions(-)

diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
index ae598b7da..ea7bf4c5c 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/TransformParser.java
@@ -51,8 +51,8 @@ import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlNodeList;
+import org.apache.calcite.sql.SqlOperatorTable;
 import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.parser.SqlParseException;
 import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.calcite.sql.parser.SqlParserPos;
@@ -60,7 +60,6 @@ import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
 import org.apache.calcite.sql.type.SqlReturnTypeInference;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.calcite.sql.util.ListSqlOperatorTable;
 import org.apache.calcite.sql.util.SqlOperatorTables;
 import org.apache.calcite.sql.validate.SqlConformanceEnum;
 import org.apache.calcite.sql.validate.SqlValidator;
@@ -156,13 +155,10 @@ public class TransformParser {
                         factory,
                         new CalciteConnectionConfigImpl(new Properties()));
         TransformSqlOperatorTable transformSqlOperatorTable = 
TransformSqlOperatorTable.instance();
-        SqlStdOperatorTable sqlStdOperatorTable = 
SqlStdOperatorTable.instance();
-        ListSqlOperatorTable udfOperatorTable = new ListSqlOperatorTable();
-        udfFunctions.forEach(udfOperatorTable::add);
+        SqlOperatorTable udfOperatorTable = SqlOperatorTables.of(udfFunctions);
         SqlValidator validator =
                 SqlValidatorUtil.newValidator(
-                        SqlOperatorTables.chain(
-                                sqlStdOperatorTable, 
transformSqlOperatorTable, udfOperatorTable),
+                        SqlOperatorTables.chain(transformSqlOperatorTable, 
udfOperatorTable),
                         calciteCatalogReader,
                         factory,
                         
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
diff --git 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
index 35253f279..658550da0 100644
--- 
a/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
+++ 
b/flink-cdc-runtime/src/main/java/org/apache/flink/cdc/runtime/parser/metadata/TransformSqlOperatorTable.java
@@ -20,13 +20,18 @@ package org.apache.flink.cdc.runtime.parser.metadata;
 import org.apache.flink.cdc.runtime.functions.BuiltInScalarFunction;
 import org.apache.flink.cdc.runtime.functions.BuiltInTimestampFunction;
 
+import org.apache.calcite.sql.SqlBinaryOperator;
 import org.apache.calcite.sql.SqlFunction;
 import org.apache.calcite.sql.SqlFunctionCategory;
 import org.apache.calcite.sql.SqlIdentifier;
 import org.apache.calcite.sql.SqlKind;
 import org.apache.calcite.sql.SqlOperator;
+import org.apache.calcite.sql.SqlPostfixOperator;
+import org.apache.calcite.sql.SqlPrefixOperator;
+import org.apache.calcite.sql.SqlSpecialOperator;
 import org.apache.calcite.sql.SqlSyntax;
-import org.apache.calcite.sql.fun.SqlCurrentDateFunction;
+import org.apache.calcite.sql.fun.SqlBetweenOperator;
+import org.apache.calcite.sql.fun.SqlCaseOperator;
 import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.InferTypes;
 import org.apache.calcite.sql.type.OperandTypes;
@@ -75,6 +80,78 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                 SqlNameMatchers.withCaseSensitive(false));
     }
 
+    // The following binary functions are sorted in documentation definitions. 
See
+    // 
https://nightlies.apache.org/flink/flink-cdc-docs-master/docs/core-concept/transform/
 for a
+    // full list of CDC supported built-in functions.
+
+    // --------------------
+    // Comparison Functions
+    // --------------------
+    public static final SqlBinaryOperator EQUALS = SqlStdOperatorTable.EQUALS;
+    public static final SqlBinaryOperator NOT_EQUALS = 
SqlStdOperatorTable.NOT_EQUALS;
+    public static final SqlBinaryOperator GREATER_THAN = 
SqlStdOperatorTable.GREATER_THAN;
+    public static final SqlBinaryOperator GREATER_THAN_OR_EQUAL =
+            SqlStdOperatorTable.GREATER_THAN_OR_EQUAL;
+    public static final SqlBinaryOperator LESS_THAN = 
SqlStdOperatorTable.LESS_THAN;
+    public static final SqlBinaryOperator LESS_THAN_OR_EQUAL =
+            SqlStdOperatorTable.LESS_THAN_OR_EQUAL;
+
+    public static final SqlPostfixOperator IS_NULL = 
SqlStdOperatorTable.IS_NULL;
+    public static final SqlPostfixOperator IS_NOT_NULL = 
SqlStdOperatorTable.IS_NOT_NULL;
+
+    public static final SqlBetweenOperator BETWEEN = 
SqlStdOperatorTable.BETWEEN;
+    public static final SqlBetweenOperator NOT_BETWEEN = 
SqlStdOperatorTable.NOT_BETWEEN;
+
+    public static final SqlSpecialOperator LIKE = SqlStdOperatorTable.LIKE;
+    public static final SqlSpecialOperator NOT_LIKE = 
SqlStdOperatorTable.NOT_LIKE;
+
+    public static final SqlBinaryOperator IN = SqlStdOperatorTable.IN;
+    public static final SqlBinaryOperator NOT_IN = SqlStdOperatorTable.NOT_IN;
+
+    // -----------------
+    // Logical Functions
+    // -----------------
+    public static final SqlBinaryOperator OR = SqlStdOperatorTable.OR;
+    public static final SqlBinaryOperator AND = SqlStdOperatorTable.AND;
+    public static final SqlPrefixOperator NOT = SqlStdOperatorTable.NOT;
+
+    public static final SqlPostfixOperator IS_FALSE = 
SqlStdOperatorTable.IS_FALSE;
+    public static final SqlPostfixOperator IS_NOT_FALSE = 
SqlStdOperatorTable.IS_NOT_FALSE;
+    public static final SqlPostfixOperator IS_TRUE = 
SqlStdOperatorTable.IS_TRUE;
+    public static final SqlPostfixOperator IS_NOT_TRUE = 
SqlStdOperatorTable.IS_NOT_TRUE;
+
+    // --------------------
+    // Arithmetic Functions
+    // --------------------
+    public static final SqlBinaryOperator PLUS = SqlStdOperatorTable.PLUS;
+    public static final SqlBinaryOperator MINUS = SqlStdOperatorTable.MINUS;
+    public static final SqlBinaryOperator MULTIPLY = 
SqlStdOperatorTable.MULTIPLY;
+    public static final SqlBinaryOperator DIVIDE = SqlStdOperatorTable.DIVIDE;
+    public static final SqlBinaryOperator PERCENT_REMAINDER = 
SqlStdOperatorTable.PERCENT_REMAINDER;
+
+    public static final SqlFunction ABS = SqlStdOperatorTable.ABS;
+    public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
+    public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
+    public static final SqlFunction ROUND =
+            new SqlFunction(
+                    "ROUND",
+                    SqlKind.OTHER_FUNCTION,
+                    TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE,
+                    null,
+                    OperandTypes.or(OperandTypes.NUMERIC_INTEGER, 
OperandTypes.NUMERIC),
+                    SqlFunctionCategory.NUMERIC);
+    public static final SqlFunction UUID =
+            BuiltInScalarFunction.newBuilder()
+                    .name("UUID")
+                    .returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36))
+                    .operandTypeChecker(OperandTypes.NILADIC)
+                    .notDeterministic()
+                    .build();
+
+    // ----------------
+    // String Functions
+    // ----------------
+    public static final SqlBinaryOperator CONCAT = SqlStdOperatorTable.CONCAT;
     public static final SqlFunction CONCAT_FUNCTION =
             BuiltInScalarFunction.newBuilder()
                     .name("CONCAT")
@@ -85,14 +162,48 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     .operandTypeChecker(
                             OperandTypes.repeat(SqlOperandCountRanges.from(1), 
OperandTypes.STRING))
                     .build();
+
+    public static final SqlFunction CHAR_LENGTH = 
SqlStdOperatorTable.CHAR_LENGTH;
+    public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER;
+    public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER;
+    public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM;
+    public static final SqlFunction REGEXP_REPLACE =
+            new SqlFunction(
+                    "REGEXP_REPLACE",
+                    SqlKind.OTHER_FUNCTION,
+                    ReturnTypes.cascade(
+                            ReturnTypes.explicit(SqlTypeName.VARCHAR),
+                            SqlTypeTransforms.TO_NULLABLE),
+                    null,
+                    OperandTypes.family(
+                            SqlTypeFamily.STRING, SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
+                    SqlFunctionCategory.STRING);
+    public static final SqlFunction SUBSTR =
+            new SqlFunction(
+                    "SUBSTR",
+                    SqlKind.OTHER_FUNCTION,
+                    TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE,
+                    null,
+                    OperandTypes.or(
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.INTEGER),
+                            OperandTypes.family(
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.INTEGER,
+                                    SqlTypeFamily.INTEGER)),
+                    SqlFunctionCategory.STRING);
+
+    // ------------------
+    // Temporal Functions
+    // ------------------
+    public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
     public static final SqlFunction LOCALTIMESTAMP =
             new BuiltInTimestampFunction("LOCALTIMESTAMP", 
SqlTypeName.TIMESTAMP, 3);
     public static final SqlFunction CURRENT_TIME =
             new BuiltInTimestampFunction("CURRENT_TIME", SqlTypeName.TIME, 0);
+    public static final SqlFunction CURRENT_DATE = 
SqlStdOperatorTable.CURRENT_DATE;
     public static final SqlFunction CURRENT_TIMESTAMP =
             new BuiltInTimestampFunction(
                     "CURRENT_TIMESTAMP", 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3);
-    public static final SqlFunction CURRENT_DATE = new 
SqlCurrentDateFunction();
     public static final SqlFunction NOW =
             new BuiltInTimestampFunction("NOW", 
SqlTypeName.TIMESTAMP_WITH_LOCAL_TIME_ZONE, 3) {
                 @Override
@@ -100,30 +211,16 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     return SqlSyntax.FUNCTION;
                 }
             };
-    public static final SqlFunction TO_DATE =
+    public static final SqlFunction DATE_FORMAT =
             new SqlFunction(
-                    "TO_DATE",
+                    "DATE_FORMAT",
                     SqlKind.OTHER_FUNCTION,
-                    ReturnTypes.cascade(
-                            ReturnTypes.explicit(SqlTypeName.DATE),
-                            SqlTypeTransforms.FORCE_NULLABLE),
-                    null,
+                    TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
+                    InferTypes.RETURN_TYPE,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.TIMESTAMP, 
SqlTypeFamily.STRING),
                             OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING)),
                     SqlFunctionCategory.TIMEDATE);
-    public static final SqlFunction TO_TIMESTAMP =
-            new SqlFunction(
-                    "TO_TIMESTAMP",
-                    SqlKind.OTHER_FUNCTION,
-                    ReturnTypes.cascade(
-                            ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3),
-                            SqlTypeTransforms.FORCE_NULLABLE),
-                    null,
-                    OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.CHARACTER),
-                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
-                    SqlFunctionCategory.TIMEDATE);
     public static final SqlFunction TIMESTAMP_DIFF =
             new SqlFunction(
                     "TIMESTAMP_DIFF",
@@ -135,64 +232,36 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     OperandTypes.family(
                             SqlTypeFamily.ANY, SqlTypeFamily.TIMESTAMP, 
SqlTypeFamily.TIMESTAMP),
                     SqlFunctionCategory.TIMEDATE);
-    public static final SqlFunction REGEXP_REPLACE =
+    public static final SqlFunction TO_DATE =
             new SqlFunction(
-                    "REGEXP_REPLACE",
+                    "TO_DATE",
                     SqlKind.OTHER_FUNCTION,
                     ReturnTypes.cascade(
-                            ReturnTypes.explicit(SqlTypeName.VARCHAR),
-                            SqlTypeTransforms.TO_NULLABLE),
-                    null,
-                    OperandTypes.family(
-                            SqlTypeFamily.STRING, SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
-                    SqlFunctionCategory.STRING);
-    public static final SqlFunction SUBSTR =
-            new SqlFunction(
-                    "SUBSTR",
-                    SqlKind.OTHER_FUNCTION,
-                    TransformSqlReturnTypes.ARG0_VARCHAR_FORCE_NULLABLE,
+                            ReturnTypes.explicit(SqlTypeName.DATE),
+                            SqlTypeTransforms.FORCE_NULLABLE),
                     null,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.INTEGER),
-                            OperandTypes.family(
-                                    SqlTypeFamily.CHARACTER,
-                                    SqlTypeFamily.INTEGER,
-                                    SqlTypeFamily.INTEGER)),
-                    SqlFunctionCategory.STRING);
-    public static final SqlFunction ROUND =
+                            OperandTypes.family(SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING)),
+                    SqlFunctionCategory.TIMEDATE);
+    public static final SqlFunction TO_TIMESTAMP =
             new SqlFunction(
-                    "ROUND",
+                    "TO_TIMESTAMP",
                     SqlKind.OTHER_FUNCTION,
-                    TransformSqlReturnTypes.ROUND_FUNCTION_NULLABLE,
+                    ReturnTypes.cascade(
+                            ReturnTypes.explicit(SqlTypeName.TIMESTAMP, 3),
+                            SqlTypeTransforms.FORCE_NULLABLE),
                     null,
-                    OperandTypes.or(OperandTypes.NUMERIC_INTEGER, 
OperandTypes.NUMERIC),
-                    SqlFunctionCategory.NUMERIC);
-    public static final SqlFunction UUID =
-            BuiltInScalarFunction.newBuilder()
-                    .name("UUID")
-                    .returnType(ReturnTypes.explicit(SqlTypeName.CHAR, 36))
-                    .operandTypeChecker(OperandTypes.NILADIC)
-                    .notDeterministic()
-                    .build();
-    public static final SqlFunction MOD = SqlStdOperatorTable.MOD;
-    public static final SqlFunction LOCALTIME = SqlStdOperatorTable.LOCALTIME;
-    public static final SqlFunction YEAR = SqlStdOperatorTable.YEAR;
-    public static final SqlFunction QUARTER = SqlStdOperatorTable.QUARTER;
-    public static final SqlFunction MONTH = SqlStdOperatorTable.MONTH;
-    public static final SqlFunction WEEK = SqlStdOperatorTable.WEEK;
-    public static final SqlFunction TIMESTAMP_ADD = 
SqlStdOperatorTable.TIMESTAMP_ADD;
-    public static final SqlOperator BETWEEN = SqlStdOperatorTable.BETWEEN;
-    public static final SqlOperator SYMMETRIC_BETWEEN = 
SqlStdOperatorTable.SYMMETRIC_BETWEEN;
-    public static final SqlOperator NOT_BETWEEN = 
SqlStdOperatorTable.NOT_BETWEEN;
-    public static final SqlOperator IN = SqlStdOperatorTable.IN;
-    public static final SqlOperator NOT_IN = SqlStdOperatorTable.NOT_IN;
-    public static final SqlFunction CHAR_LENGTH = 
SqlStdOperatorTable.CHAR_LENGTH;
-    public static final SqlFunction TRIM = SqlStdOperatorTable.TRIM;
-    public static final SqlOperator NOT_LIKE = SqlStdOperatorTable.NOT_LIKE;
-    public static final SqlOperator LIKE = SqlStdOperatorTable.LIKE;
-    public static final SqlFunction UPPER = SqlStdOperatorTable.UPPER;
-    public static final SqlFunction LOWER = SqlStdOperatorTable.LOWER;
-    public static final SqlFunction ABS = SqlStdOperatorTable.ABS;
+                    OperandTypes.or(
+                            OperandTypes.family(SqlTypeFamily.CHARACTER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
+                    SqlFunctionCategory.TIMEDATE);
+
+    // ---------------------
+    // Conditional Functions
+    // ---------------------
+    public static final SqlCaseOperator CASE = SqlStdOperatorTable.CASE;
+    public static final SqlFunction COALESCE = SqlStdOperatorTable.COALESCE;
     public static final SqlFunction IF =
             new SqlFunction(
                     "IF",
@@ -235,17 +304,9 @@ public class TransformSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             OperandTypes.family(
                                     SqlTypeFamily.BOOLEAN, SqlTypeFamily.TIME, 
SqlTypeFamily.TIME)),
                     SqlFunctionCategory.NUMERIC);
-    public static final SqlFunction NULLIF = SqlStdOperatorTable.NULLIF;
-    public static final SqlFunction FLOOR = SqlStdOperatorTable.FLOOR;
-    public static final SqlFunction CEIL = SqlStdOperatorTable.CEIL;
-    public static final SqlFunction DATE_FORMAT =
-            new SqlFunction(
-                    "DATE_FORMAT",
-                    SqlKind.OTHER_FUNCTION,
-                    TransformSqlReturnTypes.VARCHAR_FORCE_NULLABLE,
-                    InferTypes.RETURN_TYPE,
-                    OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.TIMESTAMP, 
SqlTypeFamily.STRING),
-                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING)),
-                    SqlFunctionCategory.TIMEDATE);
+
+    // --------------
+    // Cast Functions
+    // --------------
+    public static final SqlFunction CAST = SqlStdOperatorTable.CAST;
 }
diff --git 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
index 5eee2663e..86ebefedc 100644
--- 
a/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
+++ 
b/flink-cdc-runtime/src/test/java/org/apache/flink/cdc/runtime/parser/TransformParserTest.java
@@ -38,9 +38,7 @@ import org.apache.calcite.rel.type.RelDataTypeSystem;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.sql.SqlNode;
 import org.apache.calcite.sql.SqlSelect;
-import org.apache.calcite.sql.fun.SqlStdOperatorTable;
 import org.apache.calcite.sql.type.SqlTypeFactoryImpl;
-import org.apache.calcite.sql.util.SqlOperatorTables;
 import org.apache.calcite.sql.validate.SqlValidator;
 import org.apache.calcite.sql.validate.SqlValidatorUtil;
 import org.apache.calcite.sql2rel.RelDecorrelator;
@@ -101,10 +99,9 @@ public class TransformParserTest {
                         factory,
                         new CalciteConnectionConfigImpl(new Properties()));
         TransformSqlOperatorTable transformSqlOperatorTable = 
TransformSqlOperatorTable.instance();
-        SqlStdOperatorTable sqlStdOperatorTable = 
SqlStdOperatorTable.instance();
         SqlValidator validator =
                 SqlValidatorUtil.newValidator(
-                        SqlOperatorTables.chain(sqlStdOperatorTable, 
transformSqlOperatorTable),
+                        transformSqlOperatorTable,
                         calciteCatalogReader,
                         factory,
                         
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));
@@ -144,10 +141,9 @@ public class TransformParserTest {
                         factory,
                         new CalciteConnectionConfigImpl(new Properties()));
         TransformSqlOperatorTable transformSqlOperatorTable = 
TransformSqlOperatorTable.instance();
-        SqlStdOperatorTable sqlStdOperatorTable = 
SqlStdOperatorTable.instance();
         SqlValidator validator =
                 SqlValidatorUtil.newValidator(
-                        SqlOperatorTables.chain(sqlStdOperatorTable, 
transformSqlOperatorTable),
+                        transformSqlOperatorTable,
                         calciteCatalogReader,
                         factory,
                         
SqlValidator.Config.DEFAULT.withIdentifierExpansion(true));

Reply via email to