This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new a645e5a479c [FLINK-39601][table] Add UTF-8 validation utilities and 
StringData.fromUtf8Bytes connector API
a645e5a479c is described below

commit a645e5a479cd685cf26421d7ce34c5f8f3cf68fd
Author: Gustavo de Morais <[email protected]>
AuthorDate: Tue May 5 18:32:55 2026 +0200

    [FLINK-39601][table] Add UTF-8 validation utilities and 
StringData.fromUtf8Bytes connector API
    
    This closes #28110.
---
 .../org/apache/flink/table/data/StringData.java    |  37 +++++-
 .../flink/table/data/binary/BinaryStringData.java  |  46 +++++++-
 .../flink/table/data/binary/StringUtf8Utils.java   | 127 ++++++++++++++++++++-
 .../table/data/binary/StringUtf8UtilsTest.java     |  96 ++++++++++++++++
 4 files changed, 299 insertions(+), 7 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java
index ac8445425aa..1cb4f619efc 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java
@@ -46,16 +46,47 @@ public interface StringData extends Comparable<StringData> {
         return BinaryStringData.fromString(str);
     }
 
-    /** Creates an instance of {@link StringData} from the given UTF-8 byte 
array. */
+    /**
+     * Creates an instance of {@link StringData} by wrapping the given UTF-8 
byte array in O(1)
+     * without copying or validating it. The caller is responsible for 
ensuring the bytes are
+     * well-formed UTF-8; use {@link #fromUtf8Bytes(byte[])} if validation is 
required.
+     */
     static StringData fromBytes(byte[] bytes) {
         return BinaryStringData.fromBytes(bytes);
     }
 
     /**
-     * Creates an instance of {@link StringData} from the given UTF-8 byte 
array with offset and
-     * number of bytes.
+     * Creates an instance of {@link StringData} by wrapping the given UTF-8 
byte range in O(1)
+     * without copying or validating it. The caller is responsible for 
ensuring the bytes are
+     * well-formed UTF-8; use {@link #fromUtf8Bytes(byte[], int, int)} if 
validation is required.
      */
     static StringData fromBytes(byte[] bytes, int offset, int numBytes) {
         return BinaryStringData.fromBytes(bytes, offset, numBytes);
     }
+
+    /**
+     * Creates an instance of {@link StringData} from the given UTF-8 byte 
array, walking the input
+     * once in O(n) to verify it is well-formed UTF-8. Returns {@code null} if 
the input is {@code
+     * null}. Throws {@link org.apache.flink.table.api.TableRuntimeException} 
on invalid UTF-8.
+     *
+     * <p>Connector authors should prefer this method over {@link 
#fromBytes(byte[])} when ingesting
+     * data from external systems whose UTF-8 conformance is not guaranteed: 
the strict variant
+     * surfaces the error at the source rather than letting silent {@code 
U+FFFD} substitution
+     * propagate downstream. Use {@link #fromBytes(byte[])} when the bytes are 
already known to be
+     * valid and the O(n) check can be skipped.
+     */
+    static StringData fromUtf8Bytes(byte[] bytes) {
+        return BinaryStringData.fromUtf8Bytes(bytes);
+    }
+
+    /**
+     * Creates an instance of {@link StringData} from the given UTF-8 byte 
range, walking it once in
+     * O(n) to verify it is well-formed UTF-8. Returns {@code null} if the 
input is {@code null}.
+     * Throws {@link org.apache.flink.table.api.TableRuntimeException} on 
invalid UTF-8.
+     *
+     * @see #fromUtf8Bytes(byte[])
+     */
+    static StringData fromUtf8Bytes(byte[] bytes, int offset, int numBytes) {
+        return BinaryStringData.fromUtf8Bytes(bytes, offset, numBytes);
+    }
 }
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
index 3962587073e..9408332c903 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
 import org.apache.flink.api.common.typeutils.TypeSerializer;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.TableRuntimeException;
 import org.apache.flink.table.data.StringData;
 
 import javax.annotation.Nonnull;
@@ -81,20 +82,59 @@ public final class BinaryStringData extends 
LazyBinaryFormat<String> implements
         }
     }
 
-    /** Creates a {@link BinaryStringData} instance from the given UTF-8 
bytes. */
+    /**
+     * Creates a {@link BinaryStringData} instance by wrapping the given UTF-8 
bytes in O(1) without
+     * copying or validating them. The caller is responsible for ensuring the 
bytes are well-formed
+     * UTF-8; use {@link #fromUtf8Bytes(byte[])} if validation is required.
+     */
     public static BinaryStringData fromBytes(byte[] bytes) {
         return fromBytes(bytes, 0, bytes.length);
     }
 
     /**
-     * Creates a {@link BinaryStringData} instance from the given UTF-8 bytes 
with offset and number
-     * of bytes.
+     * Creates a {@link BinaryStringData} instance by wrapping the given UTF-8 
byte range in O(1)
+     * without copying or validating it. The caller is responsible for 
ensuring the bytes are
+     * well-formed UTF-8; use {@link #fromUtf8Bytes(byte[], int, int)} if 
validation is required.
      */
     public static BinaryStringData fromBytes(byte[] bytes, int offset, int 
numBytes) {
         return new BinaryStringData(
                 new MemorySegment[] {MemorySegmentFactory.wrap(bytes)}, 
offset, numBytes);
     }
 
+    /**
+     * Creates a {@link BinaryStringData} instance from the given UTF-8 bytes, 
walking the input
+     * once in O(n) to verify it is well-formed UTF-8. Returns {@code null} if 
the input is {@code
+     * null}. Throws {@link TableRuntimeException} on invalid UTF-8. Use 
{@link #fromBytes(byte[])}
+     * when the bytes are already known to be valid and the O(n) check can be 
skipped; otherwise
+     * invalid UTF-8 propagates and is later silently substituted with {@code 
U+FFFD}.
+     *
+     * @see StringData#fromUtf8Bytes(byte[])
+     */
+    public static BinaryStringData fromUtf8Bytes(final byte[] bytes) {
+        return bytes == null ? null : fromUtf8Bytes(bytes, 0, bytes.length);
+    }
+
+    /**
+     * Creates a {@link BinaryStringData} instance from the given UTF-8 byte 
range, walking it once
+     * in O(n) to verify it is well-formed UTF-8. Returns {@code null} if the 
input is {@code null}.
+     * Throws {@link TableRuntimeException} on invalid UTF-8.
+     *
+     * @see StringData#fromUtf8Bytes(byte[], int, int)
+     */
+    public static BinaryStringData fromUtf8Bytes(
+            final byte[] bytes, final int offset, final int numBytes) {
+        if (bytes == null) {
+            return null;
+        }
+        final int badIndex = StringUtf8Utils.firstInvalidUtf8ByteIndex(bytes, 
offset, numBytes);
+        if (badIndex >= 0) {
+            throw new TableRuntimeException(
+                    String.format(
+                            "Invalid UTF-8 byte at index %d of %d.", badIndex 
- offset, numBytes));
+        }
+        return fromBytes(bytes, offset, numBytes);
+    }
+
     /** Creates a {@link BinaryStringData} instance that contains `length` 
spaces. */
     public static BinaryStringData blankString(int length) {
         byte[] spaces = new byte[length];
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
index bb8eacc18f5..a099b371973 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
@@ -29,7 +29,7 @@ import static 
org.apache.flink.table.data.binary.BinarySegmentUtils.allocateReus
 
 /** Utilities for String UTF-8. */
 @Internal
-final class StringUtf8Utils {
+public final class StringUtf8Utils {
 
     private static final int MAX_BYTES_PER_CHAR = 3;
 
@@ -131,6 +131,131 @@ final class StringUtf8Utils {
         return new String(chars, 0, len);
     }
 
+    // Bit-pattern predicates for UTF-8 byte categorization. The JIT inlines 
these so they cost
+    // nothing at runtime, but they make {@link #firstInvalidUtf8ByteIndex} 
read like prose.
+    private static boolean isAsciiByte(int b) {
+        return b >= 0;
+    }
+
+    private static boolean is2ByteLead(int b) {
+        // 110xxxxx; (b & 0x1e) != 0 rejects the overlong leads 0xC0 and 0xC1
+        return (b >> 5) == -2 && (b & 0x1e) != 0;
+    }
+
+    private static boolean is3ByteLead(int b) {
+        return (b >> 4) == -2; // 1110xxxx
+    }
+
+    private static boolean is4ByteLead(int b) {
+        return (b >> 3) == -2; // 11110xxx
+    }
+
+    private static boolean isContinuation(int b) {
+        return (b & 0xc0) == 0x80; // 10xxxxxx
+    }
+
+    private static boolean isOverlong3(int b1, int b2) {
+        // 0xE0 followed by 0x80-0x9F encodes a code point already 
representable in 2 bytes
+        return b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80;
+    }
+
+    private static char decode3ByteSequence(int b1, int b2, int b3) {
+        return (char)
+                ((b1 << 12)
+                        ^ (b2 << 6)
+                        ^ (b3 ^ (((byte) 0xE0 << 12) ^ ((byte) 0x80 << 6) ^ 
((byte) 0x80))));
+    }
+
+    private static int decode4ByteSequence(int b1, int b2, int b3, int b4) {
+        return (b1 << 18)
+                ^ (b2 << 12)
+                ^ (b3 << 6)
+                ^ (b4
+                        ^ (((byte) 0xF0 << 18)
+                                ^ ((byte) 0x80 << 12)
+                                ^ ((byte) 0x80 << 6)
+                                ^ ((byte) 0x80)));
+    }
+
+    /**
+     * Returns the absolute index (into {@code bytes}) of the first byte that 
breaks UTF-8
+     * well-formedness, or {@code -1} if the range is valid. For a truncated 
trailing sequence the
+     * returned index is {@code offset + numBytes} (one past the last byte) 
since the failure is the
+     * absence of an expected continuation byte. Same byte-level checks as 
{@link
+     * #decodeUTF8Strict(byte[], int, int, char[])} but without the 
char-buffer write side effect.
+     *
+     * <p>This is a hot per-record path; it trusts its inputs and does not 
validate them. A non-null
+     * {@code bytes} with non-negative {@code offset} / {@code numBytes} that 
fits within the array
+     * is required; misuse may throw {@link NullPointerException} or {@link
+     * ArrayIndexOutOfBoundsException}.
+     */
+    public static int firstInvalidUtf8ByteIndex(
+            final byte[] bytes, final int offset, final int numBytes) {
+        int sp = offset;
+        final int sl = sp + numBytes;
+
+        // ASCII fast-path
+        while (sp < sl && isAsciiByte(bytes[sp])) {
+            sp++;
+        }
+
+        while (sp < sl) {
+            final int start = sp;
+            final int b1 = bytes[sp++];
+
+            if (isAsciiByte(b1)) {
+                continue;
+            }
+
+            if (is2ByteLead(b1)) {
+                if (sp >= sl) {
+                    return sl;
+                }
+                if (!isContinuation(bytes[sp++])) {
+                    return start;
+                }
+                continue;
+            }
+
+            if (is3ByteLead(b1)) {
+                if (sp + 1 >= sl) {
+                    return sl;
+                }
+                final int b2 = bytes[sp++];
+                final int b3 = bytes[sp++];
+                if (isOverlong3(b1, b2) || !isContinuation(b2) || 
!isContinuation(b3)) {
+                    return start;
+                }
+                if (Character.isSurrogate(decode3ByteSequence(b1, b2, b3))) {
+                    return start;
+                }
+                continue;
+            }
+
+            if (is4ByteLead(b1)) {
+                if (sp + 2 >= sl) {
+                    return sl;
+                }
+                final int b2 = bytes[sp++];
+                final int b3 = bytes[sp++];
+                final int b4 = bytes[sp++];
+                if (!isContinuation(b2) || !isContinuation(b3) || 
!isContinuation(b4)) {
+                    return start;
+                }
+                // Shortest-form check catches both overlong 4-byte forms and 
code points
+                // above U+10FFFF (anything not in the supplementary plane is 
invalid here).
+                if 
(!Character.isSupplementaryCodePoint(decode4ByteSequence(b1, b2, b3, b4))) {
+                    return start;
+                }
+                continue;
+            }
+
+            // Continuation byte without a lead, or a 5+-byte lead (RFC 3629 
forbids).
+            return start;
+        }
+        return -1;
+    }
+
     public static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) {
         final int sl = sp + len;
         int dp = 0;
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/binary/StringUtf8UtilsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/binary/StringUtf8UtilsTest.java
new file mode 100644
index 00000000000..bb3af471491
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/binary/StringUtf8UtilsTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.StringData;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for the UTF-8 validator and the connector-facing {@link 
BinaryStringData#fromUtf8Bytes}
+ * factory that delegates to it.
+ */
+class StringUtf8UtilsTest {
+
+    @ParameterizedTest(name = "{0}")
+    @MethodSource("utf8Inputs")
+    void testFirstInvalidUtf8ByteIndex(
+            String name, byte[] bytes, int offset, int numBytes, int expected) 
{
+        assertThat(StringUtf8Utils.firstInvalidUtf8ByteIndex(bytes, offset, 
numBytes))
+                .isEqualTo(expected);
+    }
+
+    static Stream<Arguments> utf8Inputs() {
+        return Stream.of(
+                // Valid - one boundary per width hits each DFA-equivalent 
transition.
+                Arguments.of("U+0080 smallest 2-byte", bytes(0xC2, 0x80), 0, 
2, -1),
+                Arguments.of("U+0800 smallest 3-byte", bytes(0xE0, 0xA0, 
0x80), 0, 3, -1),
+                Arguments.of("U+10FFFF largest valid", bytes(0xF4, 0x8F, 0xBF, 
0xBF), 0, 4, -1),
+                // Invalid - one per malformed class.
+                Arguments.of("stray continuation", bytes(0x80), 0, 1, 0),
+                Arguments.of("ASCII run then bad byte", bytes('A', 'B', 'C', 
0x80), 0, 4, 3),
+                Arguments.of("U+110000 above max", bytes(0xF4, 0x90, 0x80, 
0x80), 0, 4, 0),
+                // F5 is a forbidden lead per RFC 3629; padded with 
continuations so the
+                // shortest-form check fires (a bare F5 reports as truncation 
at index 1).
+                Arguments.of("F5 forbidden lead (padded)", bytes(0xF5, 0x80, 
0x80, 0x80), 0, 4, 0),
+                Arguments.of("U+D800 surrogate", bytes(0xED, 0xA0, 0x80), 0, 
3, 0),
+                Arguments.of("overlong '/'", bytes(0xC0, 0xAF), 0, 2, 0),
+                // Truncated -> index points one past the input.
+                Arguments.of("truncated 3-byte", bytes(0xE2), 0, 1, 1),
+                // Offset/length variant validates only the inner range.
+                Arguments.of("inner range valid", bytes(0x80, 'O', 'K', 0x80), 
1, 2, -1),
+                Arguments.of("outer bytes invalid", bytes(0x80, 'O', 'K', 
0x80), 0, 4, 0));
+    }
+
+    @Test
+    void testFromUtf8Bytes() {
+        final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8);
+
+        // Valid input wraps the bytes.
+        assertThat(BinaryStringData.fromUtf8Bytes(hello))
+                .isEqualTo(BinaryStringData.fromBytes(hello));
+
+        // null in -> null out, mirroring fromString.
+        assertThat(BinaryStringData.fromUtf8Bytes((byte[]) null)).isNull();
+        assertThat(StringData.fromUtf8Bytes((byte[]) null)).isNull();
+
+        // Invalid UTF-8 throws TableRuntimeException with the relative byte 
index.
+        assertThatThrownBy(() -> BinaryStringData.fromUtf8Bytes(bytes('A', 
'B', 0x80)))
+                .isInstanceOf(TableRuntimeException.class)
+                .hasMessageContaining("Invalid UTF-8 byte at index 2 of 3");
+    }
+
+    private static byte[] bytes(int... values) {
+        final byte[] result = new byte[values.length];
+        for (int i = 0; i < values.length; i++) {
+            result[i] = (byte) values[i];
+        }
+        return result;
+    }
+}

Reply via email to