This is an automated email from the ASF dual-hosted git repository.

ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 72ebee2eb45 [FLINK-35910][table] Add the built-in function BTRIM
72ebee2eb45 is described below

commit 72ebee2eb45f598ebd88422f716ae6da93ddee5d
Author: dylanhz <[email protected]>
AuthorDate: Thu Aug 8 10:32:24 2024 +0800

    [FLINK-35910][table] Add the built-in function BTRIM
    
    This closes #25127
    
    ---------
    
    Co-authored-by: Ron <[email protected]>
---
 docs/data/sql_functions.yml                        |  6 ++
 docs/data/sql_functions_zh.yml                     |  6 ++
 .../docs/reference/pyflink.table/expressions.rst   |  1 +
 flink-python/pyflink/table/expression.py           | 10 +++
 .../flink/table/api/internal/BaseExpressions.java  | 12 ++++
 .../functions/BuiltInFunctionDefinitions.java      | 19 ++++++
 .../planner/functions/StringFunctionsITCase.java   | 74 +++++++++++++++++++++-
 .../runtime/functions/scalar/BTrimFunction.java    | 51 +++++++++++++++
 8 files changed, 178 insertions(+), 1 deletion(-)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 2cd2a3c415f..8dca117fa7b 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -274,6 +274,12 @@ string:
   - sql: RTRIM(string)
     table: STRING.rtrim()
     description: Returns a string that removes the right whitespaces from 
STRING. E.g., 'This is a test String. '.rtrim() returns "This is a test 
String.".
+  - sql: BTRIM(str[, trimStr])
+    table: str.btrim([trimStr])
+    description: |
+      Removes any leading and trailing characters within trimStr from str. 
trimStr is set to whitespace by default.
+      str <CHAR | VARCHAR>, trimStr <CHAR | VARCHAR>
+      Returns a STRING representation of the trimmed str. `NULL` if any of the 
arguments are `NULL`.
   - sql: REPEAT(string, int)
     table: STRING.repeat(INT)
     description: Returns a string that repeats the base string integer times. 
E.g., REPEAT('This is a test String.', 2) returns "This is a test String.This 
is a test String.".
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 192a06c302a..ad8451b13f2 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -340,6 +340,12 @@ string:
     description: |
       返回从 STRING 中删除右边空格的字符串。
       例如 `'This is a test String. '.rtrim()` 返回 `'This is a test String.'`。
+  - sql: BTRIM(str[, trimStr])
+    table: str.btrim([trimStr])
+    description: |
+      从 str 的左边和右边删除 trimStr 中的字符。trimStr 默认设置为空格。      
+      str <CHAR | VARCHAR>, trimStr <CHAR | VARCHAR>
+      返回一个 STRING 格式的裁剪后的 str。如果任何参数为 `NULL`,则返回 `NULL`。
   - sql: REPEAT(string, int)
     table: STRING.repeat(INT)
     description: |
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst 
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 3a41b16df26..57c8ed314c8 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -194,6 +194,7 @@ string functions
     Expression.parse_url
     Expression.ltrim
     Expression.rtrim
+    Expression.btrim
     Expression.repeat
     Expression.over
     Expression.reverse
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index ebaed656d23..a8500836514 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1322,6 +1322,16 @@ class Expression(Generic[T]):
         """
         return _unary_op("rtrim")(self)
 
+    def btrim(self, trim_str=None) -> 'Expression':
+        """
+        Removes any leading and trailing characters within trim_str from str.
+        trim_str is set to whitespace by default.
+        """
+        if trim_str is None:
+            return _unary_op("btrim")(self)
+        else:
+            return _binary_op("btrim")(self, trim_str)
+
     def repeat(self, n: Union[int, 'Expression[int]']) -> 'Expression[str]':
         """
         Returns a string that repeats the base string n times.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 6dfcaf93f0d..1a201d99547 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -76,6 +76,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ATAN;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AVG;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BETWEEN;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CEIL;
@@ -1256,6 +1257,17 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(RTRIM, toExpr()));
     }
 
+    /** Returns a string that removes the left and right whitespaces from the 
given string. */
+    public OutType btrim() {
+        return toApiSpecificExpression(unresolvedCall(BTRIM, toExpr()));
+    }
+
+    /** Returns a string that removes the left and right chars in trimStr from 
the given string. */
+    public OutType btrim(InType trimStr) {
+        return toApiSpecificExpression(
+                unresolvedCall(BTRIM, toExpr(), objectToExpression(trimStr)));
+    }
+
     /** Returns a string that repeats the base string n times. */
     public OutType repeat(InType n) {
         return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(), 
objectToExpression(n)));
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index acbad06ef10..f3c95330cbc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1279,6 +1279,25 @@ public final class BuiltInFunctionDefinitions {
                     
.outputTypeStrategy(nullableIfArgs(varyingString(argument(0))))
                     .build();
 
+    public static final BuiltInFunctionDefinition BTRIM =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("BTRIM")
+                    .kind(SCALAR)
+                    .inputTypeStrategy(
+                            or(
+                                    sequence(
+                                            Arrays.asList("str"),
+                                            Arrays.asList(
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING))),
+                                    sequence(
+                                            Arrays.asList("str", "trimStr"),
+                                            Arrays.asList(
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING),
+                                                    
logical(LogicalTypeFamily.CHARACTER_STRING)))))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+                    
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.BTrimFunction")
+                    .build();
+
     public static final BuiltInFunctionDefinition REPEAT =
             BuiltInFunctionDefinition.newBuilder()
                     .name("repeat")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
index e21763e29ff..ade65260777 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
@@ -32,7 +32,79 @@ class StringFunctionsITCase extends BuiltInFunctionTestBase {
 
     @Override
     Stream<TestSetSpec> getTestSetSpecs() {
-        return Stream.concat(regexpExtractTestCases(), translateTestCases());
+        return Stream.of(bTrimTestCases(), regexpExtractTestCases(), 
translateTestCases())
+                .flatMap(s -> s);
+    }
+
+    private Stream<TestSetSpec> bTrimTestCases() {
+        return Stream.of(
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.BTRIM)
+                        .onFieldsWithData(null, "  \uD83D\uDE00www.apache.org  
\f   ")
+                        .andDataTypes(DataTypes.STRING(), DataTypes.STRING())
+                        // null input
+                        .testResult($("f0").btrim(), "BTRIM(f0)", null, 
DataTypes.STRING())
+                        .testResult(
+                                $("f0").btrim($("f1")), "BTRIM(f0, f1)", null, 
DataTypes.STRING())
+                        .testResult(
+                                $("f1").btrim($("f0")), "BTRIM(f1, f0)", null, 
DataTypes.STRING())
+                        // special chars
+                        .testResult(
+                                $("f1").btrim(" \f\uD83D\uDE00"),
+                                "BTRIM(f1, ' \f\uD83D\uDE00')",
+                                "www.apache.org",
+                                DataTypes.STRING())
+                        .testResult(
+                                $("f1").btrim("\uD83D\uDE00 "),
+                                "BTRIM(f1, '\uD83D\uDE00 ')",
+                                "www.apache.org  \f",
+                                DataTypes.STRING())
+                        // return type
+                        .testResult(
+                                lit("  www.apache.org  ").btrim(),
+                                "BTRIM('  www.apache.org  ')",
+                                "www.apache.org",
+                                DataTypes.STRING().notNull())
+                        // normal cases
+                        .testResult(
+                                lit("www.apache.org").btrim("a"),
+                                "BTRIM('www.apache.org', 'a')",
+                                "www.apache.org",
+                                DataTypes.STRING().notNull())
+                        .testResult(
+                                $("f1").btrim(),
+                                "BTRIM(f1)",
+                                "\uD83D\uDE00www.apache.org  \f",
+                                DataTypes.STRING())
+                        .testResult(
+                                $("f1").btrim("\f"),
+                                "BTRIM(f1, '\f')",
+                                "  \uD83D\uDE00www.apache.org  \f   ",
+                                DataTypes.STRING())
+                        .testResult(
+                                $("f1").btrim($("f1")), "BTRIM(f1, f1)", "", 
DataTypes.STRING()),
+                TestSetSpec.forFunction(BuiltInFunctionDefinitions.BTRIM, 
"Validation Error")
+                        .onFieldsWithData(100, "123")
+                        .andDataTypes(DataTypes.INT(), DataTypes.STRING())
+                        .testTableApiValidationError(
+                                $("f0").btrim(),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BTRIM(str <CHARACTER_STRING>)\n"
+                                        + "BTRIM(str <CHARACTER_STRING>, 
trimStr <CHARACTER_STRING>)")
+                        .testSqlValidationError(
+                                "BTRIM(f0)",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BTRIM(str <CHARACTER_STRING>)\n"
+                                        + "BTRIM(str <CHARACTER_STRING>, 
trimStr <CHARACTER_STRING>)")
+                        .testTableApiValidationError(
+                                $("f1").btrim($("f0")),
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BTRIM(str <CHARACTER_STRING>)\n"
+                                        + "BTRIM(str <CHARACTER_STRING>, 
trimStr <CHARACTER_STRING>)")
+                        .testSqlValidationError(
+                                "BTRIM(f1, f0)",
+                                "Invalid input arguments. Expected signatures 
are:\n"
+                                        + "BTRIM(str <CHARACTER_STRING>)\n"
+                                        + "BTRIM(str <CHARACTER_STRING>, 
trimStr <CHARACTER_STRING>)"));
     }
 
     private Stream<TestSetSpec> regexpExtractTestCases() {
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BTrimFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BTrimFunction.java
new file mode 100644
index 00000000000..8d5d3da8f7c
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BTrimFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#BTRIM}. */
+@Internal
+public class BTrimFunction extends BuiltInScalarFunction {
+
+    public BTrimFunction(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.BTRIM, context);
+    }
+
+    public @Nullable StringData eval(@Nullable StringData str) {
+        if (str == null) {
+            return null;
+        }
+        return BinaryStringDataUtil.trim((BinaryStringData) str, 
BinaryStringData.fromString(" "));
+    }
+
+    public @Nullable StringData eval(@Nullable StringData str, @Nullable 
StringData trimStr) {
+        if (str == null || trimStr == null) {
+            return null;
+        }
+        return BinaryStringDataUtil.trim((BinaryStringData) str, 
(BinaryStringData) trimStr);
+    }
+}

Reply via email to