This is an automated email from the ASF dual-hosted git repository. lincoln pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 5bbdbb2889268e879b6aa06d2cdab02fd6154cb6 Author: dylanhz <[email protected]> AuthorDate: Tue Aug 6 10:33:01 2024 +0800 [FLINK-35965][table] Add the built-in function ENDSWITH --- docs/data/sql_functions.yml | 12 ++ docs/data/sql_functions_zh.yml | 12 ++ .../docs/reference/pyflink.table/expressions.rst | 1 + flink-python/pyflink/table/expression.py | 10 ++ .../pyflink/table/tests/test_expression.py | 1 + .../flink/table/api/internal/BaseExpressions.java | 14 +++ .../functions/BuiltInFunctionDefinitions.java | 21 ++++ .../planner/functions/StringFunctionsITCase.java | 137 +++++++++++++++++++++ .../runtime/functions/scalar/EndsWithFunction.java | 70 +++++++++++ 9 files changed, 278 insertions(+) diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml index 355f8c532d2..32a092dbcf1 100644 --- a/docs/data/sql_functions.yml +++ b/docs/data/sql_functions.yml @@ -340,6 +340,18 @@ string: `expr <BINARY | VARBINARY>, startExpr <BINARY | VARBINARY>` + Returns a `BOOLEAN`. `NULL` if any of the arguments are `NULL`. + - sql: ENDSWITH(expr, endExpr) + table: expr.endsWith(endExpr) + description: | + Returns whether expr ends with endExpr. If endExpr is empty, the result is true. + + expr and endExpr should have same type. + + `expr <CHAR | VARCHAR>, endExpr <CHAR | VARCHAR>` + + `expr <BINARY | VARBINARY>, endExpr <BINARY | VARBINARY>` + Returns a `BOOLEAN`. `NULL` if any of the arguments are `NULL`. - sql: SUBSTRING(string FROM integer1 [ FOR integer2 ]) table: | diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml index 2f199613c56..b269b12f156 100644 --- a/docs/data/sql_functions_zh.yml +++ b/docs/data/sql_functions_zh.yml @@ -409,6 +409,18 @@ string: `expr <BINARY | VARBINARY>, startExpr <BINARY | VARBINARY>` + 返回一个 `BOOLEAN`。如果任意参数为 `NULL`,则返回 `NULL`。 + - sql: ENDSWITH(expr, endExpr) + table: expr.endsWith(endExpr) + description: | + 判断 expr 是否以 endExpr 结尾。如果 endExpr 为空,则结果为 true。 + + expr 和 endExpr 应具有相同的类型。 + + `expr <CHAR | VARCHAR>, endExpr <CHAR | VARCHAR>` + + `expr <BINARY | VARBINARY>, endExpr <BINARY | VARBINARY>` + 返回一个 `BOOLEAN`。如果任意参数为 `NULL`,则返回 `NULL`。 - sql: SUBSTRING(string FROM integer1 [ FOR integer2 ]) table: | diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst b/flink-python/docs/reference/pyflink.table/expressions.rst index 6f498eb1936..e2761cd488c 100644 --- a/flink-python/docs/reference/pyflink.table/expressions.rst +++ b/flink-python/docs/reference/pyflink.table/expressions.rst @@ -161,6 +161,7 @@ string functions :toctree: api/ Expression.starts_with + Expression.ends_with Expression.substring Expression.substr Expression.trim_leading diff --git a/flink-python/pyflink/table/expression.py b/flink-python/pyflink/table/expression.py index 474c60a3041..092fba6cd9c 100644 --- a/flink-python/pyflink/table/expression.py +++ b/flink-python/pyflink/table/expression.py @@ -1038,6 +1038,16 @@ class Expression(Generic[T]): """ return _binary_op("startsWith")(self, start_expr) + def ends_with(self, end_expr) -> 'Expression': + """ + Returns whether expr ends with end_expr. If end_expr is empty, the result is true. + expr and end_expr should have same type. + + :param end_expr: A STRING or BINARY expression. + :return: A BOOLEAN. + """ + return _binary_op("endsWith")(self, end_expr) + def substring(self, begin_index: Union[int, 'Expression[int]'], length: Union[int, 'Expression[int]'] = None) -> 'Expression[str]': diff --git a/flink-python/pyflink/table/tests/test_expression.py b/flink-python/pyflink/table/tests/test_expression.py index f4e60be1812..77cdfc457d1 100644 --- a/flink-python/pyflink/table/tests/test_expression.py +++ b/flink-python/pyflink/table/tests/test_expression.py @@ -174,6 +174,7 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase): self.assertEqual('ELT(3, a, b, c)', str(lit(3).elt(expr1, expr2, expr3))) self.assertEqual("PRINTF('%d %s', a, b)", str(lit("%d %s").printf(expr1, expr2))) self.assertEqual("STARTSWITH(a, b)", str(expr1.starts_with(expr2))) + self.assertEqual("ENDSWITH(a, b)", str(expr1.ends_with(expr2))) # regexp functions self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2))) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java index 4df1657a496..96749905e6a 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java @@ -93,6 +93,7 @@ import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.DISTIN import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.DIVIDE; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ELT; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ENCODE; +import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.ENDS_WITH; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EQUALS; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EXP; import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.EXTRACT; @@ -864,6 +865,19 @@ public abstract class BaseExpressions<InType, OutType> { unresolvedCall(STARTS_WITH, toExpr(), objectToExpression(startExpr))); } + /** + * Returns whether {@code expr} ends with {@code endExpr}. If {@code endExpr} is empty, the + * result is true. <br> + * {@code expr} and {@code endExpr} should have same type. + * + * @param endExpr A STRING or BINARY expression. + * @return A BOOLEAN. + */ + public OutType endsWith(InType endExpr) { + return toApiSpecificExpression( + unresolvedCall(ENDS_WITH, toExpr(), objectToExpression(endExpr))); + } + /** * Creates a substring of the given string at given index for a given length. * diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java index 6970e22e5a3..bdc3a8ad6af 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java @@ -952,6 +952,27 @@ public final class BuiltInFunctionDefinitions { "org.apache.flink.table.runtime.functions.scalar.StartsWithFunction") .build(); + public static final BuiltInFunctionDefinition ENDS_WITH = + BuiltInFunctionDefinition.newBuilder() + .name("ENDSWITH") + .kind(SCALAR) + .inputTypeStrategy( + or( + sequence( + Arrays.asList("expr", "endExpr"), + Arrays.asList( + logical(LogicalTypeFamily.CHARACTER_STRING), + logical(LogicalTypeFamily.CHARACTER_STRING))), + sequence( + Arrays.asList("expr", "endExpr"), + Arrays.asList( + logical(LogicalTypeFamily.BINARY_STRING), + logical(LogicalTypeFamily.BINARY_STRING))))) + .outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN()))) + .runtimeClass( + "org.apache.flink.table.runtime.functions.scalar.EndsWithFunction") + .build(); + public static final BuiltInFunctionDefinition SUBSTRING = BuiltInFunctionDefinition.newBuilder() .name("substring") diff --git a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java index 0f6ec6e8360..a6d1190a0a6 100644 --- a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java +++ b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java @@ -41,6 +41,7 @@ class StringFunctionsITCase extends BuiltInFunctionTestBase { return Stream.of( bTrimTestCases(), eltTestCases(), + endsWithTestCases(), printfTestCases(), startsWithTestCases(), translateTestCases()) @@ -196,6 +197,142 @@ class StringFunctionsITCase extends BuiltInFunctionTestBase { "Index must be an integer starting from '0', but was '-1'.")); } + private Stream<TestSetSpec> endsWithTestCases() { + return Stream.of( + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ENDS_WITH, "StringData") + .onFieldsWithData(null, "www.apache.org", "", "in中文", "\uD83D\uDE00") + .andDataTypes( + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING(), + DataTypes.STRING()) + // null input + .testResult( + $("f0").endsWith("abc"), + "ENDSWITH(f0, 'abc')", + null, + DataTypes.BOOLEAN()) + .testResult( + $("f1").endsWith($("f0")), + "ENDSWITH(f1, f0)", + null, + DataTypes.BOOLEAN()) + // empty input + .testResult( + $("f2").endsWith("abc"), + "ENDSWITH(f2, 'abc')", + Boolean.FALSE, + DataTypes.BOOLEAN()) + .testResult( + $("f1").endsWith($("f2")), + "ENDSWITH(f1, f2)", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + lit("").endsWith(""), + "ENDSWITH('', '')", + Boolean.TRUE, + DataTypes.BOOLEAN().notNull()) + // normal cases + .testResult( + $("f1").endsWith("org"), + "ENDSWITH(f1, 'org')", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f1").endsWith("."), + "ENDSWITH(f1, '.')", + Boolean.FALSE, + DataTypes.BOOLEAN()) + .testResult( + $("f3").endsWith("n中文"), + "ENDSWITH(f3, 'n中文')", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f3").endsWith("中"), + "ENDSWITH(f3, '中')", + Boolean.FALSE, + DataTypes.BOOLEAN()) + .testResult( + $("f4").endsWith($("f4")), + "ENDSWITH(f4, f4)", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f4").endsWith("\uDE00"), + "ENDSWITH(f4, '\uDE00')", + Boolean.FALSE, + DataTypes.BOOLEAN()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ENDS_WITH, "byte[]") + .onFieldsWithData( + null, + new byte[] {1, 2, 3}, + new byte[0], + new byte[0], + new byte[] {2, 3}, + new byte[] {1}) + .andDataTypes( + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES(), + DataTypes.BYTES().notNull(), + DataTypes.BYTES(), + DataTypes.BYTES()) + // null input + .testResult( + $("f0").endsWith($("f1")), + "ENDSWITH(f0, f1)", + null, + DataTypes.BOOLEAN()) + .testResult( + $("f1").endsWith($("f0")), + "ENDSWITH(f1, f0)", + null, + DataTypes.BOOLEAN()) + // empty input + .testResult( + $("f2").endsWith($("f1")), + "ENDSWITH(f2, f1)", + Boolean.FALSE, + DataTypes.BOOLEAN()) + .testResult( + $("f1").endsWith($("f2")), + "ENDSWITH(f1, f2)", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f3").endsWith($("f3")), + "ENDSWITH(f3, f3)", + Boolean.TRUE, + DataTypes.BOOLEAN().notNull()) + // normal cases + .testResult( + $("f1").endsWith($("f4")), + "ENDSWITH(f1, f4)", + Boolean.TRUE, + DataTypes.BOOLEAN()) + .testResult( + $("f1").endsWith($("f5")), + "ENDSWITH(f1, f5)", + Boolean.FALSE, + DataTypes.BOOLEAN()), + TestSetSpec.forFunction(BuiltInFunctionDefinitions.ENDS_WITH, "Validation Error") + .onFieldsWithData("12345", "123".getBytes()) + .andDataTypes(DataTypes.STRING(), DataTypes.BYTES()) + .testTableApiValidationError( + $("f0").endsWith($("f1")), + "Invalid input arguments. Expected signatures are:\n" + + "ENDSWITH(expr <CHARACTER_STRING>, endExpr <CHARACTER_STRING>)\n" + + "ENDSWITH(expr <BINARY_STRING>, endExpr <BINARY_STRING>)") + .testSqlValidationError( + "ENDSWITH(f0, f1)", + "Invalid input arguments. Expected signatures are:\n" + + "ENDSWITH(expr <CHARACTER_STRING>, endExpr <CHARACTER_STRING>)\n" + + "ENDSWITH(expr <BINARY_STRING>, endExpr <BINARY_STRING>)")); + } + private Stream<TestSetSpec> printfTestCases() { return Stream.of( TestSetSpec.forFunction(BuiltInFunctionDefinitions.PRINTF) diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/EndsWithFunction.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/EndsWithFunction.java new file mode 100644 index 00000000000..f7ac2415c23 --- /dev/null +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/EndsWithFunction.java @@ -0,0 +1,70 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.runtime.functions.scalar; + +import org.apache.flink.annotation.Internal; +import org.apache.flink.table.data.StringData; +import org.apache.flink.table.data.binary.BinaryStringData; +import org.apache.flink.table.data.binary.BinaryStringDataUtil; +import org.apache.flink.table.functions.BuiltInFunctionDefinitions; +import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext; + +import javax.annotation.Nullable; + +/** Implementation of {@link BuiltInFunctionDefinitions#ENDS_WITH}. */ +@Internal +public class EndsWithFunction extends BuiltInScalarFunction { + + public EndsWithFunction(SpecializedContext context) { + super(BuiltInFunctionDefinitions.ENDS_WITH, context); + } + + public @Nullable Boolean eval(@Nullable StringData expr, @Nullable StringData endExpr) { + if (expr == null || endExpr == null) { + return null; + } + if (BinaryStringDataUtil.isEmpty((BinaryStringData) endExpr)) { + return true; + } + return ((BinaryStringData) expr).endsWith((BinaryStringData) endExpr); + } + + public @Nullable Boolean eval(@Nullable byte[] expr, @Nullable byte[] endExpr) { + if (expr == null || endExpr == null) { + return null; + } + if (endExpr.length == 0) { + return true; + } + return matchAtEnd(expr, endExpr); + } + + private static boolean matchAtEnd(byte[] source, byte[] target) { + int start = source.length - target.length; + if (start < 0) { + return false; + } + for (int i = start; i < source.length; i++) { + if (source[i] != target[i - start]) { + return false; + } + } + return true; + } +}
