This is an automated email from the ASF dual-hosted git repository. gustavodemorais pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit d763453e045ad445c7d5a8382559d34a9429a1e9 Author: Ramin Gharib <[email protected]> AuthorDate: Tue May 26 10:56:29 2026 +0200 [FLINK-39735][table] Expose input upsert key on TableSemantics Adds upsertKeyColumns() to TableSemantics so ProcessTableFunctions can read the planner-derived upsert key candidates of each table input at specialization time. The planner exposes all candidates; picking one is the function's responsibility. UpsertKeyUtils provides a stable smallestKey helper for that choice. Co-Authored-By: Jubin Soni <[email protected]> --- .../resolver/rules/ResolveCallByArgumentsRule.java | 5 ++ .../flink/table/functions/TableSemantics.java | 24 ++++++++ .../apache/flink/table/utils/UpsertKeyUtils.java | 67 ++++++++++++++++++++++ .../types/inference/utils/TableSemanticsMock.java | 25 ++++++++ .../functions/bridging/BridgingSqlFunction.java | 19 +++++- .../inference/CallBindingCallContext.java | 6 ++ .../inference/OperatorBindingCallContext.java | 47 +++++++++++++-- .../stream/StreamExecProcessTableFunction.java | 32 +++++++++-- .../stream/StreamPhysicalProcessTableFunction.java | 19 +++++- .../table/planner/plan/utils/UpsertKeyUtil.java | 23 +++----- .../codegen/ProcessTableRunnerGenerator.scala | 10 +++- .../plan/to-changelog-retract-restore.json | 3 +- .../operators/process/RuntimeTableSemantics.java | 11 +++- ...essSetTableOperatorInterruptibleTimersTest.java | 3 +- .../functions/TestHarnessTableSemantics.java | 14 +++++ 15 files changed, 273 insertions(+), 35 deletions(-) diff --git a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java index be8e578deea..5e4a02cb9fb 100644 --- a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java +++ b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/expressions/resolver/rules/ResolveCallByArgumentsRule.java @@ -781,6 +781,11 @@ final class ResolveCallByArgumentsRule implements ResolverRule { return Optional.empty(); } + @Override + public List<int[]> upsertKeyColumns() { + return Collections.emptyList(); + } + private PartitionQueryOperation findPartitionOperation(QueryOperation op) { if (op instanceof PartitionQueryOperation) { return (PartitionQueryOperation) op; diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java index f63566befce..3d17401c5ef 100644 --- a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/functions/TableSemantics.java @@ -24,6 +24,7 @@ import org.apache.flink.table.api.DataTypes; import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.types.DataType; +import java.util.List; import java.util.Optional; /** @@ -128,6 +129,29 @@ public interface TableSemantics { */ Optional<ChangelogMode> changelogMode(); + /** + * Upsert key candidates derived from the passed table's metadata. + * + * <p>Returns a list of 0-based column index arrays that uniquely identify a row for upsert + * semantics. This is distinct from {@link #partitionByColumns()}: partition keys describe + * distribution and co-location, upsert keys describe row identity. Useful for functions that + * need to emit key-only deletes, match UPDATE_BEFORE / UPDATE_AFTER pairs, or want to have a + * unique identifier to interact with state. + * + * <p>Returns an empty list when no upsert key is derivable, or when the planner has not yet + * computed metadata (during type inference). + * + * <p>When the planner derives multiple candidate upsert keys for the same input (e.g., a table + * with several primary key constraints), all of them are returned. Picking which candidate to + * use is the function's responsibility, and the choice must be stable across releases to keep + * PTF state consistent after job restarts and upgrades. The order of the returned list is not + * part of the contract; PTF authors should not depend on it. A typical choice is the smallest + * candidate by cardinality, with ties broken by the column indices in ascending order. + * + * @return Candidate upsert keys of the passed table, or an empty list if none. + */ + List<int[]> upsertKeyColumns(); + /** The sort direction for ORDER BY columns in table arguments with set semantics. */ @PublicEvolving enum SortDirection { diff --git a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/UpsertKeyUtils.java b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/UpsertKeyUtils.java new file mode 100644 index 00000000000..528a977d037 --- /dev/null +++ b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/utils/UpsertKeyUtils.java @@ -0,0 +1,67 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.table.utils; + +import org.apache.flink.annotation.Internal; + +import java.util.Comparator; +import java.util.List; + +/** Helpers for working with upsert key candidates. */ +@Internal +public final class UpsertKeyUtils { + + /** + * Comparator that orders upsert-key candidates deterministically and stably across releases: + * candidates with fewer columns come first; ties between equal-cardinality candidates are + * broken by the column indices in ascending order, leading column first. + */ + private static final Comparator<int[]> SMALLEST_FIRST = + Comparator.<int[]>comparingInt(a -> a.length) + .thenComparing( + (a, b) -> { + for (int i = 0; i < a.length; i++) { + final int cmp = Integer.compare(a[i], b[i]); + if (cmp != 0) { + return cmp; + } + } + return 0; + }); + + /** + * Picks the smallest upsert key from the given candidates using {@link #SMALLEST_FIRST}. + * Returns an empty array when the candidate list is empty. The returned reference is one of the + * input arrays; callers must not mutate it. + */ + public static int[] smallestKey(final List<int[]> candidates) { + if (candidates.isEmpty()) { + return new int[0]; + } + int[] smallest = candidates.get(0); + for (int i = 1; i < candidates.size(); i++) { + if (SMALLEST_FIRST.compare(candidates.get(i), smallest) < 0) { + smallest = candidates.get(i); + } + } + return smallest; + } + + private UpsertKeyUtils() {} +} diff --git a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java index fe881f8fd1f..9b9830870fd 100644 --- a/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java +++ b/flink-table/flink-table-common/src/test/java/org/apache/flink/table/types/inference/utils/TableSemanticsMock.java @@ -24,6 +24,8 @@ import org.apache.flink.table.types.DataType; import javax.annotation.Nullable; +import java.util.Collections; +import java.util.List; import java.util.Optional; /** Mock implementation of {@link TableSemantics} for testing purposes. */ @@ -35,6 +37,7 @@ public class TableSemanticsMock implements TableSemantics { private final SortDirection[] orderByDirections; private final int timeColumn; private final ChangelogMode changelogMode; + private final List<int[]> upsertKeyColumns; public TableSemanticsMock(DataType dataType) { this(dataType, new int[0], new int[0], -1, null); @@ -46,6 +49,22 @@ public class TableSemanticsMock implements TableSemantics { int[] orderByColumns, int timeColumn, @Nullable ChangelogMode changelogMode) { + this( + dataType, + partitionByColumns, + orderByColumns, + timeColumn, + changelogMode, + Collections.emptyList()); + } + + public TableSemanticsMock( + DataType dataType, + int[] partitionByColumns, + int[] orderByColumns, + int timeColumn, + @Nullable ChangelogMode changelogMode, + List<int[]> upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; this.orderByColumns = orderByColumns; @@ -55,6 +74,7 @@ public class TableSemanticsMock implements TableSemantics { } this.timeColumn = timeColumn; this.changelogMode = changelogMode; + this.upsertKeyColumns = upsertKeyColumns; } @Override @@ -86,4 +106,9 @@ public class TableSemanticsMock implements TableSemantics { public Optional<ChangelogMode> changelogMode() { return Optional.ofNullable(changelogMode); } + + @Override + public List<int[]> upsertKeyColumns() { + return upsertKeyColumns; + } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java index 2c393adf875..900d8b6a0bd 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/bridging/BridgingSqlFunction.java @@ -335,7 +335,7 @@ public class BridgingSqlFunction extends SqlFunction { * scalar arguments through the same coercion path as validation. */ public CallContext toCallContext(RexCall call) { - return toCallContext(call, null, null, null); + return toCallContext(call, null, null, null, null); } /** @@ -348,6 +348,20 @@ public class BridgingSqlFunction extends SqlFunction { @Nullable List<Integer> inputTimeColumns, @Nullable List<ChangelogMode> inputChangelogModes, @Nullable ChangelogMode outputChangelogMode) { + return toCallContext( + call, inputTimeColumns, inputChangelogModes, outputChangelogMode, null); + } + + /** + * Variant that additionally exposes the call's input upsert keys. Used by the streaming codegen + * path so PTFs can specialize themselves on the input's row-identity information. + */ + public CallContext toCallContext( + RexCall call, + @Nullable List<Integer> inputTimeColumns, + @Nullable List<ChangelogMode> inputChangelogModes, + @Nullable ChangelogMode outputChangelogMode, + @Nullable List<List<int[]>> inputUpsertKeys) { return new OperatorBindingCallContext( dataTypeFactory, getDefinition(), @@ -355,7 +369,8 @@ public class BridgingSqlFunction extends SqlFunction { call.getType(), inputTimeColumns, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } /** diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java index 065a3033545..b86fe9b4175 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/CallBindingCallContext.java @@ -50,6 +50,7 @@ import org.apache.calcite.sql.type.SqlTypeName; import javax.annotation.Nullable; import java.util.AbstractList; +import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; @@ -312,6 +313,11 @@ public final class CallBindingCallContext extends AbstractSqlCallContext { public Optional<ChangelogMode> changelogMode() { return Optional.empty(); } + + @Override + public List<int[]> upsertKeyColumns() { + return Collections.emptyList(); + } } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java index f31406dad19..7d1d3940899 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/functions/inference/OperatorBindingCallContext.java @@ -64,13 +64,14 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { private final @Nullable List<Integer> inputTimeColumns; private final @Nullable List<ChangelogMode> inputChangelogModes; private final @Nullable ChangelogMode outputChangelogMode; + private final @Nullable List<List<int[]>> inputUpsertKeys; public OperatorBindingCallContext( DataTypeFactory dataTypeFactory, FunctionDefinition definition, SqlOperatorBinding binding, RelDataType returnRelDataType) { - this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null); + this(dataTypeFactory, definition, binding, returnRelDataType, null, null, null, null); } public OperatorBindingCallContext( @@ -81,6 +82,26 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { @Nullable List<Integer> inputTimeColumns, @Nullable List<ChangelogMode> inputChangelogModes, @Nullable ChangelogMode outputChangelogMode) { + this( + dataTypeFactory, + definition, + binding, + returnRelDataType, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + null); + } + + public OperatorBindingCallContext( + DataTypeFactory dataTypeFactory, + FunctionDefinition definition, + SqlOperatorBinding binding, + RelDataType returnRelDataType, + @Nullable List<Integer> inputTimeColumns, + @Nullable List<ChangelogMode> inputChangelogModes, + @Nullable ChangelogMode outputChangelogMode, + @Nullable List<List<int[]>> inputUpsertKeys) { super( dataTypeFactory, definition, @@ -109,6 +130,7 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { this.inputTimeColumns = inputTimeColumns; this.inputChangelogModes = inputChangelogModes; this.outputChangelogMode = outputChangelogMode; + this.inputUpsertKeys = inputUpsertKeys; } @Override @@ -173,13 +195,18 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { Optional.ofNullable(inputChangelogModes) .map(m -> m.get(tableArgCall.getInputIndex())) .orElse(null); + final List<int[]> upsertKeys = + Optional.ofNullable(inputUpsertKeys) + .map(m -> m.get(tableArgCall.getInputIndex())) + .orElse(List.of()); return Optional.of( OperatorBindingTableSemantics.create( argumentDataTypes.get(pos), staticArg, tableArgCall, timeColumn, - changelogMode)); + changelogMode, + upsertKeys)); } @Override @@ -283,20 +310,23 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { private final SortDirection[] orderByDirections; private final int timeColumn; private final @Nullable ChangelogMode changelogMode; + private final List<int[]> upsertKeyColumns; public static OperatorBindingTableSemantics create( DataType tableDataType, StaticArgument staticArg, RexTableArgCall tableArgCall, int timeColumn, - @Nullable ChangelogMode changelogMode) { + @Nullable ChangelogMode changelogMode, + List<int[]> upsertKeyColumns) { return new OperatorBindingTableSemantics( createDataType(tableDataType, staticArg), tableArgCall.getPartitionKeys(), tableArgCall.getOrderKeys(), RexTableArgCall.toSortDirections(tableArgCall.getSortOrder()), timeColumn, - changelogMode); + changelogMode, + upsertKeyColumns); } private OperatorBindingTableSemantics( @@ -305,13 +335,15 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { int[] orderByColumns, SortDirection[] orderByDirections, int timeColumn, - @Nullable ChangelogMode changelogMode) { + @Nullable ChangelogMode changelogMode, + List<int[]> upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; this.orderByColumns = orderByColumns; this.orderByDirections = orderByDirections; this.timeColumn = timeColumn; this.changelogMode = changelogMode; + this.upsertKeyColumns = upsertKeyColumns; } private static DataType createDataType(DataType tableDataType, StaticArgument staticArg) { @@ -353,5 +385,10 @@ public final class OperatorBindingCallContext extends AbstractSqlCallContext { public Optional<ChangelogMode> changelogMode() { return Optional.ofNullable(changelogMode); } + + @Override + public List<int[]> upsertKeyColumns() { + return upsertKeyColumns; + } } } diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java index 3973329af74..e0cf5f09187 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/stream/StreamExecProcessTableFunction.java @@ -69,6 +69,7 @@ import org.apache.flink.table.types.inference.TypeInferenceUtil.StateInfo; import org.apache.flink.table.types.logical.RowType; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator; +import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonInclude; import org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty; import org.apache.calcite.linq4j.Ord; @@ -108,6 +109,7 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> public static final String FIELD_NAME_FUNCTION_CALL = "functionCall"; public static final String FIELD_NAME_INPUT_CHANGELOG_MODES = "inputChangelogModes"; public static final String FIELD_NAME_OUTPUT_CHANGELOG_MODE = "outputChangelogMode"; + public static final String FIELD_NAME_INPUT_UPSERT_KEYS = "inputUpsertKeys"; @JsonProperty(FIELD_NAME_UID) private final @Nullable String uid; @@ -121,6 +123,10 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) private final ChangelogMode outputChangelogMode; + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) + @JsonInclude(JsonInclude.Include.NON_EMPTY) + private final List<List<int[]>> inputUpsertKeys; + public StreamExecProcessTableFunction( ReadableConfig tableConfig, List<InputProperty> inputProperties, @@ -129,7 +135,8 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> @Nullable String uid, RexCall invocation, List<ChangelogMode> inputChangelogModes, - ChangelogMode outputChangelogMode) { + ChangelogMode outputChangelogMode, + List<List<int[]>> inputUpsertKeys) { this( ExecNodeContext.newNodeId(), ExecNodeContext.newContext(StreamExecProcessTableFunction.class), @@ -141,7 +148,8 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> uid, invocation, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } @JsonCreator @@ -155,7 +163,9 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> @JsonProperty(FIELD_NAME_UID) @Nullable String uid, @JsonProperty(FIELD_NAME_FUNCTION_CALL) RexNode invocation, @JsonProperty(FIELD_NAME_INPUT_CHANGELOG_MODES) List<ChangelogMode> inputChangelogModes, - @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode) { + @JsonProperty(FIELD_NAME_OUTPUT_CHANGELOG_MODE) ChangelogMode outputChangelogMode, + @JsonProperty(FIELD_NAME_INPUT_UPSERT_KEYS) + @Nullable List<List<int[]>> inputUpsertKeys) { super(id, context, persistedConfig, inputProperties, outputType, description); this.uid = uid; // Mirror the FlinkLogicalTableFunctionScan converter for the compiled-plan restore path: @@ -164,6 +174,7 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> this.invocation = BridgingSqlFunction.resolveCallTraits((RexCall) invocation); this.inputChangelogModes = inputChangelogModes; this.outputChangelogMode = outputChangelogMode; + this.inputUpsertKeys = inputUpsertKeys != null ? inputUpsertKeys : List.of(); } public @Nullable String getUid() { @@ -202,7 +213,12 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> final RexCall udfCall = StreamPhysicalProcessTableFunction.toUdfCall(invocation); final GeneratedRunnerResult generated = ProcessTableRunnerGenerator.generate( - ctx, udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode); + ctx, + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + inputUpsertKeys); final GeneratedProcessTableRunner generatedRunner = generated.runner(); final LinkedHashMap<String, StateInfo> stateInfos = generated.stateInfos(); @@ -310,9 +326,12 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> final int timeColumn = inputTimeColumns.get(tableArgCall.getInputIndex()); + final int inputIndex = tableArgCall.getInputIndex(); + final List<int[]> upsertKeys = + inputIndex < inputUpsertKeys.size() ? inputUpsertKeys.get(inputIndex) : List.of(); return new RuntimeTableSemantics( tableArg.getName(), - tableArgCall.getInputIndex(), + inputIndex, dataType, tableArgCall.getPartitionKeys(), tableArgCall.getOrderKeys(), @@ -320,7 +339,8 @@ public class StreamExecProcessTableFunction extends ExecNodeBase<RowData> consumedChangelogMode, tableArg.is(StaticArgumentTrait.PASS_COLUMNS_THROUGH), tableArg.is(StaticArgumentTrait.SET_SEMANTIC_TABLE), - timeColumn); + timeColumn, + upsertKeys); } private Transformation<RowData> createKeyedTransformation( diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java index 5ccecf18e71..a337c250860 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalProcessTableFunction.java @@ -27,6 +27,7 @@ import org.apache.flink.table.functions.ProcessTableFunction; import org.apache.flink.table.planner.calcite.FlinkTypeFactory; import org.apache.flink.table.planner.calcite.RexTableArgCall; import org.apache.flink.table.planner.functions.bridging.BridgingSqlFunction; +import org.apache.flink.table.planner.plan.metadata.FlinkRelMetadataQuery; import org.apache.flink.table.planner.plan.nodes.exec.ExecNode; import org.apache.flink.table.planner.plan.nodes.exec.InputProperty; import org.apache.flink.table.planner.plan.nodes.exec.stream.StreamExecProcessTableFunction; @@ -62,6 +63,7 @@ import org.checkerframework.checker.nullness.qual.Nullable; import java.util.ArrayList; import java.util.Arrays; +import java.util.Collections; import java.util.HashSet; import java.util.List; import java.util.Objects; @@ -165,6 +167,20 @@ public class StreamPhysicalProcessTableFunction extends AbstractRelNode verifyTimeAttributes(getInputs(), call, inputChangelogModes, outputChangelogMode); final List<Ord<StaticArgument>> providedInputArgs = getProvidedInputArgs(call); verifyPassThroughColumnsForUpdates(providedInputArgs, outputChangelogMode); + final FlinkRelMetadataQuery fmq = + FlinkRelMetadataQuery.reuseOrCreate(getCluster().getMetadataQuery()); + final List<List<int[]>> inputUpsertKeys = + getInputs().stream() + .map( + input -> { + final Set<ImmutableBitSet> keys = fmq.getUpsertKeys(input); + return keys == null + ? Collections.<int[]>emptyList() + : keys.stream() + .map(ImmutableBitSet::toArray) + .collect(Collectors.toList()); + }) + .collect(Collectors.toList()); return new StreamExecProcessTableFunction( unwrapTableConfig(this), getInputs().stream().map(i -> InputProperty.DEFAULT).collect(Collectors.toList()), @@ -173,7 +189,8 @@ public class StreamPhysicalProcessTableFunction extends AbstractRelNode uid, call, inputChangelogModes, - outputChangelogMode); + outputChangelogMode, + inputUpsertKeys); } @Override diff --git a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java index cc301118b29..af99d92e9ed 100644 --- a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java +++ b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/utils/UpsertKeyUtil.java @@ -18,13 +18,17 @@ package org.apache.flink.table.planner.plan.utils; +import org.apache.flink.table.utils.UpsertKeyUtils; + import org.apache.calcite.util.ImmutableBitSet; import javax.annotation.Nonnull; import javax.annotation.Nullable; +import java.util.List; import java.util.Optional; import java.util.Set; +import java.util.stream.Collectors; /** * Utility for upsertKey which represented as a Set of {@link @@ -55,21 +59,8 @@ public class UpsertKeyUtil { if (null == upsertKeys || upsertKeys.isEmpty()) { return Optional.empty(); } - return upsertKeys.stream() - .map(ImmutableBitSet::toArray) - .reduce( - (k1, k2) -> { - if (k1.length < k2.length) { - return k1; - } - if (k1.length == k2.length) { - for (int index = 0; index < k1.length; index++) { - if (k1[index] < k2[index]) { - return k1; - } - } - } - return k2; - }); + final List<int[]> asArrays = + upsertKeys.stream().map(ImmutableBitSet::toArray).collect(Collectors.toList()); + return Optional.of(UpsertKeyUtils.smallestKey(asArrays)); } } diff --git a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala index 52df803d5c8..ea9f5ca4c77 100644 --- a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala +++ b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/codegen/ProcessTableRunnerGenerator.scala @@ -65,7 +65,8 @@ object ProcessTableRunnerGenerator { udfCall: RexCall, inputTimeColumns: java.util.List[Integer], inputChangelogModes: java.util.List[ChangelogMode], - outputChangelogMode: ChangelogMode): GeneratedRunnerResult = { + outputChangelogMode: ChangelogMode, + inputUpsertKeys: java.util.List[java.util.List[Array[Int]]]): GeneratedRunnerResult = { val function: BridgingSqlFunction = udfCall.getOperator.asInstanceOf[BridgingSqlFunction] val definition: FunctionDefinition = function.getDefinition val dataTypeFactory = function.getDataTypeFactory @@ -77,7 +78,12 @@ object ProcessTableRunnerGenerator { // Thus, functions can reconfigure themselves for the exact use case. // Including updating their state layout. val callContext = - function.toCallContext(udfCall, inputTimeColumns, inputChangelogModes, outputChangelogMode) + function.toCallContext( + udfCall, + inputTimeColumns, + inputChangelogModes, + outputChangelogMode, + inputUpsertKeys) // Create the final UDF for runtime val udf = UserDefinedFunctionHelper.createSpecializedFunction( diff --git a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json index ea961c665ca..324327cad4b 100644 --- a/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json +++ b/flink-table/flink-table-planner/src/test/resources/restore-tests/stream-exec-process-table-function_1/to-changelog-retract-restore/plan/to-changelog-retract-restore.json @@ -78,7 +78,8 @@ "type" : "ROW<`op` VARCHAR(2147483647), `name` VARCHAR(2147483647) NOT NULL, `score` BIGINT> NOT NULL" }, "inputChangelogModes" : [ [ "INSERT", "UPDATE_BEFORE", "UPDATE_AFTER", "DELETE" ] ], - "outputChangelogMode" : [ "INSERT" ] + "outputChangelogMode" : [ "INSERT" ], + "inputUpsertKeys" : [ [ [ 0 ] ] ] }, { "id" : 3, "type" : "stream-exec-sink_2", diff --git a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java index cabab4c6131..7f0fbf0c3d3 100644 --- a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java +++ b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/process/RuntimeTableSemantics.java @@ -24,6 +24,7 @@ import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; import java.io.Serializable; +import java.util.List; import java.util.Optional; /** @@ -44,6 +45,7 @@ public class RuntimeTableSemantics implements TableSemantics, Serializable { private final boolean passColumnsThrough; private final boolean hasSetSemantics; private final int timeColumn; + private final List<int[]> upsertKeyColumns; private transient ChangelogMode changelogMode; @@ -57,7 +59,8 @@ public class RuntimeTableSemantics implements TableSemantics, Serializable { RuntimeChangelogMode consumedChangelogMode, boolean passColumnsThrough, boolean hasSetSemantics, - int timeColumn) { + int timeColumn, + List<int[]> upsertKeyColumns) { this.argName = argName; this.inputIndex = inputIndex; this.dataType = dataType; @@ -68,6 +71,7 @@ public class RuntimeTableSemantics implements TableSemantics, Serializable { this.passColumnsThrough = passColumnsThrough; this.hasSetSemantics = hasSetSemantics; this.timeColumn = timeColumn; + this.upsertKeyColumns = upsertKeyColumns; } public String getArgName() { @@ -122,4 +126,9 @@ public class RuntimeTableSemantics implements TableSemantics, Serializable { public Optional<ChangelogMode> changelogMode() { return Optional.of(getChangelogMode()); } + + @Override + public List<int[]> upsertKeyColumns() { + return upsertKeyColumns; + } } diff --git a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java index be390ab5f55..15258ff24cb 100644 --- a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java +++ b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/process/ProcessSetTableOperatorInterruptibleTimersTest.java @@ -246,7 +246,8 @@ class ProcessSetTableOperatorInterruptibleTimersTest { RuntimeChangelogMode.serialize(ChangelogMode.insertOnly()), /* passColumnsThrough */ false, /* hasSetSemantics */ true, - /* timeColumn */ 1); + /* timeColumn */ 1, + /* upsertKeyColumns */ List.of()); } // -------------------------------------------------------------------------------------------- diff --git a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java index fadf21d7dd9..91edd9a059d 100644 --- a/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java +++ b/flink-table/flink-table-test-utils/src/main/java/org/apache/flink/table/runtime/functions/TestHarnessTableSemantics.java @@ -23,6 +23,8 @@ import org.apache.flink.table.connector.ChangelogMode; import org.apache.flink.table.functions.TableSemantics; import org.apache.flink.table.types.DataType; +import java.util.Collections; +import java.util.List; import java.util.Optional; /** {@link TableSemantics} implementation for {@link ProcessTableFunctionTestHarness}. */ @@ -30,10 +32,17 @@ import java.util.Optional; class TestHarnessTableSemantics implements TableSemantics { private final DataType dataType; private final int[] partitionByColumns; + private final List<int[]> upsertKeyColumns; TestHarnessTableSemantics(DataType dataType, int[] partitionByColumns) { + this(dataType, partitionByColumns, Collections.emptyList()); + } + + TestHarnessTableSemantics( + DataType dataType, int[] partitionByColumns, List<int[]> upsertKeyColumns) { this.dataType = dataType; this.partitionByColumns = partitionByColumns; + this.upsertKeyColumns = upsertKeyColumns; } @Override @@ -65,4 +74,9 @@ class TestHarnessTableSemantics implements TableSemantics { public Optional<ChangelogMode> changelogMode() { return Optional.empty(); } + + @Override + public List<int[]> upsertKeyColumns() { + return upsertKeyColumns; + } }
