This is an automated email from the ASF dual-hosted git repository.
fhueske pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new e4b8e035fa0 [FLINK-39632][table] Add fromChangelog() / toChangelog()
convenience methods to PartitionedTable (#28128)
e4b8e035fa0 is described below
commit e4b8e035fa069b66d1fd939742c5620015d85e2d
Author: Ramin Gharib <[email protected]>
AuthorDate: Tue May 12 18:00:52 2026 +0200
[FLINK-39632][table] Add fromChangelog() / toChangelog() convenience
methods to PartitionedTable (#28128)
---
.../docs/sql/reference/queries/changelog.md | 14 +++-
.../apache/flink/table/api/PartitionedTable.java | 92 ++++++++++++++++++++++
.../java/org/apache/flink/table/api/Table.java | 17 +++-
.../apache/flink/table/api/internal/TableImpl.java | 11 +++
.../exec/stream/FromChangelogSemanticTests.java | 1 +
.../exec/stream/FromChangelogTestPrograms.java | 31 ++++++++
.../exec/stream/ToChangelogSemanticTests.java | 1 +
.../nodes/exec/stream/ToChangelogTestPrograms.java | 30 +++++++
8 files changed, 191 insertions(+), 6 deletions(-)
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
index b713aebf072..f1718642b9d 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -139,7 +139,7 @@ SELECT * FROM FROM_CHANGELOG(
)
```
-When `PARTITION BY` is provided, **the output schema changes**. The partition
key columns are moved to the front by the engine, and the function emits the
remaining input columns (excluding the op column). The order becomes:
+When `PARTITION BY` is provided, **the output schema changes**. The partition
key columns are moved to the front by the engine, and the function emits the
remaining input columns (excluding the op column). The output schema becomes:
```
[partition_keys, non_partition_input_columns_excluding_op]
@@ -187,6 +187,10 @@ Table result = cdcStream.fromChangelog(
"ua", "UPDATE_AFTER",
"d", "DELETE").asArgument("op_mapping")
);
+
+// Set semantics: co-locate rows with the same key in the same parallel
operator instance.
+// Equivalent to PARTITION BY in SQL. The partition keys are prepended to the
output columns.
+Table result = cdcStream.partitionBy($("id")).fromChangelog();
```
## TO_CHANGELOG
@@ -226,7 +230,7 @@ When `op_mapping` is omitted, all four change operations
are mapped to their sta
### Output Schema
-The output columns are ordered as:
+The output schema is:
```
[op_column, all_input_columns]
@@ -301,7 +305,7 @@ SELECT * FROM TO_CHANGELOG(
input => TABLE my_aggregation PARTITION BY id
)
```
-When `PARTITION BY` is provided, **the output schema changes**. The partition
key columns are moved to the front by the engine, and the function emits the
remaining input columns. The order becomes:
+When `PARTITION BY` is provided, **the output schema changes**. The partition
key columns are moved to the front by the engine, and the function emits the
remaining input columns. The output schema becomes:
```
[partition_keys, op_column, non_partition_input_columns]
@@ -326,6 +330,10 @@ Table result = myTable.toChangelog(
descriptor("deleted").asArgument("op"),
map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
);
+
+// Set semantics: co-locate rows with the same key in the same parallel
operator instance.
+// Equivalent to PARTITION BY in SQL. The partition keys are prepended to the
output columns.
+Table result = myTable.partitionBy($("id")).toChangelog();
```
{{< top >}}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
index a83b29fecad..7adfc13fc8d 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java
@@ -169,4 +169,96 @@ public interface PartitionedTable {
* @see ProcessTableFunction
*/
Table process(Class<? extends UserDefinedFunction> function, Object...
arguments);
+
+ /**
+ * Converts this partitioned dynamic table into an append-only table with
an explicit operation
+ * code column using the built-in {@code TO_CHANGELOG} process table
function with set
+ * semantics.
+ *
+ * <p>Each input row - regardless of its original change operation - is
emitted as an
+ * INSERT-only row with a string {@code "op"} column indicating the
original operation (INSERT,
+ * UPDATE_AFTER, DELETE, etc.). With set semantics, rows for the same
partition key are
+ * co-located in the same parallel operator instance.
+ *
+ * <p>For row semantics (each row processed independently), use {@link
Table#toChangelog} on the
+ * unpartitioned table.
+ *
+ * <p>Examples:
+ *
+ * <pre>{@code
+ * // Default: adds 'op' column and supports all changelog modes
+ * Table result = table.partitionBy($("id")).toChangelog();
+ *
+ * // Custom op column name and mapping
+ * Table result = table
+ * .partitionBy($("id"))
+ * .toChangelog(
+ * descriptor("op_code").asArgument("op"),
+ * map("INSERT", "I", "UPDATE_AFTER", "U").asArgument("op_mapping")
+ * );
+ *
+ * // Deletion flag pattern: comma-separated keys map multiple change
operations to the same code
+ * Table result = table
+ * .partitionBy($("id"))
+ * .toChangelog(
+ * descriptor("deleted").asArgument("op"),
+ * map("INSERT, UPDATE_AFTER", "false", "DELETE",
"true").asArgument("op_mapping")
+ * );
+ * }</pre>
+ *
+ * @param arguments optional named arguments for {@code op} and {@code
op_mapping}
+ * @return an append-only {@link Table} with output schema {@code
[partition_keys, op,
+ * non_partition_input_columns]}
+ * @see Table#toChangelog(Expression...)
+ */
+ Table toChangelog(Expression... arguments);
+
+ /**
+ * Converts this partitioned append-only table with an explicit operation
code column into a
+ * (potentially updating) dynamic table using the built-in {@code
FROM_CHANGELOG} process table
+ * function with set semantics.
+ *
+ * <p>Each input row is expected to have a string column that indicates
the change operation.
+ * The operation column is interpreted by the engine and removed from the
output. With set
+ * semantics, rows for the same partition key are co-located in the same
parallel operator
+ * instance, which is required when downstream operators are keyed on that
column.
+ *
+ * <p>For row semantics (each row processed independently), use {@link
Table#fromChangelog} on
+ * the unpartitioned table.
+ *
+ * <p>Examples:
+ *
+ * <pre>{@code
+ * // Default: reads 'op' column with standard change operation names
+ * Table result = cdcStream.partitionBy($("id")).fromChangelog();
+ *
+ * // With custom op column name
+ * Table result = cdcStream
+ * .partitionBy($("id"))
+ * .fromChangelog(descriptor("operation").asArgument("op"));
+ *
+ * // With custom op_mapping
+ * Table result = cdcStream
+ * .partitionBy($("id"))
+ * .fromChangelog(
+ * descriptor("op").asArgument("op"),
+ * map("c, r", "INSERT",
+ * "ub", "UPDATE_BEFORE",
+ * "ua", "UPDATE_AFTER",
+ * "d", "DELETE").asArgument("op_mapping")
+ * );
+ *
+ * // Silently skip rows with NULL or unmapped op codes instead of failing
+ * Table result = cdcStream
+ * .partitionBy($("id"))
+ * .fromChangelog(lit("SKIP").asArgument("error_handling"));
+ * }</pre>
+ *
+ * @param arguments optional named arguments for {@code op}, {@code
op_mapping}, and {@code
+ * error_handling}
+ * @return a dynamic {@link Table} with output schema {@code
[partition_keys,
+ * non_partition_non_op_input_columns]}
+ * @see Table#fromChangelog(Expression...)
+ */
+ Table fromChangelog(Expression... arguments);
}
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index aa2417d4531..e7f690263a4 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1431,6 +1431,17 @@ public interface Table extends Explainable<Table>,
Executable {
* INSERT-only row with a string {@code "op"} column indicating the
original operation (INSERT,
* UPDATE_AFTER, DELETE, etc.).
*
+ * <p>By default, the input is processed with row semantics (each row
independently). To
+ * co-locate rows with the same key in the same parallel operator
instance, partition the input
+ * first via {@link #partitionBy(Expression...)} and invoke {@link
+ * PartitionedTable#toChangelog(Expression...)} with set semantics:
+ *
+ * <pre>{@code
+ * Table result = table
+ * .partitionBy($("id"))
+ * .toChangelog();
+ * }</pre>
+ *
* <p>Optional arguments can be passed using named expressions:
*
* <pre>{@code
@@ -1469,13 +1480,13 @@ public interface Table extends Explainable<Table>,
Executable {
*
* <p>By default, the input is processed with row semantics (each row
independently). To
* co-locate rows with the same key in the same parallel operator
instance, partition the input
- * first via {@link #partitionBy(Expression...)} and invoke the function
via {@link
- * PartitionedTable#process(String, Object...)}:
+ * first via {@link #partitionBy(Expression...)} and invoke {@link
+ * PartitionedTable#fromChangelog(Expression...)} with set semantics:
*
* <pre>{@code
* Table result = cdcStream
* .partitionBy($("id"))
- * .process("FROM_CHANGELOG");
+ * .fromChangelog();
* }</pre>
*
* <p>Optional arguments can be passed using named expressions:
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
index a539977532f..8bdce4a023f 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableImpl.java
@@ -922,6 +922,17 @@ public class TableImpl implements Table {
createQueryOperation(), table.tableEnvironment,
arguments));
}
+ @Override
+ public Table toChangelog(final Expression... arguments) {
+ return process(BuiltInFunctionDefinitions.TO_CHANGELOG.getName(),
(Object[]) arguments);
+ }
+
+ @Override
+ public Table fromChangelog(final Expression... arguments) {
+ return process(
+ BuiltInFunctionDefinitions.FROM_CHANGELOG.getName(),
(Object[]) arguments);
+ }
+
private QueryOperation createQueryOperation() {
if (orderKeys.isEmpty()) {
return table.operationTreeBuilder.partition(partitionKeys,
table.operationTree);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
index 85efd0d4863..c3922fb28ad 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
@@ -47,6 +47,7 @@ public class FromChangelogSemanticTests extends
SemanticTestBase {
FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING,
FromChangelogTestPrograms.SKIP_NULL_OP_CODE,
FromChangelogTestPrograms.TABLE_API_DEFAULT,
+ FromChangelogTestPrograms.TABLE_API_RETRACT_PARTITION_BY,
FromChangelogTestPrograms.ROUND_TRIP,
FromChangelogTestPrograms.INVALID_OP_CODE,
FromChangelogTestPrograms.NULL_OP_CODE);
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index f7264ec3c92..a964a57bdbe 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -26,6 +26,8 @@ import org.apache.flink.table.test.program.TableTestProgram;
import org.apache.flink.types.Row;
import org.apache.flink.types.RowKind;
+import static org.apache.flink.table.api.Expressions.$;
+
/** {@link TableTestProgram} definitions for testing the built-in
FROM_CHANGELOG PTF. */
public class FromChangelogTestPrograms {
@@ -250,6 +252,35 @@ public class FromChangelogTestPrograms {
.runTableApi(env ->
env.from("cdc_stream").fromChangelog(), "sink")
.build();
+ public static final TableTestProgram TABLE_API_RETRACT_PARTITION_BY =
+ TableTestProgram.of(
+ "from-changelog-table-api-retract-partition-by",
+ "PartitionedTable.fromChangelog() convenience
method")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema(SIMPLE_CDC_SCHEMA)
+ .producedValues(
+ Row.of(1, "INSERT", "Alice"),
+ Row.of(2, "INSERT", "Bob"),
+ Row.of(1, "UPDATE_BEFORE",
"Alice"),
+ Row.of(1, "UPDATE_AFTER",
"Alice2"),
+ Row.of(2, "DELETE", "Bob"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "name STRING")
+ .consumedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
1, "Alice"),
+ Row.ofKind(RowKind.UPDATE_AFTER,
1, "Alice2"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runTableApi(
+ env ->
env.from("cdc_stream").partitionBy($("id")).fromChangelog(),
+ "sink")
+ .build();
+
//
--------------------------------------------------------------------------------------------
// Round-trip test: FROM_CHANGELOG(TO_CHANGELOG(table))
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index ca34b79974a..e038850fb8d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -46,6 +46,7 @@ public class ToChangelogSemanticTests extends
SemanticTestBase {
ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
ToChangelogTestPrograms.CUSTOM_OP_NAME,
ToChangelogTestPrograms.TABLE_API_DEFAULT,
+ ToChangelogTestPrograms.TABLE_API_RETRACT_PARTITION_BY,
ToChangelogTestPrograms.LAG_ON_UPSERT_VIA_CHANGELOG,
ToChangelogTestPrograms.LAG_ON_RETRACT_VIA_CHANGELOG,
ToChangelogTestPrograms.DELETION_FLAG,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index 36f602225dd..7f33cfc1419 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -28,6 +28,8 @@ import org.apache.flink.types.RowKind;
import java.time.Instant;
+import static org.apache.flink.table.api.Expressions.$;
+
/** {@link TableTestProgram} definitions for testing the built-in TO_CHANGELOG
PTF. */
public class ToChangelogTestPrograms {
@@ -257,6 +259,34 @@ public class ToChangelogTestPrograms {
.runTableApi(env -> env.from("t").toChangelog(), "sink")
.build();
+ public static final TableTestProgram TABLE_API_RETRACT_PARTITION_BY =
+ TableTestProgram.of(
+ "to-changelog-table-api-retract-partition-by",
+ "PartitionedTable.toChangelog() convenience
method")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED",
+ "id STRING",
+ "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", "EU", 10L),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", "EU", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", "EU", 30L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(
+ "id STRING", "op STRING", "name
STRING", "score BIGINT")
+ .consumedValues(
+ "+I[EU, INSERT, Alice, 10]",
+ "+I[EU, UPDATE_BEFORE, Alice, 10]",
+ "+I[EU, UPDATE_AFTER, Alice, 30]")
+ .build())
+ .runTableApi(env ->
env.from("t").partitionBy($("id")).toChangelog(), "sink")
+ .build();
+
//
--------------------------------------------------------------------------------------------
// Use case: LAG on updating streams via TO_CHANGELOG
//
--------------------------------------------------------------------------------------------