This is an automated email from the ASF dual-hosted git repository.
xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push:
new f1d0ab659f2 [FLINK-37541][table-planner] Deprecate getTargetColumns in
DynamicTableSink.Context (#26381)
f1d0ab659f2 is described below
commit f1d0ab659f28fede23c2ec3adae0c08ceec84196
Author: xiangyu0xf <[email protected]>
AuthorDate: Mon Oct 13 17:34:48 2025 +0800
[FLINK-37541][table-planner] Deprecate getTargetColumns in
DynamicTableSink.Context (#26381)
---
.../apache/flink/table/connector/sink/DynamicTableSink.java | 3 +++
.../planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java | 6 ++++++
.../apache/flink/table/planner/plan/reuse/SinkReuser.java | 11 ++---------
.../table/planner/plan/nodes/calcite/LogicalSink.scala | 12 ++++++++++--
.../flink/table/planner/plan/nodes/calcite/Sink.scala | 11 +++++++++--
.../table/planner/plan/nodes/logical/FlinkLogicalSink.scala | 12 ++++++++++--
.../plan/nodes/physical/batch/BatchPhysicalSink.scala | 12 ++++++++++--
.../plan/nodes/physical/stream/StreamPhysicalSink.scala | 12 ++++++++++--
.../table/planner/factories/TestValuesTableFactory.java | 13 ++++++-------
.../runtime/connector/sink/SinkRuntimeProviderContext.java | 1 +
10 files changed, 67 insertions(+), 26 deletions(-)
diff --git
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
index 8cd344ca98a..7ddabb31a4c 100644
---
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
+++
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
@@ -190,7 +190,10 @@ public interface DynamicTableSink {
* </ul>
*
* <p>Note: will always return empty for the delete statement because
it has no column list.
+ *
+ * @deprecated use {@link SupportsTargetColumnWriting} instead.
*/
+ @Deprecated(since = "2.2")
Optional<int[][]> getTargetColumns();
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
index 6fd2914f4d9..38486dd2883 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.factories.FactoryUtil;
import org.apache.flink.table.module.Module;
import org.apache.flink.table.planner.calcite.FlinkContext;
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import
org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
import
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
@@ -52,6 +53,7 @@ public class DynamicTableSinkSpec extends
DynamicTableSpecBase {
private final ContextResolvedTable contextResolvedTable;
private final @Nullable List<SinkAbilitySpec> sinkAbilities;
+ @Deprecated(since = "2.2")
private final @Nullable int[][] targetColumns;
private DynamicTableSink tableSink;
@@ -99,9 +101,13 @@ public class DynamicTableSinkSpec extends
DynamicTableSpecBase {
return tableSink;
}
+ /**
+ * @deprecated use {@link TargetColumnWritingSpec} instead.
+ */
@JsonGetter(FIELD_NAME_TARGET_COLUMNS)
@JsonInclude(JsonInclude.Include.NON_EMPTY)
@Nullable
+ @Deprecated(since = "2.2")
public int[][] getTargetColumns() {
return targetColumns;
}
diff --git
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
index 052004e4ac4..d02aba0d64c 100644
---
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
+++
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.reuse;
import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
-import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
import
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
import
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalUnion;
@@ -217,18 +216,12 @@ public class SinkReuser {
this.originalSinks.add(sink);
this.inputTraitSet = sink.getInput().getTraitSet();
this.digest = getDigest(sink);
- this.sinkAbilitySpecs =
- isStreamingMode
- ? ((StreamPhysicalSink) sink).abilitySpecs()
- : ((BatchPhysicalSink) sink).abilitySpecs();
+ this.sinkAbilitySpecs = sink.abilitySpecs();
}
public boolean canBeReused(Sink sinkNode) {
String currentSinkDigest = getDigest(sinkNode);
- SinkAbilitySpec[] currentSinkSpecs =
- isStreamingMode
- ? ((StreamPhysicalSink) sinkNode).abilitySpecs()
- : ((BatchPhysicalSink) sinkNode).abilitySpecs();
+ SinkAbilitySpec[] currentSinkSpecs = sinkNode.abilitySpecs();
RelTraitSet currentInputTraitSet =
sinkNode.getInput().getTraitSet();
// Only table sink with the same digest, specs and input trait set
can be reused
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala
index 17c58473bd1..4d9e3008376 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala
@@ -42,8 +42,16 @@ final class LogicalSink(
tableSink: DynamicTableSink,
targetColumns: Array[Array[Int]],
val staticPartitions: Map[String, String],
- val abilitySpecs: Array[SinkAbilitySpec])
- extends Sink(cluster, traitSet, input, hints, targetColumns,
contextResolvedTable, tableSink) {
+ abilitySpecs: Array[SinkAbilitySpec])
+ extends Sink(
+ cluster,
+ traitSet,
+ input,
+ hints,
+ targetColumns,
+ contextResolvedTable,
+ tableSink,
+ abilitySpecs) {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
new LogicalSink(
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
index 48356801b3d..8279be977ca 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
@@ -19,6 +19,8 @@ package org.apache.flink.table.planner.plan.nodes.calcite
import org.apache.flink.table.catalog.ContextResolvedTable
import org.apache.flink.table.connector.sink.DynamicTableSink
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec
+import
org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec
import org.apache.flink.table.planner.plan.utils.RelExplainUtil
import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -42,20 +44,25 @@ import scala.collection.JavaConversions._
* @param hints
* the hints
* @param targetColumns
- * the specified target columns.
+ * the specified target columns. @Deprecated(since = "2.2"), use
[[TargetColumnWritingSpec]]
+ * instead.
* @param contextResolvedTable
* the table definition.
* @param tableSink
* the [[DynamicTableSink]] for which to write into
+ * @param abilitySpecs
+ * the [[SinkAbilitySpec]]s of this sink
*/
abstract class Sink(
cluster: RelOptCluster,
traitSet: RelTraitSet,
input: RelNode,
val hints: util.List[RelHint],
+ @deprecated(since = "2.2")
val targetColumns: Array[Array[Int]],
val contextResolvedTable: ContextResolvedTable,
- val tableSink: DynamicTableSink)
+ val tableSink: DynamicTableSink,
+ val abilitySpecs: Array[SinkAbilitySpec])
extends SingleRel(cluster, traitSet, input) {
override def deriveRowType(): RelDataType = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
index 5814209fa21..d00f5289558 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
@@ -46,8 +46,16 @@ class FlinkLogicalSink(
tableSink: DynamicTableSink,
targetColumns: Array[Array[Int]],
val staticPartitions: Map[String, String],
- val abilitySpecs: Array[SinkAbilitySpec])
- extends Sink(cluster, traitSet, input, hints, targetColumns,
contextResolvedTable, tableSink)
+ abilitySpecs: Array[SinkAbilitySpec])
+ extends Sink(
+ cluster,
+ traitSet,
+ input,
+ hints,
+ targetColumns,
+ contextResolvedTable,
+ tableSink,
+ abilitySpecs)
with FlinkLogicalRel {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
index 9645c15f1d2..b61d51eeb17 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
@@ -42,8 +42,16 @@ class BatchPhysicalSink(
contextResolvedTable: ContextResolvedTable,
tableSink: DynamicTableSink,
targetColumns: Array[Array[Int]],
- val abilitySpecs: Array[SinkAbilitySpec])
- extends Sink(cluster, traitSet, inputRel, hints, targetColumns,
contextResolvedTable, tableSink)
+ abilitySpecs: Array[SinkAbilitySpec])
+ extends Sink(
+ cluster,
+ traitSet,
+ inputRel,
+ hints,
+ targetColumns,
+ contextResolvedTable,
+ tableSink,
+ abilitySpecs)
with BatchPhysicalRel {
override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]):
RelNode = {
diff --git
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index 1ad00e2b397..40eea54b991 100644
---
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -46,9 +46,17 @@ class StreamPhysicalSink(
contextResolvedTable: ContextResolvedTable,
tableSink: DynamicTableSink,
targetColumns: Array[Array[Int]],
- val abilitySpecs: Array[SinkAbilitySpec],
+ abilitySpecs: Array[SinkAbilitySpec],
val upsertMaterialize: Boolean = false)
- extends Sink(cluster, traitSet, inputRel, hints, targetColumns,
contextResolvedTable, tableSink)
+ extends Sink(
+ cluster,
+ traitSet,
+ inputRel,
+ hints,
+ targetColumns,
+ contextResolvedTable,
+ tableSink,
+ abilitySpecs)
with StreamPhysicalRel {
override def requireWatermark: Boolean = false
diff --git
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 7de96b601f6..0d592e26e0d 100644
---
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -2371,16 +2371,15 @@ public final class TestValuesTableFactory
} else {
// we don't support OutputFormat for updating query in the
TestValues connector
assertThat(runtimeSink.equals("SinkFunction")).isTrue();
- // check the contract of the context.getTargetColumns method
returns the expected
- // empty Option or non-empty Option with a non-empty array
- assertThat(
- !context.getTargetColumns().isPresent()
- ||
context.getTargetColumns().get().length > 0)
- .isTrue();
+ // check the contract that targetColumns should be null for
empty array and should
+ // only be applied with a non-empty array
+ assertThat(this.targetColumns == null ||
this.targetColumns.length > 0).isTrue();
SinkFunction<RowData> sinkFunction;
if (primaryKeyIndices.length > 0) {
// TODO FLINK-31301 currently partial-insert composite
columns are not supported
- int[][] targetColumns =
context.getTargetColumns().orElse(new int[0][]);
+ int[][] targetColumns =
+ this.targetColumns != null ? this.targetColumns :
new int[0][];
+
checkArgument(
Arrays.stream(targetColumns).allMatch(subArr ->
subArr.length <= 1),
"partial-insert composite columns are not
supported yet!");
diff --git
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
index 509430a9a4b..6296c2757be 100644
---
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
+++
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
@@ -74,6 +74,7 @@ public final class SinkRuntimeProviderContext implements
DynamicTableSink.Contex
}
@Override
+ @Deprecated(since = "2.2")
public Optional<int[][]> getTargetColumns() {
return Optional.ofNullable(targetColumns);
}