This is an automated email from the ASF dual-hosted git repository.

xuyangzhong pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new f1d0ab659f2 [FLINK-37541][table-planner] Deprecate getTargetColumns in 
DynamicTableSink.Context (#26381)
f1d0ab659f2 is described below

commit f1d0ab659f28fede23c2ec3adae0c08ceec84196
Author: xiangyu0xf <[email protected]>
AuthorDate: Mon Oct 13 17:34:48 2025 +0800

    [FLINK-37541][table-planner] Deprecate getTargetColumns in 
DynamicTableSink.Context (#26381)
---
 .../apache/flink/table/connector/sink/DynamicTableSink.java |  3 +++
 .../planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java  |  6 ++++++
 .../apache/flink/table/planner/plan/reuse/SinkReuser.java   | 11 ++---------
 .../table/planner/plan/nodes/calcite/LogicalSink.scala      | 12 ++++++++++--
 .../flink/table/planner/plan/nodes/calcite/Sink.scala       | 11 +++++++++--
 .../table/planner/plan/nodes/logical/FlinkLogicalSink.scala | 12 ++++++++++--
 .../plan/nodes/physical/batch/BatchPhysicalSink.scala       | 12 ++++++++++--
 .../plan/nodes/physical/stream/StreamPhysicalSink.scala     | 12 ++++++++++--
 .../table/planner/factories/TestValuesTableFactory.java     | 13 ++++++-------
 .../runtime/connector/sink/SinkRuntimeProviderContext.java  |  1 +
 10 files changed, 67 insertions(+), 26 deletions(-)

diff --git 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
index 8cd344ca98a..7ddabb31a4c 100644
--- 
a/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
+++ 
b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/connector/sink/DynamicTableSink.java
@@ -190,7 +190,10 @@ public interface DynamicTableSink {
          * </ul>
          *
          * <p>Note: will always return empty for the delete statement because 
it has no column list.
+         *
+         * @deprecated use {@link SupportsTargetColumnWriting} instead.
          */
+        @Deprecated(since = "2.2")
         Optional<int[][]> getTargetColumns();
     }
 
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
index 6fd2914f4d9..38486dd2883 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/spec/DynamicTableSinkSpec.java
@@ -25,6 +25,7 @@ import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.module.Module;
 import org.apache.flink.table.planner.calcite.FlinkContext;
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
+import 
org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec;
 
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
 import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonGetter;
@@ -52,6 +53,7 @@ public class DynamicTableSinkSpec extends 
DynamicTableSpecBase {
     private final ContextResolvedTable contextResolvedTable;
     private final @Nullable List<SinkAbilitySpec> sinkAbilities;
 
+    @Deprecated(since = "2.2")
     private final @Nullable int[][] targetColumns;
 
     private DynamicTableSink tableSink;
@@ -99,9 +101,13 @@ public class DynamicTableSinkSpec extends 
DynamicTableSpecBase {
         return tableSink;
     }
 
+    /**
+     * @deprecated use {@link TargetColumnWritingSpec} instead.
+     */
     @JsonGetter(FIELD_NAME_TARGET_COLUMNS)
     @JsonInclude(JsonInclude.Include.NON_EMPTY)
     @Nullable
+    @Deprecated(since = "2.2")
     public int[][] getTargetColumns() {
         return targetColumns;
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
index 052004e4ac4..d02aba0d64c 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/reuse/SinkReuser.java
@@ -20,7 +20,6 @@ package org.apache.flink.table.planner.plan.reuse;
 
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 import org.apache.flink.table.planner.plan.nodes.calcite.Sink;
-import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalSink;
 import 
org.apache.flink.table.planner.plan.nodes.physical.batch.BatchPhysicalUnion;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalSink;
 import 
org.apache.flink.table.planner.plan.nodes.physical.stream.StreamPhysicalUnion;
@@ -217,18 +216,12 @@ public class SinkReuser {
             this.originalSinks.add(sink);
             this.inputTraitSet = sink.getInput().getTraitSet();
             this.digest = getDigest(sink);
-            this.sinkAbilitySpecs =
-                    isStreamingMode
-                            ? ((StreamPhysicalSink) sink).abilitySpecs()
-                            : ((BatchPhysicalSink) sink).abilitySpecs();
+            this.sinkAbilitySpecs = sink.abilitySpecs();
         }
 
         public boolean canBeReused(Sink sinkNode) {
             String currentSinkDigest = getDigest(sinkNode);
-            SinkAbilitySpec[] currentSinkSpecs =
-                    isStreamingMode
-                            ? ((StreamPhysicalSink) sinkNode).abilitySpecs()
-                            : ((BatchPhysicalSink) sinkNode).abilitySpecs();
+            SinkAbilitySpec[] currentSinkSpecs = sinkNode.abilitySpecs();
             RelTraitSet currentInputTraitSet = 
sinkNode.getInput().getTraitSet();
 
             // Only table sink with the same digest, specs and input trait set 
can be reused
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala
index 17c58473bd1..4d9e3008376 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/LogicalSink.scala
@@ -42,8 +42,16 @@ final class LogicalSink(
     tableSink: DynamicTableSink,
     targetColumns: Array[Array[Int]],
     val staticPartitions: Map[String, String],
-    val abilitySpecs: Array[SinkAbilitySpec])
-  extends Sink(cluster, traitSet, input, hints, targetColumns, 
contextResolvedTable, tableSink) {
+    abilitySpecs: Array[SinkAbilitySpec])
+  extends Sink(
+    cluster,
+    traitSet,
+    input,
+    hints,
+    targetColumns,
+    contextResolvedTable,
+    tableSink,
+    abilitySpecs) {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
     new LogicalSink(
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
index 48356801b3d..8279be977ca 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/calcite/Sink.scala
@@ -19,6 +19,8 @@ package org.apache.flink.table.planner.plan.nodes.calcite
 
 import org.apache.flink.table.catalog.ContextResolvedTable
 import org.apache.flink.table.connector.sink.DynamicTableSink
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec
+import 
org.apache.flink.table.planner.plan.abilities.sink.TargetColumnWritingSpec
 import org.apache.flink.table.planner.plan.utils.RelExplainUtil
 
 import org.apache.calcite.plan.{RelOptCluster, RelTraitSet}
@@ -42,20 +44,25 @@ import scala.collection.JavaConversions._
  * @param hints
  *   the hints
  * @param targetColumns
- *   the specified target columns.
+ *   the specified target columns. @Deprecated(since = "2.2"), use 
[[TargetColumnWritingSpec]]
+ *   instead.
  * @param contextResolvedTable
  *   the table definition.
  * @param tableSink
  *   the [[DynamicTableSink]] for which to write into
+ * @param abilitySpecs
+ *   the [[SinkAbilitySpec]]s of this sink
  */
 abstract class Sink(
     cluster: RelOptCluster,
     traitSet: RelTraitSet,
     input: RelNode,
     val hints: util.List[RelHint],
+    @deprecated(since = "2.2")
     val targetColumns: Array[Array[Int]],
     val contextResolvedTable: ContextResolvedTable,
-    val tableSink: DynamicTableSink)
+    val tableSink: DynamicTableSink,
+    val abilitySpecs: Array[SinkAbilitySpec])
   extends SingleRel(cluster, traitSet, input) {
 
   override def deriveRowType(): RelDataType = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
index 5814209fa21..d00f5289558 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/logical/FlinkLogicalSink.scala
@@ -46,8 +46,16 @@ class FlinkLogicalSink(
     tableSink: DynamicTableSink,
     targetColumns: Array[Array[Int]],
     val staticPartitions: Map[String, String],
-    val abilitySpecs: Array[SinkAbilitySpec])
-  extends Sink(cluster, traitSet, input, hints, targetColumns, 
contextResolvedTable, tableSink)
+    abilitySpecs: Array[SinkAbilitySpec])
+  extends Sink(
+    cluster,
+    traitSet,
+    input,
+    hints,
+    targetColumns,
+    contextResolvedTable,
+    tableSink,
+    abilitySpecs)
   with FlinkLogicalRel {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
index 9645c15f1d2..b61d51eeb17 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/batch/BatchPhysicalSink.scala
@@ -42,8 +42,16 @@ class BatchPhysicalSink(
     contextResolvedTable: ContextResolvedTable,
     tableSink: DynamicTableSink,
     targetColumns: Array[Array[Int]],
-    val abilitySpecs: Array[SinkAbilitySpec])
-  extends Sink(cluster, traitSet, inputRel, hints, targetColumns, 
contextResolvedTable, tableSink)
+    abilitySpecs: Array[SinkAbilitySpec])
+  extends Sink(
+    cluster,
+    traitSet,
+    inputRel,
+    hints,
+    targetColumns,
+    contextResolvedTable,
+    tableSink,
+    abilitySpecs)
   with BatchPhysicalRel {
 
   override def copy(traitSet: RelTraitSet, inputs: util.List[RelNode]): 
RelNode = {
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
index 1ad00e2b397..40eea54b991 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/nodes/physical/stream/StreamPhysicalSink.scala
@@ -46,9 +46,17 @@ class StreamPhysicalSink(
     contextResolvedTable: ContextResolvedTable,
     tableSink: DynamicTableSink,
     targetColumns: Array[Array[Int]],
-    val abilitySpecs: Array[SinkAbilitySpec],
+    abilitySpecs: Array[SinkAbilitySpec],
     val upsertMaterialize: Boolean = false)
-  extends Sink(cluster, traitSet, inputRel, hints, targetColumns, 
contextResolvedTable, tableSink)
+  extends Sink(
+    cluster,
+    traitSet,
+    inputRel,
+    hints,
+    targetColumns,
+    contextResolvedTable,
+    tableSink,
+    abilitySpecs)
   with StreamPhysicalRel {
 
   override def requireWatermark: Boolean = false
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
index 7de96b601f6..0d592e26e0d 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestValuesTableFactory.java
@@ -2371,16 +2371,15 @@ public final class TestValuesTableFactory
             } else {
                 // we don't support OutputFormat for updating query in the 
TestValues connector
                 assertThat(runtimeSink.equals("SinkFunction")).isTrue();
-                // check the contract of the context.getTargetColumns method 
returns the expected
-                // empty Option or non-empty Option with a non-empty array
-                assertThat(
-                                !context.getTargetColumns().isPresent()
-                                        || 
context.getTargetColumns().get().length > 0)
-                        .isTrue();
+                // check the contract that targetColumns should be null for 
empty array and should
+                // only be applied with a non-empty array
+                assertThat(this.targetColumns == null || 
this.targetColumns.length > 0).isTrue();
                 SinkFunction<RowData> sinkFunction;
                 if (primaryKeyIndices.length > 0) {
                     // TODO FLINK-31301 currently partial-insert composite 
columns are not supported
-                    int[][] targetColumns = 
context.getTargetColumns().orElse(new int[0][]);
+                    int[][] targetColumns =
+                            this.targetColumns != null ? this.targetColumns : 
new int[0][];
+
                     checkArgument(
                             Arrays.stream(targetColumns).allMatch(subArr -> 
subArr.length <= 1),
                             "partial-insert composite columns are not 
supported yet!");
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
index 509430a9a4b..6296c2757be 100644
--- 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/connector/sink/SinkRuntimeProviderContext.java
@@ -74,6 +74,7 @@ public final class SinkRuntimeProviderContext implements 
DynamicTableSink.Contex
     }
 
     @Override
+    @Deprecated(since = "2.2")
     public Optional<int[][]> getTargetColumns() {
         return Optional.ofNullable(targetColumns);
     }

Reply via email to