This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new a645e5a479c [FLINK-39601][table] Add UTF-8 validation utilities and
StringData.fromUtf8Bytes connector API
a645e5a479c is described below
commit a645e5a479cd685cf26421d7ce34c5f8f3cf68fd
Author: Gustavo de Morais <[email protected]>
AuthorDate: Tue May 5 18:32:55 2026 +0200
[FLINK-39601][table] Add UTF-8 validation utilities and
StringData.fromUtf8Bytes connector API
This closes #28110.
---
.../org/apache/flink/table/data/StringData.java | 37 +++++-
.../flink/table/data/binary/BinaryStringData.java | 46 +++++++-
.../flink/table/data/binary/StringUtf8Utils.java | 127 ++++++++++++++++++++-
.../table/data/binary/StringUtf8UtilsTest.java | 96 ++++++++++++++++
4 files changed, 299 insertions(+), 7 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java
index ac8445425aa..1cb4f619efc 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/StringData.java
@@ -46,16 +46,47 @@ public interface StringData extends Comparable<StringData> {
return BinaryStringData.fromString(str);
}
- /** Creates an instance of {@link StringData} from the given UTF-8 byte
array. */
+ /**
+ * Creates an instance of {@link StringData} by wrapping the given UTF-8
byte array in O(1)
+ * without copying or validating it. The caller is responsible for
ensuring the bytes are
+ * well-formed UTF-8; use {@link #fromUtf8Bytes(byte[])} if validation is
required.
+ */
static StringData fromBytes(byte[] bytes) {
return BinaryStringData.fromBytes(bytes);
}
/**
- * Creates an instance of {@link StringData} from the given UTF-8 byte
array with offset and
- * number of bytes.
+ * Creates an instance of {@link StringData} by wrapping the given UTF-8
byte range in O(1)
+ * without copying or validating it. The caller is responsible for
ensuring the bytes are
+ * well-formed UTF-8; use {@link #fromUtf8Bytes(byte[], int, int)} if
validation is required.
*/
static StringData fromBytes(byte[] bytes, int offset, int numBytes) {
return BinaryStringData.fromBytes(bytes, offset, numBytes);
}
+
+ /**
+ * Creates an instance of {@link StringData} from the given UTF-8 byte
array, walking the input
+ * once in O(n) to verify it is well-formed UTF-8. Returns {@code null} if
the input is {@code
+ * null}. Throws {@link org.apache.flink.table.api.TableRuntimeException}
on invalid UTF-8.
+ *
+ * <p>Connector authors should prefer this method over {@link
#fromBytes(byte[])} when ingesting
+ * data from external systems whose UTF-8 conformance is not guaranteed:
the strict variant
+ * surfaces the error at the source rather than letting silent {@code
U+FFFD} substitution
+ * propagate downstream. Use {@link #fromBytes(byte[])} when the bytes are
already known to be
+ * valid and the O(n) check can be skipped.
+ */
+ static StringData fromUtf8Bytes(byte[] bytes) {
+ return BinaryStringData.fromUtf8Bytes(bytes);
+ }
+
+ /**
+ * Creates an instance of {@link StringData} from the given UTF-8 byte
range, walking it once in
+ * O(n) to verify it is well-formed UTF-8. Returns {@code null} if the
input is {@code null}.
+ * Throws {@link org.apache.flink.table.api.TableRuntimeException} on
invalid UTF-8.
+ *
+ * @see #fromUtf8Bytes(byte[])
+ */
+ static StringData fromUtf8Bytes(byte[] bytes, int offset, int numBytes) {
+ return BinaryStringData.fromUtf8Bytes(bytes, offset, numBytes);
+ }
}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
index 3962587073e..9408332c903 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/BinaryStringData.java
@@ -22,6 +22,7 @@ import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.core.memory.MemorySegment;
import org.apache.flink.core.memory.MemorySegmentFactory;
+import org.apache.flink.table.api.TableRuntimeException;
import org.apache.flink.table.data.StringData;
import javax.annotation.Nonnull;
@@ -81,20 +82,59 @@ public final class BinaryStringData extends
LazyBinaryFormat<String> implements
}
}
- /** Creates a {@link BinaryStringData} instance from the given UTF-8
bytes. */
+ /**
+ * Creates a {@link BinaryStringData} instance by wrapping the given UTF-8
bytes in O(1) without
+ * copying or validating them. The caller is responsible for ensuring the
bytes are well-formed
+ * UTF-8; use {@link #fromUtf8Bytes(byte[])} if validation is required.
+ */
public static BinaryStringData fromBytes(byte[] bytes) {
return fromBytes(bytes, 0, bytes.length);
}
/**
- * Creates a {@link BinaryStringData} instance from the given UTF-8 bytes
with offset and number
- * of bytes.
+ * Creates a {@link BinaryStringData} instance by wrapping the given UTF-8
byte range in O(1)
+ * without copying or validating it. The caller is responsible for
ensuring the bytes are
+ * well-formed UTF-8; use {@link #fromUtf8Bytes(byte[], int, int)} if
validation is required.
*/
public static BinaryStringData fromBytes(byte[] bytes, int offset, int
numBytes) {
return new BinaryStringData(
new MemorySegment[] {MemorySegmentFactory.wrap(bytes)},
offset, numBytes);
}
+ /**
+ * Creates a {@link BinaryStringData} instance from the given UTF-8 bytes,
walking the input
+ * once in O(n) to verify it is well-formed UTF-8. Returns {@code null} if
the input is {@code
+ * null}. Throws {@link TableRuntimeException} on invalid UTF-8. Use
{@link #fromBytes(byte[])}
+ * when the bytes are already known to be valid and the O(n) check can be
skipped; otherwise
+ * invalid UTF-8 propagates and is later silently substituted with {@code
U+FFFD}.
+ *
+ * @see StringData#fromUtf8Bytes(byte[])
+ */
+ public static BinaryStringData fromUtf8Bytes(final byte[] bytes) {
+ return bytes == null ? null : fromUtf8Bytes(bytes, 0, bytes.length);
+ }
+
+ /**
+ * Creates a {@link BinaryStringData} instance from the given UTF-8 byte
range, walking it once
+ * in O(n) to verify it is well-formed UTF-8. Returns {@code null} if the
input is {@code null}.
+ * Throws {@link TableRuntimeException} on invalid UTF-8.
+ *
+ * @see StringData#fromUtf8Bytes(byte[], int, int)
+ */
+ public static BinaryStringData fromUtf8Bytes(
+ final byte[] bytes, final int offset, final int numBytes) {
+ if (bytes == null) {
+ return null;
+ }
+ final int badIndex = StringUtf8Utils.firstInvalidUtf8ByteIndex(bytes,
offset, numBytes);
+ if (badIndex >= 0) {
+ throw new TableRuntimeException(
+ String.format(
+ "Invalid UTF-8 byte at index %d of %d.", badIndex
- offset, numBytes));
+ }
+ return fromBytes(bytes, offset, numBytes);
+ }
+
/** Creates a {@link BinaryStringData} instance that contains `length`
spaces. */
public static BinaryStringData blankString(int length) {
byte[] spaces = new byte[length];
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
index bb8eacc18f5..a099b371973 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/data/binary/StringUtf8Utils.java
@@ -29,7 +29,7 @@ import static
org.apache.flink.table.data.binary.BinarySegmentUtils.allocateReus
/** Utilities for String UTF-8. */
@Internal
-final class StringUtf8Utils {
+public final class StringUtf8Utils {
private static final int MAX_BYTES_PER_CHAR = 3;
@@ -131,6 +131,131 @@ final class StringUtf8Utils {
return new String(chars, 0, len);
}
+ // Bit-pattern predicates for UTF-8 byte categorization. The JIT inlines
these so they cost
+ // nothing at runtime, but they make {@link #firstInvalidUtf8ByteIndex}
read like prose.
+ private static boolean isAsciiByte(int b) {
+ return b >= 0;
+ }
+
+ private static boolean is2ByteLead(int b) {
+ // 110xxxxx; (b & 0x1e) != 0 rejects the overlong leads 0xC0 and 0xC1
+ return (b >> 5) == -2 && (b & 0x1e) != 0;
+ }
+
+ private static boolean is3ByteLead(int b) {
+ return (b >> 4) == -2; // 1110xxxx
+ }
+
+ private static boolean is4ByteLead(int b) {
+ return (b >> 3) == -2; // 11110xxx
+ }
+
+ private static boolean isContinuation(int b) {
+ return (b & 0xc0) == 0x80; // 10xxxxxx
+ }
+
+ private static boolean isOverlong3(int b1, int b2) {
+ // 0xE0 followed by 0x80-0x9F encodes a code point already
representable in 2 bytes
+ return b1 == (byte) 0xe0 && (b2 & 0xe0) == 0x80;
+ }
+
+ private static char decode3ByteSequence(int b1, int b2, int b3) {
+ return (char)
+ ((b1 << 12)
+ ^ (b2 << 6)
+ ^ (b3 ^ (((byte) 0xE0 << 12) ^ ((byte) 0x80 << 6) ^
((byte) 0x80))));
+ }
+
+ private static int decode4ByteSequence(int b1, int b2, int b3, int b4) {
+ return (b1 << 18)
+ ^ (b2 << 12)
+ ^ (b3 << 6)
+ ^ (b4
+ ^ (((byte) 0xF0 << 18)
+ ^ ((byte) 0x80 << 12)
+ ^ ((byte) 0x80 << 6)
+ ^ ((byte) 0x80)));
+ }
+
+ /**
+ * Returns the absolute index (into {@code bytes}) of the first byte that
breaks UTF-8
+ * well-formedness, or {@code -1} if the range is valid. For a truncated
trailing sequence the
+ * returned index is {@code offset + numBytes} (one past the last byte)
since the failure is the
+ * absence of an expected continuation byte. Same byte-level checks as
{@link
+ * #decodeUTF8Strict(byte[], int, int, char[])} but without the
char-buffer write side effect.
+ *
+ * <p>This is a hot per-record path; it trusts its inputs and does not
validate them. A non-null
+ * {@code bytes} with non-negative {@code offset} / {@code numBytes} that
fits within the array
+ * is required; misuse may throw {@link NullPointerException} or {@link
+ * ArrayIndexOutOfBoundsException}.
+ */
+ public static int firstInvalidUtf8ByteIndex(
+ final byte[] bytes, final int offset, final int numBytes) {
+ int sp = offset;
+ final int sl = sp + numBytes;
+
+ // ASCII fast-path
+ while (sp < sl && isAsciiByte(bytes[sp])) {
+ sp++;
+ }
+
+ while (sp < sl) {
+ final int start = sp;
+ final int b1 = bytes[sp++];
+
+ if (isAsciiByte(b1)) {
+ continue;
+ }
+
+ if (is2ByteLead(b1)) {
+ if (sp >= sl) {
+ return sl;
+ }
+ if (!isContinuation(bytes[sp++])) {
+ return start;
+ }
+ continue;
+ }
+
+ if (is3ByteLead(b1)) {
+ if (sp + 1 >= sl) {
+ return sl;
+ }
+ final int b2 = bytes[sp++];
+ final int b3 = bytes[sp++];
+ if (isOverlong3(b1, b2) || !isContinuation(b2) ||
!isContinuation(b3)) {
+ return start;
+ }
+ if (Character.isSurrogate(decode3ByteSequence(b1, b2, b3))) {
+ return start;
+ }
+ continue;
+ }
+
+ if (is4ByteLead(b1)) {
+ if (sp + 2 >= sl) {
+ return sl;
+ }
+ final int b2 = bytes[sp++];
+ final int b3 = bytes[sp++];
+ final int b4 = bytes[sp++];
+ if (!isContinuation(b2) || !isContinuation(b3) ||
!isContinuation(b4)) {
+ return start;
+ }
+ // Shortest-form check catches both overlong 4-byte forms and
code points
+ // above U+10FFFF (anything not in the supplementary plane is
invalid here).
+ if
(!Character.isSupplementaryCodePoint(decode4ByteSequence(b1, b2, b3, b4))) {
+ return start;
+ }
+ continue;
+ }
+
+ // Continuation byte without a lead, or a 5+-byte lead (RFC 3629
forbids).
+ return start;
+ }
+ return -1;
+ }
+
public static int decodeUTF8Strict(byte[] sa, int sp, int len, char[] da) {
final int sl = sp + len;
int dp = 0;
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/binary/StringUtf8UtilsTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/binary/StringUtf8UtilsTest.java
new file mode 100644
index 00000000000..bb3af471491
--- /dev/null
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/data/binary/StringUtf8UtilsTest.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.data.binary;
+
+import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.data.StringData;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.Arguments;
+import org.junit.jupiter.params.provider.MethodSource;
+
+import java.nio.charset.StandardCharsets;
+import java.util.stream.Stream;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+/**
+ * Tests for the UTF-8 validator and the connector-facing {@link
BinaryStringData#fromUtf8Bytes}
+ * factory that delegates to it.
+ */
+class StringUtf8UtilsTest {
+
+ @ParameterizedTest(name = "{0}")
+ @MethodSource("utf8Inputs")
+ void testFirstInvalidUtf8ByteIndex(
+ String name, byte[] bytes, int offset, int numBytes, int expected)
{
+ assertThat(StringUtf8Utils.firstInvalidUtf8ByteIndex(bytes, offset,
numBytes))
+ .isEqualTo(expected);
+ }
+
+ static Stream<Arguments> utf8Inputs() {
+ return Stream.of(
+ // Valid - one boundary per width hits each DFA-equivalent
transition.
+ Arguments.of("U+0080 smallest 2-byte", bytes(0xC2, 0x80), 0,
2, -1),
+ Arguments.of("U+0800 smallest 3-byte", bytes(0xE0, 0xA0,
0x80), 0, 3, -1),
+ Arguments.of("U+10FFFF largest valid", bytes(0xF4, 0x8F, 0xBF,
0xBF), 0, 4, -1),
+ // Invalid - one per malformed class.
+ Arguments.of("stray continuation", bytes(0x80), 0, 1, 0),
+ Arguments.of("ASCII run then bad byte", bytes('A', 'B', 'C',
0x80), 0, 4, 3),
+ Arguments.of("U+110000 above max", bytes(0xF4, 0x90, 0x80,
0x80), 0, 4, 0),
+ // F5 is a forbidden lead per RFC 3629; padded with
continuations so the
+ // shortest-form check fires (a bare F5 reports as truncation
at index 1).
+ Arguments.of("F5 forbidden lead (padded)", bytes(0xF5, 0x80,
0x80, 0x80), 0, 4, 0),
+ Arguments.of("U+D800 surrogate", bytes(0xED, 0xA0, 0x80), 0,
3, 0),
+ Arguments.of("overlong '/'", bytes(0xC0, 0xAF), 0, 2, 0),
+ // Truncated -> index points one past the input.
+ Arguments.of("truncated 3-byte", bytes(0xE2), 0, 1, 1),
+ // Offset/length variant validates only the inner range.
+ Arguments.of("inner range valid", bytes(0x80, 'O', 'K', 0x80),
1, 2, -1),
+ Arguments.of("outer bytes invalid", bytes(0x80, 'O', 'K',
0x80), 0, 4, 0));
+ }
+
+ @Test
+ void testFromUtf8Bytes() {
+ final byte[] hello = "Hello".getBytes(StandardCharsets.UTF_8);
+
+ // Valid input wraps the bytes.
+ assertThat(BinaryStringData.fromUtf8Bytes(hello))
+ .isEqualTo(BinaryStringData.fromBytes(hello));
+
+ // null in -> null out, mirroring fromString.
+ assertThat(BinaryStringData.fromUtf8Bytes((byte[]) null)).isNull();
+ assertThat(StringData.fromUtf8Bytes((byte[]) null)).isNull();
+
+ // Invalid UTF-8 throws TableRuntimeException with the relative byte
index.
+ assertThatThrownBy(() -> BinaryStringData.fromUtf8Bytes(bytes('A',
'B', 0x80)))
+ .isInstanceOf(TableRuntimeException.class)
+ .hasMessageContaining("Invalid UTF-8 byte at index 2 of 3");
+ }
+
+ private static byte[] bytes(int... values) {
+ final byte[] result = new byte[values.length];
+ for (int i = 0; i < values.length; i++) {
+ result[i] = (byte) values[i];
+ }
+ return result;
+ }
+}