raminqaf commented on code in PR #28278:
URL: https://github.com/apache/flink/pull/28278#discussion_r3325209548


##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java:
##########
@@ -48,6 +48,7 @@ class ToChangelogOutputTypeStrategyTest extends 
TypeStrategiesTestBase {
     @Override
     protected Stream<TestSpec> testData() {
         return Stream.of(

Review Comment:
   How about we make two methods one for Row semantics and one for Set 
semantics and then concat the streams? Then we a have a clean sepration of the 
tests. 



##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java:
##########
@@ -105,6 +106,55 @@ ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE_NOT_NULL_SCORE))
                                                 DataTypes.FIELD("op", 
DataTypes.STRING()),
                                                 DataTypes.FIELD("name", 
DataTypes.STRING()),
                                                 DataTypes.FIELD("score", 
DataTypes.BIGINT()))
+                                        .notNull()),
+                // Set semantics: PARTITION BY strips the partition column 
from the projected
+                // output (the framework prepends it back outside the 
function), so the strategy
+                // returns only [op, non_partition_input_cols]. With full 
deletes the row is
+                // passed through unchanged so non-partition NOT NULL columns 
stay non-nullable.
+                TestSpec.forStrategy(
+                                "produces_full_deletes=true in set semantics 
preserves NOT NULL on non-partition columns",
+                                TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                        .inputTypes(
+                                TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(
+                                ARG_TABLE,
+                                new TableSemanticsMock(
+                                        TABLE_TYPE_NOT_NULL_SCORE,
+                                        new int[] {0},
+                                        new int[0],
+                                        -1,
+                                        null,
+                                        List.of(new int[] {0})))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, null)
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
+                        .expectDataType(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("op", 
DataTypes.STRING()),
+                                                DataTypes.FIELD(
+                                                        "score", 
DataTypes.BIGINT().notNull()))
+                                        .notNull()),
+                TestSpec.forStrategy(
+                                "produces_full_deletes=false in set semantics 
widens non-partition-key columns",
+                                TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                        .inputTypes(
+                                TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(
+                                ARG_TABLE,
+                                new TableSemanticsMock(
+                                        TABLE_TYPE_NOT_NULL_SCORE,
+                                        new int[] {0},
+                                        new int[0],
+                                        -1,
+                                        null,
+                                        List.of(new int[] {0})))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, null)
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false)
+                        .expectDataType(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("op", 
DataTypes.STRING()),

Review Comment:
   What happened to the "name" column? 
   
   ```suggestion
                                                   DataTypes.FIELD("name", 
DataTypes.STRING()),
   ```



##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java:
##########
@@ -105,6 +106,55 @@ ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE_NOT_NULL_SCORE))
                                                 DataTypes.FIELD("op", 
DataTypes.STRING()),
                                                 DataTypes.FIELD("name", 
DataTypes.STRING()),
                                                 DataTypes.FIELD("score", 
DataTypes.BIGINT()))
+                                        .notNull()),
+                // Set semantics: PARTITION BY strips the partition column 
from the projected
+                // output (the framework prepends it back outside the 
function), so the strategy
+                // returns only [op, non_partition_input_cols]. With full 
deletes the row is
+                // passed through unchanged so non-partition NOT NULL columns 
stay non-nullable.

Review Comment:
   Let's get rid of the comments here and keep the test aligned to the other 
tests



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -123,6 +123,147 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
+    /**
+     * Also exercises the {@code ChangelogNormalize} state used to materialize 
full DELETE
+     * pre-images.
+     */
+    public static final TableTestProgram UPSERT_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-restore",
+                            "TO_CHANGELOG over an upsert source restores via 
compiled plan + "
+                                    + "savepoint, including ChangelogNormalize 
state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[INSERT, Alice, 10]", 
"+I[INSERT, Bob, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
+                    .build();
+
+    public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-retract-partition-by-restore",
+                            "TO_CHANGELOG over a retract source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /** Also exercises keyed {@code ChangelogNormalize} state for full DELETE 
pre-images. */
+    public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-partition-by-restore",
+                            "TO_CHANGELOG over an upsert source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint, including 
ChangelogNormalize "
+                                    + "state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /**
+     * Retract source through TO_CHANGELOG with {@code 
produces_full_deletes=false} and a NOT NULL
+     * input column. The retract source carries full DELETE pre-images 
natively, so the planner does
+     * not insert {@code ChangelogNormalize}; the pipeline is stateless beyond 
the source. Verifies
+     * that the planner's nullability widening (non-key columns are nulled on 
partial DELETE rows,
+     * so the output schema widens them to nullable) round-trips through the 
compiled plan
+     * correctly.
+     */

Review Comment:
   Let's remove the comments :) 



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java:
##########
@@ -391,6 +391,73 @@ public class FromChangelogTestPrograms {
                                     + "input => TABLE cdc_stream)")
                     .build();
 
+    public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "from-changelog-retract-partition-by-restore",
+                            "FROM_CHANGELOG with PARTITION BY producing a 
retract changelog "
+                                    + "restores via compiled plan + savepoint")

Review Comment:
   Not needed because we are doing restore tests
   ```suggestion
   )
   ```



##########
flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java:
##########
@@ -105,6 +106,55 @@ ARG_TABLE, new 
TableSemanticsMock(TABLE_TYPE_NOT_NULL_SCORE))
                                                 DataTypes.FIELD("op", 
DataTypes.STRING()),
                                                 DataTypes.FIELD("name", 
DataTypes.STRING()),
                                                 DataTypes.FIELD("score", 
DataTypes.BIGINT()))
+                                        .notNull()),
+                // Set semantics: PARTITION BY strips the partition column 
from the projected
+                // output (the framework prepends it back outside the 
function), so the strategy
+                // returns only [op, non_partition_input_cols]. With full 
deletes the row is
+                // passed through unchanged so non-partition NOT NULL columns 
stay non-nullable.
+                TestSpec.forStrategy(
+                                "produces_full_deletes=true in set semantics 
preserves NOT NULL on non-partition columns",
+                                TO_CHANGELOG_OUTPUT_TYPE_STRATEGY)
+                        .inputTypes(
+                                TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, 
MAP_TYPE, BOOLEAN_TYPE)
+                        .calledWithTableSemanticsAt(
+                                ARG_TABLE,
+                                new TableSemanticsMock(
+                                        TABLE_TYPE_NOT_NULL_SCORE,
+                                        new int[] {0},
+                                        new int[0],
+                                        -1,
+                                        null,
+                                        List.of(new int[] {0})))
+                        .calledWithLiteralAt(ARG_OP, ColumnList.of("op"))
+                        .calledWithLiteralAt(ARG_OP_MAPPING, null)
+                        .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true)
+                        .expectDataType(
+                                DataTypes.ROW(
+                                                DataTypes.FIELD("op", 
DataTypes.STRING()),
+                                                DataTypes.FIELD(

Review Comment:
   What happened to the "name" column?



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -123,6 +123,147 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
+    /**
+     * Also exercises the {@code ChangelogNormalize} state used to materialize 
full DELETE
+     * pre-images.
+     */
+    public static final TableTestProgram UPSERT_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-restore",
+                            "TO_CHANGELOG over an upsert source restores via 
compiled plan + "
+                                    + "savepoint, including ChangelogNormalize 
state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[INSERT, Alice, 10]", 
"+I[INSERT, Bob, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
+                    .build();
+
+    public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-retract-partition-by-restore",
+                            "TO_CHANGELOG over a retract source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /** Also exercises keyed {@code ChangelogNormalize} state for full DELETE 
pre-images. */
+    public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-partition-by-restore",
+                            "TO_CHANGELOG over an upsert source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint, including 
ChangelogNormalize "
+                                    + "state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /**
+     * Retract source through TO_CHANGELOG with {@code 
produces_full_deletes=false} and a NOT NULL
+     * input column. The retract source carries full DELETE pre-images 
natively, so the planner does
+     * not insert {@code ChangelogNormalize}; the pipeline is stateless beyond 
the source. Verifies
+     * that the planner's nullability widening (non-key columns are nulled on 
partial DELETE rows,
+     * so the output schema widens them to nullable) round-trips through the 
compiled plan
+     * correctly.
+     */
+    public static final TableTestProgram 
RETRACT_PRODUCES_PARTIAL_DELETES_RESTORE =
+            TableTestProgram.of(
+                            
"to-changelog-retract-produces-partial-deletes-restore",
+                            "TO_CHANGELOG with produces_full_deletes=false 
over a NOT NULL input "
+                                    + "column restores via compiled plan + 
savepoint; the widened "
+                                    + "(nullable) output schema must 
round-trip without "
+                                    + "ChangelogNormalize")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "score BIGINT NOT NULL")
+                                    .addMode(ChangelogMode.all())
+                                    
.producedBeforeRestore(Row.ofKind(RowKind.INSERT, "Alice", 10L))
+                                    
.producedAfterRestore(Row.ofKind(RowKind.DELETE, "Alice", 10L))

Review Comment:
   Can we also have an update_after in this test



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -123,6 +123,147 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
+    /**
+     * Also exercises the {@code ChangelogNormalize} state used to materialize 
full DELETE
+     * pre-images.
+     */
+    public static final TableTestProgram UPSERT_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-restore",
+                            "TO_CHANGELOG over an upsert source restores via 
compiled plan + "
+                                    + "savepoint, including ChangelogNormalize 
state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[INSERT, Alice, 10]", 
"+I[INSERT, Bob, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
+                    .build();
+
+    public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-retract-partition-by-restore",
+                            "TO_CHANGELOG over a retract source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /** Also exercises keyed {@code ChangelogNormalize} state for full DELETE 
pre-images. */
+    public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-partition-by-restore",
+                            "TO_CHANGELOG over an upsert source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint, including 
ChangelogNormalize "
+                                    + "state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")

Review Comment:
   Just wondering why the schema isn't like this
   ```suggestion
                                       .addSchema("name STRING NOT NULL", "op 
STRING", "score BIGINT")
   ```



##########
flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java:
##########
@@ -123,6 +123,147 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
+    /**
+     * Also exercises the {@code ChangelogNormalize} state used to materialize 
full DELETE
+     * pre-images.
+     */
+    public static final TableTestProgram UPSERT_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-restore",
+                            "TO_CHANGELOG over an upsert source restores via 
compiled plan + "
+                                    + "savepoint, including ChangelogNormalize 
state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[INSERT, Alice, 10]", 
"+I[INSERT, Bob, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[UPDATE_BEFORE, Alice, 10]",
+                                            "+I[UPDATE_AFTER, Alice, 30]",
+                                            "+I[DELETE, Bob, 20]")
+                                    .build())
+                    .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
+                    .build();
+
+    public static final TableTestProgram RETRACT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-retract-partition-by-restore",
+                            "TO_CHANGELOG over a retract source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
20L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /** Also exercises keyed {@code ChangelogNormalize} state for full DELETE 
pre-images. */
+    public static final TableTestProgram UPSERT_PARTITION_BY_RESTORE =
+            TableTestProgram.of(
+                            "to-changelog-upsert-partition-by-restore",
+                            "TO_CHANGELOG over an upsert source with PARTITION 
BY restores via "
+                                    + "compiled plan + savepoint, including 
ChangelogNormalize "
+                                    + "state")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED", "score BIGINT")
+                                    .addMode(ChangelogMode.upsert())
+                                    .producedBeforeRestore(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", 10L),
+                                            Row.ofKind(RowKind.INSERT, "Bob", 
20L))
+                                    .producedAfterRestore(
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", 30L),
+                                            // Key-only delete: 
ChangelogNormalize restores
+                                            // Bob's pre-image from state.
+                                            Row.ofKind(RowKind.DELETE, "Bob", 
null))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("name STRING", "op STRING", 
"score BIGINT")
+                                    .consumedBeforeRestore(
+                                            "+I[Alice, INSERT, 10]", "+I[Bob, 
INSERT, 20]")
+                                    .consumedAfterRestore(
+                                            "+I[Alice, UPDATE_BEFORE, 10]",
+                                            "+I[Alice, UPDATE_AFTER, 30]",
+                                            "+I[Bob, DELETE, 20]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG("
+                                    + "input => TABLE t PARTITION BY name)")
+                    .build();
+
+    /**
+     * Retract source through TO_CHANGELOG with {@code 
produces_full_deletes=false} and a NOT NULL
+     * input column. The retract source carries full DELETE pre-images 
natively, so the planner does
+     * not insert {@code ChangelogNormalize}; the pipeline is stateless beyond 
the source. Verifies
+     * that the planner's nullability widening (non-key columns are nulled on 
partial DELETE rows,
+     * so the output schema widens them to nullable) round-trips through the 
compiled plan
+     * correctly.
+     */
+    public static final TableTestProgram 
RETRACT_PRODUCES_PARTIAL_DELETES_RESTORE =
+            TableTestProgram.of(
+                            
"to-changelog-retract-produces-partial-deletes-restore",
+                            "TO_CHANGELOG with produces_full_deletes=false 
over a NOT NULL input "
+                                    + "column restores via compiled plan + 
savepoint; the widened "
+                                    + "(nullable) output schema must 
round-trip without "
+                                    + "ChangelogNormalize")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "score BIGINT NOT NULL")
+                                    .addMode(ChangelogMode.all())
+                                    
.producedBeforeRestore(Row.ofKind(RowKind.INSERT, "Alice", 10L))
+                                    
.producedAfterRestore(Row.ofKind(RowKind.DELETE, "Alice", 10L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("op STRING", "name STRING", 
"score BIGINT")

Review Comment:
   Shouldn't name here be NOT NULL?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to