This is an automated email from the ASF dual-hosted git repository. gustavodemorais pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit c033b17fae27117fbfc6b007348e9499d7830023 Author: Ramin Gharib <[email protected]> AuthorDate: Thu May 28 16:00:22 2026 +0200 [FLINK-39636][table] Widen output column nullability for partial deletes When produces_full_deletes=false, the operator nulls out columns that are not part of the upsert key or PARTITION BY on DELETE rows. The output schema previously declared those columns as NOT NULL, mirroring the input, which conflicts with the values the operator emits at runtime. Read produces_full_deletes in OUTPUT_TYPE_STRATEGY and widen non-preserved columns to nullable. Preserved columns (upsert key plus partition keys) keep their declared nullability. The new ChangelogTypeStrategyUtils#computePreservedColumnIndices helper centralizes the index computation so the runtime and the type strategy stay in sync. Add ToChangelogOutputTypeStrategyTest covering all three cases: full deletes preserves NOT NULL, partial deletes with an upsert key widens only the non-key columns, and partial deletes without an upsert key widens everything. --- .../docs/sql/reference/queries/changelog.md | 4 + .../strategies/ChangelogTypeStrategyUtils.java | 24 +++++ .../strategies/ToChangelogTypeStrategy.java | 39 +++++++- .../ToChangelogOutputTypeStrategyTest.java | 110 +++++++++++++++++++++ 4 files changed, 174 insertions(+), 3 deletions(-) diff --git a/docs/content/docs/sql/reference/queries/changelog.md b/docs/content/docs/sql/reference/queries/changelog.md index 641bb7245f7..71539a9b8cb 100644 --- a/docs/content/docs/sql/reference/queries/changelog.md +++ b/docs/content/docs/sql/reference/queries/changelog.md @@ -449,6 +449,10 @@ SELECT * FROM TO_CHANGELOG(input => TABLE retract_source PARTITION BY id) The planner skips `ChangelogNormalize` and the function emits partial DELETE rows. This avoids the stateful normalization operator for upsert sources (e.g. Kafka compacted topics) where the full pre-image is not needed downstream. This requires an [upsert key](#upsert-key) to be present for the input table (row semantics) or `PARTITION BY` (set semantics); otherwise the call is rejected with a validation error. +{{< hint warning >}} +**Output nullability changes when `produces_full_deletes => false`.** Columns that are not part of the upsert key (or `PARTITION BY`) are nulled on DELETE rows at runtime, so the type system widens them to nullable in the output schema. Columns declared `NOT NULL` on the input therefore appear as nullable in the output of a `TO_CHANGELOG(..., produces_full_deletes => false)` call. The upsert-key (or partition-key) columns keep their declared nullability. Use the default `produces_full_de [...] +{{< /hint >}} + **Row semantics** (no `PARTITION BY`): the function preserves the planner-derived upsert key columns on DELETE rows and nulls the rest. The upsert key is typically a declared `PRIMARY KEY` when directly reading from a source or the key provided in a `GROUP BY <key>`. ```sql diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java index d4e20c30210..6adc885dd78 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ChangelogTypeStrategyUtils.java @@ -22,9 +22,11 @@ import org.apache.flink.annotation.Internal; import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; import org.apache.flink.table.types.inference.CallContext; +import org.apache.flink.table.utils.UpsertKeyUtils; import org.apache.flink.types.ColumnList; import java.util.Arrays; +import java.util.HashSet; import java.util.List; import java.util.OptionalInt; import java.util.Set; @@ -80,6 +82,28 @@ public final class ChangelogTypeStrategyUtils { return computeOutputIndices(tableSemantics, opIndex); } + /** + * Returns the set of input column indices whose values the function preserves on partial DELETE + * rows (i.e. those that {@code TO_CHANGELOG} keeps when {@code produces_full_deletes=false}). + * The set is the union of {@code PARTITION BY} columns (the framework prepends them outside the + * projected output) and the smallest upsert-key candidate exposed by {@link + * TableSemantics#upsertKeyColumns()}. + * + * <p>Returns only the partition keys when no upsert key candidate is exposed yet, for example + * during type inference where planner-derived metadata is not yet populated. + */ + public static Set<Integer> computePreservedColumnIndices(final TableSemantics tableSemantics) { + final Set<Integer> preserved = new HashSet<>(collectPartitionKeyIndices(tableSemantics)); + final List<int[]> upsertKeys = tableSemantics.upsertKeyColumns(); + if (upsertKeys == null || upsertKeys.isEmpty()) { + return preserved; + } + for (final int column : UpsertKeyUtils.smallestKey(upsertKeys)) { + preserved.add(column); + } + return preserved; + } + private static int[] computeOutputIndices( final TableSemantics tableSemantics, final int extraExcludedIndex) { final Set<Integer> excluded = collectPartitionKeyIndices(tableSemantics); diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java index 1434b221077..a2b8e13681b 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/types/inference/strategies/ToChangelogTypeStrategy.java @@ -32,6 +32,7 @@ import org.apache.flink.types.RowKind; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Map; @@ -84,8 +85,13 @@ public final class ToChangelogTypeStrategy { final String opColumnName = ChangelogTypeStrategyUtils.resolveOpColumnName(callContext); + final boolean producesFullDeletes = + callContext + .getArgumentValue(ARG_PRODUCES_FULL_DELETES, Boolean.class) + .orElse(true); - final List<Field> outputFields = buildOutputFields(semantics, opColumnName); + final List<Field> outputFields = + buildOutputFields(semantics, opColumnName, producesFullDeletes); return Optional.of(DataTypes.ROW(outputFields).notNull()); }; @@ -208,15 +214,42 @@ public final class ToChangelogTypeStrategy { return false; } + /** + * Builds the output {@link Field}s for the {@code TO_CHANGELOG} call. + * + * <p>When the function emits partial DELETE rows ({@code produces_full_deletes=false}), columns + * that are not part of the upsert key (or {@code PARTITION BY}) are nulled out at runtime. + * Those columns are widened to nullable here so the output schema matches what the operator can + * emit; preserved columns keep their declared nullability. + * + * <p>At type inference time the upsert key may not be exposed yet; in that case the helper + * returns only the partition keys and all projected columns are widened conservatively. + */ private static List<Field> buildOutputFields( - final TableSemantics semantics, final String opColumnName) { + final TableSemantics semantics, + final String opColumnName, + final boolean producesFullDeletes) { final List<Field> inputFields = DataType.getFields(semantics.dataType()); final int[] outputIndices = ChangelogTypeStrategyUtils.computeOutputIndices(semantics); final List<Field> outputFields = new ArrayList<>(); outputFields.add(DataTypes.FIELD(opColumnName, DataTypes.STRING())); - Arrays.stream(outputIndices).mapToObj(inputFields::get).forEach(outputFields::add); + final Set<Integer> preserved = + producesFullDeletes + ? Collections.emptySet() + : ChangelogTypeStrategyUtils.computePreservedColumnIndices(semantics); + Arrays.stream(outputIndices) + .mapToObj( + idx -> + producesFullDeletes || preserved.contains(idx) + ? inputFields.get(idx) + : asNullable(inputFields.get(idx))) + .forEach(outputFields::add); return outputFields; } + private static Field asNullable(final Field field) { + return DataTypes.FIELD(field.getName(), field.getDataType().nullable()); + } + private ToChangelogTypeStrategy() {} } diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java new file mode 100644 index 00000000000..e912370ef00 --- /dev/null +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/strategies/ToChangelogOutputTypeStrategyTest.java @@ -0,0 +1,110 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.types.inference.strategies; + +import org.apache.flink.table.api.DataTypes; +import org.apache.flink.table.types.DataType; +import org.apache.flink.table.types.inference.TypeStrategiesTestBase; +import org.apache.flink.table.types.inference.utils.TableSemanticsMock; +import org.apache.flink.types.ColumnList; + +import java.util.List; +import java.util.stream.Stream; + +import static org.apache.flink.table.types.inference.strategies.SpecificTypeStrategies.TO_CHANGELOG_OUTPUT_TYPE_STRATEGY; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_OP_MAPPING; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_PRODUCES_FULL_DELETES; +import static org.apache.flink.table.types.inference.strategies.ToChangelogTypeStrategy.ARG_TABLE; + +/** Tests for {@link ToChangelogTypeStrategy#OUTPUT_TYPE_STRATEGY}. */ +class ToChangelogOutputTypeStrategyTest extends TypeStrategiesTestBase { + + private static final DataType TABLE_TYPE_NOT_NULL_SCORE = + DataTypes.ROW( + DataTypes.FIELD("name", DataTypes.STRING().notNull()), + DataTypes.FIELD("score", DataTypes.BIGINT().notNull())); + + private static final DataType DESCRIPTOR_TYPE = DataTypes.DESCRIPTOR(); + private static final DataType MAP_TYPE = DataTypes.MAP(DataTypes.STRING(), DataTypes.STRING()); + private static final DataType BOOLEAN_TYPE = DataTypes.BOOLEAN(); + + @Override + protected Stream<TestSpec> testData() { + return Stream.of( + TestSpec.forStrategy( + "produces_full_deletes=true preserves NOT NULL on input columns", + TO_CHANGELOG_OUTPUT_TYPE_STRATEGY) + .inputTypes( + TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt( + ARG_TABLE, new TableSemanticsMock(TABLE_TYPE_NOT_NULL_SCORE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, null) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, true) + .expectDataType( + DataTypes.ROW( + DataTypes.FIELD("op", DataTypes.STRING()), + DataTypes.FIELD( + "name", DataTypes.STRING().notNull()), + DataTypes.FIELD( + "score", DataTypes.BIGINT().notNull())) + .notNull()), + TestSpec.forStrategy( + "produces_full_deletes=false widens non-upsert-key columns to nullable", + TO_CHANGELOG_OUTPUT_TYPE_STRATEGY) + .inputTypes( + TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt( + ARG_TABLE, + new TableSemanticsMock( + TABLE_TYPE_NOT_NULL_SCORE, + new int[0], + new int[0], + -1, + null, + List.of(new int[] {0}))) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, null) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false) + .expectDataType( + DataTypes.ROW( + DataTypes.FIELD("op", DataTypes.STRING()), + DataTypes.FIELD( + "name", DataTypes.STRING().notNull()), + DataTypes.FIELD("score", DataTypes.BIGINT())) + .notNull()), + TestSpec.forStrategy( + "produces_full_deletes=false without upsert key widens all columns", + TO_CHANGELOG_OUTPUT_TYPE_STRATEGY) + .inputTypes( + TABLE_TYPE_NOT_NULL_SCORE, DESCRIPTOR_TYPE, MAP_TYPE, BOOLEAN_TYPE) + .calledWithTableSemanticsAt( + ARG_TABLE, new TableSemanticsMock(TABLE_TYPE_NOT_NULL_SCORE)) + .calledWithLiteralAt(ARG_OP, ColumnList.of("op")) + .calledWithLiteralAt(ARG_OP_MAPPING, null) + .calledWithLiteralAt(ARG_PRODUCES_FULL_DELETES, false) + .expectDataType( + DataTypes.ROW( + DataTypes.FIELD("op", DataTypes.STRING()), + DataTypes.FIELD("name", DataTypes.STRING()), + DataTypes.FIELD("score", DataTypes.BIGINT())) + .notNull())); + } +}
