gustavodemorais commented on code in PR #28278:
URL: https://github.com/apache/flink/pull/28278#discussion_r3334696499
##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -123,6 +123,147 @@ public class ToChangelogTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
.build();
+ /**
+ * Also exercises the {@code ChangelogNormalize} state used to materialize
full DELETE
+ * pre-images.
+ */
+ public static final TableTestProgram UPSERT_RESTORE =
+ TableTestProgram.of(
+ "to-changelog-upsert-restore",
+ "TO_CHANGELOG over an upsert source restores via
compiled plan + "
+ + "savepoint, including ChangelogNormalize
state")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.upsert())
+ .producedBeforeRestore(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L))
+ .producedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ // Key-only delete:
ChangelogNormalize restores
+ // Bob's pre-image from state.
+ Row.ofKind(RowKind.DELETE, "Bob",
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
+ .consumedBeforeRestore(
+ "+I[INSERT, Alice, 10]",
"+I[INSERT, Bob, 20]")
+ .consumedAfterRestore(
+ "+I[UPDATE_BEFORE, Alice, 10]",
+ "+I[UPDATE_AFTER, Alice, 30]",
+ "+I[DELETE, Bob, 20]")
+ .build())
+ .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
+ .build();
+
+ public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+ TableTestProgram.of(
+ "to-changelog-retract-partition-by-restore",
+ "TO_CHANGELOG over a retract source with PARTITION
BY restores via "
+ + "compiled plan + savepoint")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedBeforeRestore(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L))
+ .producedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ Row.ofKind(RowKind.DELETE, "Bob",
20L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("name STRING", "op STRING",
"score BIGINT")
+ .consumedBeforeRestore(
+ "+I[Alice, INSERT, 10]", "+I[Bob,
INSERT, 20]")
+ .consumedAfterRestore(
+ "+I[Alice, UPDATE_BEFORE, 10]",
+ "+I[Alice, UPDATE_AFTER, 30]",
+ "+I[Bob, DELETE, 20]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY name)")
+ .build();
+
+ /** Also exercises keyed {@code ChangelogNormalize} state for full DELETE
pre-images. */
+ public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+ TableTestProgram.of(
+ "to-changelog-upsert-partition-by-restore",
+ "TO_CHANGELOG over an upsert source with PARTITION
BY restores via "
+ + "compiled plan + savepoint, including
ChangelogNormalize "
+ + "state")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED", "score BIGINT")
+ .addMode(ChangelogMode.upsert())
+ .producedBeforeRestore(
+ Row.ofKind(RowKind.INSERT,
"Alice", 10L),
+ Row.ofKind(RowKind.INSERT, "Bob",
20L))
+ .producedAfterRestore(
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", 30L),
+ // Key-only delete:
ChangelogNormalize restores
+ // Bob's pre-image from state.
+ Row.ofKind(RowKind.DELETE, "Bob",
null))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("name STRING", "op STRING",
"score BIGINT")
+ .consumedBeforeRestore(
+ "+I[Alice, INSERT, 10]", "+I[Bob,
INSERT, 20]")
+ .consumedAfterRestore(
+ "+I[Alice, UPDATE_BEFORE, 10]",
+ "+I[Alice, UPDATE_AFTER, 30]",
+ "+I[Bob, DELETE, 20]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+ + "input => TABLE t PARTITION BY name)")
+ .build();
+
+ /**
+ * Retract source through TO_CHANGELOG with {@code
produces_full_deletes=false} and a NOT NULL
+ * input column. The retract source carries full DELETE pre-images
natively, so the planner does
+ * not insert {@code ChangelogNormalize}; the pipeline is stateless beyond
the source. Verifies
+ * that the planner's nullability widening (non-key columns are nulled on
partial DELETE rows,
+ * so the output schema widens them to nullable) round-trips through the
compiled plan
+ * correctly.
+ */
+ public static final TableTestProgram
RETRACT_PRODUCES_PARTIAL_DELETES_RESTORE =
+ TableTestProgram.of(
+
"to-changelog-retract-produces-partial-deletes-restore",
+ "TO_CHANGELOG with produces_full_deletes=false
over a NOT NULL input "
+ + "column restores via compiled plan +
savepoint; the widened "
+ + "(nullable) output schema must
round-trip without "
+ + "ChangelogNormalize")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED",
+ "score BIGINT NOT NULL")
+ .addMode(ChangelogMode.all())
+
.producedBeforeRestore(Row.ofKind(RowKind.INSERT, "Alice", 10L))
+
.producedAfterRestore(Row.ofKind(RowKind.DELETE, "Alice", 10L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("op STRING", "name STRING",
"score BIGINT")
Review Comment:
Currently, for row semantics, the output type strategy widens all
non-partition columns to nullable at validation time. That's a current
restriction because TableSemantics#upsertKeyColumns() is empty during type
inference
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]