This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 8189bba195326d899a9830d3a19b50cafa092666 Author: dylanhz <[email protected]> AuthorDate: Thu Aug 1 17:20:52 2024 +0800 [FLINK-35932][table] Add the built-in function REGEXP_COUNT --- docs/data/sql_functions.yml | 22 ++++-- docs/data/sql_functions_zh.yml | 22 ++++-- .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 11 +++ .../pyflink/table/tests/test_expression.py | 1 + .../flink/table/api/internal/BaseExpressions.java | 14 ++++ .../functions/BuiltInFunctionDefinitions.java | 15 ++++ .../planner/functions/RegexpFunctionsITCase.java | 83 +++++++++++++++++++++- .../functions/scalar/RegexpCountFunction.java | 60 ++++++++++++++++ 9 files changed, 214 insertions(+), 15 deletions(-) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 70823fc582c..29b0c27ebcd 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -337,16 +337,14 @@ string: - sql: REPLACE(string1, string2, string3) table: STRING1.replace(STRING2, STRING3) description: Returns a new string which replaces all the occurrences of STRING2 with STRING3 (non-overlapping) from STRING1. E.g., 'hello world'.replace('world', 'flink') returns 'hello flink'; 'ababab'.replace('abab', 'z') returns 'zab'. - - sql: TRANSLATE(expr, fromStr, toStr) - table: expr.translate(fromStr, toStr) + - sql: REGEXP_COUNT(str, regex) + table: str.regexpCount(regex) description: | - Translate an expr where all characters in fromStr have been replaced with those in toStr. + Returns the number of times str matches the regex pattern. regex must be a Java regular expression. - If toStr has a shorter length than fromStr, unmatched characters are removed. + `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>` - expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR> - - Returns a STRING of translated expr. + Returns an `INTEGER` representation of the number of matches. `NULL` if any of the arguments are `NULL` or regex is invalid. - sql: REGEXP_EXTRACT(string1, string2[, integer]) table: STRING1.regexpExtract(STRING2[, INTEGER1]) description: | @@ -368,6 +366,16 @@ string: `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | SMALLINT | INTEGER | BIGINT>` Returns an `ARRAY<STRING>` representation of all the matched substrings. `NULL` if any of the arguments are `NULL` or invalid. + - sql: TRANSLATE(expr, fromStr, toStr) + table: expr.translate(fromStr, toStr) + description: | + Translate an expr where all characters in fromStr have been replaced with those in toStr. + + If toStr has a shorter length than fromStr, unmatched characters are removed. + + `expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>` + + Returns a `STRING` of translated expr. - sql: INITCAP(string) table: STRING.initCap() description: Returns a new form of STRING with the first character of each word converted to uppercase and the rest characters to lowercase. Here a word means a sequences of alphanumeric characters. diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index f2629ca1096..76607687f17 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -409,16 +409,14 @@ string: 返回一个新字符串,它用 STRING3(非重叠)替换 STRING1 中所有出现的 STRING2。 例如 `'hello world'.replace('world', 'flink')` 返回 `'hello flink'`; `'ababab'.replace('abab', 'z')` 返回 `'zab'`。 - - sql: TRANSLATE(expr, fromStr, toStr) - table: expr.translate(fromStr, toStr) + - sql: REGEXP_COUNT(str, regex) + table: str.regexpCount(regex) description: | - 将 expr 中所有出现在 fromStr 之中的字符替换为 toStr 中的相应字符。 + 返回字符串 str 匹配正则表达式模式 regex 的次数。regex 必须是一个 Java 正则表达式。 - 如果 toStr 的长度短于 fromStr,则未匹配的字符将被移除。 + `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>` - expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR> - - 返回 STRING 格式的转换结果。 + 返回一个 `INTEGER` 表示匹配成功的次数。如果任何参数为 `NULL` 或 regex 非法,则返回 `NULL`。 - sql: REGEXP_EXTRACT(string1, string2[, integer]) table: STRING1.regexpExtract(STRING2[, INTEGER1]) description: | @@ -435,6 +433,16 @@ string: `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | SMALLINT | INTEGER | BIGINT>` 返回一个 `ARRAY<STRING>`,表示所有匹配的子串。如果任何参数为 `NULL`或非法,则返回 `NULL`。 + - sql: TRANSLATE(expr, fromStr, toStr) + table: expr.translate(fromStr, toStr) + description: | + 将 expr 中所有出现在 fromStr 之中的字符替换为 toStr 中的相应字符。 + + 如果 toStr 的长度短于 fromStr,则未匹配的字符将被移除。 + + `expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>` + + 返回 `STRING` 格式的转换结果。 - sql: INITCAP(string) table: STRING.initCap() description: | diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 60a5631e854..8698e884e1a 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -178,6 +178,7 @@ string functions Expression.rpad Expression.overlay Expression.regexp + Expression.regexp_count Expression.regexp_replace Expression.regexp_extract Expression.regexp_extract_all diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 1e924c508e1..5975a79502b 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1204,6 +1204,17 @@ class Expression(Generic[T]): """ return _binary_op("regexp")(self, regex) + def regexp_count(self, regex) -> 'Expression': + """ + Returns the number of times str matches the regex pattern. + regex must be a Java regular expression. + null if any of the arguments are null or regex is invalid. + + :param regex: A STRING expression with a matching pattern. + :return: An INTEGER representation of the number of matches. + """ + return _binary_op("regexpCount")(self, regex) + def regexp_replace(self, regex: Union[str, 'Expression[str]'], replacement: Union[str, 'Expression[str]']) -> 'Expression[str]': diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index 3af9c27732d..fb94a02ec7d 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -175,6 +175,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): # regexp functions self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2))) + self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2))) self.assertEqual('regexpExtract(a, b, 3)', str(expr1.regexp_extract(expr2, 3))) self.assertEqual('REGEXP_EXTRACT_ALL(a, b)', str(expr1.regexp_extract_all(expr2))) self.assertEqual('REGEXP_EXTRACT_ALL(a, b, 3)', str(expr1.regexp_extract_all(expr2, 3))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 7ea2848e348..5092937e7df 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -160,6 +160,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.PRINTF import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.PROCTIME; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.RADIANS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_COUNT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE; @@ -1136,6 +1137,19 @@ public abstract class BaseExpressions<InType, OutType> { return toApiSpecificExpression(unresolvedCall(REGEXP, toExpr(), objectToExpression(regex))); } + /** + * Returns the number of times {@code str} matches the {@code regex} pattern. {@code regex} must + * be a Java regular expression. + * + * @param regex A STRING expression with a matching pattern. + * @return An INTEGER representation of the number of matches. <br> + * null if any of the arguments are null or {@code regex} is invalid. + */ + public OutType regexpCount(InType regex) { + return toApiSpecificExpression( + unresolvedCall(REGEXP_COUNT, toExpr(), objectToExpression(regex))); + } + /** * Returns a string with all substrings that match the regular expression consecutively being * replaced. diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index a005aa938fb..e726051a4c4 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -1110,6 +1110,21 @@ public final class BuiltInFunctionDefinitions { .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING()))) .build(); + public static final BuiltInFunctionDefinition REGEXP_COUNT = + BuiltInFunctionDefinition.newBuilder() + .name("REGEXP_COUNT") + .kind(SCALAR) + .inputTypeStrategy( + sequence( + Arrays.asList("str", "regex"), + Arrays.asList( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING)))) + .outputTypeStrategy(explicit(DataTypes.INT())) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.RegexpCountFunction") + .build(); + public static final BuiltInFunctionDefinition REGEXP_EXTRACT = BuiltInFunctionDefinition.newBuilder() .name("regexpExtract") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java index 3c1efb5c0c2..3809f98637f 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java @@ -25,13 +25,94 @@ import java.util.stream.Stream; import static org.apache.flink.table.api.Expressions.$; import static org.apache.flink.table.api.Expressions.call; +import static org.apache.flink.table.api.Expressions.lit; /** Test Regexp functions correct behaviour. */ class RegexpFunctionsITCase extends BuiltInFunctionTestBase { @Override Stream<TestSetSpec> getTestSetSpecs() { - return Stream.of(regexpExtractTestCases(), regexpExtractAllTestCases()).flatMap(s -> s); + return Stream.of( + regexpCountTestCases(), + regexpExtractTestCases(), + regexpExtractAllTestCases()) + .flatMap(s -> s); + } + + private Stream<TestSetSpec> regexpCountTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_COUNT) + .onFieldsWithData(null, "abcdeabde") + .andDataTypes(DataTypes.STRING(), DataTypes.STRING()) + // null input + .testResult( + $("f0").regexpCount($("f1")), + "REGEXP_COUNT(f0, f1)", + null, + DataTypes.INT()) + .testResult( + $("f1").regexpCount($("f0")), + "REGEXP_COUNT(f1, f0)", + null, + DataTypes.INT()) + // invalid regexp + .testResult( + $("f1").regexpCount("("), + "REGEXP_COUNT(f1, '(')", + null, + DataTypes.INT()) + // normal cases + .testResult( + lit("hello world! Hello everyone!").regexpCount("Hello"), + "REGEXP_COUNT('hello world! Hello everyone!', 'Hello')", + 1, + DataTypes.INT()) + .testResult( + lit("abcabcabc").regexpCount("abcab"), + "REGEXP_COUNT('abcabcabc', 'abcab')", + 1, + DataTypes.INT()) + .testResult( + lit("abcd").regexpCount("z"), + "REGEXP_COUNT('abcd', 'z')", + 0, + DataTypes.INT()) + .testResult( + lit("^abc").regexpCount("\\^abc"), + "REGEXP_COUNT('^abc', '\\^abc')", + 1, + DataTypes.INT()) + .testResult( + lit("a.b.c.d").regexpCount("\\."), + "REGEXP_COUNT('a.b.c.d', '\\.')", + 3, + DataTypes.INT()) + .testResult( + lit("a*b*c*d").regexpCount("\\*"), + "REGEXP_COUNT('a*b*c*d', '\\*')", + 3, + DataTypes.INT()) + .testResult( + lit("abc123xyz456").regexpCount("\\d"), + "REGEXP_COUNT('abc123xyz456', '\\d')", + 6, + DataTypes.INT()) + .testResult( + lit("Helloworld! Hello everyone!").regexpCount("\\bHello\\b"), + "REGEXP_COUNT('Helloworld! Hello everyone!', '\\bHello\\b')", + 1, + DataTypes.INT()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_COUNT, "Validation Error") + .onFieldsWithData(1024) + .andDataTypes(DataTypes.INT()) + .testTableApiValidationError( + $("f0").regexpCount("1024"), + "Invalid input arguments. Expected signatures are:\n" + + "REGEXP_COUNT(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)") + .testSqlValidationError( + "REGEXP_COUNT(f0, '1024')", + "Invalid input arguments. Expected signatures are:\n" + + "REGEXP_COUNT(str <CHARACTER_STRING>, regex <CHARACTER_STRING>)")); } private Stream<TestSetSpec> regexpExtractTestCases() { diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpCountFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpCountFunction.java new file mode 100644 index 00000000000..48f9cdb731e --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpCountFunction.java @@ -0,0 +1,60 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction; + +import javax.annotation.Nullable; + +import java.util.regex.Matcher; +import java.util.regex.PatternSyntaxException; + +import static org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE; + +/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_COUNT}. */ +@Internal +public class RegexpCountFunction extends BuiltInScalarFunction { + + public RegexpCountFunction(SpecializedFunction.SpecializedContext context) { + super(BuiltInFunctionDefinitions.REGEXP_COUNT, context); + } + + public @Nullable Integer eval(@Nullable StringData str, @Nullable StringData regex) { + if (str == null || regex == null) { + return null; + } + + Matcher matcher; + try { + matcher = REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString()); + } catch (PatternSyntaxException e) { + return null; + } + + int count = 0; + while (matcher.find()) { + count++; + } + + return count; + } +}
