This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 9da78118bee [FLINK-39201] Skip null values during length enforcement
9da78118bee is described below
commit 9da78118bee862729cf68ce1a1e2a992a30c78a5
Author: Fabian Paul <[email protected]>
AuthorDate: Tue Mar 3 15:20:52 2026 +0100
[FLINK-39201] Skip null values during length enforcement
---
.../stream/ConstraintEnforcerSemanticTests.java | 4 +-
.../stream/ConstraintEnforcerTestPrograms.java | 64 ++++++++++++++++++++++
.../sink/constraint/BinaryLengthConstraint.java | 3 +
.../sink/constraint/CharLengthConstraint.java | 3 +
4 files changed, 73 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
index 499770c480b..1ea7ad01382 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
@@ -40,6 +40,8 @@ public class ConstraintEnforcerSemanticTests extends
SemanticTestBase {
ConstraintEnforcerTestPrograms.NOT_NULL_ERROR_NESTED_MAPS,
ConstraintEnforcerTestPrograms.LENGTH_TRIM_PAD_WITH_NESTED_COLLECTIONS,
ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_ARRAYS,
- ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_MAPS);
+ ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_MAPS,
+
ConstraintEnforcerTestPrograms.CHAR_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS,
+
ConstraintEnforcerTestPrograms.BINARY_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
index 9609a3f3d01..527d59015c1 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
@@ -815,6 +815,70 @@ public class ConstraintEnforcerTestPrograms {
+ " control this behaviour")
.build();
+ public static final String SCHEMA_CHAR_LENGTH_NULLABLE =
+ "a INT NOT NULL, b CHAR(8), c CHAR(6), d INT NOT NULL, e INT NOT
NULL, f VARCHAR(6)";
+
+ static final TableTestProgram CHAR_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS =
+ TableTestProgram.of(
+
"constraint-enforcer-char-length-trim-pad-nullable",
+ "validates constraint enforcer handles null values
in nullable"
+ + " CHAR/VARCHAR columns with TRIM_PAD
enforcement")
+ .setupConfig(TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER,
TypeLengthEnforcer.TRIM_PAD)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(SCHEMA_CHAR_LENGTH_NULLABLE)
+ .producedValues(
+ Row.of(1, "Apache Flink", "SQL
RuleZ", 11, 111, "SQL"),
+ Row.of(2, null, null, 22, 222,
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(SCHEMA_CHAR_LENGTH_NULLABLE)
+ .consumedValues(
+ Row.of(1, "Apache F", "SQL Ru",
11, 111, "SQL"),
+ Row.of(2, null, null, 22, 222,
null))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+ .build();
+
+ public static final String SCHEMA_BINARY_LENGTH_NULLABLE =
+ "a INT NOT NULL, b BINARY(8), c BINARY(6), d INT NOT NULL, e INT
NOT NULL, f VARBINARY(6)";
+
+ static final TableTestProgram BINARY_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS
=
+ TableTestProgram.of(
+
"constraint-enforcer-binary-length-trim-pad-nullable",
+ "validates constraint enforcer handles null values
in nullable"
+ + " BINARY/VARBINARY columns with TRIM_PAD
enforcement")
+ .setupConfig(TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER,
TypeLengthEnforcer.TRIM_PAD)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(SCHEMA_BINARY_LENGTH_NULLABLE)
+ .producedValues(
+ Row.of(
+ 1,
+ new byte[] {1, 2, 3, 4, 5,
6, 7, 8, 9, 10},
+ new byte[] {1, 2, 3, 4, 5,
6, 7, 8},
+ 11,
+ 111,
+ new byte[] {1, 2, 3}),
+ Row.of(2, null, null, 22, 222,
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(SCHEMA_BINARY_LENGTH_NULLABLE)
+ .consumedValues(
+ Row.of(
+ 1,
+ new byte[] {1, 2, 3, 4, 5,
6, 7, 8},
+ new byte[] {1, 2, 3, 4, 5,
6},
+ 11,
+ 111,
+ new byte[] {1, 2, 3}),
+ Row.of(2, null, null, 22, 222,
null))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+ .build();
+
private static Map<Long, Long> mapOfNullable(@Nullable Long key, @Nullable
Long value) {
final Map<Long, Long> map = new HashMap<>();
map.put(key, value);
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/BinaryLengthConstraint.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/BinaryLengthConstraint.java
index 692462dd773..f994c523fbf 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/BinaryLengthConstraint.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/BinaryLengthConstraint.java
@@ -59,6 +59,9 @@ final class BinaryLengthConstraint implements Constraint {
for (int i = 0; i < fieldLengths.length; i++) {
final int fieldIdx = fieldIndices[i];
+ if (rowData.isNullAt(fieldIdx)) {
+ continue;
+ }
final int expectedLength = fieldLengths[i];
final byte[] binaryData = rowData.getBinary(fieldIdx);
final int actualLength = binaryData.length;
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/CharLengthConstraint.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/CharLengthConstraint.java
index 641be2f4ebe..4c24080e3c4 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/CharLengthConstraint.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/constraint/CharLengthConstraint.java
@@ -61,6 +61,9 @@ final class CharLengthConstraint implements Constraint {
for (int i = 0; i < fieldIndices.length; i++) {
final int fieldIdx = fieldIndices[i];
+ if (rowData.isNullAt(fieldIdx)) {
+ continue;
+ }
final int expectedLength = fieldLengths[i];
final BinaryStringData stringData = (BinaryStringData)
rowData.getString(fieldIdx);
final int actualLength = stringData.numChars();