This is an automated email from the ASF dual-hosted git repository.
dianfu pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e85c3038d90 [FLINK-28508][table][python] Support SPLIT_INDEX and
STR_TO_MAP built-in function in Table API (#20250)
e85c3038d90 is described below
commit e85c3038d901db3696112c1add3babcce0b0bcbc
Author: Luning (Lucas) Wang <[email protected]>
AuthorDate: Fri Jul 15 09:41:38 2022 +0800
[FLINK-28508][table][python] Support SPLIT_INDEX and STR_TO_MAP built-in
function in Table API (#20250)
---
docs/data/sql_functions.yml | 2 +
docs/data/sql_functions_zh.yml | 4 +-
flink-python/pyflink/table/expression.py | 23 ++++++++++-
.../pyflink/table/tests/test_expression.py | 3 ++
.../flink/table/api/internal/BaseExpressions.java | 48 ++++++++++++++++++++++
.../functions/BuiltInFunctionDefinitions.java | 29 +++++++++++++
.../expressions/converter/DirectConvertRule.java | 4 ++
.../planner/expressions/ScalarFunctionsTest.scala | 9 ++--
8 files changed, 116 insertions(+), 6 deletions(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index c3d0f08a9ed..35ab90cec49 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -366,8 +366,10 @@ string:
- sql: REVERSE(string)
description: Returns the reversed string. Returns NULL if string is NULL.
- sql: SPLIT_INDEX(string1, string2, integer1)
+ table: STRING1.splitIndex(STRING2, INTEGER1)
description: Splits string1 by the delimiter string2, returns the
integerth (zero-based) string of the split strings. Returns NULL if integer is
negative. Returns NULL if any of arguments is NULL.
- sql: STR_TO_MAP(string1[, string2, string3])
+ table: STRING1.strToMap([STRING2, STRING3])
description: |
Returns a map after splitting the string1 into key/value pairs using
delimiters. string2 is the pair delimiter, default is ','. And string3 is the
key-value delimiter, default is '='.
Both pair delimiter and key-value delimiter are treated as regular
expressions. So special characters (e.g. `<([{\^-=$!|]})?*+.>`) need to be
properly escaped before using as a delimiter literally.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 2f689977248..ce0f9549459 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -464,10 +464,12 @@ string:
- sql: REVERSE(string)
description: 返回反转的字符串。如果字符串为 `NULL`,则返回 `NULL`。
- sql: SPLIT_INDEX(string1, string2, integer1)
+ table: STRING1.splitIndex(STRING2, INTEGER1)
description: |
- 通过分隔符 string2 拆分 string1,返回拆分字符串的第 integer(从零开始)个字符串。如果整数为负,则返回 `NULL`。
+ 通过分隔符 string2 拆分 string1,返回分隔后这组字符串的第 integer(从零开始)个字符串。如果整数为负,则返回
`NULL`。
如果有任一参数为 `NULL`,则返回 `NULL`。
- sql: STR_TO_MAP(string1[, string2, string3])
+ table: STRING1.strToMap([STRING2, STRING3])
description: |
使用分隔符将 string1 拆分为键值对后返回一个 map。string2 是 pair 分隔符,默认为 ','。string3
是键值分隔符,默认为 '='。
pair 分隔符与键值分隔符均为正则表达式,当使用特殊字符作为分隔符时请提前进行转义,例如 `<([{\^-=$!|]})?*+.>`。
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index 8c3b3aa1211..fe48674f5b0 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -34,7 +34,6 @@ __all__ = [
'JsonQueryOnEmptyOrError'
]
-
_aggregation_doc = """
{op_desc}
@@ -161,7 +160,7 @@ def _make_aggregation_doc():
Expression.sum: "Returns the sum of the numeric field across all input
values. "
"If all values are null, null is returned.",
Expression.sum0: "Returns the sum of the numeric field across all
input values. "
- "If all values are null, 0 is returned.",
+ "If all values are null, 0 is returned.",
Expression.min: "Returns the minimum value of field across all input
values.",
Expression.max: "Returns the maximum value of field across all input
values.",
Expression.count: "Returns the number of input rows for which the
field is not null.",
@@ -1308,6 +1307,26 @@ class Expression(Generic[T]):
"""
return _binary_op("over")(self, alias)
+ def split_index(self, separator: Union[str, 'Expression[str]'],
+ index: Union[int, 'Expression[int]']) -> 'Expression[str]':
+ """
+ Split target string with custom separator and pick the index-th(start
with 0) result.
+ """
+ return _ternary_op("splitIndex")(self, separator, index)
+
+ def str_to_map(self, list_delimiter: Union[str, 'Expression[str]'] = None,
+ key_value_delimiter: Union[str, 'Expression[str]'] = None)
-> 'Expression[dict]':
+ """
+ Creates a map by parsing text. Split text into key-value pairs using
two delimiters. The
+ first delimiter separates pairs, and the second delimiter separates
key and value. Both
+ list_delimiter and key_value_delimiter are treated as regular
expressions.
+ Default delimiters are used: ',' as list_delimiter and '=' as
key_value_delimiter.
+ """
+ if list_delimiter is None or key_value_delimiter is None:
+ return _unary_op("strToMap")(self)
+ else:
+ return _ternary_op("strToMap")(self, list_delimiter,
key_value_delimiter)
+
# ---------------------------- temporal functions
----------------------------------
@property
diff --git a/flink-python/pyflink/table/tests/test_expression.py
b/flink-python/pyflink/table/tests/test_expression.py
index 4e2ab748365..bb201dcf22c 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -162,6 +162,9 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
self.assertEqual('rtrim(a)', str(expr1.rtrim))
self.assertEqual('repeat(a, 3)', str(expr1.repeat(3)))
self.assertEqual("over(a, 'w')", str(expr1.over('w')))
+ self.assertEqual("splitIndex(a, ',', 3)", str(expr1.split_index(',',
3)))
+ self.assertEqual("strToMap(a)", str(expr1.str_to_map()))
+ self.assertEqual("strToMap(a, ';', ':')", str(expr1.str_to_map(';',
':')))
# temporal functions
self.assertEqual('cast(a, DATE)', str(expr1.to_date))
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index c2df6850017..bcd69ba1ce5 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -155,9 +155,11 @@ import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIGN;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIMILAR;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SIN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SINH;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SPLIT_INDEX;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SQRT;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_POP;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STDDEV_SAMP;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.STR_TO_MAP;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SUBSTR;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.SUBSTRING;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.SUM;
@@ -1149,6 +1151,52 @@ public abstract class BaseExpressions<InType, OutType> {
return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(),
objectToExpression(n)));
}
+ /**
+ * Split target string with custom separator and pick the index-th(start
with 0) result.
+ *
+ * @param separator custom separator.
+ * @param index index of the result which you want.
+ * @return the string at the index of split results.
+ */
+ public OutType splitIndex(InType separator, InType index) {
+ return toApiSpecificExpression(
+ unresolvedCall(
+ SPLIT_INDEX,
+ toExpr(),
+ objectToExpression(separator),
+ objectToExpression(index)));
+ }
+
+ /**
+ * Creates a map by parsing text. Split text into key-value pairs using
two delimiters. The
+ * first delimiter separates pairs, and the second delimiter separates key
and value. If only
+ * one parameter is given, default delimiters are used: ',' as delimiter1
and '=' as delimiter2.
+ * Both delimiters are treated as regular expressions.
+ *
+ * @return the map
+ */
+ public OutType strToMap() {
+ return toApiSpecificExpression(unresolvedCall(STR_TO_MAP, toExpr()));
+ }
+
+ /**
+ * Creates a map by parsing text. Split text into key-value pairs using
two delimiters. The
+ * first delimiter separates pairs, and the second delimiter separates key
and value. Both
+ * {@code listDelimiter} and {@code keyValueDelimiter} are treated as
regular expressions.
+ *
+ * @param listDelimiter the delimiter to separates pairs
+ * @param keyValueDelimiter the delimiter to separates key and value
+ * @return the map
+ */
+ public OutType strToMap(InType listDelimiter, InType keyValueDelimiter) {
+ return toApiSpecificExpression(
+ unresolvedCall(
+ STR_TO_MAP,
+ toExpr(),
+ objectToExpression(listDelimiter),
+ objectToExpression(keyValueDelimiter)));
+ }
+
// Temporal operations
/** Parses a date string in the form "yyyy-MM-dd" to a SQL Date. */
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 7f5810dc2f3..b2c00f432c5 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -910,6 +910,35 @@ public final class BuiltInFunctionDefinitions {
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
.build();
+ public static final BuiltInFunctionDefinition SPLIT_INDEX =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("splitIndex")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ sequence(
+
logical(LogicalTypeFamily.CHARACTER_STRING),
+
logical(LogicalTypeFamily.CHARACTER_STRING),
+ logical(LogicalTypeRoot.INTEGER)))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+ .build();
+
+ public static final BuiltInFunctionDefinition STR_TO_MAP =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("strToMap")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ or(
+
sequence(logical(LogicalTypeFamily.CHARACTER_STRING)),
+ sequence(
+
logical(LogicalTypeFamily.CHARACTER_STRING),
+
logical(LogicalTypeFamily.CHARACTER_STRING),
+
logical(LogicalTypeFamily.CHARACTER_STRING))))
+ .outputTypeStrategy(
+ nullableIfArgs(
+ explicit(
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()))))
+ .build();
+
//
--------------------------------------------------------------------------------------------
// Math functions
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
index 85449ea2485..ba62cca0261 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/expressions/converter/DirectConvertRule.java
@@ -134,6 +134,10 @@ public class DirectConvertRule implements
CallExpressionConvertRule {
BuiltInFunctionDefinitions.REGEXP,
FlinkSqlOperatorTable.REGEXP);
DEFINITION_OPERATOR_MAP.put(
BuiltInFunctionDefinitions.REGEXP_REPLACE,
FlinkSqlOperatorTable.REGEXP_REPLACE);
+ DEFINITION_OPERATOR_MAP.put(
+ BuiltInFunctionDefinitions.SPLIT_INDEX,
FlinkSqlOperatorTable.SPLIT_INDEX);
+ DEFINITION_OPERATOR_MAP.put(
+ BuiltInFunctionDefinitions.STR_TO_MAP,
FlinkSqlOperatorTable.STR_TO_MAP);
// math functions
DEFINITION_OPERATOR_MAP.put(BuiltInFunctionDefinitions.MINUS,
FlinkSqlOperatorTable.MINUS);
diff --git
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
index 6d61ba668a1..761996663a9 100644
---
a/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
+++
b/flink-table/flink-table-planner/src/test/scala/org/apache/flink/table/planner/expressions/ScalarFunctionsTest.scala
@@ -859,7 +859,7 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
@Test
def testSplitIndex(): Unit = {
- testSqlApi("split_index(f38, 'I', 0)", "AQ")
+ testAllApis('f38.splitIndex("I", 0), "split_index(f38, 'I', 0)", "AQ")
testSqlApi("split_index(f38, 'I', 2)", "NULL")
testSqlApi("split_index(f38, 'I', -1)", "NULL")
testSqlApi("split_index(f38, CAST(null as VARCHAR), 0)", "NULL")
@@ -2589,8 +2589,11 @@ class ScalarFunctionsTest extends ScalarTypesTestBase {
@Test
def testStringToMap(): Unit = {
- testSqlApi("STR_TO_MAP('k1=v1,k2=v2')", "{k1=v1, k2=v2}")
- testSqlApi("STR_TO_MAP('k1:v1;k2: v2', ';', ':')", "{k1=v1, k2= v2}")
+ testAllApis("k1=v1,k2=v2".strToMap(), "STR_TO_MAP('k1=v1,k2=v2')",
"{k1=v1, k2=v2}")
+ testAllApis(
+ "k1:v1;k2: v2".strToMap(";", ":"),
+ "STR_TO_MAP('k1:v1;k2: v2', ';', ':')",
+ "{k1=v1, k2= v2}")
testSqlApi("STR_TO_MAP('k1$$v1|k2$$ v2', '\\|', '\\$\\$')", "{k1=v1, k2=
v2}")
// test empty