gustavodemorais commented on code in PR #27901:
URL: https://github.com/apache/flink/pull/27901#discussion_r3046212761
##########
flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/PartitionedTable.java:
##########
@@ -172,4 +172,30 @@ public interface PartitionedTable {
* columns
*/
Table toChangelog(Expression... arguments);
+
+ /**
+ * Converts this append-only table with an explicit operation code column
into a dynamic table
+ * using the built-in {@code FROM_CHANGELOG} process table function.
+ *
+ * <p>Each input row is expected to have a string operation code column
(default: {@code "op"})
+ * that indicates the change operation (e.g., INSERT, UPDATE_AFTER,
DELETE). The output table is
Review Comment:
```suggestion
* that indicates the change operation (e.g., INSERT, UPDATE_AFTER,
UPDATE_BEFORE, DELETE). The output table is
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,108 @@ Flink SQL provides built-in process table functions (PTFs)
for working with chan
| Function | Description |
|:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with
operation codes into a dynamic table |
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only
table with explicit operation codes |
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit
operation code column into a dynamic table (i.e. an updating table). Each input
row is expected to have a string column that indicates the change operation.
The op column is removed from the output and the row is emitted with the
corresponding `RowKind`.
Review Comment:
RowKind is an internal concept not thought to be used in docs made for the
SQL Persona. We're using "change operation" in the docs for now. Update it in
all relevant places so it's consistent
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,108 @@ Flink SQL provides built-in process table functions (PTFs)
for working with chan
| Function | Description |
|:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with
operation codes into a dynamic table |
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only
table with explicit operation codes |
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit
operation code column into a dynamic table (i.e. an updating table). Each input
row is expected to have a string column that indicates the change operation.
The op column is removed from the output and the row is emitted with the
corresponding `RowKind`.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems
like Debezium, Maxwell, or Canal, where events arrive as flat append-only
records with an explicit operation field.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE source_table PARTITION BY key_col,
+ [op => DESCRIPTOR(op_column_name),]
+ [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+### Parameters
+
+| Parameter | Required | Description |
+|:-------------|:---------|:------------|
+| `input` | Yes | The input table. Must be append-only and include
`PARTITION BY` for parallel execution. |
+| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING. |
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined
operation codes to change operation names. Keys are user codes (e.g., `'c'`,
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_AFTER`,
`DELETE`). Keys can contain comma-separated codes to map multiple codes to the
same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded
- unmapped codes are dropped. Each change operation may appear at most once
across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:
+
+| Input code | Change operation |
+|:-----------------|:------------------|
+| `'INSERT'` | INSERT |
+| `'UPDATE_AFTER'` | UPDATE_AFTER |
+| `'DELETE'` | DELETE |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[partition_key_columns, remaining_columns_without_op]
+```
+
+The op column is removed from the output. Output rows carry the appropriate
`RowKind` (INSERT, UPDATE_AFTER, or DELETE).
+
+### Examples
+
+#### Basic usage with standard op names
+
+```sql
+-- Input (append-only):
+-- +I[id:1, op:'INSERT', name:'Alice']
+-- +I[id:1, op:'UPDATE_AFTER', name:'Alice2']
+-- +I[id:2, op:'DELETE', name:'Bob']
+
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream PARTITION BY id
+)
+
+-- Output (upsert stream):
Review Comment:
I think it'd be good to add the Table output here. Same thing to the
to_changelog docs. iN THI SCASE
Table
id:1 name:'Alice2'
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,108 @@ Flink SQL provides built-in process table functions (PTFs)
for working with chan
| Function | Description |
|:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with
operation codes into a dynamic table |
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only
table with explicit operation codes |
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit
operation code column into a dynamic table (i.e. an updating table). Each input
row is expected to have a string column that indicates the change operation.
The op column is removed from the output and the row is emitted with the
corresponding `RowKind`.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems
like Debezium, Maxwell, or Canal, where events arrive as flat append-only
records with an explicit operation field.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE source_table PARTITION BY key_col,
+ [op => DESCRIPTOR(op_column_name),]
+ [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
Review Comment:
The `invalid_op_handling` param would already be relevant here but we can
add it in a follow up PR
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,108 @@ Flink SQL provides built-in process table functions (PTFs)
for working with chan
| Function | Description |
|:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with
operation codes into a dynamic table |
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only
table with explicit operation codes |
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit
operation code column into a dynamic table (i.e. an updating table). Each input
row is expected to have a string column that indicates the change operation.
The op column is removed from the output and the row is emitted with the
corresponding `RowKind`.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems
like Debezium, Maxwell, or Canal, where events arrive as flat append-only
records with an explicit operation field.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE source_table PARTITION BY key_col,
+ [op => DESCRIPTOR(op_column_name),]
+ [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+### Parameters
+
+| Parameter | Required | Description |
+|:-------------|:---------|:------------|
+| `input` | Yes | The input table. Must be append-only and include
`PARTITION BY` for parallel execution. |
+| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING. |
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined
operation codes to change operation names. Keys are user codes (e.g., `'c'`,
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_AFTER`,
`DELETE`). Keys can contain comma-separated codes to map multiple codes to the
same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded
- unmapped codes are dropped. Each change operation may appear at most once
across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:
+
+| Input code | Change operation |
+|:-----------------|:------------------|
+| `'INSERT'` | INSERT |
+| `'UPDATE_AFTER'` | UPDATE_AFTER |
+| `'DELETE'` | DELETE |
+
+### Output Schema
+
+The output columns are ordered as:
+
+```
+[partition_key_columns, remaining_columns_without_op]
+```
+
+The op column is removed from the output. Output rows carry the appropriate
`RowKind` (INSERT, UPDATE_AFTER, or DELETE).
Review Comment:
```suggestion
The op column is removed from the output. Output rows carry the appropriate
`RowKind` (INSERT, UPDATE_AFTER, UPDATE_BEFORE or DELETE).
```
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinition.java:
##########
@@ -84,7 +86,8 @@ private BuiltInFunctionDefinition(
boolean isDeterministic,
boolean isRuntimeProvided,
String runtimeClass,
- boolean isInternal) {
+ boolean isInternal,
+ @Nullable ChangelogFunction changelogFunction) {
Review Comment:
Adding this doesn't feel very clean. We also have to add some dead code
below when creating new ChangelogFunction().
A simple alternative would be just going adding changelogMode to the
BuiltInFunctionDefinition, but this might not be enough for eventual
optimizations where we want to decide between append/upsert/retract stream
output depending on the args inside opMapping
```
BuiltInFunctionDefinition.newBuilder()
.name("FROM_CHANGELOG")
.outputChangelogMode(ChangelogMode.upsert(false))
.build();
````
I think we have to sync with Timo on this
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,108 @@ Flink SQL provides built-in process table functions (PTFs)
for working with chan
| Function | Description |
|:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with
operation codes into a dynamic table |
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only
table with explicit operation codes |
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit
operation code column into a dynamic table (i.e. an updating table). Each input
row is expected to have a string column that indicates the change operation.
The op column is removed from the output and the row is emitted with the
corresponding `RowKind`.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems
like Debezium, Maxwell, or Canal, where events arrive as flat append-only
records with an explicit operation field.
Review Comment:
```suggestion
This is useful when consuming Change Data Capture (CDC) streams from systems
like Debezium, Maxwell, or Canal, where events arrive as flat append-only
records with an explicit operation field. It's also useful to be used in
combination with the TO_CHANGELOG function, when the user wants to turn the
append-only table back into an updating table after doing some specific
transformation to the events.
```
##########
docs/content/docs/sql/reference/queries/changelog.md:
##########
@@ -30,9 +30,108 @@ Flink SQL provides built-in process table functions (PTFs)
for working with chan
| Function | Description |
|:---------|:------------|
+| [FROM_CHANGELOG](#from_changelog) | Converts an append-only table with
operation codes into a dynamic table |
| [TO_CHANGELOG](#to_changelog) | Converts a dynamic table into an append-only
table with explicit operation codes |
-<!-- Placeholder for future FROM_CHANGELOG function -->
+## FROM_CHANGELOG
+
+The `FROM_CHANGELOG` PTF converts an append-only table with an explicit
operation code column into a dynamic table (i.e. an updating table). Each input
row is expected to have a string column that indicates the change operation.
The op column is removed from the output and the row is emitted with the
corresponding `RowKind`.
+
+This is useful when consuming Change Data Capture (CDC) streams from systems
like Debezium, Maxwell, or Canal, where events arrive as flat append-only
records with an explicit operation field.
+
+### Syntax
+
+```sql
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE source_table PARTITION BY key_col,
+ [op => DESCRIPTOR(op_column_name),]
+ [op_mapping => MAP['c, r', 'INSERT', 'u', 'UPDATE_AFTER', 'd', 'DELETE']]
+)
+```
+
+### Parameters
+
+| Parameter | Required | Description |
+|:-------------|:---------|:------------|
+| `input` | Yes | The input table. Must be append-only and include
`PARTITION BY` for parallel execution. |
+| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING. |
+| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined
operation codes to change operation names. Keys are user codes (e.g., `'c'`,
`'u'`, `'d'`), values are change operation names (`INSERT`, `UPDATE_AFTER`,
`DELETE`). Keys can contain comma-separated codes to map multiple codes to the
same operation (e.g., `'c, r'`). When provided, only mapped codes are forwarded
- unmapped codes are dropped. Each change operation may appear at most once
across all entries. |
+
+#### Default op_mapping
+
+When `op_mapping` is omitted, the following standard names are used:
Review Comment:
We're missing the UPDATE_BEFORE -> UPDATE_BEFORE
We're actually not going to drop UPDATE_BEFORE in the default
implementations. We want a simply flat mapping in the default behavior
##########
flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java:
##########
@@ -802,6 +807,47 @@ ANY, and(logical(LogicalTypeRoot.BOOLEAN), LITERAL)
"org.apache.flink.table.runtime.functions.ptf.ToChangelogFunction")
.build();
+ public static final BuiltInFunctionDefinition FROM_CHANGELOG =
+ BuiltInFunctionDefinition.newBuilder()
+ .name("FROM_CHANGELOG")
+ .kind(PROCESS_TABLE)
+ .staticArguments(
+ StaticArgument.table(
+ "input",
+ Row.class,
+ false,
+ EnumSet.of(
+ StaticArgumentTrait.TABLE,
+
StaticArgumentTrait.SET_SEMANTIC_TABLE)),
+ StaticArgument.scalar("op",
DataTypes.DESCRIPTOR(), true),
+ StaticArgument.scalar(
+ "op_mapping",
+ DataTypes.MAP(DataTypes.STRING(),
DataTypes.STRING()),
+ true))
+ .changelogFunction(
+ new ChangelogFunction() {
+ @Override
+ public ChangelogMode getChangelogMode(
+ ChangelogContext changelogContext) {
+ return ChangelogMode.upsert(false);
Review Comment:
I think we'll go with ChangelogMode.ALL
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]