This is an automated email from the ASF dual-hosted git repository.
lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 41b68c298a6 [FLINK-26939][table] Add the built-in function TRANSLATE
41b68c298a6 is described below
commit 41b68c298a6f99b57de04e27e64567f914aab638
Author: dylanhz <[email protected]>
AuthorDate: Sat Aug 3 22:36:28 2024 +0800
[FLINK-26939][table] Add the built-in function TRANSLATE
This closes #25122
---
docs/data/sql_functions.yml | 7 +
docs/data/sql_functions_zh.yml | 7 +
.../docs/reference/pyflink.table/expressions.rst | 1 +
flink-python/pyflink/table/expression.py | 7 +
.../flink/table/api/internal/BaseExpressions.java | 19 +++
.../functions/BuiltInFunctionDefinitions.java | 20 +++
.../planner/functions/StringFunctionsITCase.java | 146 +++++++++++++++++++++
.../table/data/binary/BinaryStringDataUtil.java | 10 ++
.../functions/scalar/TranslateFunction.java | 112 ++++++++++++++++
.../flink/table/data/BinaryStringDataTest.java | 12 ++
10 files changed, 341 insertions(+)
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 69bfded1797..2cd2a3c415f 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -293,6 +293,13 @@ string:
- sql: REPLACE(string1, string2, string3)
table: STRING1.replace(STRING2, STRING3)
description: Returns a new string which replaces all the occurrences of
STRING2 with STRING3 (non-overlapping) from STRING1. E.g., 'hello
world'.replace('world', 'flink') returns 'hello flink';
'ababab'.replace('abab', 'z') returns 'zab'.
+ - sql: TRANSLATE(expr, fromStr, toStr)
+ table: expr.translate(fromStr, toStr)
+ description: |
+ Translate an expr where all characters in fromStr have been replaced
with those in toStr.
+ If toStr has a shorter length than fromStr, unmatched characters are
removed.
+ expr [<CHAR> | <VARCHAR>], fromStr [<CHAR> | <VARCHAR>], toStr [<CHAR> |
<VARCHAR>]
+ Returns a STRING of translated expr.
- sql: REGEXP_EXTRACT(string1, string2[, integer])
table: STRING1.regexpExtract(STRING2[, INTEGER1])
description: |
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 4f40783a291..192a06c302a 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -369,6 +369,13 @@ string:
返回一个新字符串,它用 STRING3(非重叠)替换 STRING1 中所有出现的 STRING2。
例如 `'hello world'.replace('world', 'flink')` 返回 `'hello flink'`;
`'ababab'.replace('abab', 'z')` 返回 `'zab'`。
+ - sql: TRANSLATE(expr, fromStr, toStr)
+ table: expr.translate(fromStr, toStr)
+ description: |
+ 将 expr 中所有出现在 fromStr 之中的字符替换为 toStr 中的相应字符。
+ 如果 toStr 的长度短于 fromStr,则未匹配的字符将被移除。
+ expr [<CHAR> | <VARCHAR>], fromStr [<CHAR> | <VARCHAR>], toStr [<CHAR> |
<VARCHAR>]
+ 返回 STRING 格式的转换结果。
- sql: REGEXP_EXTRACT(string1, string2[, integer])
table: STRING1.regexpExtract(STRING2[, INTEGER1])
description: |
diff --git a/flink-python/docs/reference/pyflink.table/expressions.rst
b/flink-python/docs/reference/pyflink.table/expressions.rst
index fbf24e15fa7..3a41b16df26 100644
--- a/flink-python/docs/reference/pyflink.table/expressions.rst
+++ b/flink-python/docs/reference/pyflink.table/expressions.rst
@@ -165,6 +165,7 @@ string functions
Expression.trim_trailing
Expression.trim
Expression.replace
+ Expression.translate
Expression.char_length
Expression.upper_case
Expression.lower_case
diff --git a/flink-python/pyflink/table/expression.py
b/flink-python/pyflink/table/expression.py
index c48a488bfc9..ebaed656d23 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1084,6 +1084,13 @@ class Expression(Generic[T]):
"""
return _ternary_op("replace")(self, search, replacement)
+ def translate(self, from_str, to_str) -> 'Expression':
+ """
+ Translate an expr where all characters in from_str have been replaced
with those in to_str.
+ If to_str has a shorter length than from_str, unmatched characters are
removed.
+ """
+ return _ternary_op("translate")(self, from_str, to_str)
+
@property
def char_length(self) -> 'Expression[int]':
"""
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index ff8f2b26501..6dfcaf93f0d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -189,6 +189,7 @@ import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.TAN;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.TANH;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.TIMES;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.TO_BASE64;
+import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.TRANSLATE;
import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.TRIM;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.TRUNCATE;
import static
org.apache.flink.table.functions.BuiltInFunctionDefinitions.TRY_CAST;
@@ -878,6 +879,24 @@ public abstract class BaseExpressions<InType, OutType> {
unresolvedCall(SUBSTR, toExpr(),
objectToExpression(beginIndex)));
}
+ /**
+ * Translate an {@code expr} where all characters in {@code fromStr} have
been replaced with
+ * those in {@code toStr}. <br>
+ * NOTE: If {@code toStr} has a shorter length than {@code fromStr},
unmatched characters are
+ * removed.
+ *
+ * @param fromStr a STRING expression
+ * @param toStr a STRING expression
+ */
+ public OutType translate(InType fromStr, InType toStr) {
+ return toApiSpecificExpression(
+ unresolvedCall(
+ TRANSLATE,
+ toExpr(),
+ objectToExpression(fromStr),
+ objectToExpression(toStr)));
+ }
+
/** Removes leading space characters from the given string. */
public OutType trimLeading() {
return toApiSpecificExpression(
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index f64b4a105b8..acbad06ef10 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -970,6 +970,26 @@ public final class BuiltInFunctionDefinitions {
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
.build();
+ // By default, Calcite parse TRANSLATE as TRANSLATE3, hence it is
necessary to modify the
+ // name field to ensure it is called correctly.
+ public static final BuiltInFunctionDefinition TRANSLATE =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("TRANSLATE3")
+ .sqlName("TRANSLATE")
+ .kind(SCALAR)
+ .inputTypeStrategy(
+ sequence(
+ Arrays.asList("expr", "fromStr", "toStr"),
+ Arrays.asList(
+
logical(LogicalTypeFamily.CHARACTER_STRING),
+
logical(LogicalTypeFamily.CHARACTER_STRING),
+
logical(LogicalTypeFamily.CHARACTER_STRING))))
+ .outputTypeStrategy(
+ nullableIfArgs(ConstantArgumentCount.to(0),
explicit(STRING())))
+ .runtimeClass(
+
"org.apache.flink.table.runtime.functions.scalar.TranslateFunction")
+ .build();
+
public static final BuiltInFunctionDefinition TRIM =
BuiltInFunctionDefinition.newBuilder()
.name("trim")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
index 09c86fee0cf..e21763e29ff 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/StringFunctionsITCase.java
@@ -25,12 +25,17 @@ import java.util.stream.Stream;
import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;
+import static org.apache.flink.table.api.Expressions.lit;
/** Test String functions correct behaviour. */
class StringFunctionsITCase extends BuiltInFunctionTestBase {
@Override
Stream<TestSetSpec> getTestSetSpecs() {
+ return Stream.concat(regexpExtractTestCases(), translateTestCases());
+ }
+
+ private Stream<TestSetSpec> regexpExtractTestCases() {
return Stream.of(
TestSetSpec.forFunction(
BuiltInFunctionDefinitions.REGEXP_EXTRACT,
"Check return type")
@@ -46,4 +51,145 @@ class StringFunctionsITCase extends BuiltInFunctionTestBase
{
"ABC",
DataTypes.STRING().nullable()));
}
+
+ private Stream<TestSetSpec> translateTestCases() {
+ return Stream.of(
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.TRANSLATE)
+ .onFieldsWithData(
+ null, "www.apache.org", "", "翻译test,测试",
"www.\uD83D\uDE00.org")
+ .andDataTypes(
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING(),
+ DataTypes.STRING())
+ // null input
+ .testResult(
+ $("f0").translate("abc", "123"),
+ "TRANSLATE(f0, 'abc', '123')",
+ null,
+ DataTypes.STRING())
+ .testResult(
+ $("f1").translate($("f0"), "123"),
+ "TRANSLATE(f1, f0, '123')",
+ "www.apache.org",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").translate("abc", $("f0")),
+ "TRANSLATE(f1, 'abc', f0)",
+ "www.phe.org",
+ DataTypes.STRING())
+ // empty input
+ .testResult(
+ $("f2").translate("abc", "123"),
+ "TRANSLATE(f2, 'abc', '123')",
+ "",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").translate($("f2"), "123"),
+ "TRANSLATE(f1, f2, '123')",
+ "www.apache.org",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").translate("abc", $("f2")),
+ "TRANSLATE(f1, 'abc', f2)",
+ "www.phe.org",
+ DataTypes.STRING())
+ // from longer than to
+ .testResult(
+ $("f1").translate("abcde", "123"),
+ "TRANSLATE(f1, 'abcde', '123')",
+ "www.1p13h.org",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").translate("abcde.", "123"),
+ "TRANSLATE(f1, 'abcde.', '123')",
+ "www1p13horg",
+ DataTypes.STRING())
+ // to longer than from
+ .testResult(
+ $("f1").translate("abc", "12345"),
+ "TRANSLATE(f1, 'abc', '12345')",
+ "www.1p13he.org",
+ DataTypes.STRING())
+ // duplicate chars in from
+ .testResult(
+ $("f1").translate("abcae", "12345"),
+ "TRANSLATE(f1, 'abcae', '12345')",
+ "www.1p13h5.org",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").translate("...", "123"),
+ "TRANSLATE(f1, '...', '123')",
+ "www1apache1org",
+ DataTypes.STRING())
+ // case sensitive
+ .testResult(
+ $("f1").translate("ABCDE", "12345"),
+ "TRANSLATE(f1, 'ABCDE', '12345')",
+ "www.apache.org",
+ DataTypes.STRING())
+ // Unicode
+ .testResult(
+ $("f3").translate("翻译测试test,", "测试翻译tset。"),
+ "TRANSLATE(f3, '翻译测试test,', '测试翻译tset。')",
+ "测试tset。翻译",
+ DataTypes.STRING())
+ .testResult(
+ $("f3").translate("翻译测试test,", "test翻译 "),
+ "TRANSLATE(f3, '翻译测试test,', 'test翻译 ')",
+ "te翻译 翻st",
+ DataTypes.STRING())
+ .testResult(
+ $("f4").translate(".\uD83D\uDE00",
"\uD83D\uDE00."),
+ "TRANSLATE(f4, '.\uD83D\uDE00',
'\uD83D\uDE00.')",
+ "www\uD83D\uDE00.\uD83D\uDE00org",
+ DataTypes.STRING())
+ .testResult(
+ $("f4").translate("\uD83D\uDE00w", "笑α"),
+ "TRANSLATE(f4, '\uD83D\uDE00w', '笑α')",
+ "ααα.笑.org",
+ DataTypes.STRING())
+ // return type
+ .testResult(
+ lit("www.apache.org").translate("abc", "123"),
+ "TRANSLATE('www.apache.org', 'abc', '123')",
+ "www.1p13he.org",
+ DataTypes.STRING().notNull())
+ // dict reuse (coverage)
+ .testResult(
+ lit("www.apache.org")
+ .translate("abc", "123")
+ .translate("abc", "123"),
+ "TRANSLATE(TRANSLATE('www.apache.org', 'abc',
'123'), 'abc', '123')",
+ "www.1p13he.org",
+ DataTypes.STRING().notNull())
+ // normal cases
+ .testResult(
+ $("f1").translate("abc", "123"),
+ "TRANSLATE(f1, 'abc', '123')",
+ "www.1p13he.org",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").translate("abc", "ABC"),
+ "TRANSLATE(f1, 'abc', 'ABC')",
+ "www.ApAChe.org",
+ DataTypes.STRING())
+ .testResult(
+ $("f1").translate("abcworg", "123 "),
+ "TRANSLATE(f1, 'abcworg', '123 ')",
+ " .1p13he.",
+ DataTypes.STRING()),
+ TestSetSpec.forFunction(BuiltInFunctionDefinitions.TRANSLATE,
"Validation Error")
+ .onFieldsWithData(12345)
+ .andDataTypes(DataTypes.INT())
+ .testTableApiValidationError(
+ $("f0").translate("3", "5"),
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "TRANSLATE3(expr <CHARACTER_STRING>,
fromStr <CHARACTER_STRING>, toStr <CHARACTER_STRING>)")
+ .testSqlValidationError(
+ "TRANSLATE(f0, '3', '5')",
+ "Invalid input arguments. Expected signatures
are:\n"
+ + "TRANSLATE3(expr <CHARACTER_STRING>,
fromStr <CHARACTER_STRING>, toStr <CHARACTER_STRING>)"));
+ }
}
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
index 63fcb60b95e..1cfdf939700 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/data/binary/BinaryStringDataUtil.java
@@ -964,6 +964,16 @@ public class BinaryStringDataUtil {
}
}
+ public static boolean isEmpty(BinaryStringData str) {
+ // check javaObject or binarySection directly rather than call
+ // BinaryStringData#getSizeInBytes to avoid performance loss caused by
materialization
+ if (str.javaObject != null) {
+ return str.javaObject.isEmpty();
+ } else {
+ return str.binarySection == null ||
str.binarySection.getSizeInBytes() == 0;
+ }
+ }
+
public static boolean isSpaceString(BinaryStringData str) {
if (str.javaObject != null) {
return str.javaObject.equals(" ");
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/TranslateFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/TranslateFunction.java
new file mode 100644
index 00000000000..50bcb9a1510
--- /dev/null
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/TranslateFunction.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.data.binary.BinaryStringData;
+import org.apache.flink.table.data.binary.BinaryStringDataUtil;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.utils.ThreadLocalCache;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import javax.annotation.Nullable;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#TRANSLATE}. */
+@Internal
+public class TranslateFunction extends BuiltInScalarFunction {
+
+ private static final ThreadLocalCache<Pair<String, String>, Map<Integer,
String>> DICT_CACHE =
+ new ThreadLocalCache<Pair<String, String>, Map<Integer, String>>()
{
+ @Override
+ public Map<Integer, String> getNewInstance(Pair<String,
String> key) {
+ return buildDict(key.getLeft(), key.getRight());
+ }
+ };
+
+ public TranslateFunction(SpecializedContext context) {
+ super(BuiltInFunctionDefinitions.TRANSLATE, context);
+ }
+
+ public @Nullable StringData eval(
+ @Nullable StringData expr, @Nullable StringData fromStr, @Nullable
StringData toStr) {
+ if (expr == null
+ || fromStr == null
+ || BinaryStringDataUtil.isEmpty((BinaryStringData) expr)
+ || BinaryStringDataUtil.isEmpty((BinaryStringData) fromStr)) {
+ return expr;
+ }
+
+ final String source = expr.toString();
+ final String from = fromStr.toString();
+ final String to = toStr == null ? "" : toStr.toString();
+
+ Map<Integer, String> dict = DICT_CACHE.get(Pair.of(from, to));
+
+ return BinaryStringData.fromString(translate(source, dict));
+ }
+
+ private String translate(String expr, Map<Integer, String> dict) {
+ StringBuilder res = new StringBuilder();
+ int charCount;
+ for (int i = 0; i < expr.length(); i += charCount) {
+ int codePoint = expr.codePointAt(i);
+ charCount = Character.charCount(codePoint);
+ String ch = dict.get(codePoint);
+ if (ch == null) {
+ res.append(expr, i, i + charCount);
+ } else {
+ res.append(ch);
+ }
+ }
+ return res.toString();
+ }
+
+ private static Map<Integer, String> buildDict(String from, String to) {
+ HashMap<Integer, String> hashDict = new HashMap<>();
+
+ int i = 0;
+ int j = 0;
+ while (i < from.length()) {
+ int toCodePoint = -1;
+ int toCharCount = 1;
+ if (j < to.length()) {
+ toCodePoint = to.codePointAt(j);
+ toCharCount = Character.charCount(toCodePoint);
+ j += toCharCount;
+ }
+
+ int fromCodePoint = from.codePointAt(i);
+ i += Character.charCount(fromCodePoint);
+
+ // ignore duplicate mapping
+ if (!hashDict.containsKey(fromCodePoint)) {
+ hashDict.put(
+ fromCodePoint, toCodePoint == -1 ? "" : to.substring(j
- toCharCount, j));
+ }
+ }
+
+ return hashDict;
+ }
+}
diff --git
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
index f643cc3fdea..865e5ed41c7 100644
---
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
+++
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/data/BinaryStringDataTest.java
@@ -42,6 +42,7 @@ import static
org.apache.flink.table.data.binary.BinaryStringData.fromBytes;
import static
org.apache.flink.table.data.binary.BinaryStringDataUtil.EMPTY_STRING_ARRAY;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.concat;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.concatWs;
+import static org.apache.flink.table.data.binary.BinaryStringDataUtil.isEmpty;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.keyValue;
import static org.apache.flink.table.data.binary.BinaryStringDataUtil.reverse;
import static
org.apache.flink.table.data.binary.BinaryStringDataUtil.splitByWholeSeparatorPreserveAllTokens;
@@ -858,4 +859,15 @@ public class BinaryStringDataTest {
// check reference same.
assertThat(javaStr).isSameAs(str.toString());
}
+
+ @Test
+ public void testIsEmpty() {
+ assertThat(isEmpty(fromString(""))).isEqualTo(true);
+ assertThat(isEmpty(BinaryStringData.fromBytes(new byte[]
{}))).isEqualTo(true);
+ assertThat(isEmpty(fromString("hello"))).isEqualTo(false);
+
assertThat(isEmpty(BinaryStringData.fromBytes("hello".getBytes()))).isEqualTo(false);
+ assertThat(isEmpty(fromString("中文"))).isEqualTo(false);
+
assertThat(isEmpty(BinaryStringData.fromBytes("中文".getBytes()))).isEqualTo(false);
+ assertThat(isEmpty(new BinaryStringData())).isEqualTo(true);
+ }
}