This is an automated email from the ASF dual-hosted git repository.
fpaul pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new c0479c74f2b [FLINK-39201] Add test coverage for null values and error
constraint
c0479c74f2b is described below
commit c0479c74f2b736aaed00b4d9fb1b14ad296e562d
Author: Fabian Paul <[email protected]>
AuthorDate: Wed Mar 4 13:48:52 2026 +0100
[FLINK-39201] Add test coverage for null values and error constraint
---
.../stream/ConstraintEnforcerSemanticTests.java | 4 +-
.../stream/ConstraintEnforcerTestPrograms.java | 58 ++++++++++++++++++++++
2 files changed, 61 insertions(+), 1 deletion(-)
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
index 1ea7ad01382..5f08ddea816 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerSemanticTests.java
@@ -42,6 +42,8 @@ public class ConstraintEnforcerSemanticTests extends
SemanticTestBase {
ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_ARRAYS,
ConstraintEnforcerTestPrograms.LENGTH_ERROR_WITH_NESTED_MAPS,
ConstraintEnforcerTestPrograms.CHAR_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS,
-
ConstraintEnforcerTestPrograms.BINARY_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS);
+
ConstraintEnforcerTestPrograms.BINARY_LENGTH_TRIM_PAD_WITH_NULLABLE_COLUMNS,
+
ConstraintEnforcerTestPrograms.CHAR_LENGTH_ERROR_WITH_NULLABLE_COLUMNS,
+
ConstraintEnforcerTestPrograms.BINARY_LENGTH_ERROR_WITH_NULLABLE_COLUMNS);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
index 527d59015c1..f777486c221 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ConstraintEnforcerTestPrograms.java
@@ -879,6 +879,64 @@ public class ConstraintEnforcerTestPrograms {
.runSql("INSERT INTO sink_t SELECT * FROM source_t")
.build();
+ static final TableTestProgram CHAR_LENGTH_ERROR_WITH_NULLABLE_COLUMNS =
+ TableTestProgram.of(
+ "constraint-enforcer-char-length-error-nullable",
+ "validates constraint enforcer handles null values
in nullable"
+ + " CHAR/VARCHAR columns with ERROR
enforcement")
+ .setupConfig(TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER,
TypeLengthEnforcer.ERROR)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(SCHEMA_CHAR_LENGTH_NULLABLE)
+ .producedValues(
+ Row.of(1, "ApacheFl", "SQL Ru",
11, 111, "SQL"),
+ Row.of(2, null, null, 22, 222,
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(SCHEMA_CHAR_LENGTH_NULLABLE)
+ .consumedValues(
+ Row.of(1, "ApacheFl", "SQL Ru",
11, 111, "SQL"),
+ Row.of(2, null, null, 22, 222,
null))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+ .build();
+
+ static final TableTestProgram BINARY_LENGTH_ERROR_WITH_NULLABLE_COLUMNS =
+ TableTestProgram.of(
+ "constraint-enforcer-binary-length-error-nullable",
+ "validates constraint enforcer handles null values
in nullable"
+ + " BINARY/VARBINARY columns with ERROR
enforcement")
+ .setupConfig(TABLE_EXEC_SINK_TYPE_LENGTH_ENFORCER,
TypeLengthEnforcer.ERROR)
+ .setupTableSource(
+ SourceTestStep.newBuilder("source_t")
+ .addSchema(SCHEMA_BINARY_LENGTH_NULLABLE)
+ .producedValues(
+ Row.of(
+ 1,
+ new byte[] {1, 2, 3, 4, 5,
6, 7, 8},
+ new byte[] {1, 2, 3, 4, 5,
6},
+ 11,
+ 111,
+ new byte[] {1, 2, 3}),
+ Row.of(2, null, null, 22, 222,
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink_t")
+ .addSchema(SCHEMA_BINARY_LENGTH_NULLABLE)
+ .consumedValues(
+ Row.of(
+ 1,
+ new byte[] {1, 2, 3, 4, 5,
6, 7, 8},
+ new byte[] {1, 2, 3, 4, 5,
6},
+ 11,
+ 111,
+ new byte[] {1, 2, 3}),
+ Row.of(2, null, null, 22, 222,
null))
+ .build())
+ .runSql("INSERT INTO sink_t SELECT * FROM source_t")
+ .build();
+
private static Map<Long, Long> mapOfNullable(@Nullable Long key, @Nullable
Long value) {
final Map<Long, Long> map = new HashMap<>();
map.put(key, value);