This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 9da78118bee [FLINK-39201] Skip null values during length enforcement
9da78118bee is described below

commit 9da78118bee862729cf68ce1a1e2a992a30c78a5
Author: Fabian Paul <[email protected]>
AuthorDate: Tue Mar 3 15:20:52 2026 +0100

    [FLINK-39201] Skip null values during length enforcement
---
 .../stream/ConstraintEnforcerSemanticTests.java    |  4 +-
 .../stream/ConstraintEnforcerTestPrograms.java     | 64 ++++++++++++++++++++++
 .../sink/constraint/BinaryLengthConstraint.java    |  3 +
 .../sink/constraint/CharLengthConstraint.java      |  3 +
 4 files changed, 73 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
index 499770c480b..1ea7ad01382 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
@@ -40,6 +40,8 @@ public class ConstraintEnforcerSemanticTests extends 
SemanticTestBase {
                 ConstraintEnforcerTestPrograms.NOT_NULL_ERROR_NESTED_MAPS,
                 
ConstraintEnforcerTestPrograms.LENGTH_TRIM_PAD_WITH_NESTED_COLLECTIONS,
                 ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_ARRAYS,
-                ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_MAPS);
+                ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_MAPS,
+                
ConstraintEnforcerTestPrograms.CHAR_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS,
+                
ConstraintEnforcerTestPrograms.BINARY_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
index 9609a3f3d01..527d59015c1 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
@@ -815,6 +815,70 @@ public class ConstraintEnforcerTestPrograms {
                                     + " control this behaviour")
                     .build();
 
+    public static final String SCHEMA_CHAR_LENGTH_NULLABLE =
+            "a INT NOT NULL, b CHAR(8), c CHAR(6), d INT NOT NULL, e INT NOT 
NULL, f VARCHAR(6)";
+
+    static final TableTestProgram CHAR_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS =
+            TableTestProgram.of(
+                            
"constraint-enforcer-char-length-trim-pad-nullable",
+                            "validates constraint enforcer handles null values 
in nullable"
+                                    + " CHAR/VARCHAR columns with TRIM_PAD 
enforcement")
+                    .setupConfig(TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER, 
TypeLengthEnforcer.TRIM_PAD)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SCHEMA_CHAR_LENGTH_NULLABLE)
+                                    .producedValues(
+                                            Row.of(1, "Apache Flink", "SQL 
RuleZ", 11, 111, "SQL"),
+                                            Row.of(2, null, null, 22, 222, 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SCHEMA_CHAR_LENGTH_NULLABLE)
+                                    .consumedValues(
+                                            Row.of(1, "Apache F", "SQL Ru", 
11, 111, "SQL"),
+                                            Row.of(2, null, null, 22, 222, 
null))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+                    .build();
+
+    public static final String SCHEMA_BINARY_LENGTH_NULLABLE =
+            "a INT NOT NULL, b BINARY(8), c BINARY(6), d INT NOT NULL, e INT 
NOT NULL, f VARBINARY(6)";
+
+    static final TableTestProgram BINARY_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS 
=
+            TableTestProgram.of(
+                            
"constraint-enforcer-binary-length-trim-pad-nullable",
+                            "validates constraint enforcer handles null values 
in nullable"
+                                    + " BINARY/VARBINARY columns with TRIM_PAD 
enforcement")
+                    .setupConfig(TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER, 
TypeLengthEnforcer.TRIM_PAD)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SCHEMA_BINARY_LENGTH_NULLABLE)
+                                    .producedValues(
+                                            Row.of(
+                                                    1,
+                                                    new byte[] {1, 2, 3, 4, 5, 
6, 7, 8, 9, 10},
+                                                    new byte[] {1, 2, 3, 4, 5, 
6, 7, 8},
+                                                    11,
+                                                    111,
+                                                    new byte[] {1, 2, 3}),
+                                            Row.of(2, null, null, 22, 222, 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SCHEMA_BINARY_LENGTH_NULLABLE)
+                                    .consumedValues(
+                                            Row.of(
+                                                    1,
+                                                    new byte[] {1, 2, 3, 4, 5, 
6, 7, 8},
+                                                    new byte[] {1, 2, 3, 4, 5, 
6},
+                                                    11,
+                                                    111,
+                                                    new byte[] {1, 2, 3}),
+                                            Row.of(2, null, null, 22, 222, 
null))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+                    .build();
+
     private static Map<Long, Long> mapOfNullable(@Nullable Long key, @Nullable 
Long value) {
         final Map<Long, Long> map = new HashMap<>();
         map.put(key, value);
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/BinaryLengthConstraint.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/BinaryLengthConstraint.java
index 692462dd773..f994c523fbf 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/BinaryLengthConstraint.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/BinaryLengthConstraint.java
@@ -59,6 +59,9 @@ final class BinaryLengthConstraint implements Constraint {
 
         for (int i = 0; i < fieldLengths.length; i++) {
             final int fieldIdx = fieldIndices[i];
+            if (rowData.isNullAt(fieldIdx)) {
+                continue;
+            }
             final int expectedLength = fieldLengths[i];
             final byte[] binaryData = rowData.getBinary(fieldIdx);
             final int actualLength = binaryData.length;
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/CharLengthConstraint.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/CharLengthConstraint.java
index 641be2f4ebe..4c24080e3c4 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/CharLengthConstraint.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/CharLengthConstraint.java
@@ -61,6 +61,9 @@ final class CharLengthConstraint implements Constraint {
 
         for (int i = 0; i < fieldIndices.length; i++) {
             final int fieldIdx = fieldIndices[i];
+            if (rowData.isNullAt(fieldIdx)) {
+                continue;
+            }
             final int expectedLength = fieldLengths[i];
             final BinaryStringData stringData = (BinaryStringData) 
rowData.getString(fieldIdx);
             final int actualLength = stringData.numChars();

Reply via email to