This is an automated email from the ASF dual-hosted git repository.
twalthr pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new 658e30a5484 [FLINK-39537][table] Apply conditional SET_SEMANTIC_TABLE
trait to FROM_CHANGELOG
658e30a5484 is described below
commit 658e30a5484393e99fafded250dd3cd0eb900a85
Author: Ramin Gharib <[email protected]>
AuthorDate: Fri May 8 12:22:51 2026 +0200
[FLINK-39537][table] Apply conditional SET_SEMANTIC_TABLE trait to
FROM_CHANGELOG
This closes #28025.
---
.../docs/sql/reference/queries/changelog.md | 26 ++++++++-
.../java/org/apache/flink/table/api/Table.java | 11 ++++
.../functions/BuiltInFunctionDefinitions.java | 15 +++---
.../strategies/ChangelogTypeStrategyUtils.java | 39 ++++++++++++--
.../strategies/FromChangelogTypeStrategy.java | 42 +++++++--------
.../exec/stream/FromChangelogSemanticTests.java | 4 +-
.../exec/stream/FromChangelogTestPrograms.java | 63 ++++++++++++++++++++--
.../planner/plan/stream/sql/FromChangelogTest.java | 12 ++---
.../planner/plan/stream/sql/FromChangelogTest.xml | 23 ++++----
.../functions/ptf/FromChangelogFunction.java | 19 ++-----
10 files changed, 177 insertions(+), 77 deletions(-)
diff --git a/docs/content/docs/sql/reference/queries/changelog.md
b/docs/content/docs/sql/reference/queries/changelog.md
index 34945439f92..b713aebf072 100644
--- a/docs/content/docs/sql/reference/queries/changelog.md
+++ b/docs/content/docs/sql/reference/queries/changelog.md
@@ -45,7 +45,7 @@ Note: This version requires that your CDC data encodes
updates using a full imag
```sql
SELECT * FROM FROM_CHANGELOG(
- input => TABLE source_table,
+ input => TABLE source_table [PARTITION BY key_col],
[op => DESCRIPTOR(op_column_name),]
[op_mapping => MAP[
'c, r', 'INSERT',
@@ -61,7 +61,7 @@ SELECT * FROM FROM_CHANGELOG(
| Parameter | Required | Description
[...]
|:-------------|:---------|:--------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------
[...]
-| `input` | Yes | The input table. Must be append-only.
[...]
+| `input` | Yes | The input table. Must be append-only. Use
`PARTITION BY` to ensure rows for the same key are processed together.
|
| `op` | No | A `DESCRIPTOR` with a single column name for the
operation code column. Defaults to `op`. The column must exist in the input
table and be of type STRING.
[...]
| `op_mapping` | No | A `MAP<STRING, STRING>` mapping user-defined codes
to Flink change operation names. Keys are user-defined codes (e.g., `'c'`,
`'u'`, `'d'`), values are Flink change operation names (`INSERT`,
`UPDATE_BEFORE`, `UPDATE_AFTER`, `DELETE`). Keys can contain comma-separated
codes to map multiple codes to the same operation (e.g., `'c, r'`). Each change
operation may appear at most once across all entries. |
| `error_handling` | No | Controls behavior when an input row's operation code
is `NULL` or not present in the `op_mapping`. Valid values: `FAIL` (default) —
throw a `TableRuntimeException`, `SKIP` — silently drop the row. |
@@ -127,6 +127,28 @@ SELECT * FROM FROM_CHANGELOG(
-- The operation column named 'operation' is used instead of 'op'
```
+#### Partitioning by a key
+
+```sql
+-- Input table 'cdc_stream' with columns (name, id, op, doc)
+-- Default output schema: [name, id, doc]
+-- Output schema with PARTITION BY: [id, name, doc]
+
+SELECT * FROM FROM_CHANGELOG(
+ input => TABLE cdc_stream PARTITION BY id
+)
+```
+
+When `PARTITION BY` is provided, **the output schema changes**. The partition
key columns are moved to the front by the engine, and the function emits the
remaining input columns (excluding the op column). The order becomes:
+
+```
+[partition_keys, non_partition_input_columns_excluding_op]
+```
+
+Prefer row semantics, when possible. `PARTITION BY` is only necessary when
downstream operators are keyed on that column and you want to co-locate rows
for the same key in the same parallel operator instance.
+
+If you are producing an upsert table — that is, you are emitting
`UPDATE_AFTER` but no `UPDATE_BEFORE` from your CDC input stream — the
partition key you select here will be considered both the primary key and the
upsert key by the engine. Make sure the `PARTITION BY` key matches your primary
key exactly.
+
#### Invalid operation code handling
Two `error_handling` modes are supported. The job can either fail upon an
invalid or unknown op code, or skip the row and continue processing.
diff --git
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
index eb61371329c..aa2417d4531 100644
---
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
+++
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/Table.java
@@ -1467,6 +1467,17 @@ public interface Table extends Explainable<Table>,
Executable {
* TableRuntimeException} when an input row's op code is {@code NULL} or
not present in the
* mapping; pass {@code error_handling => 'SKIP'} to silently drop those
rows instead.
*
+ * <p>By default, the input is processed with row semantics (each row
independently). To
+ * co-locate rows with the same key in the same parallel operator
instance, partition the input
+ * first via {@link #partitionBy(Expression...)} and invoke the function
via {@link
+ * PartitionedTable#process(String, Object...)}:
+ *
+ * <pre>{@code
+ * Table result = cdcStream
+ * .partitionBy($("id"))
+ * .process("FROM_CHANGELOG");
+ * }</pre>
+ *
* <p>Optional arguments can be passed using named expressions:
*
* <pre>{@code
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
index abe54e6f48e..1085afa48b2 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/BuiltInFunctionDefinitions.java
@@ -839,12 +839,15 @@ public final class BuiltInFunctionDefinitions {
.kind(PROCESS_TABLE)
.staticArguments(
StaticArgument.table(
- "input",
- Row.class,
- false,
- EnumSet.of(
- StaticArgumentTrait.TABLE,
-
StaticArgumentTrait.ROW_SEMANTIC_TABLE)),
+ "input",
+ Row.class,
+ false,
+ EnumSet.of(
+ StaticArgumentTrait.TABLE,
+
StaticArgumentTrait.ROW_SEMANTIC_TABLE))
+ .withConditionalTrait(
+
StaticArgumentTrait.SET_SEMANTIC_TABLE,
+ TraitCondition.hasPartitionBy()),
StaticArgument.scalar("op",
DataTypes.DESCRIPTOR(), true),
StaticArgument.scalar(
"op_mapping",
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
index e9608ee7795..d4e20c30210 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java
@@ -21,16 +21,46 @@ package org.apache.flink.table.types.inference.strategies;
import org.apache.flink.annotation.Internal;
import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.DataType;
+import org.apache.flink.table.types.inference.CallContext;
+import org.apache.flink.types.ColumnList;
import java.util.Arrays;
-import java.util.HashSet;
+import java.util.List;
+import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
+import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP;
+
/** Shared helpers for changelog-style PTFs ({@code TO_CHANGELOG}, {@code
FROM_CHANGELOG}). */
@Internal
public final class ChangelogTypeStrategyUtils {
+ private static final String DEFAULT_OP_COLUMN_NAME = "op";
+
+ /**
+ * Resolves the op column name from the {@code op} descriptor argument,
falling back to {@link
+ * #DEFAULT_OP_COLUMN_NAME} when the argument is omitted or empty.
+ */
+ public static String resolveOpColumnName(final CallContext callContext) {
+ return callContext
+ .getArgumentValue(ARG_OP, ColumnList.class)
+ .filter(cl -> !cl.getNames().isEmpty())
+ .map(cl -> cl.getNames().get(0))
+ .orElse(DEFAULT_OP_COLUMN_NAME);
+ }
+
+ /**
+ * Returns the index of the column matching {@code opColumnName} within
the input schema, or
+ * empty if no field matches.
+ */
+ public static OptionalInt resolveOpColumnIndex(
+ final TableSemantics tableSemantics, final String opColumnName) {
+ final List<String> fieldNames =
DataType.getFieldNames(tableSemantics.dataType());
+ return IntStream.range(0, fieldNames.size())
+ .filter(i -> fieldNames.get(i).equals(opColumnName))
+ .findFirst();
+ }
/**
* Returns the input column indices that pass through to the function's
output, excluding the
@@ -61,10 +91,9 @@ public final class ChangelogTypeStrategyUtils {
}
private static Set<Integer> collectPartitionKeyIndices(final
TableSemantics tableSemantics) {
- return new HashSet<>(
- Arrays.stream(tableSemantics.partitionByColumns())
- .boxed()
- .collect(Collectors.toSet()));
+ return Arrays.stream(tableSemantics.partitionByColumns())
+ .boxed()
+ .collect(Collectors.toSet());
}
private ChangelogTypeStrategyUtils() {}
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
index d1d3c1c61da..f4d54f276d7 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/FromChangelogTypeStrategy.java
@@ -27,14 +27,17 @@ import org.apache.flink.table.types.DataType;
import org.apache.flink.table.types.inference.CallContext;
import org.apache.flink.table.types.inference.InputTypeStrategy;
import org.apache.flink.table.types.inference.TypeStrategy;
+import org.apache.flink.table.types.logical.LogicalType;
import org.apache.flink.table.types.logical.LogicalTypeFamily;
import org.apache.flink.types.ColumnList;
import org.apache.flink.types.RowKind;
+import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Optional;
+import java.util.OptionalInt;
import java.util.Set;
import java.util.stream.Collectors;
@@ -50,8 +53,6 @@ public final class FromChangelogTypeStrategy {
public static final int ARG_OP_MAPPING = 2;
public static final int ARG_ERROR_HANDLING = 3;
- public static final String DEFAULT_OP_COLUMN_NAME = "op";
-
private static final Set<String> VALID_ROW_KIND_NAMES =
Set.of("INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE");
@@ -86,7 +87,8 @@ public final class FromChangelogTypeStrategy {
new ValidationException(
"First argument must
be a table for FROM_CHANGELOG."));
- final String opColumnName = resolveOpColumnName(callContext);
+ final String opColumnName =
+
ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);
final List<Field> outputFields =
buildOutputFields(tableSemantics, opColumnName);
@@ -97,7 +99,6 @@ public final class FromChangelogTypeStrategy {
// Helpers
//
--------------------------------------------------------------------------------------------
- @SuppressWarnings("rawtypes")
private static Optional<List<DataType>> validateInputs(
final CallContext callContext, final boolean throwOnFailure) {
Optional<List<DataType>> error;
@@ -156,23 +157,26 @@ public final class FromChangelogTypeStrategy {
final CallContext callContext, final boolean throwOnFailure) {
final TableSemantics tableSemantics =
callContext.getTableSemantics(ARG_TABLE).get();
- final String opColumnName = resolveOpColumnName(callContext);
- final List<Field> inputFields =
DataType.getFields(tableSemantics.dataType());
- final Optional<Field> opField =
- inputFields.stream().filter(f ->
f.getName().equals(opColumnName)).findFirst();
- if (opField.isEmpty()) {
+ final String opColumnName =
ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);
+ final OptionalInt opIndex =
+
ChangelogTypeStrategyUtils.resolveOpColumnIndex(tableSemantics, opColumnName);
+ if (opIndex.isEmpty()) {
return callContext.fail(
throwOnFailure,
String.format(
"The op column '%s' does not exist in the input
schema.",
opColumnName));
}
- if
(!opField.get().getDataType().getLogicalType().is(LogicalTypeFamily.CHARACTER_STRING))
{
+ final LogicalType opFieldType =
+ DataType.getFieldDataTypes(tableSemantics.dataType())
+ .get(opIndex.getAsInt())
+ .getLogicalType();
+ if (!opFieldType.is(LogicalTypeFamily.CHARACTER_STRING)) {
return callContext.fail(
throwOnFailure,
String.format(
"The op column '%s' must be of STRING type, but
was '%s'.",
- opColumnName,
opField.get().getDataType().getLogicalType()));
+ opColumnName, opFieldType));
}
return Optional.empty();
}
@@ -273,21 +277,13 @@ public final class FromChangelogTypeStrategy {
return Optional.empty();
}
- private static String resolveOpColumnName(final CallContext callContext) {
- return callContext
- .getArgumentValue(ARG_OP, ColumnList.class)
- .filter(cl -> !cl.getNames().isEmpty())
- .map(cl -> cl.getNames().get(0))
- .orElse(DEFAULT_OP_COLUMN_NAME);
- }
-
private static List<Field> buildOutputFields(
final TableSemantics tableSemantics, final String opColumnName) {
final List<Field> inputFields =
DataType.getFields(tableSemantics.dataType());
-
- // Exclude the op column (becomes RowKind), keep all other columns
- return inputFields.stream()
- .filter(f -> !f.getName().equals(opColumnName))
+ return Arrays.stream(
+ ChangelogTypeStrategyUtils.computeOutputIndices(
+ tableSemantics, opColumnName))
+ .mapToObj(inputFields::get)
.collect(Collectors.toList());
}
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
index 4643a51be52..85efd0d4863 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogSemanticTests.java
@@ -39,9 +39,11 @@ public class FromChangelogSemanticTests extends
SemanticTestBase {
@Override
public List<TableTestProgram> programs() {
return List.of(
- FromChangelogTestPrograms.DEFAULT_OP_MAPPING,
+ FromChangelogTestPrograms.RETRACT,
FromChangelogTestPrograms.CUSTOM_OP_MAPPING,
FromChangelogTestPrograms.CUSTOM_OP_NAME,
+ FromChangelogTestPrograms.RETRACT_PARTITION_BY,
+ FromChangelogTestPrograms.DELETION_FLAG_PARTITION_BY,
FromChangelogTestPrograms.SKIP_INVALID_OP_HANDLING,
FromChangelogTestPrograms.SKIP_NULL_OP_CODE,
FromChangelogTestPrograms.TABLE_API_DEFAULT,
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
index 1858aabeed0..f7264ec3c92 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/nodes/exec/stream/FromChangelogTestPrograms.java
@@ -35,10 +35,8 @@ public class FromChangelogTestPrograms {
// SQL tests
//
--------------------------------------------------------------------------------------------
- public static final TableTestProgram DEFAULT_OP_MAPPING =
- TableTestProgram.of(
- "from-changelog-default-op-mapping",
- "default mapping with standard op names")
+ public static final TableTestProgram RETRACT =
+ TableTestProgram.of("from-changelog-retract", "retract changelog
with default mapping")
.setupTableSource(
SourceTestStep.newBuilder("cdc_stream")
.addSchema(SIMPLE_CDC_SCHEMA)
@@ -146,7 +144,6 @@ public class FromChangelogTestPrograms {
+ "error_handling => 'SKIP')")
.build();
- /** Custom op column name via DESCRIPTOR. */
public static final TableTestProgram CUSTOM_OP_NAME =
TableTestProgram.of(
"from-changelog-custom-op-name", "custom op column
name via DESCRIPTOR")
@@ -172,6 +169,62 @@ public class FromChangelogTestPrograms {
+ "op => DESCRIPTOR(operation))")
.build();
+ public static final TableTestProgram RETRACT_PARTITION_BY =
+ TableTestProgram.of(
+ "from-changelog-retract-partition-by",
+ "retract changelog with PARTITION BY")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema("name STRING", "id INT", "op
STRING")
+ .producedValues(
+ Row.of("Alice", 1, "INSERT"),
+ Row.of("Bob", 2, "INSERT"),
+ Row.of("Alice", 1,
"UPDATE_BEFORE"),
+ Row.of("Alice2", 1,
"UPDATE_AFTER"),
+ Row.of("Bob", 2, "DELETE"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "name STRING")
+ .consumedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"),
+ Row.ofKind(RowKind.UPDATE_BEFORE,
1, "Alice"),
+ Row.ofKind(RowKind.UPDATE_AFTER,
1, "Alice2"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream PARTITION BY
id)")
+ .build();
+
+ public static final TableTestProgram DELETION_FLAG_PARTITION_BY =
+ TableTestProgram.of(
+ "from-changelog-deletion-flag-partition-by",
+ "deletion flag mapping with PARTITION BY: 'false'
-> INSERT, 'true' -> DELETE")
+ .setupTableSource(
+ SourceTestStep.newBuilder("cdc_stream")
+ .addSchema("id INT", "deleted STRING",
"name STRING")
+ .producedValues(
+ Row.of(1, "false", "Alice"),
+ Row.of(2, "false", "Bob"),
+ Row.of(2, "true", "Bob"))
+ .build())
+ .setupTableSink(
+ SinkTestStep.newBuilder("sink")
+ .addSchema("id INT", "name STRING")
+ .consumedValues(
+ Row.ofKind(RowKind.INSERT, 1,
"Alice"),
+ Row.ofKind(RowKind.INSERT, 2,
"Bob"),
+ Row.ofKind(RowKind.DELETE, 2,
"Bob"))
+ .build())
+ .runSql(
+ "INSERT INTO sink SELECT * FROM FROM_CHANGELOG("
+ + "input => TABLE cdc_stream PARTITION BY
id, "
+ + "op => DESCRIPTOR(deleted), "
+ + "op_mapping => MAP['false', 'INSERT',
'true', 'DELETE'])")
+ .build();
+
//
--------------------------------------------------------------------------------------------
// Table API test
//
--------------------------------------------------------------------------------------------
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
index ba2c1a5690c..17d7f64fa48 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.java
@@ -46,7 +46,7 @@ public class FromChangelogTest extends TableTestBase {
}
@Test
- void testInsertOnlySource() {
+ void testRetract() {
util.tableEnv()
.executeSql(
"CREATE TABLE cdc_stream ("
@@ -59,20 +59,16 @@ public class FromChangelogTest extends TableTestBase {
}
@Test
- void testCustomOpMapping() {
+ void testRetractPartitionBy() {
util.tableEnv()
.executeSql(
"CREATE TABLE cdc_stream ("
+ " id INT,"
- + " __op STRING,"
+ + " op STRING,"
+ " name STRING"
+ ") WITH ('connector' = 'values')");
util.verifyRelPlan(
- "SELECT * FROM FROM_CHANGELOG("
- + "input => TABLE cdc_stream, "
- + "op => DESCRIPTOR(__op), "
- + "op_mapping => MAP['c, r', 'INSERT', 'ub',
'UPDATE_BEFORE', 'ua', 'UPDATE_AFTER', 'd', 'DELETE'], "
- + "error_handling => 'SKIP')",
+ "SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream
PARTITION BY id)",
CHANGELOG_MODE);
}
}
diff --git
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
index 614eb5456b9..2addfe9f5ff 100644
---
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
+++
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/stream/sql/FromChangelogTest.xml
@@ -16,41 +16,42 @@ See the License for the specific language governing
permissions and
limitations under the License.
-->
<Root>
- <TestCase name="testCustomOpMapping">
+ <TestCase name="testRetract">
<Resource name="sql">
- <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream, op =>
DESCRIPTOR(__op), op_mapping => MAP['c, r', 'INSERT', 'ub', 'UPDATE_BEFORE',
'ua', 'UPDATE_AFTER', 'd', 'DELETE'], error_handling => 'SKIP')]]>
+ <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(id=[$0], name=[$1])
-+- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0),
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16 [...]
- +- LogicalProject(id=[$0], __op=[$1], name=[$2])
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id,
VARCHAR(2147483647) name)])
+ +- LogicalProject(id=[$0], op=[$1], name=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database,
cdc_stream]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0),
DESCRIPTOR(_UTF-16LE'__op'), MAP(_UTF-16LE'c, r':VARCHAR(4) CHARACTER SET
"UTF-16LE", _UTF-16LE'INSERT':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ub':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_BEFORE':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'ua':VARCHAR(4) CHARACTER SET "UTF-16LE",
_UTF-16LE'UPDATE_AFTER':VARCHAR(13) CHARACTER SET "UTF-16LE",
_UTF-16LE'd':VARCHAR(4) CHARACTER SET "UTF-16LE", _UTF-16LE'DELE [...]
-+- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]],
fields=[id, __op, name], changelogMode=[I])
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name],
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)],
changelogMode=[I,UB,UA,D])
++- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]],
fields=[id, op, name], changelogMode=[I])
]]>
</Resource>
</TestCase>
- <TestCase name="testInsertOnlySource">
+ <TestCase name="testRetractPartitionBy">
<Resource name="sql">
- <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream)]]>
+ <![CDATA[SELECT * FROM FROM_CHANGELOG(input => TABLE cdc_stream
PARTITION BY id)]]>
</Resource>
<Resource name="ast">
<![CDATA[
LogicalProject(id=[$0], name=[$1])
-+- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], rowType=[RecordType(INTEGER id,
VARCHAR(2147483647) name)])
++- LogicalTableFunctionScan(invocation=[FROM_CHANGELOG(TABLE(#0) PARTITION
BY($0), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())],
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)])
+- LogicalProject(id=[$0], op=[$1], name=[$2])
+- LogicalTableScan(table=[[default_catalog, default_database,
cdc_stream]])
]]>
</Resource>
<Resource name="optimized rel plan">
<![CDATA[
-ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0), DEFAULT(),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[null], select=[id,name],
rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)],
changelogMode=[I,UB,UA,D])
-+- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]],
fields=[id, op, name], changelogMode=[I])
+ProcessTableFunction(invocation=[FROM_CHANGELOG(TABLE(#0) PARTITION BY($0),
DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT(), DEFAULT())], uid=[FROM_CHANGELOG],
select=[id,name], rowType=[RecordType(INTEGER id, VARCHAR(2147483647) name)],
changelogMode=[I,UB,UA,D])
++- Exchange(distribution=[hash[id]], changelogMode=[I])
+ +- TableSourceScan(table=[[default_catalog, default_database, cdc_stream]],
fields=[id, op, name], changelogMode=[I])
]]>
</Resource>
</TestCase>
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
index cc1f9deb21d..9a2607fb01b 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/functions/ptf/FromChangelogFunction.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.functions.FunctionContext;
import org.apache.flink.table.functions.SpecializedFunction.SpecializedContext;
import org.apache.flink.table.functions.TableSemantics;
import org.apache.flink.table.types.inference.CallContext;
+import
org.apache.flink.table.types.inference.strategies.ChangelogTypeStrategyUtils;
import org.apache.flink.table.types.inference.strategies.ErrorHandlingMode;
import org.apache.flink.table.types.logical.RowType;
import org.apache.flink.types.ColumnList;
@@ -40,13 +41,10 @@ import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;
import java.util.stream.Collectors;
-import java.util.stream.IntStream;
import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_ERROR_HANDLING;
-import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP;
import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_OP_MAPPING;
import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.ARG_TABLE;
-import static
org.apache.flink.table.types.inference.strategies.FromChangelogTypeStrategy.DEFAULT_OP_COLUMN_NAME;
/**
* Runtime implementation of {@link BuiltInFunctionDefinitions#FROM_CHANGELOG}.
@@ -88,14 +86,10 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
.orElseThrow(() -> new IllegalStateException("Table
argument expected."));
final RowType inputType = (RowType)
tableSemantics.dataType().getLogicalType();
- final String opColumnName = resolveOpColumnName(callContext);
+ final String opColumnName =
ChangelogTypeStrategyUtils.resolveOpColumnName(callContext);
this.opColumnIndex = inputType.getFieldNames().indexOf(opColumnName);
-
- // Exclude only the op column from output — all other columns pass
through
this.outputIndices =
- IntStream.range(0, inputType.getFieldCount())
- .filter(i -> i != opColumnIndex)
- .toArray();
+
ChangelogTypeStrategyUtils.computeOutputIndices(tableSemantics, opColumnName);
this.rawOpMap = buildOpMap(callContext);
@@ -114,13 +108,6 @@ public class FromChangelogFunction extends
BuiltInProcessTableFunction<RowData>
projectedOutput = ProjectedRowData.from(outputIndices);
}
- private static String resolveOpColumnName(final CallContext callContext) {
- return callContext
- .getArgumentValue(ARG_OP, ColumnList.class)
- .map(cl -> cl.getNames().get(0))
- .orElse(DEFAULT_OP_COLUMN_NAME);
- }
-
/**
* Builds a String-to-RowKind map. Keys in the provided mapping may be
comma-separated (e.g.,
* "INSERT, UPDATE_AFTER") to map multiple input codes to the same RowKind.