This is an automated email from the ASF dual-hosted git repository.

fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new c0479c74f2b [FLINK-39201] Add test coverage for null values and error 
constraint
c0479c74f2b is described below

commit c0479c74f2b736aaed00b4d9fb1b14ad296e562d
Author: Fabian Paul <[email protected]>
AuthorDate: Wed Mar 4 13:48:52 2026 +0100

    [FLINK-39201] Add test coverage for null values and error constraint
---
 .../stream/ConstraintEnforcerSemanticTests.java    |  4 +-
 .../stream/ConstraintEnforcerTestPrograms.java     | 58 ++++++++++++++++++++++
 2 files changed, 61 insertions(+), 1 deletion(-)

diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
index 1ea7ad01382..5f08ddea816 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
@@ -42,6 +42,8 @@ public class ConstraintEnforcerSemanticTests extends 
SemanticTestBase {
                 ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_ARRAYS,
                 ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_MAPS,
                 
ConstraintEnforcerTestPrograms.CHAR_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS,
-                
ConstraintEnforcerTestPrograms.BINARY_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS);
+                
ConstraintEnforcerTestPrograms.BINARY_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS,
+                
ConstraintEnforcerTestPrograms.CHAR_LENGTH_ERROR_WITH_NULLABLE_COLUMNS,
+                
ConstraintEnforcerTestPrograms.BINARY_LENGTH_ERROR_WITH_NULLABLE_COLUMNS);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
index 527d59015c1..f777486c221 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
@@ -879,6 +879,64 @@ public class ConstraintEnforcerTestPrograms {
                     .runSql("INSERT INTO sink_t SELECT * FROM source_t")
                     .build();
 
+    static final TableTestProgram CHAR_LENGTH_ERROR_WITH_NULLABLE_COLUMNS =
+            TableTestProgram.of(
+                            "constraint-enforcer-char-length-error-nullable",
+                            "validates constraint enforcer handles null values 
in nullable"
+                                    + " CHAR/VARCHAR columns with ERROR 
enforcement")
+                    .setupConfig(TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER, 
TypeLengthEnforcer.ERROR)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SCHEMA_CHAR_LENGTH_NULLABLE)
+                                    .producedValues(
+                                            Row.of(1, "ApacheFl", "SQL Ru", 
11, 111, "SQL"),
+                                            Row.of(2, null, null, 22, 222, 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SCHEMA_CHAR_LENGTH_NULLABLE)
+                                    .consumedValues(
+                                            Row.of(1, "ApacheFl", "SQL Ru", 
11, 111, "SQL"),
+                                            Row.of(2, null, null, 22, 222, 
null))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+                    .build();
+
+    static final TableTestProgram BINARY_LENGTH_ERROR_WITH_NULLABLE_COLUMNS =
+            TableTestProgram.of(
+                            "constraint-enforcer-binary-length-error-nullable",
+                            "validates constraint enforcer handles null values 
in nullable"
+                                    + " BINARY/VARBINARY columns with ERROR 
enforcement")
+                    .setupConfig(TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER, 
TypeLengthEnforcer.ERROR)
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("source_t")
+                                    .addSchema(SCHEMA_BINARY_LENGTH_NULLABLE)
+                                    .producedValues(
+                                            Row.of(
+                                                    1,
+                                                    new byte[] {1, 2, 3, 4, 5, 
6, 7, 8},
+                                                    new byte[] {1, 2, 3, 4, 5, 
6},
+                                                    11,
+                                                    111,
+                                                    new byte[] {1, 2, 3}),
+                                            Row.of(2, null, null, 22, 222, 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink_t")
+                                    .addSchema(SCHEMA_BINARY_LENGTH_NULLABLE)
+                                    .consumedValues(
+                                            Row.of(
+                                                    1,
+                                                    new byte[] {1, 2, 3, 4, 5, 
6, 7, 8},
+                                                    new byte[] {1, 2, 3, 4, 5, 
6},
+                                                    11,
+                                                    111,
+                                                    new byte[] {1, 2, 3}),
+                                            Row.of(2, null, null, 22, 222, 
null))
+                                    .build())
+                    .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+                    .build();
+
     private static Map<Long, Long> mapOfNullable(@Nullable Long key, @Nullable 
Long value) {
         final Map<Long, Long> map = new HashMap<>();
         map.put(key, value);

Reply via email to