gustavodemorais commented on code in PR #28278:
URL: https://github.com/apache/flink/pull/28278#discussion_r3334353724


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -123,6 +123,147 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
+    /**
+     * Also exercises the {@code ChangelogNormalize} state used to materialize 
full DELETE
+     * pre-images.
+     */
+    public static final TableTestProgram UPSERT_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-restore",
+                            "TO_CHANGELOG over an upsert source restores via 
compiled plan + "
+                                    + "savepoint, including ChangelogNormalize 
state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[INSERT, Alice, 10]", 
"+I[INSERT, Bob, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
+                    .build();
+
+    public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-retract-partition-by-restore",
+                            "TO_CHANGELOG over a retract source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /** Also exercises keyed {@code ChangelogNormalize} state for full DELETE 
pre-images. */
+    public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-partition-by-restore",
+                            "TO_CHANGELOG over an upsert source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint, including 
ChangelogNormalize "
+                                    + "state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /**
+     * Retract source through TO_CHANGELOG with {@code 
produces_full_deletes=false} and a NOT NULL
+     * input column. The retract source carries full DELETE pre-images 
natively, so the planner does
+     * not insert {@code ChangelogNormalize}; the pipeline is stateless beyond 
the source. Verifies
+     * that the planner's nullability widening (non-key columns are nulled on 
partial DELETE rows,
+     * so the output schema widens them to nullable) round-trips through the 
compiled plan
+     * correctly.
+     */
+    public static final TableTestProgram 
RETRACT_PRODUCES_PARTIAL_DELETES_RESTORE =
+            TableTestProgram.of(
+                            
"to-changelog-retract-produces-partial-deletes-restore",
+                            "TO_CHANGELOG with produces_full_deletes=false 
over a NOT NULL input "
+                                    + "column restores via compiled plan + 
savepoint; the widened "
+                                    + "(nullable) output schema must 
round-trip without "
+                                    + "ChangelogNormalize")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "score BIGINT NOT NULL")
+                                    .addMode(ChangelogMode.all())
+                                    
.producedBeforeRestore(Row.ofKind(RowKind.INSERT, "Alice", 10L))
+                                    
.producedAfterRestore(Row.ofKind(RowKind.DELETE, "Alice", 10L))

Review Comment:
   done



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to