This is an automated email from the ASF dual-hosted git repository.

snuyanzin pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d65a4f69c7 [FLINK-39614][table] Support `TO_CHANGELOG`: retract 
stream -> retract with set semantics
6d65a4f69c7 is described below

commit 6d65a4f69c7231c818912132cb53db81eba4efbb
Author: Gustavo de Morais <[email protected]>
AuthorDate: Thu May 7 18:46:04 2026 +0200

    [FLINK-39614][table] Support `TO_CHANGELOG`: retract stream -> retract with 
set semantics
---
 .../docs/sql/reference/queries/changelog.md        | 19 ++++++
 .../strategies/ChangelogTypeStrategyUtils.java     | 71 ++++++++++++++++++++++
 .../strategies/ToChangelogTypeStrategy.java        |  5 +-
 .../exec/stream/ToChangelogSemanticTests.java      |  7 ++-
 .../nodes/exec/stream/ToChangelogTestPrograms.java | 37 ++++++++++-
 .../planner/plan/stream/sql/ToChangelogTest.java   |  4 +-
 .../planner/plan/stream/sql/ToChangelogTest.xml    | 28 ++++-----
 .../runtime/functions/ptf/ToChangelogFunction.java | 11 +++-
 8 files changed, 158 insertions(+), 24 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index 494dcc60be8..34945439f92 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -268,6 +268,25 @@ SELECT * FROM TO_CHANGELOG(
 -- UPDATE_BEFORE is dropped (not in the mapping)
 ```
 
+#### Partitioning by a key
+
+```sql
+-- Input table 'my_aggregation' with columns (name, id, cnt)
+-- Default output schema:           [op, name, id, cnt]
+-- Output schema with PARTITION BY: [id, op, name, cnt]
+
+SELECT * FROM TO_CHANGELOG(
+  input => TABLE my_aggregation PARTITION BY id
+)
+```
+When `PARTITION BY` is provided, **the output schema changes**. The partition 
key columns are moved to the front by the engine, and the function emits the 
remaining input columns. The order becomes:
+
+```
+[partition_keys, op_column, non_partition_input_columns]
+```
+
+Prefer row semantics, when possible. `PARTITION BY` is only necessary when 
downstream operators are keyed on that column and you want to co-locate rows 
for the same key in the same parallel operator instance.
+
 #### Table API
 
 ```java
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
new file mode 100644
index 00000000000..e9608ee7795
--- /dev/null
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
@@ -0,0 +1,71 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.types.inference.strategies;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.table.functions.TableSemantics;
+import org.apache.flink.table.types.DataType;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+
+/** Shared helpers for changelog-style PTFs ({@code TO_CHANGELOG}, {@code 
FROM_CHANGELOG}). */
+@Internal
+public final class ChangelogTypeStrategyUtils {
+
+    /**
+     * Returns the input column indices that pass through to the function's 
output, excluding the
+     * partition key columns (the PTF framework prepends them when the input 
has set semantics).
+     */
+    public static int[] computeOutputIndices(final TableSemantics 
tableSemantics) {
+        return computeOutputIndices(tableSemantics, -1);
+    }
+
+    /**
+     * Returns the input column indices that pass through to the function's 
output, excluding the
+     * partition key columns and the operation column matching {@code 
opColumnName}.
+     */
+    public static int[] computeOutputIndices(
+            final TableSemantics tableSemantics, final String opColumnName) {
+        final int opIndex = 
DataType.getFieldNames(tableSemantics.dataType()).indexOf(opColumnName);
+        return computeOutputIndices(tableSemantics, opIndex);
+    }
+
+    private static int[] computeOutputIndices(
+            final TableSemantics tableSemantics, final int extraExcludedIndex) 
{
+        final Set<Integer> excluded = 
collectPartitionKeyIndices(tableSemantics);
+        if (extraExcludedIndex >= 0) {
+            excluded.add(extraExcludedIndex);
+        }
+        final int inputFieldCount = 
DataType.getFieldCount(tableSemantics.dataType());
+        return IntStream.range(0, inputFieldCount).filter(i -> 
!excluded.contains(i)).toArray();
+    }
+
+    private static Set<Integer> collectPartitionKeyIndices(final 
TableSemantics tableSemantics) {
+        return new HashSet<>(
+                Arrays.stream(tableSemantics.partitionByColumns())
+                        .boxed()
+                        .collect(Collectors.toSet()));
+    }
+
+    private ChangelogTypeStrategyUtils() {}
+}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
index c910885d69e..d977deab4a9 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java
@@ -35,6 +35,7 @@ import org.apache.flink.table.types.inference.TypeStrategy;
 import org.apache.flink.types.ColumnList;
 
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
@@ -96,10 +97,12 @@ public final class ToChangelogTypeStrategy {
 
                 final String opColumnName = resolveOpColumnName(callContext);
                 final List<Field> inputFields = 
DataType.getFields(semantics.dataType());
+                final int[] outputIndices =
+                        
ChangelogTypeStrategyUtils.computeOutputIndices(semantics);
 
                 final List<Field> outputFields = new ArrayList<>();
                 outputFields.add(DataTypes.FIELD(opColumnName, 
DataTypes.STRING()));
-                outputFields.addAll(inputFields);
+                
Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add);
 
                 return Optional.of(DataTypes.ROW(outputFields).notNull());
             };
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
index da3c9270eba..ca34b79974a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogSemanticTests.java
@@ -39,9 +39,10 @@ public class ToChangelogSemanticTests extends 
SemanticTestBase {
     @Override
     public List<TableTestProgram> programs() {
         return List.of(
-                ToChangelogTestPrograms.INSERT_ONLY_INPUT,
-                ToChangelogTestPrograms.UPDATING_INPUT,
-                ToChangelogTestPrograms.UPSERT_INPUT,
+                ToChangelogTestPrograms.INSERT,
+                ToChangelogTestPrograms.RETRACT,
+                ToChangelogTestPrograms.UPSERT,
+                ToChangelogTestPrograms.RETRACT_PARTITION_BY,
                 ToChangelogTestPrograms.CUSTOM_OP_MAPPING,
                 ToChangelogTestPrograms.CUSTOM_OP_NAME,
                 ToChangelogTestPrograms.TABLE_API_DEFAULT,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
index d6da85f8c0c..36f602225dd 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/ToChangelogTestPrograms.java
@@ -42,7 +42,7 @@ public class ToChangelogTestPrograms {
     // SQL tests
     // 
--------------------------------------------------------------------------------------------
 
-    public static final TableTestProgram INSERT_ONLY_INPUT =
+    public static final TableTestProgram INSERT =
             TableTestProgram.of("to-changelog-insert-only", "insert-only input 
produces op=INSERT")
                     .setupTableSource(
                             SourceTestStep.newBuilder("t")
@@ -60,7 +60,7 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
-    public static final TableTestProgram UPDATING_INPUT =
+    public static final TableTestProgram RETRACT =
             TableTestProgram.of(
                             "to-changelog-updating-input",
                             "retract input produces all op codes including 
UPDATE_BEFORE")
@@ -121,7 +121,38 @@ public class ToChangelogTestPrograms {
                     .runSql("INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t)")
                     .build();
 
-    public static final TableTestProgram UPSERT_INPUT =
+    /** Partitions by a non-leading column ({@code id}, the middle column of 
three). */
+    public static final TableTestProgram RETRACT_PARTITION_BY =
+            TableTestProgram.of(
+                            "to-changelog-retract-partition-by-middle-column",
+                            "PARTITION BY a non-leading column drops it from 
the function output "
+                                    + "without disturbing the order of the 
remaining columns")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("t")
+                                    .addSchema(
+                                            "name STRING PRIMARY KEY NOT 
ENFORCED",
+                                            "id STRING",
+                                            "score BIGINT")
+                                    .addMode(ChangelogMode.all())
+                                    .producedValues(
+                                            Row.ofKind(RowKind.INSERT, 
"Alice", "EU", 10L),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
"Alice", "EU", 10L),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
"Alice", "EU", 30L))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema(
+                                            "id STRING", "op STRING", "name 
STRING", "score BIGINT")
+                                    .consumedValues(
+                                            "+I[EU, INSERT, Alice, 10]",
+                                            "+I[EU, UPDATE_BEFORE, Alice, 10]",
+                                            "+I[EU, UPDATE_AFTER, Alice, 30]")
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM TO_CHANGELOG(input 
=> TABLE t PARTITION BY id)")
+                    .build();
+
+    public static final TableTestProgram UPSERT =
             TableTestProgram.of(
                             "to-changelog-upsert-input",
                             "upsert input gets ChangelogNormalize for 
UPDATE_BEFORE and full deletes")
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
index e98e28fc53a..12790318688 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.java
@@ -77,7 +77,7 @@ public class ToChangelogTest extends TableTestBase {
     }
 
     @Test
-    void testInsertOnlySource() {
+    void testInsertSource() {
         util.tableEnv()
                 .executeSql(
                         "CREATE TABLE insert_only_source ("
@@ -89,7 +89,7 @@ public class ToChangelogTest extends TableTestBase {
     }
 
     @Test
-    void testSetSemanticsWithPartitionBy() {
+    void testRetractPartitionBy() {
         util.tableEnv()
                 .executeSql(
                         "CREATE TABLE retract_source ("
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
index 77133f4fe41..b477039f2da 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/ToChangelogTest.xml
@@ -16,7 +16,7 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testInsertOnlySource">
+  <TestCase name="testInsertSource">
     <Resource name="sql">
       <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE insert_only_source)]]>
     </Resource>
@@ -35,42 +35,42 @@ ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), 
DEFAULT(), DEFAULT(), D
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testRetractSource">
+  <TestCase name="testRetractPartitionBy">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)]]>
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(op=[$0], id=[$1], name=[$2])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, 
INTEGER id, VARCHAR(2147483647) name)])
+LogicalProject(id=[$0], op=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, VARCHAR(2147483647) 
name)])
    +- LogicalProject(id=[$0], name=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
retract_source]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
-+- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, 
VARCHAR(2147483647) name)], changelogMode=[I])
++- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
+   +- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testSetSemanticsWithPartitionBy">
+  <TestCase name="testRetractSource">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source 
PARTITION BY id)]]>
+      <![CDATA[SELECT * FROM TO_CHANGELOG(input => TABLE retract_source)]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
-LogicalProject(id=[$0], op=[$1], id0=[$2], name=[$3])
-+- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) op, INTEGER id0, 
VARCHAR(2147483647) name)])
+LogicalProject(op=[$0], id=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(VARCHAR(2147483647) op, 
INTEGER id, VARCHAR(2147483647) name)])
    +- LogicalProject(id=[$0], name=[$1])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
retract_source]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[TO_CHANGELOG], 
select=[id,op,id0,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) 
op, INTEGER id0, VARCHAR(2147483647) name)], changelogMode=[I])
-+- Exchange(distribution=[hash[id]], changelogMode=[I,UB,UA,D])
-   +- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
+ProcessTableFunction(invocation=[TO_CHANGELOG(TABLE(#0), DEFAULT(), DEFAULT(), 
DEFAULT(), DEFAULT())], uid=[null], select=[op,id,name], 
rowType=[RecordType(VARCHAR(2147483647) op, INTEGER id, VARCHAR(2147483647) 
name)], changelogMode=[I])
++- TableSourceScan(table=[[default_catalog, default_database, 
retract_source]], fields=[id, name], changelogMode=[I,UB,UA,D])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
index 247f565228b..36ed1a615bb 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/ToChangelogFunction.java
@@ -24,10 +24,13 @@ import org.apache.flink.table.data.MapData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
 import org.apache.flink.table.data.utils.JoinedRowData;
+import org.apache.flink.table.data.utils.ProjectedRowData;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
+import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.inference.CallContext;
+import 
org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils;
 import org.apache.flink.types.ColumnList;
 import org.apache.flink.types.RowKind;
 
@@ -57,19 +60,24 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
                     RowKind.DELETE, "DELETE");
 
     private final Map<RowKind, String> rawOpMap;
+    private final int[] outputIndices;
 
     private transient Map<RowKind, StringData> opMap;
     private transient GenericRowData opRow;
     private transient JoinedRowData output;
+    private transient ProjectedRowData projectedOutput;
 
     @SuppressWarnings("unchecked")
     public ToChangelogFunction(final SpecializedContext context) {
         super(BuiltInFunctionDefinitions.TO_CHANGELOG, context);
         final CallContext callContext = context.getCallContext();
+        // Table argument is guaranteed by the type strategy's validation 
phase.
+        final TableSemantics tableSemantics = 
callContext.getTableSemantics(0).get();
 
         final Map<String, String> opMapping =
                 callContext.getArgumentValue(2, Map.class).orElse(null);
         this.rawOpMap = buildOpMap(opMapping);
+        this.outputIndices = 
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics);
     }
 
     @Override
@@ -79,6 +87,7 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         rawOpMap.forEach((kind, code) -> opMap.put(kind, 
StringData.fromString(code)));
         opRow = new GenericRowData(1);
         output = new JoinedRowData();
+        projectedOutput = ProjectedRowData.from(outputIndices);
     }
 
     /**
@@ -110,6 +119,6 @@ public class ToChangelogFunction extends 
BuiltInProcessTableFunction<RowData> {
         }
 
         opRow.setField(0, opCode);
-        collect(output.replace(opRow, input));
+        collect(output.replace(opRow, projectedOutput.replaceRow(input)));
     }
 }

Reply via email to