This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 8189bba195326d899a9830d3a19b50cafa092666
Author: dylanhz <[email protected]>
AuthorDate: Thu Aug 1 17:20:52 2024 +0800

    [FLINK-35932][table] Add the built-in function REGEXP_COUNT
---
 docs/data/sql_functions.yml                        | 22 ++++--
 docs/data/sql_functions_zh.yml                     | 22 ++++--
 .../docs/reference/pyflink.table/expressions.rst   |  1 +
 flink-python/pyflink/table/expression.py           | 11 +++
 .../pyflink/table/tests/test_expression.py         |  1 +
 .../flink/table/api/internal/BaseExpressions.java  | 14 ++++
 .../functions/BuiltInFunctionDefinitions.java      | 15 ++++
 .../planner/functions/RegexpFunctionsITCase.java   | 83 +++++++++++++++++++++-
 .../functions/scalar/RegexpCountFunction.java      | 60 ++++++++++++++++
 9 files changed, 214 insertions(+), 15 deletions(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 70823fc582c..29b0c27ebcd 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -337,16 +337,14 @@ string:
   - sql: REPLACE(string1, string2, string3)
     table: STRING1.replace(STRING2, STRING3)
     description: Returns a new string which replaces all the occurrences of 
STRING2 with STRING3 (non-overlapping) from STRING1. E.g., 'hello 
world'.replace('world', 'flink') returns 'hello flink'; 
'ababab'.replace('abab', 'z') returns 'zab'.
-  - sql: TRANSLATE(expr, fromStr, toStr)
-    table: expr.translate(fromStr, toStr)
+  - sql: REGEXP_COUNT(str, regex)
+    table: str.regexpCount(regex)
     description: |
-      Translate an expr where all characters in fromStr have been replaced 
with those in toStr.
+      Returns the number of times str matches the regex pattern. regex must be 
a Java regular expression.
       
-      If toStr has a shorter length than fromStr, unmatched characters are 
removed.
+      `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
       
-      expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>
-      
-      Returns a STRING of translated expr.
+      Returns an `INTEGER` representation of the number of matches. `NULL` if 
any of the arguments are `NULL` or regex is invalid.
   - sql: REGEXP_EXTRACT(string1, string2[, integer])
     table: STRING1.regexpExtract(STRING2[, INTEGER1])
     description: |
@@ -368,6 +366,16 @@ string:
       `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | 
SMALLINT | INTEGER | BIGINT>`
       
       Returns an `ARRAY<STRING>` representation of all the matched substrings. 
`NULL` if any of the arguments are `NULL` or invalid.
+  - sql: TRANSLATE(expr, fromStr, toStr)
+    table: expr.translate(fromStr, toStr)
+    description: |
+      Translate an expr where all characters in fromStr have been replaced 
with those in toStr.
+
+      If toStr has a shorter length than fromStr, unmatched characters are 
removed.
+
+      `expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>`
+
+      Returns a `STRING` of translated expr.
   - sql: INITCAP(string)
     table: STRING.initCap()
     description: Returns a new form of STRING with the first character of each 
word converted to uppercase and the rest characters to lowercase. Here a word 
means a sequences of alphanumeric characters.
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index f2629ca1096..76607687f17 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -409,16 +409,14 @@ string:
       返回一个新字符串,它用 STRING3(非重叠)替换 STRING1 中所有出现的 STRING2。
       例如 `'hello world'.replace('world', 'flink')` 返回 `'hello flink'`;
       `'ababab'.replace('abab', 'z')` 返回 `'zab'`。
-  - sql: TRANSLATE(expr, fromStr, toStr)
-    table: expr.translate(fromStr, toStr)
+  - sql: REGEXP_COUNT(str, regex)
+    table: str.regexpCount(regex)
     description: |
-      将 expr 中所有出现在 fromStr 之中的字符替换为 toStr 中的相应字符。
+      返回字符串 str 匹配正则表达式模式 regex 的次数。regex 必须是一个 Java 正则表达式。
       
-      如果 toStr 的长度短于 fromStr,则未匹配的字符将被移除。
+      `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>`
       
-      expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>
-      
-      返回 STRING 格式的转换结果。
+      返回一个 `INTEGER` 表示匹配成功的次数。如果任何参数为 `NULL` 或 regex 非法,则返回 `NULL`。
   - sql: REGEXP_EXTRACT(string1, string2[, integer])
     table: STRING1.regexpExtract(STRING2[, INTEGER1])
     description: |
@@ -435,6 +433,16 @@ string:
       `str <CHAR | VARCHAR>, regex <CHAR | VARCHAR>, extractIndex <TINYINT | 
SMALLINT | INTEGER | BIGINT>`
       
       返回一个 `ARRAY<STRING>`,表示所有匹配的子串。如果任何参数为 `NULL`或非法,则返回 `NULL`。
+  - sql: TRANSLATE(expr, fromStr, toStr)
+    table: expr.translate(fromStr, toStr)
+    description: |
+      将 expr 中所有出现在 fromStr 之中的字符替换为 toStr 中的相应字符。
+
+      如果 toStr 的长度短于 fromStr,则未匹配的字符将被移除。
+
+      `expr <CHAR | VARCHAR>, fromStr <CHAR | VARCHAR>, toStr <CHAR | VARCHAR>`
+
+      返回 `STRING` 格式的转换结果。
   - sql: INITCAP(string)
     table: STRING.initCap()
     description: |
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 60a5631e854..8698e884e1a 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -178,6 +178,7 @@ string functions
     Expression.rpad
     Expression.overlay
     Expression.regexp
+    Expression.regexp_count
     Expression.regexp_replace
     Expression.regexp_extract
     Expression.regexp_extract_all
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 1e924c508e1..5975a79502b 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1204,6 +1204,17 @@ class Expression(Generic[T]):
         """
         return _binary_op("regexp")(self, regex)
 
+    def regexp_count(self, regex) -> 'Expression':
+        """
+        Returns the number of times str matches the regex pattern.
+        regex must be a Java regular expression.
+        null if any of the arguments are null or regex is invalid.
+
+        :param regex: A STRING expression with a matching pattern.
+        :return: An INTEGER representation of the number of matches.
+        """
+        return _binary_op("regexpCount")(self, regex)
+
     def regexp_replace(self,
                        regex: Union[str, 'Expression[str]'],
                        replacement: Union[str, 'Expression[str]']) -> 
'Expression[str]':
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index 3af9c27732d..fb94a02ec7d 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -175,6 +175,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
 
         # regexp functions
         self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
+        self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2)))
         self.assertEqual('regexpExtract(a, b, 3)', 
str(expr1.regexp_extract(expr2, 3)))
         self.assertEqual('REGEXP_EXTRACT_ALL(a, b)', 
str(expr1.regexp_extract_all(expr2)))
         self.assertEqual('REGEXP_EXTRACT_ALL(a, b, 3)', 
str(expr1.regexp_extract_all(expr2, 3)))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 7ea2848e348..5092937e7df 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -160,6 +160,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.PRINTF
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.PROCTIME;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.RADIANS;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_COUNT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_EXTRACT_ALL;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.REGEXP_REPLACE;
@@ -1136,6 +1137,19 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(REGEXP, toExpr(), 
objectToExpression(regex)));
     }
 
+    /**
+     * Returns the number of times {@code str} matches the {@code regex} 
pattern. {@code regex} must
+     * be a Java regular expression.
+     *
+     * @param regex A STRING expression with a matching pattern.
+     * @return An INTEGER representation of the number of matches. <br>
+     *     null if any of the arguments are null or {@code regex} is invalid.
+     */
+    public OutType regexpCount(InType regex) {
+        return toApiSpecificExpression(
+                unresolvedCall(REGEXP_COUNT, toExpr(), 
objectToExpression(regex)));
+    }
+
     /**
      * Returns a string with all substrings that match the regular expression 
consecutively being
      * replaced.
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index a005aa938fb..e726051a4c4 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1110,6 +1110,21 @@ public final class BuiltInFunctionDefinitions {
                     
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
                     .build();
 
+    public static final BuiltInFunctionDefinition REGEXP_COUNT =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("REGEXP_COUNT")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            sequence(
+                                    Arrays.asList("str", "regex"),
+                                    Arrays.asList(
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                            
logical(LogicalTypeFamily.CHARACTER_STRING))))
+                    .outputTypeStrategy(explicit(DataTypes.INT()))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.RegexpCountFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition REGEXP_EXTRACT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("regexpExtract")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
index 3c1efb5c0c2..3809f98637f 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/RegexpFunctionsITCase.java
@@ -25,13 +25,94 @@ import java.util.stream.Stream;
 
 import static org.apache.flink.table.api.Expressions.$;
 import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.lit;
 
 /** Test Regexp functions correct behaviour. */
 class RegexpFunctionsITCase extends BuiltInFunctionTestBase {
 
     @Override
     Stream<TestSetSpec> getTestSetSpecs() {
-        return Stream.of(regexpExtractTestCases(), 
regexpExtractAllTestCases()).flatMap(s -> s);
+        return Stream.of(
+                        regexpCountTestCases(),
+                        regexpExtractTestCases(),
+                        regexpExtractAllTestCases())
+                .flatMap(s -> s);
+    }
+
+    private Stream<TestSetSpec> regexpCountTestCases() {
+        return Stream.of(
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_COUNT)
+                        .onFieldsWithData(null, "abcdeabde")
+                        .andDataTypes(DataTypes.STRING(), DataTypes.STRING())
+                        // null input
+                        .testResult(
+                                $("f0").regexpCount($("f1")),
+                                "REGEXP_COUNT(f0, f1)",
+                                null,
+                                DataTypes.INT())
+                        .testResult(
+                                $("f1").regexpCount($("f0")),
+                                "REGEXP_COUNT(f1, f0)",
+                                null,
+                                DataTypes.INT())
+                        // invalid regexp
+                        .testResult(
+                                $("f1").regexpCount("("),
+                                "REGEXP_COUNT(f1, '(')",
+                                null,
+                                DataTypes.INT())
+                        // normal cases
+                        .testResult(
+                                lit("hello world! Hello 
everyone!").regexpCount("Hello"),
+                                "REGEXP_COUNT('hello world! Hello everyone!', 
'Hello')",
+                                1,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("abcabcabc").regexpCount("abcab"),
+                                "REGEXP_COUNT('abcabcabc', 'abcab')",
+                                1,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("abcd").regexpCount("z"),
+                                "REGEXP_COUNT('abcd', 'z')",
+                                0,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("^abc").regexpCount("\\^abc"),
+                                "REGEXP_COUNT('^abc', '\\^abc')",
+                                1,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("a.b.c.d").regexpCount("\\."),
+                                "REGEXP_COUNT('a.b.c.d', '\\.')",
+                                3,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("a*b*c*d").regexpCount("\\*"),
+                                "REGEXP_COUNT('a*b*c*d', '\\*')",
+                                3,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("abc123xyz456").regexpCount("\\d"),
+                                "REGEXP_COUNT('abc123xyz456', '\\d')",
+                                6,
+                                DataTypes.INT())
+                        .testResult(
+                                lit("Helloworld! Hello 
everyone!").regexpCount("\\bHello\\b"),
+                                "REGEXP_COUNT('Helloworld! Hello everyone!', 
'\\bHello\\b')",
+                                1,
+                                DataTypes.INT()),
+                
TestSetSpec.forFunction(BuiltInFunctionDefinitions.REGEXP_COUNT, "Validation 
Error")
+                        .onFieldsWithData(1024)
+                        .andDataTypes(DataTypes.INT())
+                        .testTableApiValidationError(
+                                $("f0").regexpCount("1024"),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "REGEXP_COUNT(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>)")
+                        .testSqlValidationError(
+                                "REGEXP_COUNT(f0, '1024')",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "REGEXP_COUNT(str 
<CHARACTER_STRING>, regex <CHARACTER_STRING>)"));
     }
 
     private Stream<TestSetSpec> regexpExtractTestCases() {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpCountFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpCountFunction.java
new file mode 100644
index 00000000000..48f9cdb731e
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/RegexpCountFunction.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction;
+
+import javax.annotation.Nullable;
+
+import java.util.regex.Matcher;
+import java.util.regex.PatternSyntaxException;
+
+import static 
org.apache.flink.table.runtime.functions.SqlFunctionUtils.REGEXP_PATTERN_CACHE;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#REGEXP_COUNT}. */
+@Internal
+public class RegexpCountFunction extends BuiltInScalarFunction {
+
+    public RegexpCountFunction(SpecializedFunction.SpecializedContext context) 
{
+        super(BuiltInFunctionDefinitions.REGEXP_COUNT, context);
+    }
+
+    public @Nullable Integer eval(@Nullable StringData str, @Nullable 
StringData regex) {
+        if (str == null || regex == null) {
+            return null;
+        }
+
+        Matcher matcher;
+        try {
+            matcher = 
REGEXP_PATTERN_CACHE.get(regex.toString()).matcher(str.toString());
+        } catch (PatternSyntaxException e) {
+            return null;
+        }
+
+        int count = 0;
+        while (matcher.find()) {
+            count++;
+        }
+
+        return count;
+    }
+}

Reply via email to