raminqaf commented on code in PR #28235:
URL: https://github.com/apache/flink/pull/28235#discussion_r3311143540


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +599,220 @@ public class ToChangelogTestPrograms {
                             ValidationException.class,
                             "Duplicate change operation: 'DELETE'")
                     .build();
+
+    public static final TableTestProgram 
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =
+            TableTestProgram.of(
+                            
"to-changelog-produces-full-deletes-on-append-only-input",
+                            "fails when produces_full_deletes=true on an input 
that never emits DELETE rows")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => true)",
+                            ValidationException.class,
+                            "the input table only produces [INSERT] and never 
emits DELETE rows")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Row semantics x delete handling matrix
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram ROW_SEM_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-row-sem-partial-deletes",
+                            "row semantics: produces_full_deletes=false skips 
ChangelogNormalize and a partial DELETE row from the input passes through 
unchanged")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[INSERT, Alice, 10]",
+                                            "+I[INSERT, Bob, 20]",
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => false)")
+                    .build();
+
+    public static final TableTestProgram ROW_SEM_FORCE_FULL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-row-sem-force-full-deletes",
+                            "row semantics: produces_full_deletes=true forces 
ChangelogNormalize to materialize the full DELETE row from an upsert source 
emitting key-only deletes")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            // Key-only delete: 
ChangelogNormalize fills the row.
+                                            Row.ofKind(RowKind.DELETE, 
"Alice", null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[INSERT, Alice, 10]", 
"+I[DELETE, Alice, 10]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => true)")
+                    .build();
+
+    public static final TableTestProgram 
ROW_SEM_PARTIAL_DELETES_VIA_UPSERT_KEY =
+            TableTestProgram.of(
+                            
"to-changelog-row-sem-partial-deletes-via-upsert-key",
+                            "row semantics with single-column upsert key + "
+                                    + "produces_full_deletes=false: DELETE 
preserves the key "
+                                    + "column and nulls the rest without 
requiring PARTITION BY")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.DELETE, 
"Alice", 10L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[INSERT, Alice, 10]", 
"+I[DELETE, Alice, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => false)")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Set semantics x delete handling matrix
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram SET_SEM_FORCE_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-set-sem-force-partial-deletes",
+                            "set semantics: produces_full_deletes=false nulls 
non-partition-key columns on DELETE even when the input row is fully populated")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedValues(
+                                            "+I[Alice, INSERT, 10]",
+                                            "+I[Bob, INSERT, 20]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, null]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name,"
+                                    + "produces_full_deletes => false)")
+                    .build();
+
+    public static final TableTestProgram SET_SEM_PARTIAL_DELETES =
+            TableTestProgram.of(
+                            "to-changelog-set-sem-partial-deletes",
+                            "set semantics: produces_full_deletes=false 
(default) lets a partial DELETE row from the input pass through with 
non-partition-key columns null")

Review Comment:
   Corrected



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -598,4 +599,220 @@ public class ToChangelogTestPrograms {
                             ValidationException.class,
                             "Duplicate change operation: 'DELETE'")
                     .build();
+
+    public static final TableTestProgram 
PRODUCES_FULL_DELETES_ON_APPEND_ONLY_INPUT =
+            TableTestProgram.of(
+                            
"to-changelog-produces-full-deletes-on-append-only-input",
+                            "fails when produces_full_deletes=true on an input 
that never emits DELETE rows")
+                    .setupTableSource(SIMPLE_SOURCE)
+                    .runFailingSql(
+                            "SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t, "
+                                    + "produces_full_deletes => true)",
+                            ValidationException.class,
+                            "the input table only produces [INSERT] and never 
emits DELETE rows")
+                    .build();
+
+    // 
--------------------------------------------------------------------------------------------
+    // Row semantics x delete handling matrix
+    // 
--------------------------------------------------------------------------------------------
+
+    public static final TableTestProgram ROW_SEM_PARTIAL_DELETES =

Review Comment:
   Renamed



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to