This is an automated email from the ASF dual-hosted git repository.

guoweijie pushed a commit to branch release-1.17
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/release-1.17 by this push:
     new 0a04422f283 [FLINK-32001][table] Row-level update should support 
returning partial columns
0a04422f283 is described below

commit 0a04422f2838a000f36c80b940d9a04c35b46948
Author: luoyuxia <[email protected]>
AuthorDate: Fri May 5 18:56:32 2023 +0800

    [FLINK-32001][table] Row-level update should support returning partial 
columns
---
 .../src/test/resources/sql/delete.q                |   2 +-
 .../table/planner/connectors/DynamicSinkUtils.java | 134 +++++++++++++--------
 .../plan/abilities/sink/RowLevelDeleteSpec.java    |  27 ++++-
 .../plan/abilities/sink/RowLevelUpdateSpec.java    |  25 +++-
 .../plan/nodes/exec/batch/BatchExecSink.java       |  66 ++++++++++
 .../plan/nodes/exec/common/CommonExecSink.java     |   4 +-
 .../factories/TestUpdateDeleteTableFactory.java    | 125 ++++++++++++++++---
 .../runtime/batch/sql/DeleteTableITCase.java       |  47 +++++++-
 .../runtime/batch/sql/UpdateTableITCase.java       |  40 ++++++
 9 files changed, 385 insertions(+), 85 deletions(-)

diff --git a/flink-table/flink-sql-client/src/test/resources/sql/delete.q 
b/flink-table/flink-sql-client/src/test/resources/sql/delete.q
index 1604f27305d..7fcd56d3c28 100644
--- a/flink-table/flink-sql-client/src/test/resources/sql/delete.q
+++ b/flink-table/flink-sql-client/src/test/resources/sql/delete.q
@@ -29,7 +29,7 @@ SET 'table.dml-sync' = 'true';
 !info
 
 # create a table first
-CREATE TABLE t (a int, b string, c double)
+CREATE TABLE t (a int PRIMARY KEY NOT ENFORCED, b string, c double)
 WITH (
   'connector' = 'test-update-delete',
   'data-id' = '$VAR_DELETE_TABLE_DATA_ID',
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index 441ddcca2b1..42e4f95c2cc 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -386,36 +386,37 @@ public final class DynamicSinkUtils {
         RowLevelModificationScanContext context = 
RowLevelModificationContextUtils.getScanContext();
         SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo =
                 supportsRowLevelDelete.applyRowLevelDelete(context);
-        sinkAbilitySpecs.add(
-                new 
RowLevelDeleteSpec(rowLevelDeleteInfo.getRowLevelDeleteMode(), context));
 
         if (rowLevelDeleteInfo.getRowLevelDeleteMode()
-                == SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) {
-            // convert the LogicalTableModify node to a rel node representing 
row-level delete
-            return convertToRowLevelDelete(
-                    tableModify,
-                    contextResolvedTable,
-                    rowLevelDeleteInfo,
-                    tableDebugName,
-                    dataTypeFactory,
-                    typeFactory);
-        } else if (rowLevelDeleteInfo.getRowLevelDeleteMode()
+                        != 
SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS
+                && rowLevelDeleteInfo.getRowLevelDeleteMode()
+                        != 
SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
+            throw new TableException(
+                    "Unknown delete mode: " + 
rowLevelDeleteInfo.getRowLevelDeleteMode());
+        }
+
+        if (rowLevelDeleteInfo.getRowLevelDeleteMode()
                 == SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
             // if it's for remaining row, convert the predicate in where clause
             // to the negative predicate
             convertPredicateToNegative(tableModify);
-            // convert the LogicalTableModify node to a rel node representing 
row-level delete
-            return convertToRowLevelDelete(
-                    tableModify,
-                    contextResolvedTable,
-                    rowLevelDeleteInfo,
-                    tableDebugName,
-                    dataTypeFactory,
-                    typeFactory);
-        } else {
-            throw new TableException(
-                    "Unknown delete mode: " + 
rowLevelDeleteInfo.getRowLevelDeleteMode());
         }
+
+        // convert the LogicalTableModify node to a RelNode representing 
row-level delete
+        Tuple2<RelNode, int[]> deleteRelNodeAndRequireIndices =
+                convertToRowLevelDelete(
+                        tableModify,
+                        contextResolvedTable,
+                        rowLevelDeleteInfo,
+                        tableDebugName,
+                        dataTypeFactory,
+                        typeFactory);
+        sinkAbilitySpecs.add(
+                new RowLevelDeleteSpec(
+                        rowLevelDeleteInfo.getRowLevelDeleteMode(),
+                        context,
+                        deleteRelNodeAndRequireIndices.f1));
+        return deleteRelNodeAndRequireIndices.f0;
     }
 
     private static RelNode convertUpdate(
@@ -445,16 +446,21 @@ public final class DynamicSinkUtils {
             throw new IllegalArgumentException(
                     "Unknown update mode:" + 
updateInfo.getRowLevelUpdateMode());
         }
+        Tuple2<RelNode, int[]> updateRelNodeAndRequireIndices =
+                convertToRowLevelUpdate(
+                        tableModify,
+                        contextResolvedTable,
+                        updateInfo,
+                        tableDebugName,
+                        dataTypeFactory,
+                        typeFactory);
         sinkAbilitySpecs.add(
                 new RowLevelUpdateSpec(
-                        updatedColumns, updateInfo.getRowLevelUpdateMode(), 
context));
-        return convertToRowLevelUpdate(
-                tableModify,
-                contextResolvedTable,
-                updateInfo,
-                tableDebugName,
-                dataTypeFactory,
-                typeFactory);
+                        updatedColumns,
+                        updateInfo.getRowLevelUpdateMode(),
+                        context,
+                        updateRelNodeAndRequireIndices.f1));
+        return updateRelNodeAndRequireIndices.f0;
     }
 
     private static List<Column> getUpdatedColumns(
@@ -469,8 +475,13 @@ public final class DynamicSinkUtils {
         return updatedColumns;
     }
 
-    /** Convert tableModify node to a rel node representing for row-level 
delete. */
-    private static RelNode convertToRowLevelDelete(
+    /**
+     * Convert tableModify node to a RelNode representing for row-level delete.
+     *
+     * @return a tuple contains the RelNode and the index for the required 
physical columns for
+     *     row-level delete.
+     */
+    private static Tuple2<RelNode, int[]> convertToRowLevelDelete(
             LogicalTableModify tableModify,
             ContextResolvedTable contextResolvedTable,
             SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo,
@@ -495,14 +506,25 @@ public final class DynamicSinkUtils {
                     addExtraMetaCols(
                             tableModify, tableScan, tableDebugName, 
metadataColumns, typeFactory);
         }
+
         // create a project only select the required columns for delete
-        return projectColumnsForDelete(
-                tableModify,
-                resolvedSchema,
-                colIndexes,
-                tableDebugName,
-                dataTypeFactory,
-                typeFactory);
+        return Tuple2.of(
+                projectColumnsForDelete(
+                        tableModify,
+                        resolvedSchema,
+                        colIndexes,
+                        tableDebugName,
+                        dataTypeFactory,
+                        typeFactory),
+                getPhysicalColumnIndices(colIndexes, resolvedSchema));
+    }
+
+    /** Return the indices from {@param colIndexes} that belong to physical 
column. */
+    private static int[] getPhysicalColumnIndices(List<Integer> colIndexes, 
ResolvedSchema schema) {
+        return colIndexes.stream()
+                .filter(i -> schema.getColumns().get(i).isPhysical())
+                .mapToInt(i -> i)
+                .toArray();
     }
 
     /** Convert the predicate in WHERE clause to the negative predicate. */
@@ -606,8 +628,13 @@ public final class DynamicSinkUtils {
         return (LogicalTableScan) relNode;
     }
 
-    /** Convert tableModify node to a RelNode representing for row-level 
update. */
-    private static RelNode convertToRowLevelUpdate(
+    /**
+     * Convert tableModify node to a RelNode representing for row-level update.
+     *
+     * @return a tuple contains the RelNode and the index for the required 
physical columns for
+     *     row-level update.
+     */
+    private static Tuple2<RelNode, int[]> convertToRowLevelUpdate(
             LogicalTableModify tableModify,
             ContextResolvedTable contextResolvedTable,
             SupportsRowLevelUpdate.RowLevelUpdateInfo rowLevelUpdateInfo,
@@ -622,7 +649,7 @@ public final class DynamicSinkUtils {
         LogicalTableScan tableScan = getSourceTableScan(tableModify);
         Tuple2<List<Integer>, List<MetadataColumn>> colsIndexAndExtraMetaCols =
                 getRequireColumnsIndexAndExtraMetaCols(tableScan, 
requiredColumns, resolvedSchema);
-        List<Integer> updatedIndexes = colsIndexAndExtraMetaCols.f0;
+        List<Integer> colIndexes = colsIndexAndExtraMetaCols.f0;
         List<MetadataColumn> metadataColumns = colsIndexAndExtraMetaCols.f1;
         // if meta columns size is greater than 0, we need to modify the 
underlying
         // LogicalTableScan to make it can read meta column
@@ -632,16 +659,17 @@ public final class DynamicSinkUtils {
                     addExtraMetaCols(
                             tableModify, tableScan, tableDebugName, 
metadataColumns, typeFactory);
         }
-
-        return projectColumnsForUpdate(
-                tableModify,
-                originColsCount,
-                resolvedSchema,
-                updatedIndexes,
-                rowLevelUpdateInfo.getRowLevelUpdateMode(),
-                tableDebugName,
-                dataTypeFactory,
-                typeFactory);
+        return Tuple2.of(
+                projectColumnsForUpdate(
+                        tableModify,
+                        originColsCount,
+                        resolvedSchema,
+                        colIndexes,
+                        rowLevelUpdateInfo.getRowLevelUpdateMode(),
+                        tableDebugName,
+                        dataTypeFactory,
+                        typeFactory),
+                getPhysicalColumnIndices(colIndexes, resolvedSchema));
     }
 
     // create a project only select the required column or expression for 
update
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
index cd26a7b3fd4..29f87dfdd00 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
@@ -33,17 +33,20 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.Objects;
 
 /**
  * A sub-class of {@link SinkAbilitySpec} that can not only 
serialize/deserialize the row-level
- * delete mode to/from JSON, but also can delete existing data for {@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ * delete mode & required physical column indices to/from JSON, but also can 
delete existing data
+ * for {@link 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonTypeName("RowLevelDelete")
 public class RowLevelDeleteSpec implements SinkAbilitySpec {
     public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = 
"rowLevelDeleteMode";
+    public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES =
+            "requiredPhysicalColumnIndices";
 
     @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE)
     @Nonnull
@@ -51,13 +54,20 @@ public class RowLevelDeleteSpec implements SinkAbilitySpec {
 
     @JsonIgnore @Nullable private final RowLevelModificationScanContext 
scanContext;
 
+    @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES)
+    @Nonnull
+    private final int[] requiredPhysicalColumnIndices;
+
     @JsonCreator
     public RowLevelDeleteSpec(
             @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) @Nonnull
                     SupportsRowLevelDelete.RowLevelDeleteMode 
rowLevelDeleteMode,
-            @Nullable RowLevelModificationScanContext scanContext) {
+            @Nullable RowLevelModificationScanContext scanContext,
+            @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES) @Nonnull
+                    int[] requiredPhysicalColumnIndices) {
         this.rowLevelDeleteMode = 
Preconditions.checkNotNull(rowLevelDeleteMode);
         this.scanContext = scanContext;
+        this.requiredPhysicalColumnIndices = requiredPhysicalColumnIndices;
     }
 
     @Override
@@ -77,6 +87,10 @@ public class RowLevelDeleteSpec implements SinkAbilitySpec {
         return rowLevelDeleteMode;
     }
 
+    public int[] getRequiredPhysicalColumnIndices() {
+        return requiredPhysicalColumnIndices;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -87,11 +101,14 @@ public class RowLevelDeleteSpec implements SinkAbilitySpec 
{
         }
         RowLevelDeleteSpec that = (RowLevelDeleteSpec) o;
         return rowLevelDeleteMode == that.rowLevelDeleteMode
-                && Objects.equals(scanContext, that.scanContext);
+                && Objects.equals(scanContext, that.scanContext)
+                && Arrays.equals(requiredPhysicalColumnIndices, 
that.requiredPhysicalColumnIndices);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(rowLevelDeleteMode, scanContext);
+        int result = Objects.hash(rowLevelDeleteMode, scanContext);
+        result = 31 * result + Arrays.hashCode(requiredPhysicalColumnIndices);
+        return result;
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
index d046ab002bb..17c9176dc74 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelUpdateSpec.java
@@ -33,19 +33,22 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 import javax.annotation.Nonnull;
 import javax.annotation.Nullable;
 
+import java.util.Arrays;
 import java.util.List;
 import java.util.Objects;
 
 /**
  * A sub-class of {@link SinkAbilitySpec} that can not only 
serialize/deserialize the row-level
- * update mode & columns to/from JSON, but also can update existing data for 
{@link
- * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
+ * update mode, columns & required physical column indices to/from JSON, but 
also can update
+ * existing data for {@link 
org.apache.flink.table.connector.sink.abilities.SupportsRowLevelUpdate}.
  */
 @JsonIgnoreProperties(ignoreUnknown = true)
 @JsonTypeName("RowLevelUpdate")
 public class RowLevelUpdateSpec implements SinkAbilitySpec {
     public static final String FIELD_NAME_UPDATED_COLUMNS = "updatedColumns";
     public static final String FIELD_NAME_ROW_LEVEL_UPDATE_MODE = 
"rowLevelUpdateMode";
+    public static final String FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES =
+            "requiredPhysicalColumnIndices";
 
     @JsonProperty(FIELD_NAME_UPDATED_COLUMNS)
     @Nonnull
@@ -55,6 +58,10 @@ public class RowLevelUpdateSpec implements SinkAbilitySpec {
     @Nonnull
     private final SupportsRowLevelUpdate.RowLevelUpdateMode rowLevelUpdateMode;
 
+    @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES)
+    @Nonnull
+    private final int[] requiredPhysicalColumnIndices;
+
     @JsonIgnore @Nullable private final RowLevelModificationScanContext 
scanContext;
 
     @JsonCreator
@@ -62,10 +69,13 @@ public class RowLevelUpdateSpec implements SinkAbilitySpec {
             @JsonProperty(FIELD_NAME_UPDATED_COLUMNS) @Nonnull List<Column> 
updatedColumns,
             @JsonProperty(FIELD_NAME_ROW_LEVEL_UPDATE_MODE) @Nonnull
                     SupportsRowLevelUpdate.RowLevelUpdateMode 
rowLevelUpdateMode,
-            @Nullable RowLevelModificationScanContext scanContext) {
+            @Nullable RowLevelModificationScanContext scanContext,
+            @JsonProperty(FIELD_NAME_REQUIRED_PHYSICAL_COLUMN_INDICES) @Nonnull
+                    int[] requiredPhysicalColumnIndices) {
         this.updatedColumns = updatedColumns;
         this.rowLevelUpdateMode = rowLevelUpdateMode;
         this.scanContext = scanContext;
+        this.requiredPhysicalColumnIndices = requiredPhysicalColumnIndices;
     }
 
     @Override
@@ -85,6 +95,10 @@ public class RowLevelUpdateSpec implements SinkAbilitySpec {
         return rowLevelUpdateMode;
     }
 
+    public int[] getRequiredPhysicalColumnIndices() {
+        return requiredPhysicalColumnIndices;
+    }
+
     @Override
     public boolean equals(Object o) {
         if (this == o) {
@@ -96,11 +110,14 @@ public class RowLevelUpdateSpec implements SinkAbilitySpec 
{
         RowLevelUpdateSpec that = (RowLevelUpdateSpec) o;
         return Objects.equals(updatedColumns, that.updatedColumns)
                 && rowLevelUpdateMode == that.rowLevelUpdateMode
+                && Arrays.equals(requiredPhysicalColumnIndices, 
that.requiredPhysicalColumnIndices)
                 && Objects.equals(scanContext, that.scanContext);
     }
 
     @Override
     public int hashCode() {
-        return Objects.hash(updatedColumns, rowLevelUpdateMode, scanContext);
+        int result = Objects.hash(updatedColumns, rowLevelUpdateMode, 
scanContext);
+        result = 31 * result + Arrays.hashCode(requiredPhysicalColumnIndices);
+        return result;
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
index c9068656a51..520fe12bfc0 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/batch/BatchExecSink.java
@@ -20,10 +20,16 @@ package 
org.apache.flink.table.planner.plan.nodes.exec.batch;
 
 import org.apache.flink.api.dag.Transformation;
 import org.apache.flink.configuration.ReadableConfig;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UniqueConstraint;
 import org.apache.flink.table.connector.ChangelogMode;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.delegation.PlannerBase;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelUpdateSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeContext;
@@ -31,8 +37,11 @@ import 
org.apache.flink.table.planner.plan.nodes.exec.InputProperty;
 import org.apache.flink.table.planner.plan.nodes.exec.common.CommonExecSink;
 import 
org.apache.flink.table.planner.plan.nodes.exec.spec.DynamicTableSinkSpec;
 import org.apache.flink.table.types.logical.LogicalType;
+import org.apache.flink.table.types.logical.RowType;
 
+import java.util.ArrayList;
 import java.util.Collections;
+import java.util.List;
 
 /**
  * Batch {@link ExecNode} to to write data into an external sink defined by a 
{@link
@@ -74,4 +83,61 @@ public class BatchExecSink extends CommonExecSink implements 
BatchExecNode<Objec
                 false,
                 null);
     }
+
+    @Override
+    protected RowType getPhysicalRowType(ResolvedSchema schema) {
+        // row-level modification may only write partial columns,
+        // so we try to prune the RowType to get the real RowType containing
+        // the physical columns to be written
+        if (tableSinkSpec.getSinkAbilities() != null) {
+            for (SinkAbilitySpec sinkAbilitySpec : 
tableSinkSpec.getSinkAbilities()) {
+                if (sinkAbilitySpec instanceof RowLevelUpdateSpec) {
+                    RowLevelUpdateSpec rowLevelUpdateSpec = 
(RowLevelUpdateSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(
+                            schema, 
rowLevelUpdateSpec.getRequiredPhysicalColumnIndices());
+                } else if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
+                    RowLevelDeleteSpec rowLevelDeleteSpec = 
(RowLevelDeleteSpec) sinkAbilitySpec;
+                    return getPhysicalRowType(
+                            schema, 
rowLevelDeleteSpec.getRequiredPhysicalColumnIndices());
+                }
+            }
+        }
+        return (RowType) schema.toPhysicalRowDataType().getLogicalType();
+    }
+
+    @Override
+    protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
+        if (schema.getPrimaryKey().isPresent()) {
+            UniqueConstraint uniqueConstraint = schema.getPrimaryKey().get();
+            int[] primaryKeyIndices = new 
int[uniqueConstraint.getColumns().size()];
+            // SinkRowType may not contain full primary keys in case of 
row-level update or delete.
+            // In such case, return an empty array since the primary keys are 
not completed and
+            // we consider such case as no primary keys
+            // Note: this may happen if the required columns returned by
+            // connector don't fully contain the primary keys. But it's not 
recommended to only
+            // return partial primary keys
+            // For example, a table has primary keys: a, b, c; but the 
connector only return a, b
+            // in method SupportsRowLevelUpdate#applyRowLevelUpdate.
+            for (int i = 0; i < uniqueConstraint.getColumns().size(); i++) {
+                int fieldIndex = 
sinkRowType.getFieldIndex(uniqueConstraint.getColumns().get(i));
+                if (fieldIndex == -1) {
+                    return new int[0];
+                }
+                primaryKeyIndices[i] = fieldIndex;
+            }
+            return primaryKeyIndices;
+        } else {
+            return new int[0];
+        }
+    }
+
+    /** Get the physical row type with given column indices. */
+    private RowType getPhysicalRowType(ResolvedSchema schema, int[] 
columnIndices) {
+        List<Column> columns = schema.getColumns();
+        List<Column> requireColumns = new ArrayList<>();
+        for (int columnIndex : columnIndices) {
+            requireColumns.add(columns.get(columnIndex));
+        }
+        return (RowType) 
ResolvedSchema.of(requireColumns).toPhysicalRowDataType().getLogicalType();
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 9cde92de23a..c3debde7751 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -644,13 +644,13 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         return InternalTypeInfo.of(getInputEdges().get(0).getOutputType());
     }
 
-    private int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
+    protected int[] getPrimaryKeyIndices(RowType sinkRowType, ResolvedSchema 
schema) {
         return schema.getPrimaryKey()
                 .map(k -> 
k.getColumns().stream().mapToInt(sinkRowType::getFieldIndex).toArray())
                 .orElse(new int[0]);
     }
 
-    private RowType getPhysicalRowType(ResolvedSchema schema) {
+    protected RowType getPhysicalRowType(ResolvedSchema schema) {
         return (RowType) schema.toPhysicalRowDataType().getLogicalType();
     }
 
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
index fa38811ed08..2e9c9141d54 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
@@ -75,6 +75,7 @@ import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicInteger;
+import java.util.stream.IntStream;
 
 import static org.apache.flink.table.data.RowData.createFieldGetter;
 
@@ -331,6 +332,7 @@ public class TestUpdateDeleteTableFactory
         protected final String dataId;
 
         protected boolean isUpdate;
+        protected int[] requiredColumnIndices;
 
         public SupportsRowLevelUpdateSink(
                 ObjectIdentifier tableIdentifier,
@@ -383,10 +385,15 @@ public class TestUpdateDeleteTableFactory
                                     new UpdateDataSinkFunction(
                                             dataId,
                                             getPrimaryKeyFieldGetter(
-                                                    
resolvedCatalogTable.getResolvedSchema()),
+                                                    
resolvedCatalogTable.getResolvedSchema(),
+                                                    requiredColumnIndices),
                                             getAllFieldGetter(
                                                     
resolvedCatalogTable.getResolvedSchema()),
-                                            updateMode))
+                                            getPartialFieldGetter(
+                                                    
resolvedCatalogTable.getResolvedSchema(),
+                                                    requiredColumnIndices),
+                                            updateMode,
+                                            requiredColumnIndices))
                             .setParallelism(1);
                 }
             };
@@ -428,6 +435,8 @@ public class TestUpdateDeleteTableFactory
                                         requireColumnsForUpdate,
                                         
resolvedCatalogTable.getResolvedSchema());
                     }
+                    requiredColumnIndices =
+                            getRequiredColumnIndexes(resolvedCatalogTable, 
requiredCols);
                     return Optional.ofNullable(requiredCols);
                 }
 
@@ -450,6 +459,7 @@ public class TestUpdateDeleteTableFactory
         private final List<String> requireColumnsForDelete;
 
         private boolean isDelete;
+        protected int[] requiredColumnIndices;
 
         public SupportsRowLevelModificationSink(
                 ObjectIdentifier tableIdentifier,
@@ -519,6 +529,10 @@ public class TestUpdateDeleteTableFactory
                                     .addSink(
                                             new DeleteDataSinkFunction(
                                                     dataId,
+                                                    getPrimaryKeyFieldGetter(
+                                                            
resolvedCatalogTable
+                                                                    
.getResolvedSchema(),
+                                                            
requiredColumnIndices),
                                                     getAllFieldGetter(
                                                             
resolvedCatalogTable
                                                                     
.getResolvedSchema()),
@@ -568,6 +582,8 @@ public class TestUpdateDeleteTableFactory
                                         requireColumnsForDelete,
                                         
resolvedCatalogTable.getResolvedSchema());
                     }
+                    requiredColumnIndices =
+                            getRequiredColumnIndexes(resolvedCatalogTable, 
requiredCols);
                     return Optional.ofNullable(requiredCols);
                 }
 
@@ -582,7 +598,8 @@ public class TestUpdateDeleteTableFactory
     /** The sink for delete existing data. */
     private static class DeleteDataSinkFunction extends 
RichSinkFunction<RowData> {
         private final String dataId;
-        private final RowData.FieldGetter[] fieldGetters;
+        private final RowData.FieldGetter[] primaryKeyFieldGetters;
+        private final RowData.FieldGetter[] allFieldGetters;
         private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode;
 
         private transient Collection<RowData> data;
@@ -590,10 +607,12 @@ public class TestUpdateDeleteTableFactory
 
         DeleteDataSinkFunction(
                 String dataId,
-                RowData.FieldGetter[] fieldGetters,
+                RowData.FieldGetter[] primaryKeyFieldGetters,
+                RowData.FieldGetter[] allFieldGetters,
                 SupportsRowLevelDelete.RowLevelDeleteMode deleteMode) {
             this.dataId = dataId;
-            this.fieldGetters = fieldGetters;
+            this.primaryKeyFieldGetters = primaryKeyFieldGetters;
+            this.allFieldGetters = allFieldGetters;
             this.deleteMode = deleteMode;
         }
 
@@ -620,7 +639,7 @@ public class TestUpdateDeleteTableFactory
                     String.format(
                             "The RowKind for the coming rows should be %s in 
delete mode %s.",
                             RowKind.DELETE, DELETE_MODE));
-            data.removeIf(rowData -> equal(rowData, deletedRow, fieldGetters));
+            data.removeIf(rowData -> equal(rowData, deletedRow, 
primaryKeyFieldGetters));
         }
 
         private void consumeRemainingRows(RowData remainingRow) {
@@ -629,7 +648,12 @@ public class TestUpdateDeleteTableFactory
                     String.format(
                             "The RowKind for the coming rows should be %s in 
delete mode %s.",
                             RowKind.INSERT, DELETE_MODE));
-            newData.add(copyRowData(remainingRow, fieldGetters));
+            // find the row that match the remaining row
+            for (RowData oldRow : data) {
+                if (equal(oldRow, remainingRow, primaryKeyFieldGetters)) {
+                    newData.add(copyRowData(oldRow, allFieldGetters));
+                }
+            }
         }
 
         @Override
@@ -720,6 +744,24 @@ public class TestUpdateDeleteTableFactory
         }
     }
 
+    private static int[] getRequiredColumnIndexes(
+            ResolvedCatalogTable resolvedCatalogTable, @Nullable List<Column> 
columns) {
+        if (columns == null) {
+            return IntStream.range(0, 
resolvedCatalogTable.getResolvedSchema().getColumnCount())
+                    .toArray();
+        } else {
+            List<Column> allColumns = 
resolvedCatalogTable.getResolvedSchema().getColumns();
+            int[] columnIndexes = new int[columns.size()];
+            for (int i = 0; i < columns.size(); i++) {
+                int colIndex = allColumns.indexOf(columns.get(i));
+                if (colIndex != -1) {
+                    columnIndexes[i] = colIndex;
+                }
+            }
+            return columnIndexes;
+        }
+    }
+
     /**
      * Get a list of equal predicate from a list of filter, each contains 
[column, value]. Return
      * Optional.empty() if it contains any non-equal predicate.
@@ -817,7 +859,9 @@ public class TestUpdateDeleteTableFactory
         private final String dataId;
         private final RowData.FieldGetter[] primaryKeyFieldGetters;
         private final RowData.FieldGetter[] allFieldGetters;
+        private final RowData.FieldGetter[] requireColumnFieldGetters;
         private final SupportsRowLevelUpdate.RowLevelUpdateMode updateMode;
+        private final int[] requiredColumnIndexes;
         private transient RowData[] oldRows;
         private transient List<Tuple2<Integer, RowData>> updatedRows;
         private transient List<RowData> allNewRows;
@@ -826,11 +870,15 @@ public class TestUpdateDeleteTableFactory
                 String dataId,
                 RowData.FieldGetter[] primaryKeyFieldGetters,
                 RowData.FieldGetter[] allFieldGetters,
-                SupportsRowLevelUpdate.RowLevelUpdateMode updateMode) {
+                RowData.FieldGetter[] requireColumnFieldGetters,
+                SupportsRowLevelUpdate.RowLevelUpdateMode updateMode,
+                int[] requiredColumnIndexes) {
             this.dataId = dataId;
             this.primaryKeyFieldGetters = primaryKeyFieldGetters;
             this.updateMode = updateMode;
             this.allFieldGetters = allFieldGetters;
+            this.requireColumnFieldGetters = requireColumnFieldGetters;
+            this.requiredColumnIndexes = requiredColumnIndexes;
         }
 
         @Override
@@ -858,7 +906,8 @@ public class TestUpdateDeleteTableFactory
 
             for (int i = 0; i < oldRows.length; i++) {
                 if (equal(oldRows[i], updatedRow, primaryKeyFieldGetters)) {
-                    updatedRows.add(new Tuple2<>(i, copyRowData(updatedRow, 
allFieldGetters)));
+                    updatedRows.add(
+                            new Tuple2<>(i, 
getUpdatedAfterRowDataWithAllFields(updatedRow)));
                 }
             }
         }
@@ -867,7 +916,25 @@ public class TestUpdateDeleteTableFactory
             Preconditions.checkArgument(
                     rowData.getRowKind() == RowKind.INSERT,
                     "The RowKind for the updated rows should be " + 
RowKind.INSERT);
-            allNewRows.add(copyRowData(rowData, allFieldGetters));
+            allNewRows.add(getUpdatedAfterRowDataWithAllFields(rowData));
+        }
+
+        private RowData getUpdatedAfterRowDataWithAllFields(RowData 
updateAfterRowData) {
+            GenericRowData newRowData = null;
+            // first find the old row to be updated and copy the old values
+            for (RowData oldRow : oldRows) {
+                if (equal(oldRow, updateAfterRowData, primaryKeyFieldGetters)) 
{
+                    newRowData = copyRowData(oldRow, allFieldGetters);
+                }
+            }
+            Preconditions.checkNotNull(newRowData);
+            // then set the new value after updated
+            for (int i = 0; i < requiredColumnIndexes.length; i++) {
+                newRowData.setField(
+                        requiredColumnIndexes[i],
+                        
requireColumnFieldGetters[i].getFieldOrNull(updateAfterRowData));
+            }
+            return newRowData;
         }
 
         @Override
@@ -904,15 +971,25 @@ public class TestUpdateDeleteTableFactory
                 "The scan context should contains the object identifier for 
row-level modification.");
     }
 
-    private static RowData.FieldGetter[] 
getPrimaryKeyFieldGetter(ResolvedSchema resolvedSchema) {
-        int[] indexes = resolvedSchema.getPrimaryKeyIndexes();
-        RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[indexes.length];
+    /**
+     * Get the an array of FieldGetter for the primary keys which are also in 
{@param
+     * requiredColumnIndices}.
+     */
+    private static RowData.FieldGetter[] getPrimaryKeyFieldGetter(
+            ResolvedSchema resolvedSchema, int[] requiredColumnIndices) {
+        List<RowData.FieldGetter> fieldGetters = new ArrayList<>();
+        int[] primaryKeyIndices = resolvedSchema.getPrimaryKeyIndexes();
         List<DataType> dataTypes = resolvedSchema.getColumnDataTypes();
-        for (int i = 0; i < fieldGetters.length; i++) {
-            int colIndex = indexes[i];
-            fieldGetters[i] = 
createFieldGetter(dataTypes.get(colIndex).getLogicalType(), colIndex);
+        for (final int primaryKeyIndex : primaryKeyIndices) {
+            // find the primaryKeyIndex in requiredColumnIndices
+            for (int i = 0; i < requiredColumnIndices.length; i++) {
+                if (requiredColumnIndices[i] == primaryKeyIndex) {
+                    fieldGetters.add(
+                            
createFieldGetter(dataTypes.get(primaryKeyIndex).getLogicalType(), i));
+                }
+            }
         }
-        return fieldGetters;
+        return fieldGetters.toArray(new RowData.FieldGetter[0]);
     }
 
     private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema 
resolvedSchema) {
@@ -924,6 +1001,18 @@ public class TestUpdateDeleteTableFactory
         return fieldGetters;
     }
 
+    private static RowData.FieldGetter[] getPartialFieldGetter(
+            ResolvedSchema resolvedSchema, int[] partialColumIndexes) {
+        List<Column> columns = resolvedSchema.getColumns();
+        RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[partialColumIndexes.length];
+        for (int i = 0; i < fieldGetters.length; i++) {
+            fieldGetters[i] =
+                    createFieldGetter(
+                            
columns.get(partialColumIndexes[i]).getDataType().getLogicalType(), i);
+        }
+        return fieldGetters;
+    }
+
     private static boolean equal(
             RowData value1, RowData value2, RowData.FieldGetter[] 
fieldGetters) {
         for (RowData.FieldGetter fieldGetter : fieldGetters) {
@@ -935,7 +1024,7 @@ public class TestUpdateDeleteTableFactory
         return true;
     }
 
-    private static RowData copyRowData(RowData rowData, RowData.FieldGetter[] 
fieldGetters) {
+    private static GenericRowData copyRowData(RowData rowData, 
RowData.FieldGetter[] fieldGetters) {
         Object[] values = new Object[fieldGetters.length];
         for (int i = 0; i < fieldGetters.length; i++) {
             values[i] = fieldGetters[i].getFieldOrNull(rowData);
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
index b918b7af3a0..21e2907004a 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
@@ -95,7 +95,8 @@ public class DeleteTableITCase extends BatchTestBase {
         String dataId = registerData();
         tEnv().executeSql(
                         String.format(
-                                "CREATE TABLE t (a int, b string, c double) 
WITH"
+                                "CREATE TABLE t (a int PRIMARY KEY NOT 
ENFORCED,"
+                                        + " b string, c double) WITH"
                                         + " ('connector' = 
'test-update-delete',"
                                         + " 'data-id' = '%s',"
                                         + " 'delete-mode' = '%s',"
@@ -111,13 +112,55 @@ public class DeleteTableITCase extends BatchTestBase {
         assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0]]");
     }
 
+    @Test
+    public void testRowLevelDeleteWithPartitionColumn() throws Exception {
+        String dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t"
+                                        + " (a int PRIMARY KEY NOT ENFORCED,"
+                                        + " b string not null,"
+                                        + " c double not null) WITH"
+                                        + " ('connector' = 
'test-update-delete',"
+                                        + " 'data-id' = '%s',"
+                                        + " 'delete-mode' = '%s',"
+                                        + " 'required-columns-for-delete' = 
'a;c',"
+                                        + " 'support-delete-push-down' = 
'false'"
+                                        + ")",
+                                dataId, deleteMode));
+        tEnv().executeSql("DELETE FROM t WHERE a > 1").await();
+        List<Row> rows = toRows(tEnv().executeSql("SELECT * FROM t"));
+        assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 
2.0]]");
+
+        // test delete with requiring partial primary keys
+        dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t1"
+                                        + " (a int,"
+                                        + " b string not null,"
+                                        + " c double not null,"
+                                        + " PRIMARY KEY (a, c) NOT ENFORCED) 
WITH"
+                                        + " ('connector' = 
'test-update-delete',"
+                                        + " 'data-id' = '%s',"
+                                        + " 'delete-mode' = '%s',"
+                                        + " 'required-columns-for-delete' = 
'a;b',"
+                                        + " 'support-delete-push-down' = 
'false'"
+                                        + ")",
+                                dataId, deleteMode));
+        tEnv().executeSql("DELETE FROM t1 WHERE a > 1").await();
+        rows = toRows(tEnv().executeSql("SELECT * FROM t1"));
+        assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 
2.0]]");
+    }
+
     @Test
     public void testMixDelete() throws Exception {
         // test mix delete push down and row-level delete
         String dataId = registerData();
         tEnv().executeSql(
                         String.format(
-                                "CREATE TABLE t (a int, b string, c double) 
WITH"
+                                "CREATE TABLE t (a int PRIMARY KEY NOT 
ENFORCED,"
+                                        + " b string, c double) WITH"
                                         + " ('connector' = 
'test-update-delete',"
                                         + " 'data-id' = '%s',"
                                         + " 'mix-delete' = 'true')",
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
index 793a67eaa59..b7f55011f21 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/UpdateTableITCase.java
@@ -85,6 +85,46 @@ public class UpdateTableITCase extends BatchTestBase {
                 .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 4.0], +I[2, uab, 
16.0]]");
     }
 
+    @Test
+    public void testPartialUpdate() throws Exception {
+        String dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t ("
+                                        + " a int PRIMARY KEY NOT ENFORCED,"
+                                        + " b string not null,"
+                                        + " c double not null) WITH"
+                                        + " ('connector' = 
'test-update-delete', "
+                                        + "'data-id' = '%s',"
+                                        + " 'required-columns-for-update' = 
'a;b', "
+                                        + " 'update-mode' = '%s')",
+                                dataId, updateMode));
+        tEnv().executeSql("UPDATE t SET b = 'uaa' WHERE a >= 1").await();
+        List<String> rows = toSortedResults(tEnv().executeSql("SELECT * FROM 
t"));
+        assertThat(rows.toString())
+                .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 
4.0]]");
+
+        // test partial update with requiring partial primary keys
+        dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t1 ("
+                                        + " a int,"
+                                        + " b string not null,"
+                                        + " c double not null,"
+                                        + " PRIMARY KEY (a, c) NOT ENFORCED"
+                                        + ") WITH"
+                                        + " ('connector' = 
'test-update-delete', "
+                                        + "'data-id' = '%s',"
+                                        + " 'required-columns-for-update' = 
'a;b', "
+                                        + " 'update-mode' = '%s')",
+                                dataId, updateMode));
+        tEnv().executeSql("UPDATE t1 SET b = 'uaa' WHERE a >= 1").await();
+        rows = toSortedResults(tEnv().executeSql("SELECT * FROM t1"));
+        assertThat(rows.toString())
+                .isEqualTo("[+I[0, b_0, 0.0], +I[1, uaa, 2.0], +I[2, uaa, 
4.0]]");
+    }
+
     @Test
     public void testStatementSetContainUpdateAndInsert() throws Exception {
         tEnv().executeSql(


Reply via email to