This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new d1e3d13de8a [FLINK-39623][table-planner] Make `CAST(BYTES AS STRING)` 
strict on invalid `UTF-8`
d1e3d13de8a is described below

commit d1e3d13de8a099f045c7d292a26d6e8d48c53766
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon May 11 11:51:25 2026 +0200

    [FLINK-39623][table-planner] Make `CAST(BYTES AS STRING)` strict on invalid 
`UTF-8`
---
 AGENTS.md                                          |  2 +
 docs/data/sql_functions.yml                        |  4 +-
 docs/data/sql_functions_zh.yml                     |  6 +-
 .../generated/execution_config_configuration.html  |  6 ++
 .../table/api/config/ExecutionConfigOptions.java   | 12 ++++
 .../functions/casting/BinaryToStringCastRule.java  | 67 +++++++++++++++++++---
 .../planner/codegen/calls/BuiltInMethods.scala     |  3 +
 .../planner/functions/CastFunctionMiscITCase.java  | 19 +++++-
 .../planner/functions/casting/CastRulesTest.java   | 48 ++++++++++++++++
 9 files changed, 155 insertions(+), 12 deletions(-)

diff --git a/AGENTS.md b/AGENTS.md
index daa34497a66..1c4199d50b8 100644
--- a/AGENTS.md
+++ b/AGENTS.md
@@ -196,6 +196,7 @@ Every module from the root pom.xml, organized by function. 
Flink provides three
 Key separations:
 
 - **Planner vs Runtime:** The table planner generates code and execution 
plans; the runtime executes them. Changes to planning logic live in 
`flink-table-planner`; changes to runtime operators live in 
`flink-table-runtime` or `flink-streaming-java`.
+- **Codegen vs hand-written operators:** Per-record expression logic (casts, 
projections, filters, function calls) is generated at planning time by cast 
rules in `flink-table-planner/.../functions/casting/` and call generators in 
`flink-table-planner/.../codegen/calls/`, then compiled by Janino into the 
surrounding operator class. Operators with fixed structure (joins, 
aggregations, source/sink runtime) are hand-written Java in 
`flink-table-runtime` or `flink-streaming-java`. New scalar  [...]
 - **API vs Implementation:** Public API surfaces (`flink-core-api`, 
`flink-datastream-api`, `flink-table-api-java`) are separate from 
implementation modules. API stability annotations control what users can depend 
on.
 - **ArchUnit enforcement:** `flink-architecture-tests/` contains ArchUnit 
tests that enforce module boundaries. New violations should be avoided; if 
unavoidable, follow the freeze procedure in 
`flink-architecture-tests/README.md`.
 
@@ -294,6 +295,7 @@ This section maps common types of Flink changes to the 
modules they touch and th
 - Ensure `./mvnw clean verify` passes before opening a PR
 - Always push to your fork, not directly to `apache/flink`
 - Rebase onto the latest target branch before submitting
+- For user-visible behaviour changes, breaking changes, new SQL features, or 
new config options: fill in the **Release Notes** field on the JIRA ticket. The 
release manager consolidates these when cutting a release. The next version's 
`docs/content/release-notes/flink-X.Y.md` will be generated based of the jira 
tickets, so make sure to fill them in properly.
 
 ### AI-assisted contributions
 
diff --git a/docs/data/sql_functions.yml b/docs/data/sql_functions.yml
index 4c60a96746c..53a1ad5dd18 100644
--- a/docs/data/sql_functions.yml
+++ b/docs/data/sql_functions.yml
@@ -794,7 +794,7 @@ conditional:
 conversion:
   - sql: CAST(value AS type)
     table: ANY.cast(TYPE)
-    description: Returns a new value being cast to type type. A CAST error 
throws an exception and fails the job. When performing a cast operation that 
may fail, like STRING to INT, one should rather use TRY_CAST, in order to 
handle errors. If "table.exec.legacy-cast-behaviour" is enabled, CAST behaves 
like TRY_CAST. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS STRING) returns 
NULL of type STRING; CAST('non-number' AS INT) throws an exception and fails 
the job.
+    description: Returns a new value being cast to type type. A CAST error 
throws an exception and fails the job. When performing a cast operation that 
may fail, like STRING to INT, one should rather use TRY_CAST, in order to 
handle errors. If "table.exec.legacy-cast-behaviour" is enabled, CAST behaves 
like TRY_CAST. E.g., CAST('42' AS INT) returns 42; CAST(NULL AS STRING) returns 
NULL of type STRING; CAST('non-number' AS INT) throws an exception and fails 
the job. Casting BINARY/VARBINA [...]
   - sql: TRY_CAST(value AS type)
     table: ANY.tryCast(TYPE)
     description: Like CAST, but in case of error, returns NULL rather than 
failing the job. E.g., TRY_CAST('42' AS INT) returns 42; TRY_CAST(NULL AS 
STRING) returns NULL of type STRING; TRY_CAST('non-number' AS INT) returns NULL 
of type INT; COALESCE(TRY_CAST('non-number' AS INT), 0) returns 0 of type INT.
@@ -818,6 +818,8 @@ conversion:
     description: |
       Decodes the input as UTF-8, replacing each invalid sequence with the 
Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is 
lossy and irreversible. Returns `NULL` if the input is `NULL`.
 
+      `MAKE_VALID_UTF8()` can fully replace a `CAST(bytes AS STRING)` which 
would error in case of invalid UTF-8.
+
       E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; 
`MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character).
 
 collection:
diff --git a/docs/data/sql_functions_zh.yml b/docs/data/sql_functions_zh.yml
index 5cfcac48791..12a952bd968 100644
--- a/docs/data/sql_functions_zh.yml
+++ b/docs/data/sql_functions_zh.yml
@@ -922,8 +922,10 @@ conversion:
     description: |
       返回 value 被转换为类型 type 的新值。CAST错误会抛出异常并导致作业失败。为了处理错误,在使用可能失败的 CAST 操作时,例如 
STRING 转换为 INT,建议使用 TRY_CAST 替代。
       如果开启了 "table.exec.legacy-cast-behaviour",CAST 行为将变得与 TRY_CAST 一致。
-      
+
       例如, CAST('42' AS INT) 返回 42; CAST(NULL AS STRING) 返回字符串类型的 `NULL`; 
CAST('non-number' AS INT) 抛出异常且作业失败。
+
+      Casting BINARY/VARBINARY/BYTES to a CHAR/VARCHAR/STRING type validates 
that the input is well-formed UTF-8 and throws on invalid sequences. Use 
MAKE_VALID_UTF8 to substitute the Unicode replacement character U+FFFD for 
invalid bytes, TRY_CAST to return NULL, or set 
"table.exec.legacy-bytes-to-string-cast" to "true" to restore the prior 
silent-substitution behavior.
   - sql: TRY_CAST(value AS type)
     table: ANY.tryCast(TYPE)
     description: |
@@ -948,6 +950,8 @@ conversion:
     description: |
       Decodes the input as UTF-8, replacing each invalid sequence with the 
Unicode replacement character `U+FFFD` (rendered as `�`). The substitution is 
lossy and irreversible. Returns `NULL` if the input is `NULL`.
 
+      `MAKE_VALID_UTF8()` can fully replace a `CAST(bytes AS STRING)` which 
would error in case of invalid UTF-8.
+
       E.g., `MAKE_VALID_UTF8(x'48656C6C6F')` returns `'Hello'`; 
`MAKE_VALID_UTF8(x'80')` returns `'�'` (the `U+FFFD` replacement character).
 
 collection:
diff --git 
a/docs/layouts/shortcodes/generated/execution_config_configuration.html 
b/docs/layouts/shortcodes/generated/execution_config_configuration.html
index d043932d56e..e71bf3d7673 100644
--- a/docs/layouts/shortcodes/generated/execution_config_configuration.html
+++ b/docs/layouts/shortcodes/generated/execution_config_configuration.html
@@ -182,6 +182,12 @@ By default no operator is disabled.</td>
             <td>Duration</td>
             <td>Specifies a minimum time interval for how long cleanup 
unmatched records in the interval join operator. Before Flink 1.18, the default 
value of this param was the half of interval duration. Note: Set this option 
greater than 0 will cause unmatched records in outer joins to be output later 
than watermark, leading to possible discarding of these records by downstream 
watermark-dependent operators, such as window operators. The default value is 
0, which means it will clean u [...]
         </tr>
+        <tr>
+            <td><h5>table.exec.legacy-bytes-to-string-cast</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
+            <td style="word-wrap: break-word;">false</td>
+            <td>Boolean</td>
+            <td>When true, CAST(bytes AS STRING) for BINARY/VARBINARY/BYTES 
inputs silently substitutes the Unicode replacement character U+FFFD for 
invalid UTF-8 sequences. When false (the default), invalid input fails the job; 
use MAKE_VALID_UTF8 or TRY_CAST to handle malformed bytes.</td>
+        </tr>
         <tr>
             <td><h5>table.exec.legacy-cast-behaviour</h5><br> <span 
class="label label-primary">Batch</span> <span class="label 
label-primary">Streaming</span></td>
             <td style="word-wrap: break-word;">DISABLED</td>
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
index 047b7b36e99..a1ed3ec97ff 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/config/ExecutionConfigOptions.java
@@ -731,6 +731,18 @@ public class ExecutionConfigOptions {
                             "Determines whether CAST will operate following 
the legacy behaviour "
                                     + "or the new one that introduces various 
fixes and improvements.");
 
+    @Documentation.TableOption(execMode = 
Documentation.ExecMode.BATCH_STREAMING)
+    public static final ConfigOption<Boolean> 
TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST =
+            key("table.exec.legacy-bytes-to-string-cast")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "When true, CAST(bytes AS STRING) for 
BINARY/VARBINARY/BYTES inputs "
+                                    + "silently substitutes the Unicode 
replacement character "
+                                    + "U+FFFD for invalid UTF-8 sequences. 
When false (the default), "
+                                    + "invalid input fails the job; use 
MAKE_VALID_UTF8 or TRY_CAST to handle "
+                                    + "malformed bytes.");
+
     @Documentation.TableOption(execMode = Documentation.ExecMode.STREAMING)
     public static final ConfigOption<Long> TABLE_EXEC_RANK_TOPN_CACHE_SIZE =
             ConfigOptions.key("table.exec.rank.topn-cache-size")
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
index 1cd659571a1..b6067710ee4 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/casting/BinaryToStringCastRule.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.planner.functions.casting;
 
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.table.types.logical.utils.LogicalTypeChecks;
@@ -27,12 +28,19 @@ import java.nio.charset.StandardCharsets;
 
 import static org.apache.flink.table.planner.codegen.CodeGenUtils.newName;
 import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_STRING;
+import static 
org.apache.flink.table.planner.codegen.calls.BuiltInMethods.BINARY_STRING_DATA_FROM_UTF8_BYTES;
 import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.accessStaticField;
 import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.constructorCall;
 import static 
org.apache.flink.table.planner.functions.casting.CastRuleUtils.staticCall;
+import static 
org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldPad;
+import static 
org.apache.flink.table.planner.functions.casting.CharVarCharTrimPadCastRule.couldTrim;
 
 /**
  * {@link LogicalTypeFamily#BINARY_STRING} to {@link 
LogicalTypeFamily#CHARACTER_STRING} cast rule.
+ *
+ * <p>Strict UTF-8 mode is the default: invalid input bytes throw a {@code 
TableRuntimeException}.
+ * Setting {@link 
ExecutionConfigOptions#TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST} to {@code true}
+ * restores the prior behavior, where invalid sequences are silently replaced 
by {@code U+FFFD}.
  */
 class BinaryToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<byte[], String> {
 
@@ -48,14 +56,26 @@ class BinaryToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<byte
 
     /* Example generated code
 
+    --- Strict UTF-8 mode fast path: STRING / VARCHAR(MAX) target. No String 
allocation, no re-encoding.
     isNull$0 = _myInputIsNull;
     if (!isNull$0) {
-        java.lang.String resultString$435;
-        resultString$435 = new java.lang.String(_myInput, 
java.nio.charset.StandardCharsets.UTF_8);
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromUtf8Bytes(_myInput);
+        isNull$0 = result$1 == null;
+    } else {
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
+    }
+
+    --- Round-trip path: legacy mode (silent U+FFFD substitution) or strict 
UTF-8 mode + CHAR(n)/VARCHAR(n) (trim/pad).
+    --- The decode line below is the legacy variant; in strict UTF-8 mode it 
becomes:
+    ---     resultString$0 = 
org.apache.flink.table.data.binary.BinaryStringData.fromUtf8Bytes(_myInput).toString();
+    isNull$0 = _myInputIsNull;
+    if (!isNull$0) {
+        java.lang.String resultString$0;
+        resultString$0 = new java.lang.String(_myInput, 
java.nio.charset.StandardCharsets.UTF_8);
         java.lang.String resultPadOrTrim$538;
-        resultPadOrTrim$538 = resultString$435.toString();
-        if (resultString$435.length() > 12) {
-            resultPadOrTrim$538 = resultString$435.substring(0, 
java.lang.Math.min(resultString$435.length(), 12));
+        resultPadOrTrim$538 = resultString$0.toString();
+        if (resultString$0.length() > 12) {
+            resultPadOrTrim$538 = resultString$0.substring(0, 
java.lang.Math.min(resultString$0.length(), 12));
         } else {
             if (resultPadOrTrim$538.length() < 12) {
                 int padLength$539;
@@ -68,8 +88,8 @@ class BinaryToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<byte
                 resultPadOrTrim$538 = resultPadOrTrim$538 + 
sbPadding$540.toString();
             }
         }
-        resultString$435 = resultPadOrTrim$538;
-        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$435);
+        resultString$0 = resultPadOrTrim$538;
+        result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.fromString(resultString$0);
         isNull$0 = result$1 == null;
     } else {
         result$1 = 
org.apache.flink.table.data.binary.BinaryStringData.EMPTY_UTF8;
@@ -77,6 +97,12 @@ class BinaryToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<byte
 
      */
 
+    @Override
+    public boolean canFail(LogicalType inputLogicalType, LogicalType 
targetLogicalType) {
+        // Strict UTF-8 mode validates the input and can throw on malformed 
bytes.
+        return true;
+    }
+
     @Override
     protected String generateCodeBlockInternal(
             CodeGeneratorCastRule.Context context,
@@ -84,6 +110,24 @@ class BinaryToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<byte
             String returnVariable,
             LogicalType inputLogicalType,
             LogicalType targetLogicalType) {
+        final boolean legacy =
+                context.getCodeGeneratorContext()
+                        .tableConfig()
+                        
.get(ExecutionConfigOptions.TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST);
+        final int length = LogicalTypeChecks.getLength(targetLogicalType);
+        final boolean needsTrimOrPad = couldTrim(length) || 
couldPad(targetLogicalType, length);
+
+        // Strict UTF-8 mode fast path: unbounded target. Wrap the input bytes 
directly with no
+        // intermediate String. Legacy mode always needs the round-trip below 
because the JDK
+        // decoder is what substitutes U+FFFD for invalid sequences.
+        if (!context.isPrinting() && !legacy && !needsTrimOrPad) {
+            return new CastRuleUtils.CodeWriter()
+                    .assignStmt(
+                            returnVariable,
+                            staticCall(BINARY_STRING_DATA_FROM_UTF8_BYTES(), 
inputTerm))
+                    .toString();
+        }
+
         final String resultStringTerm = 
newName(context.getCodeGeneratorContext(), "resultString");
         final CastRuleUtils.CodeWriter writer = new CastRuleUtils.CodeWriter();
 
@@ -93,19 +137,24 @@ class BinaryToStringCastRule extends 
AbstractNullAwareCodeGeneratorCastRule<byte
                     .assignPlusStmt(
                             resultStringTerm, staticCall(EncodingUtils.class, 
"hex", inputTerm))
                     .assignPlusStmt(resultStringTerm, "\"'\"");
-        } else {
+        } else if (legacy) {
+            // Legacy mode: lenient JDK decode, invalid sequences become 
U+FFFD.
             writer.assignStmt(
                     resultStringTerm,
                     constructorCall(
                             String.class,
                             inputTerm,
                             accessStaticField(StandardCharsets.class, 
"UTF_8")));
+        } else {
+            // Strict UTF-8 mode: validates, then materializes the String for 
trim/pad below.
+            writer.assignStmt(
+                    resultStringTerm,
+                    staticCall(BINARY_STRING_DATA_FROM_UTF8_BYTES(), 
inputTerm) + ".toString()");
         }
 
         if (!context.legacyBehaviour() && !context.isPrinting()) {
             final String resultPadOrTrim =
                     newName(context.getCodeGeneratorContext(), 
"resultPadOrTrim");
-            final int length = LogicalTypeChecks.getLength(targetLogicalType);
             CharVarCharTrimPadCastRule.padAndTrimStringIfNeeded(
                     writer,
                     targetLogicalType,
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
index a6392ae4a4d..20af0357350 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/calls/BuiltInMethods.scala
@@ -514,6 +514,9 @@ object BuiltInMethods {
   val BINARY_STRING_DATA_FROM_STRING =
     Types.lookupMethod(classOf[BinaryStringData], "fromString", 
classOf[String])
 
+  val BINARY_STRING_DATA_FROM_UTF8_BYTES =
+    Types.lookupMethod(classOf[BinaryStringData], "fromUtf8Bytes", 
classOf[Array[Byte]])
+
   val STRING_DATA_TO_BOOLEAN =
     Types.lookupMethod(classOf[BinaryStringDataUtil], "toBoolean", 
classOf[BinaryStringData])
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
index c7066d42a3c..218d911e881 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/CastFunctionMiscITCase.java
@@ -26,6 +26,7 @@ import org.apache.flink.table.functions.ScalarFunction;
 import org.apache.flink.types.Row;
 
 import java.nio.ByteBuffer;
+import java.nio.charset.StandardCharsets;
 import java.time.LocalDateTime;
 import java.time.LocalTime;
 import java.util.Objects;
@@ -344,7 +345,23 @@ class CastFunctionMiscITCase extends 
BuiltInFunctionTestBase {
                                 $("f1").tryCast(MAP(INT(), ARRAY(INT()))),
                                 "TRY_CAST(f1 AS MAP<INT, ARRAY<INT>>)",
                                 null,
-                                MAP(INT(), ARRAY(INT())).nullable()));
+                                MAP(INT(), ARRAY(INT())).nullable()),
+                TestSetSpec.forFunction(
+                                BuiltInFunctionDefinitions.TRY_CAST,
+                                "try cast from BYTES with invalid UTF-8 to 
STRING returns NULL")
+                        .onFieldsWithData(
+                                new byte[] {(byte) 0x80}, 
"Hello".getBytes(StandardCharsets.UTF_8))
+                        .andDataTypes(BYTES(), BYTES())
+                        .testResult(
+                                $("f0").tryCast(STRING()),
+                                "TRY_CAST(f0 AS STRING)",
+                                null,
+                                STRING().nullable())
+                        .testResult(
+                                $("f1").tryCast(STRING()),
+                                "TRY_CAST(f1 AS STRING)",
+                                "Hello",
+                                STRING().nullable()));
     }
 
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
index 90d04690e78..2aa6e198a98 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/functions/casting/CastRulesTest.java
@@ -23,6 +23,7 @@ import 
org.apache.flink.api.common.typeutils.base.LocalDateTimeSerializer;
 import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.TableRuntimeException;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
 import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.data.DecimalData;
 import org.apache.flink.table.data.GenericArrayData;
@@ -47,6 +48,7 @@ import org.junit.jupiter.api.parallel.Execution;
 import org.junit.jupiter.api.parallel.ExecutionMode;
 
 import java.math.BigDecimal;
+import java.nio.charset.StandardCharsets;
 import java.time.Duration;
 import java.time.LocalDate;
 import java.time.LocalDateTime;
@@ -114,6 +116,14 @@ class CastRulesTest {
             new CodeGeneratorContext(
                     new Configuration(), 
Thread.currentThread().getContextClassLoader());
 
+    private static final CodeGeneratorContext CTX_LEGACY_BYTES_TO_STRING =
+            new CodeGeneratorContext(
+                    new Configuration()
+                            .set(
+                                    
ExecutionConfigOptions.TABLE_EXEC_LEGACY_BYTES_TO_STRING_CAST,
+                                    true),
+                    Thread.currentThread().getContextClassLoader());
+
     private static final CastRule.Context CET_CONTEXT =
             CastRule.Context.create(
                     false, false, CET, 
Thread.currentThread().getContextClassLoader(), CTX);
@@ -693,6 +703,25 @@ class CastRulesTest {
                                 BYTES(),
                                 new byte[] {70, 108, 105, 110, 107},
                                 fromString("x'466c696e6b'"))
+                        // Strict UTF-8 validation across all BINARY_STRING 
family roots.
+                        .fail(BINARY(1), new byte[] {(byte) 0x80}, 
TableRuntimeException.class)
+                        .fail(
+                                VARBINARY(2),
+                                new byte[] {(byte) 0xC0, (byte) 0xAF},
+                                TableRuntimeException.class)
+                        .fail(BYTES(), new byte[] {(byte) 0x80}, 
TableRuntimeException.class)
+                        // table.exec.legacy-bytes-to-string-cast=true 
restores silent substitution.
+                        .fromCaseLegacyBytesToString(
+                                BYTES(), new byte[] {(byte) 0x80}, 
fromString("�"))
+                        .fromCaseLegacyBytesToString(
+                                VARBINARY(2),
+                                new byte[] {(byte) 0xC0, (byte) 0xAF},
+                                fromString("��"))
+                        .fromCase(
+                                BYTES(),
+                                "é€😀".getBytes(StandardCharsets.UTF_8),
+                                fromString("é€😀"))
+                        .fromCasePrinting(BYTES(), new byte[] {(byte) 0x80}, 
fromString("x'80'"))
                         .fromCase(BOOLEAN(), true, 
StringData.fromString("TRUE"))
                         .fromCase(BOOLEAN(), false, 
StringData.fromString("FALSE"))
                         .fromCase(
@@ -871,6 +900,11 @@ class CastRulesTest {
                         .fromCaseLegacy(VARBINARY(1), new byte[] {33}, 
fromString("\u0021"))
                         .fromCase(BYTES(), new byte[] {32}, fromString("      
"))
                         .fromCaseLegacy(BYTES(), new byte[] {32}, fromString(" 
"))
+                        // Strict UTF-8 validation must fire before trim/pad 
on a CHAR(n) target.
+                        .fail(BYTES(), new byte[] {(byte) 0x80}, 
TableRuntimeException.class)
+                        // Legacy-bytes-to-string mode: invalid byte becomes 
U+FFFD then is padded.
+                        .fromCaseLegacyBytesToString(
+                                BYTES(), new byte[] {(byte) 0x80}, 
fromString("�     "))
                         .fromCase(TINYINT(), (byte) -125, fromString("-125  "))
                         .fromCaseLegacy(TINYINT(), (byte) -125, 
fromString("-125"))
                         .fromCase(SMALLINT(), (short) 32767, fromString("32767 
"))
@@ -1661,6 +1695,20 @@ class CastRulesTest {
                     target);
         }
 
+        private CastTestSpecBuilder fromCaseLegacyBytesToString(
+                DataType srcDataType, Object src, Object target) {
+            return fromCase(
+                    srcDataType,
+                    CastRule.Context.create(
+                            false,
+                            false,
+                            DateTimeUtils.UTC_ZONE.toZoneId(),
+                            Thread.currentThread().getContextClassLoader(),
+                            CTX_LEGACY_BYTES_TO_STRING),
+                    src,
+                    target);
+        }
+
         private CastTestSpecBuilder fromCase(
                 DataType srcDataType, CastRule.Context castContext, Object 
src, Object target) {
             this.inputTypes.add(srcDataType);

Reply via email to