This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 5bbdbb2889268e879b6aa06d2cdab02fd6154cb6
Author: dylanhz <[email protected]>
AuthorDate: Tue Aug 6 10:33:01 2024 +0800

    [FLINK-35965][table] Add the built-in function ENDSWITH
---
 docs/data/sql_functions.yml                        |  12 ++
 docs/data/sql_functions_zh.yml                     |  12 ++
 .../docs/reference/pyflink.table/expressions.rst   |   1 +
 flink-python/pyflink/table/expression.py           |  10 ++
 .../pyflink/table/tests/test_expression.py         |   1 +
 .../flink/table/api/internal/BaseExpressions.java  |  14 +++
 .../functions/BuiltInFunctionDefinitions.java      |  21 ++++
 .../planner/functions/StringFunctionsITCase.java   | 137 +++++++++++++++++++++
 .../runtime/functions/scalar/EndsWithFunction.java |  70 +++++++++++
 9 files changed, 278 insertions(+)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 355f8c532d2..32a092dbcf1 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -340,6 +340,18 @@ string:
       
       `expr <BINARY | VARBINARY>, startExpr <BINARY | VARBINARY>`
       
+      Returns a `BOOLEAN`. `NULL` if any of the arguments are `NULL`.
+  - sql: ENDSWITH(expr, endExpr)
+    table: expr.endsWith(endExpr)
+    description: |
+      Returns whether expr ends with endExpr. If endExpr is empty, the result 
is true.
+      
+      expr and endExpr should have same type. 
+      
+      `expr <CHAR | VARCHAR>, endExpr <CHAR | VARCHAR>`
+      
+      `expr <BINARY | VARBINARY>, endExpr <BINARY | VARBINARY>`
+      
       Returns a `BOOLEAN`. `NULL` if any of the arguments are `NULL`.
   - sql: SUBSTRING(string FROM integer1 [ FOR integer2 ])
     table: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 2f199613c56..b269b12f156 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -409,6 +409,18 @@ string:
       
       `expr <BINARY | VARBINARY>, startExpr <BINARY | VARBINARY>`
       
+      返回一个 `BOOLEAN`。如果任意参数为 `NULL`,则返回 `NULL`。
+  - sql: ENDSWITH(expr, endExpr)
+    table: expr.endsWith(endExpr)
+    description: |
+      判断 expr 是否以 endExpr 结尾。如果 endExpr 为空,则结果为 true。
+      
+      expr 和 endExpr 应具有相同的类型。
+      
+      `expr <CHAR | VARCHAR>, endExpr <CHAR | VARCHAR>`
+      
+      `expr <BINARY | VARBINARY>, endExpr <BINARY | VARBINARY>`
+      
       返回一个 `BOOLEAN`。如果任意参数为 `NULL`,则返回 `NULL`。
   - sql: SUBSTRING(string FROM integer1 [ FOR integer2 ])
     table: |
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 6f498eb1936..e2761cd488c 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -161,6 +161,7 @@ string functions
     :toctree: api/
 
     Expression.starts_with
+    Expression.ends_with
     Expression.substring
     Expression.substr
     Expression.trim_leading
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 474c60a3041..092fba6cd9c 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1038,6 +1038,16 @@ class Expression(Generic[T]):
         """
         return _binary_op("startsWith")(self, start_expr)
 
+    def ends_with(self, end_expr) -> 'Expression':
+        """
+        Returns whether expr ends with end_expr. If end_expr is empty, the 
result is true.
+        expr and end_expr should have same type.
+
+        :param end_expr: A STRING or BINARY expression.
+        :return: A BOOLEAN.
+        """
+        return _binary_op("endsWith")(self, end_expr)
+
     def substring(self,
                   begin_index: Union[int, 'Expression[int]'],
                   length: Union[int, 'Expression[int]'] = None) -> 
'Expression[str]':
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index f4e60be1812..77cdfc457d1 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -174,6 +174,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual('ELT(3, a, b, c)', str(lit(3).elt(expr1, expr2, 
expr3)))
         self.assertEqual("PRINTF('%d %s', a, b)", str(lit("%d 
%s").printf(expr1, expr2)))
         self.assertEqual("STARTSWITH(a, b)", str(expr1.starts_with(expr2)))
+        self.assertEqual("ENDSWITH(a, b)", str(expr1.ends_with(expr2)))
 
         # regexp functions
         self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 4df1657a496..96749905e6a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -93,6 +93,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.DISTIN
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.DIVIDE;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ELT;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ENCODE;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ENDS_WITH;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.EQUALS;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EXP;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.EXTRACT;
@@ -864,6 +865,19 @@ public abstract class BaseExpressions<InType, OutType> {
                 unresolvedCall(STARTS_WITH, toExpr(), 
objectToExpression(startExpr)));
     }
 
+    /**
+     * Returns whether {@code expr} ends with {@code endExpr}. If {@code 
endExpr} is empty, the
+     * result is true. <br>
+     * {@code expr} and {@code endExpr} should have same type.
+     *
+     * @param endExpr A STRING or BINARY expression.
+     * @return A BOOLEAN.
+     */
+    public OutType endsWith(InType endExpr) {
+        return toApiSpecificExpression(
+                unresolvedCall(ENDS_WITH, toExpr(), 
objectToExpression(endExpr)));
+    }
+
     /**
      * Creates a substring of the given string at given index for a given 
length.
      *
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index 6970e22e5a3..bdc3a8ad6af 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -952,6 +952,27 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.StartsWithFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition ENDS_WITH =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("ENDSWITH")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            or(
+                                    sequence(
+                                            Arrays.asList("expr", "endExpr"),
+                                            Arrays.asList(
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING))),
+                                    sequence(
+                                            Arrays.asList("expr", "endExpr"),
+                                            Arrays.asList(
+                                                    
logical(LogicalTypeFamily.BINARY_STRING),
+                                                    
logical(LogicalTypeFamily.BINARY_STRING)))))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.EndsWithFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition SUBSTRING =
             BuiltInFunctionDefinition.newBuilder()
                     .name("substring")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
index 0f6ec6e8360..a6d1190a0a6 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
@@ -41,6 +41,7 @@ class StringFunctionsITCase extends BuiltInFunctionTestBase {
         return Stream.of(
                         bTrimTestCases(),
                         eltTestCases(),
+                        endsWithTestCases(),
                         printfTestCases(),
                         startsWithTestCases(),
                         translateTestCases())
@@ -196,6 +197,142 @@ class StringFunctionsITCase extends 
BuiltInFunctionTestBase {
                                 "Index must be an integer starting from '0', 
but was '-1'."));
     }
 
+    private Stream<TestSetSpec> endsWithTestCases() {
+        return Stream.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.ENDS_WITH, 
"StringData")
+                        .onFieldsWithData(null, "www.apache.org", "", "in中文", 
"\uD83D\uDE00")
+                        .andDataTypes(
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING(),
+                                DataTypes.STRING())
+                        // null input
+                        .testResult(
+                                $("f0").endsWith("abc"),
+                                "ENDSWITH(f0, 'abc')",
+                                null,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").endsWith($("f0")),
+                                "ENDSWITH(f1, f0)",
+                                null,
+                                DataTypes.BOOLEAN())
+                        // empty input
+                        .testResult(
+                                $("f2").endsWith("abc"),
+                                "ENDSWITH(f2, 'abc')",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").endsWith($("f2")),
+                                "ENDSWITH(f1, f2)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                lit("").endsWith(""),
+                                "ENDSWITH('', '')",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN().notNull())
+                        // normal cases
+                        .testResult(
+                                $("f1").endsWith("org"),
+                                "ENDSWITH(f1, 'org')",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").endsWith("."),
+                                "ENDSWITH(f1, '.')",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f3").endsWith("n中文"),
+                                "ENDSWITH(f3, 'n中文')",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f3").endsWith("中"),
+                                "ENDSWITH(f3, '中')",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f4").endsWith($("f4")),
+                                "ENDSWITH(f4, f4)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f4").endsWith("\uDE00"),
+                                "ENDSWITH(f4, '\uDE00')",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN()),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.ENDS_WITH, 
"byte[]")
+                        .onFieldsWithData(
+                                null,
+                                new byte[] {1, 2, 3},
+                                new byte[0],
+                                new byte[0],
+                                new byte[] {2, 3},
+                                new byte[] {1})
+                        .andDataTypes(
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES().notNull(),
+                                DataTypes.BYTES(),
+                                DataTypes.BYTES())
+                        // null input
+                        .testResult(
+                                $("f0").endsWith($("f1")),
+                                "ENDSWITH(f0, f1)",
+                                null,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").endsWith($("f0")),
+                                "ENDSWITH(f1, f0)",
+                                null,
+                                DataTypes.BOOLEAN())
+                        // empty input
+                        .testResult(
+                                $("f2").endsWith($("f1")),
+                                "ENDSWITH(f2, f1)",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").endsWith($("f2")),
+                                "ENDSWITH(f1, f2)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f3").endsWith($("f3")),
+                                "ENDSWITH(f3, f3)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN().notNull())
+                        // normal cases
+                        .testResult(
+                                $("f1").endsWith($("f4")),
+                                "ENDSWITH(f1, f4)",
+                                Boolean.TRUE,
+                                DataTypes.BOOLEAN())
+                        .testResult(
+                                $("f1").endsWith($("f5")),
+                                "ENDSWITH(f1, f5)",
+                                Boolean.FALSE,
+                                DataTypes.BOOLEAN()),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.ENDS_WITH, 
"Validation Error")
+                        .onFieldsWithData("12345", "123".getBytes())
+                        .andDataTypes(DataTypes.STRING(), DataTypes.BYTES())
+                        .testTableApiValidationError(
+                                $("f0").endsWith($("f1")),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "ENDSWITH(expr <CHARACTER_STRING>, 
endExpr <CHARACTER_STRING>)\n"
+                                        + "ENDSWITH(expr <BINARY_STRING>, 
endExpr <BINARY_STRING>)")
+                        .testSqlValidationError(
+                                "ENDSWITH(f0, f1)",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "ENDSWITH(expr <CHARACTER_STRING>, 
endExpr <CHARACTER_STRING>)\n"
+                                        + "ENDSWITH(expr <BINARY_STRING>, 
endExpr <BINARY_STRING>)"));
+    }
+
     private Stream<TestSetSpec> printfTestCases() {
         return Stream.of(
                 TestSetSpec.forFunction(BuiltInFunctionDefinitions.PRINTF)
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/EndsWithFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/EndsWithFunction.java
new file mode 100644
index 00000000000..f7ac2415c23
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/EndsWithFunction.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#ENDS_WITH}. */
+@Internal
+public class EndsWithFunction extends BuiltInScalarFunction {
+
+    public EndsWithFunction(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.ENDS_WITH, context);
+    }
+
+    public @Nullable Boolean eval(@Nullable StringData expr, @Nullable 
StringData endExpr) {
+        if (expr == null || endExpr == null) {
+            return null;
+        }
+        if (BinaryStringDataUtil.isEmpty((BinaryStringData) endExpr)) {
+            return true;
+        }
+        return ((BinaryStringData) expr).endsWith((BinaryStringData) endExpr);
+    }
+
+    public @Nullable Boolean eval(@Nullable byte[] expr, @Nullable byte[] 
endExpr) {
+        if (expr == null || endExpr == null) {
+            return null;
+        }
+        if (endExpr.length == 0) {
+            return true;
+        }
+        return matchAtEnd(expr, endExpr);
+    }
+
+    private static boolean matchAtEnd(byte[] source, byte[] target) {
+        int start = source.length - target.length;
+        if (start < 0) {
+            return false;
+        }
+        for (int i = start; i < source.length; i++) {
+            if (source[i] != target[i - start]) {
+                return false;
+            }
+        }
+        return true;
+    }
+}

Reply via email to