This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new de569e4d930 [FLINK-39088][table] Support injective casts from
CHAR/VARCHAR to BINARY/VARBINARY
de569e4d930 is described below
commit de569e4d9305ed5aeb569d53a7f36ae054a12e87
Author: Gustavo de Morais <[email protected]>
AuthorDate: Mon Feb 23 10:43:17 2026 +0100
[FLINK-39088][table] Support injective casts from CHAR/VARCHAR to
BINARY/VARBINARY
This closes #27640.
---
.../types/logical/utils/LogicalTypeCasts.java | 17 ++++++++++++
.../types/logical/utils/LogicalTypeChecks.java | 4 +++
.../flink/table/types/LogicalTypeCastsTest.java | 30 +++++++++++++++++++++-
3 files changed, 50 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
index 307f47f76d3..97cd40b8e6c 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeCasts.java
@@ -84,6 +84,7 @@ import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getLe
import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getPrecision;
import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getScale;
import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.getYearPrecision;
+import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.hasMaxLength;
import static
org.apache.flink.table.types.logical.utils.LogicalTypeChecks.isSingleFieldInterval;
/**
@@ -151,6 +152,20 @@ public final class LogicalTypeCasts {
getPrecision(source) == getPrecision(target)
&& getScale(source) == getScale(target);
+ /**
+ * Injective when the target binary length can hold worst-case UTF-8
encoding (4 bytes/char).
+ */
+ private static final BiPredicate<LogicalType, LogicalType>
WHEN_BINARY_LENGTH_FITS_UTF8 =
+ (source, target) -> {
+ if (hasMaxLength(target)) {
+ return true;
+ }
+ if (hasMaxLength(source)) {
+ return false;
+ }
+ return (long) getLength(target) >= (long) getLength(source) *
4;
+ };
+
static {
implicitCastingRules = new HashMap<>();
explicitCastingRules = new HashMap<>();
@@ -191,6 +206,7 @@ public final class LogicalTypeCasts {
.explicitFromFamily(CHARACTER_STRING)
.explicitFrom(VARBINARY, RAW)
.injectiveFrom(WHEN_LENGTH_FITS, BINARY)
+ .injectiveFrom(WHEN_BINARY_LENGTH_FITS_UTF8, CHAR, VARCHAR)
.build();
castTo(VARBINARY)
@@ -198,6 +214,7 @@ public final class LogicalTypeCasts {
.explicitFromFamily(CHARACTER_STRING)
.explicitFrom(BINARY, RAW)
.injectiveFrom(WHEN_LENGTH_FITS, BINARY, VARBINARY)
+ .injectiveFrom(WHEN_BINARY_LENGTH_FITS_UTF8, CHAR, VARCHAR)
.build();
//
-----------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
index 229d344fe60..9c1a791472a 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/logical/utils/LogicalTypeChecks.java
@@ -151,6 +151,10 @@ public final class LogicalTypeChecks {
return getLength(logicalType) == length;
}
+ public static boolean hasMaxLength(LogicalType logicalType) {
+ return getLength(logicalType) == Integer.MAX_VALUE;
+ }
+
/** Returns the precision of all types that define a precision implicitly
or explicitly. */
public static int getPrecision(LogicalType logicalType) {
return logicalType.accept(PRECISION_EXTRACTOR);
diff --git
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
index 7428e5adc10..e6ac53f79bd 100644
---
a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
+++
b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/LogicalTypeCastsTest.java
@@ -479,7 +479,35 @@ class LogicalTypeCastsTest {
Arguments.of(new TimestampType(3), new VarCharType(5), false),
// too short
// DOUBLE identity
- Arguments.of(new DoubleType(), new DoubleType(), true));
+ Arguments.of(new DoubleType(), new DoubleType(), true),
+
+ // ---- String to binary injective casts (UTF-8: up to 4 bytes
per char) ----
+
+ // VARCHAR(MAX) → VARBINARY(MAX): both unbounded
+ Arguments.of(
+ VarCharType.STRING_TYPE, new
VarBinaryType(VarBinaryType.MAX_LENGTH), true),
+ // CHAR(MAX) → VARBINARY(MAX): both unbounded
+ Arguments.of(
+ new CharType(CharType.MAX_LENGTH),
+ new VarBinaryType(VarBinaryType.MAX_LENGTH),
+ true),
+ // VARCHAR(10) → VARBINARY(40): exact fit (10 * 4 = 40)
+ Arguments.of(new VarCharType(10), new VarBinaryType(40), true),
+ // VARCHAR(10) → VARBINARY(39): one byte short
+ Arguments.of(new VarCharType(10), new VarBinaryType(39),
false),
+ // VARCHAR(10) → VARBINARY(MAX): target unbounded
+ Arguments.of(
+ new VarCharType(10), new
VarBinaryType(VarBinaryType.MAX_LENGTH), true),
+ // VARCHAR(MAX) → VARBINARY(100): source unbounded, target
bounded
+ Arguments.of(VarCharType.STRING_TYPE, new VarBinaryType(100),
false),
+ // CHAR(5) → BINARY(20): exact fit (5 * 4 = 20)
+ Arguments.of(new CharType(5), new BinaryType(20), true),
+ // CHAR(5) → BINARY(19): one byte short
+ Arguments.of(new CharType(5), new BinaryType(19), false),
+ // CHAR(10) → VARBINARY(40): fixed char to var binary
+ Arguments.of(new CharType(10), new VarBinaryType(40), true),
+ // VARCHAR(10) → BINARY(40): var char to fixed binary
+ Arguments.of(new VarCharType(10), new BinaryType(40), true));
}
@ParameterizedTest(name = "{index}: [From: {0}, To: {1}, Injective: {2}]")