This is an automated email from the ASF dual-hosted git repository.

twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 658e30a5484 [FLINK-39537][table] Apply conditional SET_SEMANTIC_TABLE 
trait to FROM_CHANGELOG
658e30a5484 is described below

commit 658e30a5484393e99fafded250dd3cd0eb900a85
Author: Ramin Gharib <[email protected]>
AuthorDate: Fri May 8 12:22:51 2026 +0200

    [FLINK-39537][table] Apply conditional SET_SEMANTIC_TABLE trait to 
FROM_CHANGELOG
    
    This closes #28025.
---
 .../docs/sql/reference/queries/changelog.md        | 26 ++++++++-
 .../java/org/apache/flink/table/api/Table.java     | 11 ++++
 .../functions/BuiltInFunctionDefinitions.java      | 15 +++---
 .../strategies/ChangelogTypeStrategyUtils.java     | 39 ++++++++++++--
 .../strategies/FromChangelogTypeStrategy.java      | 42 +++++++--------
 .../exec/stream/FromChangelogSemanticTests.java    |  4 +-
 .../exec/stream/FromChangelogTestPrograms.java     | 63 ++++++++++++++++++++--
 .../planner/plan/stream/sql/FromChangelogTest.java | 12 ++---
 .../planner/plan/stream/sql/FromChangelogTest.xml  | 23 ++++----
 .../functions/ptf/FromChangelogFunction.java       | 19 ++-----
 10 files changed, 177 insertions(+), 77 deletions(-)

diff --git a/docs/content/docs/sql/reference/queries/changelog.md 
b/docs/content/docs/sql/reference/queries/changelog.md
index 34945439f92..b713aebf072 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -45,7 +45,7 @@ Note: This version requires that your CDC data encodes 
updates using a full imag
 
 ```sql
 SELECT * FROM FROM_CHANGELOG(
-  input => TABLE source_table,
+  input => TABLE source_table [PARTITION BY key_col],
   [op => DESCRIPTOR(op_column_name),]
   [op_mapping => MAP[
       'c, r', 'INSERT',
@@ -61,7 +61,7 @@ SELECT * FROM FROM_CHANGELOG(
 
 | Parameter    | Required | Description                                        
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
 
|:-------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
 [...]
-| `input`      | Yes      | The input table. Must be append-only.              
                                                                                
                                                                                
                                                                                
                                                                                
                                                                                
              [...]
+| `input`      | Yes      | The input table. Must be append-only. Use 
`PARTITION BY` to ensure rows for the same key are processed together.          
                                                                                
                                                                                
                                                                                
                                                |
 | `op`         | No       | A `DESCRIPTOR` with a single column name for the 
operation code column. Defaults to `op`. The column must exist in the input 
table and be of type STRING.                                                    
                                                                                
                                                                                
                                                                                
                    [...]
 | `op_mapping` | No       | A `MAP<STRING, STRING>` mapping user-defined codes 
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`, 
`'u'`, `'d'`), values are Flink change operation names (`INSERT`, 
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated 
codes to map multiple codes to the same operation (e.g., `'c, r'`). Each change 
operation may appear at most once across all entries. |
 | `error_handling` | No | Controls behavior when an input row's operation code 
is `NULL` or not present in the `op_mapping`. Valid values: `FAIL` (default) — 
throw a `TableRuntimeException`, `SKIP` — silently drop the row. |
@@ -127,6 +127,28 @@ SELECT * FROM FROM_CHANGELOG(
 -- The operation column named 'operation' is used instead of 'op'
 ```
 
+#### Partitioning by a key
+
+```sql
+-- Input table 'cdc_stream' with columns (name, id, op, doc)
+-- Default output schema:           [name, id, doc]
+-- Output schema with PARTITION BY: [id, name, doc]
+
+SELECT * FROM FROM_CHANGELOG(
+  input => TABLE cdc_stream PARTITION BY id
+)
+```
+
+When `PARTITION BY` is provided, **the output schema changes**. The partition 
key columns are moved to the front by the engine, and the function emits the 
remaining input columns (excluding the op column). The order becomes:
+
+```
+[partition_keys, non_partition_input_columns_excluding_op]
+```
+
+Prefer row semantics, when possible. `PARTITION BY` is only necessary when 
downstream operators are keyed on that column and you want to co-locate rows 
for the same key in the same parallel operator instance.
+
+If you are producing an upsert table — that is, you are emitting 
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the 
partition key you select here will be considered both the primary key and the 
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary 
key exactly.
+
 #### Invalid operation code handling
 
 Two `error_handling` modes are supported. The job can either fail upon an 
invalid or unknown op code, or skip the row and continue processing.
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index eb61371329c..aa2417d4531 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1467,6 +1467,17 @@ public interface Table extends Explainable<Table>, 
Executable {
      * TableRuntimeException} when an input row's op code is {@code NULL} or 
not present in the
      * mapping; pass {@code error_handling => 'SKIP'} to silently drop those 
rows instead.
      *
+     * <p>By default, the input is processed with row semantics (each row 
independently). To
+     * co-locate rows with the same key in the same parallel operator 
instance, partition the input
+     * first via {@link #partitionBy(Expression...)} and invoke the function 
via {@link
+     * PartitionedTable#process(String, Object...)}:
+     *
+     * <pre>{@code
+     * Table result = cdcStream
+     *     .partitionBy($("id"))
+     *     .process("FROM_CHANGELOG");
+     * }</pre>
+     *
      * <p>Optional arguments can be passed using named expressions:
      *
      * <pre>{@code
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index abe54e6f48e..1085afa48b2 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -839,12 +839,15 @@ public final class BuiltInFunctionDefinitions {
                     .kind(PROCESS_TABLE)
                     .staticArguments(
                             StaticArgument.table(
-                                    "input",
-                                    Row.class,
-                                    false,
-                                    EnumSet.of(
-                                            StaticArgumentTrait.TABLE,
-                                            
StaticArgumentTrait.ROW_SEMANTIC_TABLE)),
+                                            "input",
+                                            Row.class,
+                                            false,
+                                            EnumSet.of(
+                                                    StaticArgumentTrait.TABLE,
+                                                    
StaticArgumentTrait.ROW_SEMANTIC_TABLE))
+                                    .withConditionalTrait(
+                                            
StaticArgumentTrait.SET_SEMANTIC_TABLE,
+                                            TraitCondition.hasPartitionBy()),
                             StaticArgument.scalar("op", 
DataTypes.DESCRIPTOR(), true),
                             StaticArgument.scalar(
                                     "op_mapping",
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
index e9608ee7795..d4e20c30210 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
@@ -21,16 +21,46 @@ package org.apache.flink.table.types.inference.strategies;
 import org.apache.flink.annotation.Internal;
 import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.types.ColumnList;
 
 import java.util.Arrays;
-import java.util.HashSet;
+import java.util.List;
+import java.util.OptionalInt;
 import java.util.Set;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP;
+
 /** Shared helpers for changelog-style PTFs ({@code TO_CHANGELOG}, {@code 
FROM_CHANGELOG}). */
 @Internal
 public final class ChangelogTypeStrategyUtils {
+    private static final String DEFAULT_OP_COLUMN_NAME = "op";
+
+    /**
+     * Resolves the op column name from the {@code op} descriptor argument, 
falling back to {@link
+     * #DEFAULT_OP_COLUMN_NAME} when the argument is omitted or empty.
+     */
+    public static String resolveOpColumnName(final CallContext callContext) {
+        return callContext
+                .getArgumentValue(ARG_OP, ColumnList.class)
+                .filter(cl -> !cl.getNames().isEmpty())
+                .map(cl -> cl.getNames().get(0))
+                .orElse(DEFAULT_OP_COLUMN_NAME);
+    }
+
+    /**
+     * Returns the index of the column matching {@code opColumnName} within 
the input schema, or
+     * empty if no field matches.
+     */
+    public static OptionalInt resolveOpColumnIndex(
+            final TableSemantics tableSemantics, final String opColumnName) {
+        final List<String> fieldNames = 
DataType.getFieldNames(tableSemantics.dataType());
+        return IntStream.range(0, fieldNames.size())
+                .filter(i -> fieldNames.get(i).equals(opColumnName))
+                .findFirst();
+    }
 
     /**
      * Returns the input column indices that pass through to the function's 
output, excluding the
@@ -61,10 +91,9 @@ public final class ChangelogTypeStrategyUtils {
     }
 
     private static Set<Integer> collectPartitionKeyIndices(final 
TableSemantics tableSemantics) {
-        return new HashSet<>(
-                Arrays.stream(tableSemantics.partitionByColumns())
-                        .boxed()
-                        .collect(Collectors.toSet()));
+        return Arrays.stream(tableSemantics.partitionByColumns())
+                .boxed()
+                .collect(Collectors.toSet());
     }
 
     private ChangelogTypeStrategyUtils() {}
diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
index d1d3c1c61da..f4d54f276d7 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
@@ -27,14 +27,17 @@ import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.CallContext;
 import org.apache.flink.table.types.inference.InputTypeStrategy;
 import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.LogicalTypeFamily;
 import org.apache.flink.types.ColumnList;
 import org.apache.flink.types.RowKind;
 
+import java.util.Arrays;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Optional;
+import java.util.OptionalInt;
 import java.util.Set;
 import java.util.stream.Collectors;
 
@@ -50,8 +53,6 @@ public final class FromChangelogTypeStrategy {
     public static final int ARG_OP_MAPPING = 2;
     public static final int ARG_ERROR_HANDLING = 3;
 
-    public static final String DEFAULT_OP_COLUMN_NAME = "op";
-
     private static final Set<String> VALID_ROW_KIND_NAMES =
             Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
 
@@ -86,7 +87,8 @@ public final class FromChangelogTypeStrategy {
                                                 new ValidationException(
                                                         "First argument must 
be a table for FROM_CHANGELOG."));
 
-                final String opColumnName = resolveOpColumnName(callContext);
+                final String opColumnName =
+                        
ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);
 
                 final List<Field> outputFields = 
buildOutputFields(tableSemantics, opColumnName);
 
@@ -97,7 +99,6 @@ public final class FromChangelogTypeStrategy {
     // Helpers
     // 
--------------------------------------------------------------------------------------------
 
-    @SuppressWarnings("rawtypes")
     private static Optional<List<DataType>> validateInputs(
             final CallContext callContext, final boolean throwOnFailure) {
         Optional<List<DataType>> error;
@@ -156,23 +157,26 @@ public final class FromChangelogTypeStrategy {
             final CallContext callContext, final boolean throwOnFailure) {
 
         final TableSemantics tableSemantics = 
callContext.getTableSemantics(ARG_TABLE).get();
-        final String opColumnName = resolveOpColumnName(callContext);
-        final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
-        final Optional<Field> opField =
-                inputFields.stream().filter(f -> 
f.getName().equals(opColumnName)).findFirst();
-        if (opField.isEmpty()) {
+        final String opColumnName = 
ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);
+        final OptionalInt opIndex =
+                
ChangelogTypeStrategyUtils.resolveOpColumnIndex(tableSemantics, opColumnName);
+        if (opIndex.isEmpty()) {
             return callContext.fail(
                     throwOnFailure,
                     String.format(
                             "The op column '%s' does not exist in the input 
schema.",
                             opColumnName));
         }
-        if 
(!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
 {
+        final LogicalType opFieldType =
+                DataType.getFieldDataTypes(tableSemantics.dataType())
+                        .get(opIndex.getAsInt())
+                        .getLogicalType();
+        if (!opFieldType.is(LogicalTypeFamily.CHARACTER_STRING)) {
             return callContext.fail(
                     throwOnFailure,
                     String.format(
                             "The op column '%s' must be of STRING type, but 
was '%s'.",
-                            opColumnName, 
opField.get().getDataType().getLogicalType()));
+                            opColumnName, opFieldType));
         }
         return Optional.empty();
     }
@@ -273,21 +277,13 @@ public final class FromChangelogTypeStrategy {
         return Optional.empty();
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
-        return callContext
-                .getArgumentValue(ARG_OP, ColumnList.class)
-                .filter(cl -> !cl.getNames().isEmpty())
-                .map(cl -> cl.getNames().get(0))
-                .orElse(DEFAULT_OP_COLUMN_NAME);
-    }
-
     private static List<Field> buildOutputFields(
             final TableSemantics tableSemantics, final String opColumnName) {
         final List<Field> inputFields = 
DataType.getFields(tableSemantics.dataType());
-
-        // Exclude the op column (becomes RowKind), keep all other columns
-        return inputFields.stream()
-                .filter(f -> !f.getName().equals(opColumnName))
+        return Arrays.stream(
+                        ChangelogTypeStrategyUtils.computeOutputIndices(
+                                tableSemantics, opColumnName))
+                .mapToObj(inputFields::get)
                 .collect(Collectors.toList());
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
index 4643a51be52..85efd0d4863 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
@@ -39,9 +39,11 @@ public class FromChangelogSemanticTests extends 
SemanticTestBase {
     @Override
     public List<TableTestProgram> programs() {
         return List.of(
-                FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
+                FromChangelogTestPrograms.RETRACT,
                 FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
                 FromChangelogTestPrograms.CUSTOM_OP_NAME,
+                FromChangelogTestPrograms.RETRACT_PARTITION_BY,
+                FromChangelogTestPrograms.DELETION_FLAG_PARTITION_BY,
                 FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING,
                 FromChangelogTestPrograms.SKIP_NULL_OP_CODE,
                 FromChangelogTestPrograms.TABLE_API_DEFAULT,
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index 1858aabeed0..f7264ec3c92 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -35,10 +35,8 @@ public class FromChangelogTestPrograms {
     // SQL tests
     // 
--------------------------------------------------------------------------------------------
 
-    public static final TableTestProgram DEFAULT_OP_MAPPING =
-            TableTestProgram.of(
-                            "from-changelog-default-op-mapping",
-                            "default mapping with standard op names")
+    public static final TableTestProgram RETRACT =
+            TableTestProgram.of("from-changelog-retract", "retract changelog 
with default mapping")
                     .setupTableSource(
                             SourceTestStep.newBuilder("cdc_stream")
                                     .addSchema(SIMPLE_CDC_SCHEMA)
@@ -146,7 +144,6 @@ public class FromChangelogTestPrograms {
                                     + "error_handling => 'SKIP')")
                     .build();
 
-    /** Custom op column name via DESCRIPTOR. */
     public static final TableTestProgram CUSTOM_OP_NAME =
             TableTestProgram.of(
                             "from-changelog-custom-op-name", "custom op column 
name via DESCRIPTOR")
@@ -172,6 +169,62 @@ public class FromChangelogTestPrograms {
                                     + "op => DESCRIPTOR(operation))")
                     .build();
 
+    public static final TableTestProgram RETRACT_PARTITION_BY =
+            TableTestProgram.of(
+                            "from-changelog-retract-partition-by",
+                            "retract changelog with PARTITION BY")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema("name STRING", "id INT", "op 
STRING")
+                                    .producedValues(
+                                            Row.of("Alice", 1, "INSERT"),
+                                            Row.of("Bob", 2, "INSERT"),
+                                            Row.of("Alice", 1, 
"UPDATE_BEFORE"),
+                                            Row.of("Alice2", 1, 
"UPDATE_AFTER"),
+                                            Row.of("Bob", 2, "DELETE"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.UPDATE_BEFORE, 
1, "Alice"),
+                                            Row.ofKind(RowKind.UPDATE_AFTER, 
1, "Alice2"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream PARTITION BY 
id)")
+                    .build();
+
+    public static final TableTestProgram DELETION_FLAG_PARTITION_BY =
+            TableTestProgram.of(
+                            "from-changelog-deletion-flag-partition-by",
+                            "deletion flag mapping with PARTITION BY: 'false' 
-> INSERT, 'true' -> DELETE")
+                    .setupTableSource(
+                            SourceTestStep.newBuilder("cdc_stream")
+                                    .addSchema("id INT", "deleted STRING", 
"name STRING")
+                                    .producedValues(
+                                            Row.of(1, "false", "Alice"),
+                                            Row.of(2, "false", "Bob"),
+                                            Row.of(2, "true", "Bob"))
+                                    .build())
+                    .setupTableSink(
+                            SinkTestStep.newBuilder("sink")
+                                    .addSchema("id INT", "name STRING")
+                                    .consumedValues(
+                                            Row.ofKind(RowKind.INSERT, 1, 
"Alice"),
+                                            Row.ofKind(RowKind.INSERT, 2, 
"Bob"),
+                                            Row.ofKind(RowKind.DELETE, 2, 
"Bob"))
+                                    .build())
+                    .runSql(
+                            "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+                                    + "input => TABLE cdc_stream PARTITION BY 
id, "
+                                    + "op => DESCRIPTOR(deleted), "
+                                    + "op_mapping => MAP['false', 'INSERT', 
'true', 'DELETE'])")
+                    .build();
+
     // 
--------------------------------------------------------------------------------------------
     // Table API test
     // 
--------------------------------------------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
index ba2c1a5690c..17d7f64fa48 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
@@ -46,7 +46,7 @@ public class FromChangelogTest extends TableTestBase {
     }
 
     @Test
-    void testInsertOnlySource() {
+    void testRetract() {
         util.tableEnv()
                 .executeSql(
                         "CREATE TABLE cdc_stream ("
@@ -59,20 +59,16 @@ public class FromChangelogTest extends TableTestBase {
     }
 
     @Test
-    void testCustomOpMapping() {
+    void testRetractPartitionBy() {
         util.tableEnv()
                 .executeSql(
                         "CREATE TABLE cdc_stream ("
                                 + "  id INT,"
-                                + "  __op STRING,"
+                                + "  op STRING,"
                                 + "  name STRING"
                                 + ") WITH ('connector' = 'values')");
         util.verifyRelPlan(
-                "SELECT * FROM FROM_CHANGELOG("
-                        + "input => TABLE cdc_stream, "
-                        + "op => DESCRIPTOR(__op), "
-                        + "op_mapping => MAP['c, r', 'INSERT', 'ub', 
'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'], "
-                        + "error_handling => 'SKIP')",
+                "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream 
PARTITION BY id)",
                 CHANGELOG_MODE);
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
index 614eb5456b9..2addfe9f5ff 100644
--- 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
@@ -16,41 +16,42 @@ See the License for the specific language governing 
permissions and
 limitations under the License.
 -->
 <Root>
-  <TestCase name="testCustomOpMapping">
+  <TestCase name="testRetract">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream, op => 
DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE', 
'ua', 'UPDATE_AFTER', 'd', 'DELETE'], error_handling => 'SKIP')]]>
+      <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(id=[$0], name=[$1])
-+- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET 
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16 [...]
-   +- LogicalProject(id=[$0], __op=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, 
VARCHAR(2147483647) name)])
+   +- LogicalProject(id=[$0], op=[$1], name=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc_stream]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), 
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET 
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE", 
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE", 
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'DELE [...]
-+- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], 
fields=[id, __op, name], changelogMode=[I])
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)], 
changelogMode=[I,UB,UA,D])
++- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], 
fields=[id, op, name], changelogMode=[I])
 ]]>
     </Resource>
   </TestCase>
-  <TestCase name="testInsertOnlySource">
+  <TestCase name="testRetractPartitionBy">
     <Resource name="sql">
-      <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)]]>
+      <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream 
PARTITION BY id)]]>
     </Resource>
     <Resource name="ast">
       <![CDATA[
 LogicalProject(id=[$0], name=[$1])
-+- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id, 
VARCHAR(2147483647) name)])
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0) PARTITION 
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)])
    +- LogicalProject(id=[$0], op=[$1], name=[$2])
       +- LogicalTableScan(table=[[default_catalog, default_database, 
cdc_stream]])
 ]]>
     </Resource>
     <Resource name="optimized rel plan">
       <![CDATA[
-ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name], 
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)], 
changelogMode=[I,UB,UA,D])
-+- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], 
fields=[id, op, name], changelogMode=[I])
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0) PARTITION BY($0), 
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[FROM_CHANGELOG], 
select=[id,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)], 
changelogMode=[I,UB,UA,D])
++- Exchange(distribution=[hash[id]], changelogMode=[I])
+   +- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]], 
fields=[id, op, name], changelogMode=[I])
 ]]>
     </Resource>
   </TestCase>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
index cc1f9deb21d..9a2607fb01b 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.functions.FunctionContext;
 import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
 import org.apache.flink.table.functions.TableSemantics;
 import org.apache.flink.table.types.inference.CallContext;
+import 
org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils;
 import org.apache.flink.table.types.inference.strategies.ErrorHandlingMode;
 import org.apache.flink.table.types.logical.RowType;
 import org.apache.flink.types.ColumnList;
@@ -40,13 +41,10 @@ import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Map;
 import java.util.stream.Collectors;
-import java.util.stream.IntStream;
 
 import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_ERROR_HANDLING;
-import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP;
 import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP_MAPPING;
 import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_TABLE;
-import static 
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.DEFAULT_OP_COLUMN_NAME;
 
 /**
  * Runtime implementation of {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}.
@@ -88,14 +86,10 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
                         .orElseThrow(() -> new IllegalStateException("Table 
argument expected."));
 
         final RowType inputType = (RowType) 
tableSemantics.dataType().getLogicalType();
-        final String opColumnName = resolveOpColumnName(callContext);
+        final String opColumnName = 
ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);
         this.opColumnIndex = inputType.getFieldNames().indexOf(opColumnName);
-
-        // Exclude only the op column from output — all other columns pass 
through
         this.outputIndices =
-                IntStream.range(0, inputType.getFieldCount())
-                        .filter(i -> i != opColumnIndex)
-                        .toArray();
+                
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics, opColumnName);
 
         this.rawOpMap = buildOpMap(callContext);
 
@@ -114,13 +108,6 @@ public class FromChangelogFunction extends 
BuiltInProcessTableFunction<RowData>
         projectedOutput = ProjectedRowData.from(outputIndices);
     }
 
-    private static String resolveOpColumnName(final CallContext callContext) {
-        return callContext
-                .getArgumentValue(ARG_OP, ColumnList.class)
-                .map(cl -> cl.getNames().get(0))
-                .orElse(DEFAULT_OP_COLUMN_NAME);
-    }
-
     /**
      * Builds a String-to-RowKind map. Keys in the provided mapping may be 
comma-separated (e.g.,
      * "INSERT, UPDATE_AFTER") to map multiple input codes to the same RowKind.


Reply via email to