This is an automated email from the ASF dual-hosted git repository.

dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e85c3038d90 [FLINK-28508][table][python] Support SPLIT_INDEX and 
STR_TO_MAP built-in function in Table API (#20250)
e85c3038d90 is described below

commit e85c3038d901db3696112c1add3babcce0b0bcbc
Author: Luning (Lucas) Wang <[email protected]>
AuthorDate: Fri Jul 15 09:41:38 2022 +0800

    [FLINK-28508][table][python] Support SPLIT_INDEX and STR_TO_MAP built-in 
function in Table API (#20250)
---
 docs/data/sql_functions.yml                        |  2 +
 docs/data/sql_functions_zh.yml                     |  4 +-
 flink-python/pyflink/table/expression.py           | 23 ++++++++++-
 .../pyflink/table/tests/test_expression.py         |  3 ++
 .../flink/table/api/internal/BaseExpressions.java  | 48 ++++++++++++++++++++++
 .../functions/BuiltInFunctionDefinitions.java      | 29 +++++++++++++
 .../expressions/converter/DirectConvertRule.java   |  4 ++
 .../planner/expressions/ScalarFunctionsTest.scala  |  9 ++--
 8 files changed, 116 insertions(+), 6 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index c3d0f08a9ed..35ab90cec49 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -366,8 +366,10 @@ string:
   - sql: REVERSE(string)
     description: Returns the reversed string. Returns NULL if string is NULL.
   - sql: SPLIT_INDEX(string1, string2, integer1)
+    table: STRING1.splitIndex(STRING2, INTEGER1)
     description: Splits string1 by the delimiter string2, returns the 
integerth (zero-based) string of the split strings. Returns NULL if integer is 
negative. Returns NULL if any of arguments is NULL.
   - sql: STR_TO_MAP(string1[, string2, string3])
+    table: STRING1.strToMap([STRING2, STRING3])
     description: |
       Returns a map after splitting the string1 into key/value pairs using 
delimiters. string2 is the pair delimiter, default is ','. And string3 is the 
key-value delimiter, default is '='.
       Both pair delimiter and key-value delimiter are treated as regular 
expressions. So special characters (e.g. `<([{\^-=$!|]})?*+.>`) need to be 
properly escaped before using as a delimiter literally.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 2f689977248..ce0f9549459 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -464,10 +464,12 @@ string:
   - sql: REVERSE(string)
     description: 返回反转的字符串。如果字符串为 `NULL`,则返回 `NULL`。
   - sql: SPLIT_INDEX(string1, string2, integer1)
+    table: STRING1.splitIndex(STRING2, INTEGER1)
     description: |
-      通过分隔符 string2 拆分 string1,返回拆分字符串的第 integer(从零开始)个字符串。如果整数为负,则返回 `NULL`。
+      通过分隔符 string2 拆分 string1,返回分隔后这组字符串的第 integer(从零开始)个字符串。如果整数为负,则返回 
`NULL`。
       如果有任一参数为 `NULL`,则返回 `NULL`。
   - sql: STR_TO_MAP(string1[, string2, string3])
+    table: STRING1.strToMap([STRING2, STRING3])
     description: |
       使用分隔符将 string1 拆分为键值对后返回一个 map。string2 是 pair 分隔符,默认为 ','。string3 
是键值分隔符,默认为 '='。
       pair 分隔符与键值分隔符均为正则表达式,当使用特殊字符作为分隔符时请提前进行转义,例如 `<([{\^-=$!|]})?*+.>`。
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 8c3b3aa1211..fe48674f5b0 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -34,7 +34,6 @@ __all__ = [
     'JsonQueryOnEmptyOrError'
 ]
 
-
 _aggregation_doc = """
 {op_desc}
 
@@ -161,7 +160,7 @@ def _make_aggregation_doc():
         Expression.sum: "Returns the sum of the numeric field across all input 
values. "
                         "If all values are null, null is returned.",
         Expression.sum0: "Returns the sum of the numeric field across all 
input values. "
-                        "If all values are null, 0 is returned.",
+                         "If all values are null, 0 is returned.",
         Expression.min: "Returns the minimum value of field across all input 
values.",
         Expression.max: "Returns the maximum value of field across all input 
values.",
         Expression.count: "Returns the number of input rows for which the 
field is not null.",
@@ -1308,6 +1307,26 @@ class Expression(Generic[T]):
         """
         return _binary_op("over")(self, alias)
 
+    def split_index(self, separator: Union[str, 'Expression[str]'],
+                    index: Union[int, 'Expression[int]']) -> 'Expression[str]':
+        """
+        Split target string with custom separator and pick the index-th(start 
with 0) result.
+        """
+        return _ternary_op("splitIndex")(self, separator, index)
+
+    def str_to_map(self, list_delimiter: Union[str, 'Expression[str]'] = None,
+                   key_value_delimiter: Union[str, 'Expression[str]'] = None) 
-> 'Expression[dict]':
+        """
+        Creates a map by parsing text. Split text into key-value pairs using 
two delimiters. The
+        first delimiter separates pairs, and the second delimiter separates 
key and value. Both
+        list_delimiter and key_value_delimiter are treated as regular 
expressions.
+        Default delimiters are used: ',' as list_delimiter and '=' as 
key_value_delimiter.
+        """
+        if list_delimiter is None or key_value_delimiter is None:
+            return _unary_op("strToMap")(self)
+        else:
+            return _ternary_op("strToMap")(self, list_delimiter, 
key_value_delimiter)
+
     # ---------------------------- temporal functions 
----------------------------------
 
     @property
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 4e2ab748365..bb201dcf22c 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -162,6 +162,9 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual('rtrim(a)', str(expr1.rtrim))
         self.assertEqual('repeat(a, 3)', str(expr1.repeat(3)))
         self.assertEqual("over(a, 'w')", str(expr1.over('w')))
+        self.assertEqual("splitIndex(a, ',', 3)", str(expr1.split_index(',', 
3)))
+        self.assertEqual("strToMap(a)", str(expr1.str_to_map()))
+        self.assertEqual("strToMap(a, ';', ':')", str(expr1.str_to_map(';', 
':')))
 
         # temporal functions
         self.assertEqual('cast(a, DATE)', str(expr1.to_date))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index c2df6850017..bcd69ba1ce5 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -155,9 +155,11 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIGN;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIMILAR;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIN;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SINH;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SPLIT_INDEX;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SQRT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_POP;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_SAMP;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STR_TO_MAP;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SUBSTR;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SUBSTRING;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SUM;
@@ -1149,6 +1151,52 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(), 
objectToExpression(n)));
     }
 
+    /**
+     * Split target string with custom separator and pick the index-th(start 
with 0) result.
+     *
+     * @param separator custom separator.
+     * @param index index of the result which you want.
+     * @return the string at the index of split results.
+     */
+    public OutType splitIndex(InType separator, InType index) {
+        return toApiSpecificExpression(
+                unresolvedCall(
+                        SPLIT_INDEX,
+                        toExpr(),
+                        objectToExpression(separator),
+                        objectToExpression(index)));
+    }
+
+    /**
+     * Creates a map by parsing text. Split text into key-value pairs using 
two delimiters. The
+     * first delimiter separates pairs, and the second delimiter separates key 
and value. If only
+     * one parameter is given, default delimiters are used: ',' as delimiter1 
and '=' as delimiter2.
+     * Both delimiters are treated as regular expressions.
+     *
+     * @return the map
+     */
+    public OutType strToMap() {
+        return toApiSpecificExpression(unresolvedCall(STR_TO_MAP, toExpr()));
+    }
+
+    /**
+     * Creates a map by parsing text. Split text into key-value pairs using 
two delimiters. The
+     * first delimiter separates pairs, and the second delimiter separates key 
and value. Both
+     * {@code listDelimiter} and {@code keyValueDelimiter} are treated as 
regular expressions.
+     *
+     * @param listDelimiter the delimiter to separates pairs
+     * @param keyValueDelimiter the delimiter to separates key and value
+     * @return the map
+     */
+    public OutType strToMap(InType listDelimiter, InType keyValueDelimiter) {
+        return toApiSpecificExpression(
+                unresolvedCall(
+                        STR_TO_MAP,
+                        toExpr(),
+                        objectToExpression(listDelimiter),
+                        objectToExpression(keyValueDelimiter)));
+    }
+
     // Temporal operations
 
     /** Parses a date string in the form "yyyy-MM-dd" to a SQL Date. */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 7f5810dc2f3..b2c00f432c5 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -910,6 +910,35 @@ public final class BuiltInFunctionDefinitions {
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
                     .build();
 
+    public static final BuiltInFunctionDefinition SPLIT_INDEX =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("splitIndex")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                    logical(LogicalTypeRoot.INTEGER)))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+                    .build();
+
+    public static final BuiltInFunctionDefinition STR_TO_MAP =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("strToMap")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            or(
+                                    
sequence(logical(LogicalTypeFamily.CHARACTER_STRING)),
+                                    sequence(
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING))))
+                    .outputTypeStrategy(
+                            nullableIfArgs(
+                                    explicit(
+                                            DataTypes.MAP(DataTypes.STRING(), 
DataTypes.STRING()))))
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Math functions
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index 85449ea2485..ba62cca0261 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -134,6 +134,10 @@ public class DirectConvertRule implements 
CallExpressionConvertRule {
                 BuiltInFunctionDefinitions.REGEXP, 
FlinkSqlOperatorTable.REGEXP);
         DEFINITION_OPERATOR_MAP.put(
                 BuiltInFunctionDefinitions.REGEXP_REPLACE, 
FlinkSqlOperatorTable.REGEXP_REPLACE);
+        DEFINITION_OPERATOR_MAP.put(
+                BuiltInFunctionDefinitions.SPLIT_INDEX, 
FlinkSqlOperatorTable.SPLIT_INDEX);
+        DEFINITION_OPERATOR_MAP.put(
+                BuiltInFunctionDefinitions.STR_TO_MAP, 
FlinkSqlOperatorTable.STR_TO_MAP);
 
         // math functions
         DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.MINUS, 
FlinkSqlOperatorTable.MINUS);
diff --git 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 6d61ba668a1..761996663a9 100644
--- 
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++ 
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -859,7 +859,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
 
   @Test
   def testSplitIndex(): Unit = {
-    testSqlApi("split_index(f38, 'I', 0)", "AQ")
+    testAllApis('f38.splitIndex("I", 0), "split_index(f38, 'I', 0)", "AQ")
     testSqlApi("split_index(f38, 'I', 2)", "NULL")
     testSqlApi("split_index(f38, 'I', -1)", "NULL")
     testSqlApi("split_index(f38, CAST(null as VARCHAR), 0)", "NULL")
@@ -2589,8 +2589,11 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
 
   @Test
   def testStringToMap(): Unit = {
-    testSqlApi("STR_TO_MAP('k1=v1,k2=v2')", "{k1=v1, k2=v2}")
-    testSqlApi("STR_TO_MAP('k1:v1;k2: v2', ';', ':')", "{k1=v1, k2= v2}")
+    testAllApis("k1=v1,k2=v2".strToMap(), "STR_TO_MAP('k1=v1,k2=v2')", 
"{k1=v1, k2=v2}")
+    testAllApis(
+      "k1:v1;k2: v2".strToMap(";", ":"),
+      "STR_TO_MAP('k1:v1;k2: v2', ';', ':')",
+      "{k1=v1, k2= v2}")
     testSqlApi("STR_TO_MAP('k1$$v1|k2$$ v2', '\\|', '\\$\\$')", "{k1=v1, k2= 
v2}")
 
     // test empty

Reply via email to