This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new de569e4d930 [FLINK-39088][table] Support injective casts from 
CHAR/VARCHAR to BINARY/VARBINARY
de569e4d930 is described below

commit de569e4d9305ed5aeb569d53a7f36ae054a12e87
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon Feb 23 10:43:17 2026 +0100

    [FLINK-39088][table] Support injective casts from CHAR/VARCHAR to 
BINARY/VARBINARY
    
    This closes #27640.
---
 .../types/logical/utils/LogicalTypeCasts.java      | 17 ++++++++++++
 .../types/logical/utils/LogicalTypeChecks.java     |  4 +++
 .../flink/table/types/LogicalTypeCastsTest.java    | 30 +++++++++++++++++++++-
 3 files changed, 50 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index 307f47f76d3..97cd40b8e6c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -84,6 +84,7 @@ import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLe
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getYearPrecision;
+import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasMaxLength;
 import static 
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isSingleFieldInterval;
 
 /**
@@ -151,6 +152,20 @@ public final class LogicalTypeCasts {
                     getPrecision(source) == getPrecision(target)
                             && getScale(source) == getScale(target);
 
+    /**
+     * Injective when the target binary length can hold worst-case UTF-8 
encoding (4 bytes/char).
+     */
+    private static final BiPredicate<LogicalType, LogicalType> 
WHEN_BINARY_LENGTH_FITS_UTF8 =
+            (source, target) -> {
+                if (hasMaxLength(target)) {
+                    return true;
+                }
+                if (hasMaxLength(source)) {
+                    return false;
+                }
+                return (long) getLength(target) >= (long) getLength(source) * 
4;
+            };
+
     static {
         implicitCastingRules = new HashMap<>();
         explicitCastingRules = new HashMap<>();
@@ -191,6 +206,7 @@ public final class LogicalTypeCasts {
                 .explicitFromFamily(CHARACTER_STRING)
                 .explicitFrom(VARBINARY, RAW)
                 .injectiveFrom(WHEN_LENGTH_FITS, BINARY)
+                .injectiveFrom(WHEN_BINARY_LENGTH_FITS_UTF8, CHAR, VARCHAR)
                 .build();
 
         castTo(VARBINARY)
@@ -198,6 +214,7 @@ public final class LogicalTypeCasts {
                 .explicitFromFamily(CHARACTER_STRING)
                 .explicitFrom(BINARY, RAW)
                 .injectiveFrom(WHEN_LENGTH_FITS, BINARY, VARBINARY)
+                .injectiveFrom(WHEN_BINARY_LENGTH_FITS_UTF8, CHAR, VARCHAR)
                 .build();
 
         // 
-----------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
index 229d344fe60..9c1a791472a 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
@@ -151,6 +151,10 @@ public final class LogicalTypeChecks {
         return getLength(logicalType) == length;
     }
 
+    public static boolean hasMaxLength(LogicalType logicalType) {
+        return getLength(logicalType) == Integer.MAX_VALUE;
+    }
+
     /** Returns the precision of all types that define a precision implicitly 
or explicitly. */
     public static int getPrecision(LogicalType logicalType) {
         return logicalType.accept(PRECISION_EXTRACTOR);
diff --git 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
index 7428e5adc10..e6ac53f79bd 100644
--- 
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
+++ 
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
@@ -479,7 +479,35 @@ class LogicalTypeCastsTest {
                 Arguments.of(new TimestampType(3), new VarCharType(5), false), 
// too short
 
                 // DOUBLE identity
-                Arguments.of(new DoubleType(), new DoubleType(), true));
+                Arguments.of(new DoubleType(), new DoubleType(), true),
+
+                // ---- String to binary injective casts (UTF-8: up to 4 bytes 
per char) ----
+
+                // VARCHAR(MAX) → VARBINARY(MAX): both unbounded
+                Arguments.of(
+                        VarCharType.STRING_TYPE, new 
VarBinaryType(VarBinaryType.MAX_LENGTH), true),
+                // CHAR(MAX) → VARBINARY(MAX): both unbounded
+                Arguments.of(
+                        new CharType(CharType.MAX_LENGTH),
+                        new VarBinaryType(VarBinaryType.MAX_LENGTH),
+                        true),
+                // VARCHAR(10) → VARBINARY(40): exact fit (10 * 4 = 40)
+                Arguments.of(new VarCharType(10), new VarBinaryType(40), true),
+                // VARCHAR(10) → VARBINARY(39): one byte short
+                Arguments.of(new VarCharType(10), new VarBinaryType(39), 
false),
+                // VARCHAR(10) → VARBINARY(MAX): target unbounded
+                Arguments.of(
+                        new VarCharType(10), new 
VarBinaryType(VarBinaryType.MAX_LENGTH), true),
+                // VARCHAR(MAX) → VARBINARY(100): source unbounded, target 
bounded
+                Arguments.of(VarCharType.STRING_TYPE, new VarBinaryType(100), 
false),
+                // CHAR(5) → BINARY(20): exact fit (5 * 4 = 20)
+                Arguments.of(new CharType(5), new BinaryType(20), true),
+                // CHAR(5) → BINARY(19): one byte short
+                Arguments.of(new CharType(5), new BinaryType(19), false),
+                // CHAR(10) → VARBINARY(40): fixed char to var binary
+                Arguments.of(new CharType(10), new VarBinaryType(40), true),
+                // VARCHAR(10) → BINARY(40): var char to fixed binary
+                Arguments.of(new VarCharType(10), new BinaryType(40), true));
     }
 
     @ParameterizedTest(name = "{index}: [From: {0}, To: {1}, Injective: {2}]")

Reply via email to