This is an automated email from the ASF dual-hosted git repository.

fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new e4b8e035fa0 [FLINK-39632][table] Add fromChangelog() / toChangelog() 
convenience methods to PartitionedTable (#28128)
e4b8e035fa0 is described below

commit e4b8e035fa069b66d1fd939742c5620015d85e2d
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue May 12 18:00:52 2026 +0200

    [FLINK-39632][table] Add fromChangelog() / toChangelog() convenience 
methods to PartitionedTable (#28128)
---
 .../docs/sql/reference/queries/changelog.md        | 14 +++-
 .../apache/flink/table/api/PartitionedTable.java   | 92 ++++++++++++++++++++++
 .../java/org/apache/flink/table/api/Table.java     | 17 +++-
 .../apache/flink/table/api/internal/TableImpl.java | 11 +++
 .../exec/stream/FromChangelogSemanticTests.java    |  1 +
 .../exec/stream/FromChangelogTestPrograms.java     | 31 ++++++++
 .../exec/stream/ToChangelogSemanticTests.java      |  1 +
 .../nodes/exec/stream/ToChangelogTestPrograms.java | 30 +++++++
 8 files changed, 191 insertions(+), 6 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index b713aebf072..f1718642b9d 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -139,7 +139,7 @@ SELECT * FROM FROM_CHANGELOG(
 )
 ```
 
-When `PARTITION BY` is provided, **the output schema changes**. The partition 
key columns are moved to the front by the engine, and the function emits the 
remaining input columns (excluding the op column). The order becomes:
+When `PARTITION BY` is provided, **the output schema changes**. The partition 
key columns are moved to the front by the engine, and the function emits the 
remaining input columns (excluding the op column). The output schema becomes:
 
 ```
 [partition_keys, non_partition_input_columns_excluding_op]
@@ -187,6 +187,10 @@ Table result = cdcStream.fromChangelog(
         "ua", "UPDATE_AFTER",
         "d", "DELETE").asArgument("op_mapping")
 );
+
+// Set semantics: co-locate rows with the same key in the same parallel 
operator instance.
+// Equivalent to PARTITION BY in SQL. The partition keys are prepended to the 
output columns.
+Table result = cdcStream.partitionBy($("id")).fromChangelog();
 ```
 
 ## TO_CHANGELOG
@@ -226,7 +230,7 @@ When `op_mapping` is omitted, all four change operations 
are mapped to their sta
 
 ### Output Schema
 
-The output columns are ordered as:
+The output schema is:
 
 ```
 [op_column, all_input_columns]
@@ -301,7 +305,7 @@ SELECT * FROM TO_CHANGELOG(
   input => TABLE my_aggregation PARTITION BY id
 )
 ```
-When `PARTITION BY` is provided, **the output schema changes**. The partition 
key columns are moved to the front by the engine, and the function emits the 
remaining input columns. The order becomes:
+When `PARTITION BY` is provided, **the output schema changes**. The partition 
key columns are moved to the front by the engine, and the function emits the 
remaining input columns. The output schema becomes:
 
 ```
 [partition_keys, op_column, non_partition_input_columns]
@@ -326,6 +330,10 @@ Table result = myTable.toChangelog(
     descriptor("deleted").asArgument("op"),
     map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
 );
+
+// Set semantics: co-locate rows with the same key in the same parallel 
operator instance.
+// Equivalent to PARTITION BY in SQL. The partition keys are prepended to the 
output columns.
+Table result = myTable.partitionBy($("id")).toChangelog();
 ```
 
 {{< top >}}
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index a83b29fecad..7adfc13fc8d 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -169,4 +169,96 @@ public interface PartitionedTable {
      * @see ProcessTableFunction
      */
     Table process(Class<? extends UserDefinedFunction> function, Object... 
arguments);
+
+    /**
+     * Converts this partitioned dynamic table into an append-only table with 
an explicit operation
+     * code column using the built-in {@code TO_CHANGELOG} process table 
function with set
+     * semantics.
+     *
+     * <p>Each input row - regardless of its original change operation - is 
emitted as an
+     * INSERT-only row with a string {@code "op"} column indicating the 
original operation (INSERT,
+     * UPDATE_AFTER, DELETE, etc.). With set semantics, rows for the same 
partition key are
+     * co-located in the same parallel operator instance.
+     *
+     * <p>For row semantics (each row processed independently), use {@link 
Table#toChangelog} on the
+     * unpartitioned table.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // Default: adds 'op' column and supports all changelog modes
+     * Table result = table.partitionBy($("id")).toChangelog();
+     *
+     * // Custom op column name and mapping
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog(
+     *         descriptor("op_code").asArgument("op"),
+     *         map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+     *     );
+     *
+     * // Deletion flag pattern: comma-separated keys map multiple change 
operations to the same code
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog(
+     *         descriptor("deleted").asArgument("op"),
+     *         map("INSERT, UPDATE_AFTER", "false", "DELETE", 
"true").asArgument("op_mapping")
+     *     );
+     * }</pre>
+     *
+     * @param arguments optional named arguments for {@code op} and {@code 
op_mapping}
+     * @return an append-only {@link Table} with output schema {@code 
[partition_keys, op,
+     *     non_partition_input_columns]}
+     * @see Table#toChangelog(Expression...)
+     */
+    Table toChangelog(Expression... arguments);
+
+    /**
+     * Converts this partitioned append-only table with an explicit operation 
code column into a
+     * (potentially updating) dynamic table using the built-in {@code 
FROM_CHANGELOG} process table
+     * function with set semantics.
+     *
+     * <p>Each input row is expected to have a string column that indicates 
the change operation.
+     * The operation column is interpreted by the engine and removed from the 
output. With set
+     * semantics, rows for the same partition key are co-located in the same 
parallel operator
+     * instance, which is required when downstream operators are keyed on that 
column.
+     *
+     * <p>For row semantics (each row processed independently), use {@link 
Table#fromChangelog} on
+     * the unpartitioned table.
+     *
+     * <p>Examples:
+     *
+     * <pre>{@code
+     * // Default: reads 'op' column with standard change operation names
+     * Table result = cdcStream.partitionBy($("id")).fromChangelog();
+     *
+     * // With custom op column name
+     * Table result = cdcStream
+     *     .partitionBy($("id"))
+     *     .fromChangelog(descriptor("operation").asArgument("op"));
+     *
+     * // With custom op_mapping
+     * Table result = cdcStream
+     *     .partitionBy($("id"))
+     *     .fromChangelog(
+     *         descriptor("op").asArgument("op"),
+     *         map("c, r", "INSERT",
+     *             "ub", "UPDATE_BEFORE",
+     *             "ua", "UPDATE_AFTER",
+     *             "d", "DELETE").asArgument("op_mapping")
+     *     );
+     *
+     * // Silently skip rows with NULL or unmapped op codes instead of failing
+     * Table result = cdcStream
+     *     .partitionBy($("id"))
+     *     .fromChangelog(lit("SKIP").asArgument("error_handling"));
+     * }</pre>
+     *
+     * @param arguments optional named arguments for {@code op}, {@code 
op_mapping}, and {@code
+     *     error_handling}
+     * @return a dynamic {@link Table} with output schema {@code 
[partition_keys,
+     *     non_partition_non_op_input_columns]}
+     * @see Table#fromChangelog(Expression...)
+     */
+    Table fromChangelog(Expression... arguments);
 }
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index aa2417d4531..e7f690263a4 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1431,6 +1431,17 @@ public interface Table extends Explainable<Table>, 
Executable {
      * INSERT-only row with a string {@code "op"} column indicating the 
original operation (INSERT,
      * UPDATE_AFTER, DELETE, etc.).
      *
+     * <p>By default, the input is processed with row semantics (each row 
independently). To
+     * co-locate rows with the same key in the same parallel operator 
instance, partition the input
+     * first via {@link #partitionBy(Expression...)} and invoke {@link
+     * PartitionedTable#toChangelog(Expression...)} with set semantics:
+     *
+     * <pre>{@code
+     * Table result = table
+     *     .partitionBy($("id"))
+     *     .toChangelog();
+     * }</pre>
+     *
      * <p>Optional arguments can be passed using named expressions:
      *
      * <pre>{@code
@@ -1469,13 +1480,13 @@ public interface Table extends Explainable<Table>, 
Executable {
      *
      * <p>By default, the input is processed with row semantics (each row 
independently). To
      * co-locate rows with the same key in the same parallel operator 
instance, partition the input
-     * first via {@link #partitionBy(Expression...)} and invoke the function 
via {@link
-     * PartitionedTable#process(String, Object...)}:
+     * first via {@link #partitionBy(Expression...)} and invoke {@link
+     * PartitionedTable#fromChangelog(Expression...)} with set semantics:
      *
      * <pre>{@code
      * Table result = cdcStream
      *     .partitionBy($("id"))
-     *     .process("FROM_CHANGELOG");
+     *     .fromChangelog();
      * }</pre>
      *
      * <p>Optional arguments can be passed using named expressions:
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index a539977532f..8bdce4a023f 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -922,6 +922,17 @@ public class TableImpl implements Table {
                             createQueryOperation(), table.tableEnvironment, 
arguments));
         }
 
+        @Override
+        public Table toChangelog(final Expression... arguments) {
+            return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(), 
(Object[]) arguments);
+        }
+
+        @Override
+        public Table fromChangelog(final Expression... arguments) {
+            return process(
+                    BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(), 
(Object[]) arguments);
+        }
+
         private QueryOperation createQueryOperation() {
             if (orderKeys.isEmpty()) {
                 return table.operationTreeBuilder.partition(partitionKeys, 
table.operationTree);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
index 85efd0d4863..c3922fb28ad 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
@@ -47,6 +47,7 @@ public class FromChangelogSemanticTests extends 
SemanticTestBase {
                 FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING,
                 FromChangelogTestPrograms.SKIP_NULL_OP_CODE,
                 FromChangelogTestPrograms.TABLE_API_DEFAULT,
+                FromChangelogTestPrograms.TABLE_API_RETRACT_PARTITION_BY,
                 FromChangelogTestPrograms.ROUND_TRIP,
                 FromChangelogTestPrograms.INVALID_OP_CODE,
                 FromChangelogTestPrograms.NULL_OP_CODE);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index f7264ec3c92..a964a57bdbe 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.test.program.TableTestProgram;
 import org.apache.flink.types.Row;
 import org.apache.flink.types.RowKind;
 
+import static org.apache.flink.table.api.Expressions.$;
+
 /** {@link TableTestProgram} definitions for testing the built-in 
FROM_CHANGELOG PTF. */
 public class FromChangelogTestPrograms {
 
@@ -250,6 +252,35 @@ public class FromChangelogTestPrograms {
                     .runTableApi(env -> 
env.from("cdc_stream").fromChangelog(), "sink")
                     .build();
 
+    public static final TableTestProgram TABLE_API_RETRACT_PARTITION_BY =
+            TableTestProgram.of(
+                            "from-changelog-table-api-retract-partition-by",
+                            "PartitionedTable.fromChangelog() convenience 
method")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema(SIMPLE_CDC_SCHEMA)
+                                    .producedValues(
+                                            Row.of(1, "INSERT", "Alice"),
+                                            Row.of(2, "INSERT", "Bob"),
+                                            Row.of(1, "UPDATE_BEFORE", 
"Alice"),
+                                            Row.of(1, "UPDATE_AFTER", 
"Alice2"),
+                                            Row.of(2, "DELETE", "Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
1, "Alice"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runTableApi(
+                            env -> 
env.from("cdc_stream").partitionBy($("id")).fromChangelog(),
+                            "sink")
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Round-trip test: FROM_CHANGELOG(TO_CHANGELOG(table))
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index ca34b79974a..e038850fb8d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -46,6 +46,7 @@ public class ToChangelogSemanticTests extends 
SemanticTestBase {
                 ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
                 ToChangelogTestPrograms.CUSTOM_OP_NAME,
                 ToChangelogTestPrograms.TABLE_API_DEFAULT,
+                ToChangelogTestPrograms.TABLE_API_RETRACT_PARTITION_BY,
                 ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
                 ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
                 ToChangelogTestPrograms.DELETION_FLAG,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 36f602225dd..7f33cfc1419 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -28,6 +28,8 @@ import org.apache.flink.types.RowKind;
 
 import java.time.Instant;
 
+import static org.apache.flink.table.api.Expressions.$;
+
 /** {@link TableTestProgram} definitions for testing the built-in TO_CHANGELOG 
PTF. */
 public class ToChangelogTestPrograms {
 
@@ -257,6 +259,34 @@ public class ToChangelogTestPrograms {
                     .runTableApi(env -> env.from("t").toChangelog(), "sink")
                     .build();
 
+    public static final TableTestProgram TABLE_API_RETRACT_PARTITION_BY =
+            TableTestProgram.of(
+                            "to-changelog-table-api-retract-partition-by",
+                            "PartitionedTable.toChangelog() convenience 
method")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "id STRING",
+                                            "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", "EU", 10L),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", "EU", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", "EU", 30L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(
+                                            "id STRING", "op STRING", "name 
STRING", "score BIGINT")
+                                    .consumedValues(
+                                            "+I[EU, INSERT, Alice, 10]",
+                                            "+I[EU, UPDATE_BEFORE, Alice, 10]",
+                                            "+I[EU, UPDATE_AFTER, Alice, 30]")
+                                    .build())
+                    .runTableApi(env -> 
env.from("t").partitionBy($("id")).toChangelog(), "sink")
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Use case: LAG on updating streams via TO_CHANGELOG
     // 
--------------------------------------------------------------------------------------------

Reply via email to