This is an automated email from the ASF dual-hosted git repository.
ron pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 72ebee2eb45 [FLINK-35910][table] Add the built-in function BTRIM
72ebee2eb45 is described below
commit 72ebee2eb45f598ebd88422f716ae6da93ddee5d
Author: dylanhz <[email protected]>
AuthorDate: Thu Aug 8 10:32:24 2024 +0800
[FLINK-35910][table] Add the built-in function BTRIM
This closes #25127
---------
Co-authored-by: Ron <[email protected]>
---
docs/data/sql_functions.yml | 6 ++
docs/data/sql_functions_zh.yml | 6 ++
.../docs/reference/pyflink.table/expressions.rst | 1 +
flink-python/pyflink/table/expression.py | 10 +++
.../flink/table/api/internal/BaseExpressions.java | 12 ++++
.../functions/BuiltInFunctionDefinitions.java | 19 ++++++
.../planner/functions/StringFunctionsITCase.java | 74 +++++++++++++++++++++-
.../runtime/functions/scalar/BTrimFunction.java | 51 +++++++++++++++
8 files changed, 178 insertions(+), 1 deletion(-)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 2cd2a3c415f..8dca117fa7b 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -274,6 +274,12 @@ string:
- sql: RTRIM(string)
table: STRING.rtrim()
description: Returns a string that removes the right whitespaces from
STRING. E.g., 'This is a test String. '.rtrim() returns "This is a test
String.".
+ - sql: BTRIM(str[, trimStr])
+ table: str.btrim([trimStr])
+ description: |
+ Removes any leading and trailing characters within trimStr from str.
trimStr is set to whitespace by default.
+ str <CHAR | VARCHAR>, trimStr <CHAR | VARCHAR>
+ Returns a STRING representation of the trimmed str. `NULL` if any of the
arguments are `NULL`.
- sql: REPEAT(string, int)
table: STRING.repeat(INT)
description: Returns a string that repeats the base string integer times.
E.g., REPEAT('This is a test String.', 2) returns "This is a test String.This
is a test String.".
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 192a06c302a..ad8451b13f2 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -340,6 +340,12 @@ string:
description: |
返回从 STRING 中删除右边空格的字符串。
例如 `'This is a test String. '.rtrim()` 返回 `'This is a test String.'`。
+ - sql: BTRIM(str[, trimStr])
+ table: str.btrim([trimStr])
+ description: |
+ 从 str 的左边和右边删除 trimStr 中的字符。trimStr 默认设置为空格。
+ str <CHAR | VARCHAR>, trimStr <CHAR | VARCHAR>
+ 返回一个 STRING 格式的裁剪后的 str。如果任何参数为 `NULL`,则返回 `NULL`。
- sql: REPEAT(string, int)
table: STRING.repeat(INT)
description: |
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst
b/flink-python/docs/reference/pyflink.table/expressions.rst
index 3a41b16df26..57c8ed314c8 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -194,6 +194,7 @@ string functions
Expression.parse_url
Expression.ltrim
Expression.rtrim
+ Expression.btrim
Expression.repeat
Expression.over
Expression.reverse
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index ebaed656d23..a8500836514 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1322,6 +1322,16 @@ class Expression(Generic[T]):
"""
return _unary_op("rtrim")(self)
+ def btrim(self, trim_str=None) -> 'Expression':
+ """
+ Removes any leading and trailing characters within trim_str from str.
+ trim_str is set to whitespace by default.
+ """
+ if trim_str is None:
+ return _unary_op("btrim")(self)
+ else:
+ return _binary_op("btrim")(self, trim_str)
+
def repeat(self, n: Union[int, 'Expression[int]']) -> 'Expression[str]':
"""
Returns a string that repeats the base string n times.
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 6dfcaf93f0d..1a201d99547 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -76,6 +76,7 @@ import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.ATAN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.AVG;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BETWEEN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.BIN;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.BTRIM;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.CARDINALITY;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CAST;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.CEIL;
@@ -1256,6 +1257,17 @@ public abstract class BaseExpressions<InType, OutType> {
return toApiSpecificExpression(unresolvedCall(RTRIM, toExpr()));
}
+ /** Returns a string that removes the left and right whitespaces from the
given string. */
+ public OutType btrim() {
+ return toApiSpecificExpression(unresolvedCall(BTRIM, toExpr()));
+ }
+
+ /** Returns a string that removes the left and right chars in trimStr from
the given string. */
+ public OutType btrim(InType trimStr) {
+ return toApiSpecificExpression(
+ unresolvedCall(BTRIM, toExpr(), objectToExpression(trimStr)));
+ }
+
/** Returns a string that repeats the base string n times. */
public OutType repeat(InType n) {
return toApiSpecificExpression(unresolvedCall(REPEAT, toExpr(),
objectToExpression(n)));
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index acbad06ef10..f3c95330cbc 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -1279,6 +1279,25 @@ public final class BuiltInFunctionDefinitions {
.outputTypeStrategy(nullableIfArgs(varyingString(argument(0))))
.build();
+ public static final BuiltInFunctionDefinition BTRIM =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("BTRIM")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ or(
+ sequence(
+ Arrays.asList("str"),
+ Arrays.asList(
+
logical(LogicalTypeFamily.CHARACTER_STRING))),
+ sequence(
+ Arrays.asList("str", "trimStr"),
+ Arrays.asList(
+
logical(LogicalTypeFamily.CHARACTER_STRING),
+
logical(LogicalTypeFamily.CHARACTER_STRING)))))
+
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+
.runtimeClass("org.apache.flink.table.runtime.functions.scalar.BTrimFunction")
+ .build();
+
public static final BuiltInFunctionDefinition REPEAT =
BuiltInFunctionDefinition.newBuilder()
.name("repeat")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
index e21763e29ff..ade65260777 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
@@ -32,7 +32,79 @@ class StringFunctionsITCase extends BuiltInFunctionTestBase {
@Override
Stream<TestSetSpec> getTestSetSpecs() {
- return Stream.concat(regexpExtractTestCases(), translateTestCases());
+ return Stream.of(bTrimTestCases(), regexpExtractTestCases(),
translateTestCases())
+ .flatMap(s -> s);
+ }
+
+ private Stream<TestSetSpec> bTrimTestCases() {
+ return Stream.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.BTRIM)
+ .onFieldsWithData(null, " \uD83D\uDE00www.apache.org
\f ")
+ .andDataTypes(DataTypes.STRING(), DataTypes.STRING())
+ // null input
+ .testResult($("f0").btrim(), "BTRIM(f0)", null,
DataTypes.STRING())
+ .testResult(
+ $("f0").btrim($("f1")), "BTRIM(f0, f1)", null,
DataTypes.STRING())
+ .testResult(
+ $("f1").btrim($("f0")), "BTRIM(f1, f0)", null,
DataTypes.STRING())
+ // special chars
+ .testResult(
+ $("f1").btrim(" \f\uD83D\uDE00"),
+ "BTRIM(f1, ' \f\uD83D\uDE00')",
+ "www.apache.org",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").btrim("\uD83D\uDE00 "),
+ "BTRIM(f1, '\uD83D\uDE00 ')",
+ "www.apache.org \f",
+ DataTypes.STRING())
+ // return type
+ .testResult(
+ lit(" www.apache.org ").btrim(),
+ "BTRIM(' www.apache.org ')",
+ "www.apache.org",
+ DataTypes.STRING().notNull())
+ // normal cases
+ .testResult(
+ lit("www.apache.org").btrim("a"),
+ "BTRIM('www.apache.org', 'a')",
+ "www.apache.org",
+ DataTypes.STRING().notNull())
+ .testResult(
+ $("f1").btrim(),
+ "BTRIM(f1)",
+ "\uD83D\uDE00www.apache.org \f",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").btrim("\f"),
+ "BTRIM(f1, '\f')",
+ " \uD83D\uDE00www.apache.org \f ",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").btrim($("f1")), "BTRIM(f1, f1)", "",
DataTypes.STRING()),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.BTRIM,
"Validation Error")
+ .onFieldsWithData(100, "123")
+ .andDataTypes(DataTypes.INT(), DataTypes.STRING())
+ .testTableApiValidationError(
+ $("f0").btrim(),
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "BTRIM(str <CHARACTER_STRING>)\n"
+ + "BTRIM(str <CHARACTER_STRING>,
trimStr <CHARACTER_STRING>)")
+ .testSqlValidationError(
+ "BTRIM(f0)",
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "BTRIM(str <CHARACTER_STRING>)\n"
+ + "BTRIM(str <CHARACTER_STRING>,
trimStr <CHARACTER_STRING>)")
+ .testTableApiValidationError(
+ $("f1").btrim($("f0")),
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "BTRIM(str <CHARACTER_STRING>)\n"
+ + "BTRIM(str <CHARACTER_STRING>,
trimStr <CHARACTER_STRING>)")
+ .testSqlValidationError(
+ "BTRIM(f1, f0)",
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "BTRIM(str <CHARACTER_STRING>)\n"
+ + "BTRIM(str <CHARACTER_STRING>,
trimStr <CHARACTER_STRING>)"));
}
private Stream<TestSetSpec> regexpExtractTestCases() {
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BTrimFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BTrimFunction.java
new file mode 100644
index 00000000000..8d5d3da8f7c
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/BTrimFunction.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#BTRIM}. */
+@Internal
+public class BTrimFunction extends BuiltInScalarFunction {
+
+ public BTrimFunction(SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.BTRIM, context);
+ }
+
+ public @Nullable StringData eval(@Nullable StringData str) {
+ if (str == null) {
+ return null;
+ }
+ return BinaryStringDataUtil.trim((BinaryStringData) str,
BinaryStringData.fromString(" "));
+ }
+
+ public @Nullable StringData eval(@Nullable StringData str, @Nullable
StringData trimStr) {
+ if (str == null || trimStr == null) {
+ return null;
+ }
+ return BinaryStringDataUtil.trim((BinaryStringData) str,
(BinaryStringData) trimStr);
+ }
+}