gustavodemorais commented on code in PR #28278:
URL: https://github.com/apache/flink/pull/28278#discussion_r3334701315


##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -123,6 +123,147 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
+    /**
+     * Also exercises the {@code ChangelogNormalize} state used to materialize 
full DELETE
+     * pre-images.
+     */
+    public static final TableTestProgram UPSERT_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-restore",
+                            "TO_CHANGELOG over an upsert source restores via 
compiled plan + "
+                                    + "savepoint, including ChangelogNormalize 
state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[INSERT, Alice, 10]", 
"+I[INSERT, Bob, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
+                    .build();
+
+    public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-retract-partition-by-restore",
+                            "TO_CHANGELOG over a retract source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /** Also exercises keyed {@code ChangelogNormalize} state for full DELETE 
pre-images. */
+    public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-partition-by-restore",
+                            "TO_CHANGELOG over an upsert source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint, including 
ChangelogNormalize "
+                                    + "state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")

Review Comment:
   For set semantics we can define NOT NULL 👍 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to