This is an automated email from the ASF dual-hosted git repository.
snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 6d65a4f69c7 [FLINK-39614][table] Support `TO_CHANGELOG`: retract
stream -> retract with set semantics
6d65a4f69c7 is described below
commit 6d65a4f69c7231c818912132cb53db81eba4efbb
Author: Gustavo de Morais <[email protected]>
AuthorDate: Thu May 7 18:46:04 2026 +0200
[FLINK-39614][table] Support `TO_CHANGELOG`: retract stream -> retract with
set semantics
---
.../docs/sql/reference/queries/changelog.md | 19 ++++++
.../strategies/ChangelogTypeStrategyUtils.java | 71 ++++++++++++++++++++++
.../strategies/ToChangelogTypeStrategy.java | 5 +-
.../exec/stream/ToChangelogSemanticTests.java | 7 ++-
.../nodes/exec/stream/ToChangelogTestPrograms.java | 37 ++++++++++-
.../planner/plan/stream/sql/ToChangelogTest.java | 4 +-
.../planner/plan/stream/sql/ToChangelogTest.xml | 28 ++++-----
.../runtime/functions/ptf/ToChangelogFunction.java | 11 +++-
8 files changed, 158 insertions(+), 24 deletions(-)
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
index 494dcc60be8..34945439f92 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -268,6 +268,25 @@ SELECT * FROM TO_CHANGELOG(
-- UPDATE_BEFORE is dropped (not in the mapping)
```
+#### Partitioning by a key
+
+```sql
+-- Input table 'my_aggregation' with columns (name, id, cnt)
+-- Default output schema: [op, name, id, cnt]
+-- Output schema with PARTITION BY: [id, op, name, cnt]
+
+SELECT * FROM TO_CHANGELOG(
+ input => TABLE my_aggregation PARTITION BY id
+)
+```
+When `PARTITION BY` is provided, **the output schema changes**. The partition
key columns are moved to the front by the engine, and the function emits the
remaining input columns. The order becomes:
+
+```
+[partition_keys, op_column, non_partition_input_columns]
+```
+
+Prefer row semantics, when possible. `PARTITION BY` is only necessary when
downstream operators are keyed on that column and you want to co-locate rows
for the same key in the same parallel operator instance.
+
#### Table API
```java
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
new file mode 100644
index 00000000000..e9608ee7795
--- /dev/null
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** Shared helpers for changelog-style PTFs ({@code TO_CHANGELOG}, {@code
FROM_CHANGELOG}). */
+@Internal
+public final class ChangelogTypeStrategyUtils {
+
+ /**
+ * Returns the input column indices that pass through to the function's
output, excluding the
+ * partition key columns (the PTF framework prepends them when the input
has set semantics).
+ */
+ public static int[] computeOutputIndices(final TableSemantics
tableSemantics) {
+ return computeOutputIndices(tableSemantics, -1);
+ }
+
+ /**
+ * Returns the input column indices that pass through to the function's
output, excluding the
+ * partition key columns and the operation column matching {@code
opColumnName}.
+ */
+ public static int[] computeOutputIndices(
+ final TableSemantics tableSemantics, final String opColumnName) {
+ final int opIndex =
DataType.getFieldNames(tableSemantics.dataType()).indexOf(opColumnName);
+ return computeOutputIndices(tableSemantics, opIndex);
+ }
+
+ private static int[] computeOutputIndices(
+ final TableSemantics tableSemantics, final int extraExcludedIndex)
{
+ final Set<Integer> excluded =
collectPartitionKeyIndices(tableSemantics);
+ if (extraExcludedIndex >= 0) {
+ excluded.add(extraExcludedIndex);
+ }
+ final int inputFieldCount =
DataType.getFieldCount(tableSemantics.dataType());
+ return IntStream.range(0, inputFieldCount).filter(i ->
!excluded.contains(i)).toArray();
+ }
+
+ private static Set<Integer> collectPartitionKeyIndices(final
TableSemantics tableSemantics) {
+ return new HashSet<>(
+ Arrays.stream(tableSemantics.partitionByColumns())
+ .boxed()
+ .collect(Collectors.toSet()));
+ }
+
+ private ChangelogTypeStrategyUtils() {}
+}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index c910885d69e..d977deab4a9 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.types.inference.TypeStrategy;
import org.apache.flink.types.ColumnList;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
@@ -96,10 +97,12 @@ public final class ToChangelogTypeStrategy {
final String opColumnName = resolveOpColumnName(callContext);
final List<Field> inputFields =
DataType.getFields(semantics.dataType());
+ final int[] outputIndices =
+
ChangelogTypeStrategyUtils.computeOutputIndices(semantics);
final List<Field> outputFields = new ArrayList<>();
outputFields.add(DataTypes.FIELD(opColumnName,
DataTypes.STRING()));
- outputFields.addAll(inputFields);
+
Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add);
return Optional.of(DataTypes.ROW(outputFields).notNull());
};
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index da3c9270eba..ca34b79974a 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -39,9 +39,10 @@ public class ToChangelogSemanticTests extends
SemanticTestBase {
@Override
public List<TableTestProgram> programs() {
return List.of(
- ToChangelogTestPrograms.INSERT_ONLY_INPUT,
- ToChangelogTestPrograms.UPDATING_INPUT,
- ToChangelogTestPrograms.UPSERT_INPUT,
+ ToChangelogTestPrograms.INSERT,
+ ToChangelogTestPrograms.RETRACT,
+ ToChangelogTestPrograms.UPSERT,
+ ToChangelogTestPrograms.RETRACT_PARTITION_BY,
ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
ToChangelogTestPrograms.CUSTOM_OP_NAME,
ToChangelogTestPrograms.TABLE_API_DEFAULT,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index d6da85f8c0c..36f602225dd 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -42,7 +42,7 @@ public class ToChangelogTestPrograms {
// SQL tests
//
--------------------------------------------------------------------------------------------
- public static final TableTestProgram INSERT_ONLY_INPUT =
+ public static final TableTestProgram INSERT =
TableTestProgram.of("to-changelog-insert-only", "insert-only input
produces op=INSERT")
.setupTableSource(
SourceTestStep.newBuilder("t")
@@ -60,7 +60,7 @@ public class ToChangelogTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
.build();
- public static final TableTestProgram UPDATING_INPUT =
+ public static final TableTestProgram RETRACT =
TableTestProgram.of(
"to-changelog-updating-input",
"retract input produces all op codes including
UPDATE_BEFORE")
@@ -121,7 +121,38 @@ public class ToChangelogTestPrograms {
.runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t)")
.build();
- public static final TableTestProgram UPSERT_INPUT =
+ /** Partitions by a non-leading column ({@code id}, the middle column of
three). */
+ public static final TableTestProgram RETRACT_PARTITION_BY =
+ TableTestProgram.of(
+ "to-changelog-retract-partition-by-middle-column",
+ "PARTITION BY a non-leading column drops it from
the function output "
+ + "without disturbing the order of the
remaining columns")
+ .setupTableSource(
+ SourceTestStep.newBuilder("t")
+ .addSchema(
+ "name STRING PRIMARY KEY NOT
ENFORCED",
+ "id STRING",
+ "score BIGINT")
+ .addMode(ChangelogMode.all())
+ .producedValues(
+ Row.ofKind(RowKind.INSERT,
"Alice", "EU", 10L),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
"Alice", "EU", 10L),
+ Row.ofKind(RowKind.UPDATE_AFTER,
"Alice", "EU", 30L))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema(
+ "id STRING", "op STRING", "name
STRING", "score BIGINT")
+ .consumedValues(
+ "+I[EU, INSERT, Alice, 10]",
+ "+I[EU, UPDATE_BEFORE, Alice, 10]",
+ "+I[EU, UPDATE_AFTER, Alice, 30]")
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input
=> TABLE t PARTITION BY id)")
+ .build();
+
+ public static final TableTestProgram UPSERT =
TableTestProgram.of(
"to-changelog-upsert-input",
"upsert input gets ChangelogNormalize for
UPDATE_BEFORE and full deletes")
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
index e98e28fc53a..12790318688 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
@@ -77,7 +77,7 @@ public class ToChangelogTest extends TableTestBase {
}
@Test
- void testInsertOnlySource() {
+ void testInsertSource() {
util.tableEnv()
.executeSql(
"CREATE TABLE insert_only_source ("
@@ -89,7 +89,7 @@ public class ToChangelogTest extends TableTestBase {
}
@Test
- void testSetSemanticsWithPartitionBy() {
+ void testRetractPartitionBy() {
util.tableEnv()
.executeSql(
"CREATE TABLE retract_source ("
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
index 77133f4fe41..b477039f2da 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
@@ -16,7 +16,7 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
- <TestCase name="testInsertOnlySource">
+ <TestCase name="testInsertSource">
<Resource name="sql">
<![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source)]]>
</Resource>
@@ -35,42 +35,42 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0),
DEFAULT(), DEFAULT(), D
]]>
</Resource>
</TestCase>
- <TestCase name="testRetractSource">
+ <TestCase name="testRetractPartitionBy">
<Resource name="sql">
- <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)]]>
+ <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source
PARTITION BY id)]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(op=[$0], id=[$1], name=[$2])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op,
INTEGER id, VARCHAR(2147483647) name)])
+LogicalProject(id=[$0], op=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())],
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647)
name)])
+- LogicalProject(id=[$0], name=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database,
retract_source]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(),
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name],
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647)
name)], changelogMode=[I])
-+- TableSourceScan(table=[[default_catalog, default_database,
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG],
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op,
VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+ +- TableSourceScan(table=[[default_catalog, default_database,
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
]]>
</Resource>
</TestCase>
- <TestCase name="testSetSemanticsWithPartitionBy">
+ <TestCase name="testRetractSource">
<Resource name="sql">
- <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source
PARTITION BY id)]]>
+ <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)]]>
</Resource>
<Resource name="ast">
<![CDATA[
-LogicalProject(id=[$0], op=[$1], id0=[$2], name=[$3])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())],
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, INTEGER id0,
VARCHAR(2147483647) name)])
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op,
INTEGER id, VARCHAR(2147483647) name)])
+- LogicalProject(id=[$0], name=[$1])
+- LogicalTableScan(table=[[default_catalog, default_database,
retract_source]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG],
select=[id,op,id0,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647)
op, INTEGER id0, VARCHAR(2147483647) name)], changelogMode=[I])
-+- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
- +- TableSourceScan(table=[[default_catalog, default_database,
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(),
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name],
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647)
name)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database,
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index 247f565228b..36ed1a615bb 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -24,10 +24,13 @@ import org.apache.flink.table.data.MapData;
import org.apache.flink.table.data.RowData;
import org.apache.flink.table.data.StringData;
import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.inference.CallContext;
+import
org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils;
import org.apache.flink.types.ColumnList;
import org.apache.flink.types.RowKind;
@@ -57,19 +60,24 @@ public class ToChangelogFunction extends
BuiltInProcessTableFunction<RowData> {
RowKind.DELETE, "DELETE");
private final Map<RowKind, String> rawOpMap;
+ private final int[] outputIndices;
private transient Map<RowKind, StringData> opMap;
private transient GenericRowData opRow;
private transient JoinedRowData output;
+ private transient ProjectedRowData projectedOutput;
@SuppressWarnings("unchecked")
public ToChangelogFunction(final SpecializedContext context) {
super(BuiltInFunctionDefinitions.TO_CHANGELOG, context);
final CallContext callContext = context.getCallContext();
+ // Table argument is guaranteed by the type strategy's validation
phase.
+ final TableSemantics tableSemantics =
callContext.getTableSemantics(0).get();
final Map<String, String> opMapping =
callContext.getArgumentValue(2, Map.class).orElse(null);
this.rawOpMap = buildOpMap(opMapping);
+ this.outputIndices =
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics);
}
@Override
@@ -79,6 +87,7 @@ public class ToChangelogFunction extends
BuiltInProcessTableFunction<RowData> {
rawOpMap.forEach((kind, code) -> opMap.put(kind,
StringData.fromString(code)));
opRow = new GenericRowData(1);
output = new JoinedRowData();
+ projectedOutput = ProjectedRowData.from(outputIndices);
}
/**
@@ -110,6 +119,6 @@ public class ToChangelogFunction extends
BuiltInProcessTableFunction<RowData> {
}
opRow.setField(0, opCode);
- collect(output.replace(opRow, input));
+ collect(output.replace(opRow, projectedOutput.replaceRow(input)));
}
}