This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 4cf2124d71a [FLINK-32466] Replace STRING with CHARACTER for functions 
that don't support binary strings
4cf2124d71a is described below

commit 4cf2124d71a8dd0595e40f07c2dbcc4c85883b82
Author: Timo Walther <twal...@apache.org>
AuthorDate: Wed Jun 28 13:42:47 2023 +0200

    [FLINK-32466] Replace STRING with CHARACTER for functions that don't 
support binary strings
    
    This closes #22892.
---
 .../functions/sql/FlinkSqlOperatorTable.java       | 116 ++++++++++++---------
 1 file changed, 66 insertions(+), 50 deletions(-)

diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
index cb2ff52c6e8..15f0d52ca9c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/sql/FlinkSqlOperatorTable.java
@@ -228,7 +228,8 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                                     ReturnTypes.explicit(SqlTypeName.VARCHAR),
                                     SqlTypeTransforms.TO_NULLABLE))
                     .operandTypeChecker(
-                            OperandTypes.repeat(SqlOperandCountRanges.from(1), 
OperandTypes.STRING))
+                            OperandTypes.repeat(
+                                    SqlOperandCountRanges.from(1), 
OperandTypes.CHARACTER))
                     .build();
 
     /** Function for concat strings with a separator. */
@@ -240,7 +241,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             ReturnTypes.explicit(SqlTypeName.VARCHAR),
                             SqlTypeTransforms.TO_NULLABLE),
                     null,
-                    OperandTypes.repeat(SqlOperandCountRanges.from(1), 
OperandTypes.STRING),
+                    OperandTypes.repeat(SqlOperandCountRanges.from(1), 
OperandTypes.CHARACTER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction LOG =
@@ -311,7 +312,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     InferTypes.RETURN_TYPE,
                     OperandTypes.or(
                             OperandTypes.family(SqlTypeFamily.INTEGER),
-                            OperandTypes.family(SqlTypeFamily.STRING)),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.NUMERIC);
 
     public static final SqlFunction STR_TO_MAP =
@@ -321,11 +322,11 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     STR_MAP_NULLABLE,
                     null,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER),
                             OperandTypes.family(
-                                    SqlTypeFamily.STRING,
-                                    SqlTypeFamily.STRING,
-                                    SqlTypeFamily.STRING)),
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction IS_DECIMAL =
@@ -391,7 +392,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             SqlTypeTransforms.TO_NULLABLE),
                     null,
                     OperandTypes.family(
-                            SqlTypeFamily.STRING, SqlTypeFamily.INTEGER, 
SqlTypeFamily.STRING),
+                            SqlTypeFamily.CHARACTER,
+                            SqlTypeFamily.INTEGER,
+                            SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction RPAD =
@@ -403,7 +406,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             SqlTypeTransforms.TO_NULLABLE),
                     null,
                     OperandTypes.family(
-                            SqlTypeFamily.STRING, SqlTypeFamily.INTEGER, 
SqlTypeFamily.STRING),
+                            SqlTypeFamily.CHARACTER,
+                            SqlTypeFamily.INTEGER,
+                            SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction REPEAT =
@@ -414,7 +419,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             ReturnTypes.explicit(SqlTypeName.VARCHAR),
                             SqlTypeTransforms.TO_NULLABLE),
                     null,
-                    OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
+                    OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.INTEGER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction REVERSE =
@@ -423,7 +428,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     SqlKind.OTHER_FUNCTION,
                     VARCHAR_FORCE_NULLABLE,
                     null,
-                    OperandTypes.family(SqlTypeFamily.STRING),
+                    OperandTypes.family(SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction REPLACE =
@@ -435,7 +440,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             SqlTypeTransforms.TO_NULLABLE),
                     null,
                     OperandTypes.family(
-                            SqlTypeFamily.STRING, SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
+                            SqlTypeFamily.CHARACTER,
+                            SqlTypeFamily.CHARACTER,
+                            SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction SPLIT_INDEX =
@@ -446,12 +453,12 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     null,
                     OperandTypes.or(
                             OperandTypes.family(
-                                    SqlTypeFamily.STRING,
+                                    SqlTypeFamily.CHARACTER,
                                     SqlTypeFamily.INTEGER,
                                     SqlTypeFamily.INTEGER),
                             OperandTypes.family(
-                                    SqlTypeFamily.STRING,
-                                    SqlTypeFamily.STRING,
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER,
                                     SqlTypeFamily.INTEGER)),
                     SqlFunctionCategory.STRING);
 
@@ -464,7 +471,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             SqlTypeTransforms.TO_NULLABLE),
                     null,
                     OperandTypes.family(
-                            SqlTypeFamily.STRING, SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
+                            SqlTypeFamily.CHARACTER,
+                            SqlTypeFamily.CHARACTER,
+                            SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction REGEXP_EXTRACT =
@@ -473,7 +482,12 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     SqlKind.OTHER_FUNCTION,
                     VARCHAR_FORCE_NULLABLE,
                     null,
-                    OperandTypes.or(OperandTypes.STRING_STRING_INTEGER, 
OperandTypes.STRING_STRING),
+                    OperandTypes.or(
+                            OperandTypes.family(
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.INTEGER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction HASH_CODE =
@@ -485,7 +499,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     OperandTypes.or(
                             OperandTypes.family(SqlTypeFamily.BOOLEAN),
                             OperandTypes.family(SqlTypeFamily.NUMERIC),
-                            OperandTypes.family(SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER),
                             OperandTypes.family(SqlTypeFamily.TIMESTAMP),
                             OperandTypes.family(SqlTypeFamily.TIME),
                             OperandTypes.family(SqlTypeFamily.DATE)),
@@ -578,8 +592,8 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     VARCHAR_FORCE_NULLABLE,
                     InferTypes.RETURN_TYPE,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.TIMESTAMP, 
SqlTypeFamily.STRING),
-                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING)),
+                            OperandTypes.family(SqlTypeFamily.TIMESTAMP, 
SqlTypeFamily.CHARACTER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.TIMEDATE);
 
     public static final SqlFunction REGEXP =
@@ -588,7 +602,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     SqlKind.OTHER_FUNCTION,
                     ReturnTypes.BOOLEAN_NULLABLE,
                     null,
-                    OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
+                    OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction PARSE_URL =
@@ -598,11 +612,11 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     VARCHAR_FORCE_NULLABLE,
                     null,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER),
                             OperandTypes.family(
-                                    SqlTypeFamily.STRING,
-                                    SqlTypeFamily.STRING,
-                                    SqlTypeFamily.STRING)),
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction PRINT =
@@ -611,7 +625,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     SqlKind.OTHER_FUNCTION,
                     ReturnTypes.ARG1_NULLABLE,
                     null,
-                    OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.ANY),
+                    OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.ANY),
                     SqlFunctionCategory.STRING);
 
     // Flink timestamp functions
@@ -632,9 +646,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     .operandTypeChecker(
                             OperandTypes.or(
                                     OperandTypes.NILADIC,
-                                    OperandTypes.family(SqlTypeFamily.STRING),
+                                    
OperandTypes.family(SqlTypeFamily.CHARACTER),
                                     OperandTypes.family(
-                                            SqlTypeFamily.STRING, 
SqlTypeFamily.STRING)))
+                                            SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)))
                     .notDeterministic()
                     .monotonicity(
                             call -> {
@@ -654,7 +668,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     null,
                     OperandTypes.or(
                             OperandTypes.family(SqlTypeFamily.INTEGER),
-                            OperandTypes.family(SqlTypeFamily.INTEGER, 
SqlTypeFamily.STRING)),
+                            OperandTypes.family(SqlTypeFamily.INTEGER, 
SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.TIMEDATE);
 
     public static final SqlFunction IF =
@@ -676,8 +690,8 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             // used for a more explicit exception message
                             OperandTypes.family(
                                     SqlTypeFamily.BOOLEAN,
-                                    SqlTypeFamily.STRING,
-                                    SqlTypeFamily.STRING),
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER),
                             OperandTypes.family(
                                     SqlTypeFamily.BOOLEAN,
                                     SqlTypeFamily.BOOLEAN,
@@ -764,7 +778,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     SqlKind.OTHER_FUNCTION,
                     ARG0_VARCHAR_FORCE_NULLABLE,
                     null,
-                    OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
+                    OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.INTEGER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction RIGHT =
@@ -773,7 +787,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     SqlKind.OTHER_FUNCTION,
                     ARG0_VARCHAR_FORCE_NULLABLE,
                     null,
-                    OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.INTEGER),
+                    OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.INTEGER),
                     SqlFunctionCategory.STRING);
 
     // TODO: the return type of TO_TIMESTAMP should be TIMESTAMP(9),
@@ -812,8 +826,8 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             SqlTypeTransforms.FORCE_NULLABLE),
                     null,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.STRING),
-                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING)),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.TIMEDATE);
 
     public static final SqlFunction CONVERT_TZ =
@@ -823,7 +837,9 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     VARCHAR_FORCE_NULLABLE,
                     null,
                     OperandTypes.family(
-                            SqlTypeFamily.STRING, SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
+                            SqlTypeFamily.CHARACTER,
+                            SqlTypeFamily.CHARACTER,
+                            SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.TIMEDATE);
 
     public static final SqlFunction LOCATE =
@@ -833,10 +849,10 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     ReturnTypes.INTEGER_NULLABLE,
                     null,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER),
                             OperandTypes.family(
-                                    SqlTypeFamily.STRING,
-                                    SqlTypeFamily.STRING,
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER,
                                     SqlTypeFamily.INTEGER)),
                     SqlFunctionCategory.NUMERIC);
 
@@ -857,7 +873,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             ReturnTypes.explicit(SqlTypeName.BINARY),
                             SqlTypeTransforms.FORCE_NULLABLE),
                     null,
-                    OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
+                    OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction DECODE =
@@ -866,7 +882,7 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     SqlKind.OTHER_FUNCTION,
                     VARCHAR_FORCE_NULLABLE,
                     null,
-                    OperandTypes.family(SqlTypeFamily.BINARY, 
SqlTypeFamily.STRING),
+                    OperandTypes.family(SqlTypeFamily.BINARY, 
SqlTypeFamily.CHARACTER),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction INSTR =
@@ -876,14 +892,14 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                     ReturnTypes.INTEGER_NULLABLE,
                     null,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER),
                             OperandTypes.family(
-                                    SqlTypeFamily.STRING,
-                                    SqlTypeFamily.STRING,
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER,
                                     SqlTypeFamily.INTEGER),
                             OperandTypes.family(
-                                    SqlTypeFamily.STRING,
-                                    SqlTypeFamily.STRING,
+                                    SqlTypeFamily.CHARACTER,
+                                    SqlTypeFamily.CHARACTER,
                                     SqlTypeFamily.INTEGER,
                                     SqlTypeFamily.INTEGER)),
                     SqlFunctionCategory.NUMERIC);
@@ -898,8 +914,8 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             SqlTypeTransforms.TO_VARYING),
                     null,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.STRING),
-                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING)),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction RTRIM =
@@ -912,8 +928,8 @@ public class FlinkSqlOperatorTable extends 
ReflectiveSqlOperatorTable {
                             SqlTypeTransforms.TO_VARYING),
                     null,
                     OperandTypes.or(
-                            OperandTypes.family(SqlTypeFamily.STRING),
-                            OperandTypes.family(SqlTypeFamily.STRING, 
SqlTypeFamily.STRING)),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER),
+                            OperandTypes.family(SqlTypeFamily.CHARACTER, 
SqlTypeFamily.CHARACTER)),
                     SqlFunctionCategory.STRING);
 
     public static final SqlFunction TRY_CAST = new SqlTryCastFunction();

Reply via email to