This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 42d39eb16b1 [FLINK-39602][table] Add IS_VALID_UTF8 and MAKE_VALID_UTF8 
built-in functions
42d39eb16b1 is described below

commit 42d39eb16b1f5d88887173805e46990f77a09f08
Author: Gustavo de Morais <[email protected]>
AuthorDate: Wed May 6 15:24:01 2026 +0200

    [FLINK-39602][table] Add IS_VALID_UTF8 and MAKE_VALID_UTF8 built-in 
functions
    
    This closes #28111.
---
 docs/data/sql_functions.yml                        |  14 +++
 docs/data/sql_functions_zh.yml                     |  14 +++
 flink-python/pyflink/table/expression.py           |  22 +++++
 .../pyflink/table/tests/test_expression.py         |   4 +
 .../flink/table/api/internal/BaseExpressions.java  |  24 +++++
 .../functions/BuiltInFunctionDefinitions.java      |  20 ++++
 .../planner/functions/Utf8FunctionsITCase.java     | 107 +++++++++++++++++++++
 .../functions/scalar/IsValidUtf8Function.java      |  42 ++++++++
 .../functions/scalar/MakeValidUtf8Function.java    |  49 ++++++++++
 9 files changed, 296 insertions(+)

diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 93baa65e3d1..4c60a96746c 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -805,6 +805,20 @@ conversion:
       call("TYPEOF", input)
       call("TYPEOF", input, force_serializable)
     description: Returns the string representation of the input expression's 
data type. By default, the returned string is a summary string that might omit 
certain details for readability. If force_serializable is set to TRUE, the 
string represents a full data type that could be persisted in a catalog. Note 
that especially anonymous, inline data types have no serializable string 
representation. In this case, NULL is returned.
+  - sql: IS_VALID_UTF8(bytes)
+    table: BYTES.isValidUtf8()
+    description: |
+      Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. 
Specifically rejects: truncated multi-byte sequences (missing continuation 
bytes), "overlong" encodings (using more bytes than necessary for the code 
point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate 
values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if 
the input is `NULL`.
+
+      Useful for filtering records with invalid UTF-8: `WHERE 
IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` 
selects the rejects.
+
+      E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; 
`IS_VALID_UTF8(x'80')` returns `FALSE`.
+  - sql: MAKE_VALID_UTF8(bytes)
+    table: BYTES.makeValidUtf8()
+    description: |
+      Decodes the input as UTF-8, replacing each invalid sequence with the 
Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is 
lossy and irreversible. Returns `NULL` if the input is `NULL`.
+
+      E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; 
`MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character).
 
 collection:
   - sql: CARDINALITY(array)
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index df74194ff90..5cfcac48791 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -935,6 +935,20 @@ conversion:
       返回输入表达式的数据类型的字符串表示。默认情况下返回的字符串是一个摘要字符串,可能会为了可读性而省略某些细节。
       如果 force_serializable 设置为 TRUE,则字符串表示可以持久化保存在 catalog 中的完整数据类型。
       请注意,特别是匿名的内联数据类型没有可序列化的字符串表示。在这种情况下返回 `NULL`。
+  - sql: IS_VALID_UTF8(bytes)
+    table: BYTES.isValidUtf8()
+    description: |
+      Returns `TRUE` if the input is well-formed UTF-8, `FALSE` otherwise. 
Specifically rejects: truncated multi-byte sequences (missing continuation 
bytes), "overlong" encodings (using more bytes than necessary for the code 
point), code points above the Unicode maximum U+10FFFF, and UTF-16 surrogate 
values U+D800-U+DFFF (which have no UTF-8 representation). Returns `NULL` if 
the input is `NULL`.
+
+      Useful for filtering records with invalid UTF-8: `WHERE 
IS_VALID_UTF8(payload)` keeps clean rows; `WHERE NOT IS_VALID_UTF8(payload)` 
selects the rejects.
+
+      E.g., `IS_VALID_UTF8(x'48656C6C6F')` returns `TRUE`; 
`IS_VALID_UTF8(x'80')` returns `FALSE`.
+  - sql: MAKE_VALID_UTF8(bytes)
+    table: BYTES.makeValidUtf8()
+    description: |
+      Decodes the input as UTF-8, replacing each invalid sequence with the 
Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is 
lossy and irreversible. Returns `NULL` if the input is `NULL`.
+
+      E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; 
`MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character).
 
 collection:
   - sql: CARDINALITY(array)
diff --git a/flink-python/pyflink/table/expression.py 
b/flink-python/pyflink/table/expression.py
index 9e1711dc455..82c49f2edc6 100644
--- a/flink-python/pyflink/table/expression.py
+++ b/flink-python/pyflink/table/expression.py
@@ -1489,6 +1489,28 @@ class Expression(Generic[T]):
         """
         return _unary_op("inetNtoa")(self)
 
+    @property
+    def is_valid_utf8(self) -> 'Expression[bool]':
+        """
+        Returns true if the input bytes are a well-formed UTF-8 sequence, 
false otherwise.
+        Returns null if the input is null.
+
+        Specifically rejects: truncated multi-byte sequences (missing 
continuation bytes),
+        "overlong" encodings (using more bytes than necessary for the code 
point), code points
+        above the Unicode maximum U+10FFFF, and UTF-16 surrogate values 
U+D800-U+DFFF (which
+        have no UTF-8 representation).
+        """
+        return _unary_op("isValidUtf8")(self)
+
+    @property
+    def make_valid_utf8(self) -> 'Expression[str]':
+        """
+        Decodes the input bytes as UTF-8, replacing each invalid sequence with 
the Unicode
+        replacement character U+FFFD. The substitution is lossy and 
irreversible. Returns null
+        if the input is null.
+        """
+        return _unary_op("makeValidUtf8")(self)
+
     def parse_url(self, part_to_extract: Union[str, 'Expression[str]'],
                   key: Union[str, 'Expression[str]'] = None) -> 
'Expression[str]':
         """
diff --git a/flink-python/pyflink/table/tests/test_expression.py 
b/flink-python/pyflink/table/tests/test_expression.py
index e7a2078d1db..543566c1e8c 100644
--- a/flink-python/pyflink/table/tests/test_expression.py
+++ b/flink-python/pyflink/table/tests/test_expression.py
@@ -188,6 +188,10 @@ class PyFlinkBatchExpressionTests(PyFlinkTestCase):
         self.assertEqual("INET_ATON(a)", str(expr1.inet_aton()))
         self.assertEqual("INET_NTOA(a)", str(expr1.inet_ntoa()))
 
+        # utf-8 validation functions
+        self.assertEqual("IS_VALID_UTF8(a)", str(expr1.is_valid_utf8))
+        self.assertEqual("MAKE_VALID_UTF8(a)", str(expr1.make_valid_utf8))
+
         # regexp functions
         self.assertEqual("regexp(a, b)", str(expr1.regexp(expr2)))
         self.assertEqual("REGEXP_COUNT(a, b)", str(expr1.regexp_count(expr2)))
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
index 27921d4105d..074224ae696 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/BaseExpressions.java
@@ -138,6 +138,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NOT
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NOT_TRUE;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_NULL;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_TRUE;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.IS_VALID_UTF8;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_EXISTS;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUERY;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.JSON_QUOTE;
@@ -157,6 +158,7 @@ import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOG2;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LOWER;
 import static org.apache.flink.table.functions.BuiltInFunctionDefinitions.LPAD;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.LTRIM;
+import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAKE_VALID_UTF8;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_ENTRIES;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_KEYS;
 import static 
org.apache.flink.table.functions.BuiltInFunctionDefinitions.MAP_UNION;
@@ -1493,6 +1495,28 @@ public abstract class BaseExpressions<InType, OutType> {
         return toApiSpecificExpression(unresolvedCall(INET_NTOA, toExpr()));
     }
 
+    /**
+     * Returns {@code true} if the input bytes are a well-formed UTF-8 
sequence, {@code false}
+     * otherwise. Returns {@code null} if the input is {@code null}.
+     *
+     * <p>Specifically rejects: truncated multi-byte sequences (missing 
continuation bytes),
+     * "overlong" encodings (using more bytes than necessary for the code 
point), code points above
+     * the Unicode maximum U+10FFFF, and UTF-16 surrogate values U+D800-U+DFFF 
(which have no UTF-8
+     * representation).
+     */
+    public OutType isValidUtf8() {
+        return toApiSpecificExpression(unresolvedCall(IS_VALID_UTF8, 
toExpr()));
+    }
+
+    /**
+     * Decodes the input bytes as UTF-8, replacing each invalid sequence with 
the Unicode
+     * replacement character {@code U+FFFD}. The substitution is lossy and 
irreversible. Returns
+     * {@code null} if the input is {@code null}.
+     */
+    public OutType makeValidUtf8() {
+        return toApiSpecificExpression(unresolvedCall(MAKE_VALID_UTF8, 
toExpr()));
+    }
+
     /**
      * Parse url and return various parameter of the URL. If accept any null 
arguments, return null.
      */
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index cd25361923c..abe54e6f48e 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -489,6 +489,26 @@ public final class BuiltInFunctionDefinitions {
                             
"org.apache.flink.table.runtime.functions.scalar.InetNtoaFunction")
                     .build();
 
+    public static final BuiltInFunctionDefinition IS_VALID_UTF8 =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("IS_VALID_UTF8")
+                    .kind(SCALAR)
+                    
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.BINARY_STRING)))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.BOOLEAN())))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.IsValidUtf8Function")
+                    .build();
+
+    public static final BuiltInFunctionDefinition MAKE_VALID_UTF8 =
+            BuiltInFunctionDefinition.newBuilder()
+                    .name("MAKE_VALID_UTF8")
+                    .kind(SCALAR)
+                    
.inputTypeStrategy(sequence(logical(LogicalTypeFamily.BINARY_STRING)))
+                    
.outputTypeStrategy(nullableIfArgs(explicit(DataTypes.STRING())))
+                    .runtimeClass(
+                            
"org.apache.flink.table.runtime.functions.scalar.MakeValidUtf8Function")
+                    .build();
+
     public static final BuiltInFunctionDefinition INTERNAL_REPLICATE_ROWS =
             BuiltInFunctionDefinition.newBuilder()
                     .name("$REPLICATE_ROWS$1")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java
new file mode 100644
index 00000000000..54543724421
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/Utf8FunctionsITCase.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.functions;
+
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+
+import static org.apache.flink.table.api.Expressions.$;
+
+/**
+ * Tests for {@link BuiltInFunctionDefinitions#IS_VALID_UTF8} and {@link
+ * BuiltInFunctionDefinitions#MAKE_VALID_UTF8}.
+ */
+public class Utf8FunctionsITCase extends BuiltInFunctionTestBase {
+
+    private static final byte[] HELLO = 
"Hello".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] MULTIBYTE = 
"é€😀".getBytes(StandardCharsets.UTF_8);
+    private static final byte[] INVALID_START = {(byte) 0x80};
+    private static final byte[] TRUNCATED = {(byte) 0xE2, (byte) 0x82};
+    private static final byte[] OVERLONG = {(byte) 0xC0, (byte) 0xAF};
+    private static final byte[] SURROGATE = {(byte) 0xED, (byte) 0xA0, (byte) 
0x80};
+
+    @Override
+    Stream<TestSetSpec> getTestSetSpecs() {
+        return Stream.of(isValidUtf8Cases(), makeValidUtf8Cases());
+    }
+
+    private TestSetSpec isValidUtf8Cases() {
+        return 
TestSetSpec.forFunction(BuiltInFunctionDefinitions.IS_VALID_UTF8)
+                .onFieldsWithData(
+                        null, HELLO, MULTIBYTE, INVALID_START, TRUNCATED, 
OVERLONG, SURROGATE)
+                .andDataTypes(
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES())
+                .testSqlResult("IS_VALID_UTF8(f0)", null, 
DataTypes.BOOLEAN().nullable())
+                .testSqlResult("IS_VALID_UTF8(f1)", true, 
DataTypes.BOOLEAN().nullable())
+                .testSqlResult("IS_VALID_UTF8(f2)", true, 
DataTypes.BOOLEAN().nullable())
+                .testSqlResult("IS_VALID_UTF8(f3)", false, 
DataTypes.BOOLEAN().nullable())
+                .testSqlResult("IS_VALID_UTF8(f4)", false, 
DataTypes.BOOLEAN().nullable())
+                .testSqlResult("IS_VALID_UTF8(f5)", false, 
DataTypes.BOOLEAN().nullable())
+                .testSqlResult("IS_VALID_UTF8(f6)", false, 
DataTypes.BOOLEAN().nullable())
+                // Table API method routes to the same definition.
+                .testTableApiResult($("f1").isValidUtf8(), true, 
DataTypes.BOOLEAN().nullable())
+                .testTableApiResult($("f3").isValidUtf8(), false, 
DataTypes.BOOLEAN().nullable());
+    }
+
+    private TestSetSpec makeValidUtf8Cases() {
+        // Lenient decode equivalent to new String(b, UTF_8). Java follows the 
W3C
+        // maximal-subpart rule: one replacement char per maximal ill-formed 
subpart.
+        final byte[] mixed = {'A', 'B', (byte) 0x80, 'C', 'D'};
+        return 
TestSetSpec.forFunction(BuiltInFunctionDefinitions.MAKE_VALID_UTF8)
+                .onFieldsWithData(
+                        null,
+                        HELLO,
+                        MULTIBYTE,
+                        INVALID_START,
+                        TRUNCATED,
+                        OVERLONG,
+                        SURROGATE,
+                        mixed)
+                .andDataTypes(
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES(),
+                        DataTypes.BYTES())
+                .testSqlResult("MAKE_VALID_UTF8(f0)", null, 
DataTypes.STRING().nullable())
+                .testSqlResult("MAKE_VALID_UTF8(f1)", "Hello", 
DataTypes.STRING().nullable())
+                .testSqlResult("MAKE_VALID_UTF8(f2)", "é€😀", 
DataTypes.STRING().nullable())
+                .testSqlResult("MAKE_VALID_UTF8(f3)", "�", 
DataTypes.STRING().nullable())
+                .testSqlResult("MAKE_VALID_UTF8(f4)", "�", 
DataTypes.STRING().nullable())
+                .testSqlResult("MAKE_VALID_UTF8(f5)", "��", 
DataTypes.STRING().nullable())
+                // JDK's UTF-8 decoder consumes the entire 3-byte surrogate 
attempt and emits
+                // a single U+FFFD; this differs from the W3C maximal-subpart 
count of 3.
+                .testSqlResult("MAKE_VALID_UTF8(f6)", "�", 
DataTypes.STRING().nullable())
+                .testSqlResult("MAKE_VALID_UTF8(f7)", "AB�CD", 
DataTypes.STRING().nullable())
+                .testTableApiResult($("f1").makeValidUtf8(), "Hello", 
DataTypes.STRING().nullable())
+                .testTableApiResult($("f3").makeValidUtf8(), "�", 
DataTypes.STRING().nullable());
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java
new file mode 100644
index 00000000000..5d6a4c8476b
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/IsValidUtf8Function.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.binary.StringUtf8Utils;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+/** Implementation of {@link BuiltInFunctionDefinitions#IS_VALID_UTF8}. */
+@Internal
+public final class IsValidUtf8Function extends BuiltInScalarFunction {
+
+    public IsValidUtf8Function(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.IS_VALID_UTF8, context);
+    }
+
+    public @Nullable Boolean eval(final @Nullable byte[] bytes) {
+        if (bytes == null) {
+            return null;
+        }
+        return StringUtf8Utils.firstInvalidUtf8ByteIndex(bytes, 0, 
bytes.length) < 0;
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java
new file mode 100644
index 00000000000..cdf0d6b98a1
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/scalar/MakeValidUtf8Function.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.functions.scalar;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.data.StringData;
+import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
+import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+
+import javax.annotation.Nullable;
+
+import java.nio.charset.StandardCharsets;
+
+/**
+ * Implementation of {@link BuiltInFunctionDefinitions#MAKE_VALID_UTF8}.
+ *
+ * <p>Decodes UTF-8 bytes leniently, replacing each invalid sequence with the 
Unicode replacement
+ * character {@code U+FFFD}. The substitution is lossy and irreversible.
+ */
+@Internal
+public final class MakeValidUtf8Function extends BuiltInScalarFunction {
+
+    public MakeValidUtf8Function(SpecializedContext context) {
+        super(BuiltInFunctionDefinitions.MAKE_VALID_UTF8, context);
+    }
+
+    public @Nullable StringData eval(final @Nullable byte[] bytes) {
+        if (bytes == null) {
+            return null;
+        }
+        return StringData.fromString(new String(bytes, 
StandardCharsets.UTF_8));
+    }
+}

Reply via email to