This is an automated email from the ASF dual-hosted git repository.

lincoln pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit b7d32069c0971fba1c33950aa373624bb6018505
Author: luoyuxia <[email protected]>
AuthorDate: Sun Jan 15 20:14:33 2023 +0800

    [FLINK-30662][table] Planner supports row-level delete
    
    This closes #21676
---
 .../table/api/internal/TableEnvironmentImpl.java   |   7 +-
 .../flink/table/catalog/ContextResolvedTable.java  |   9 +
 .../table/planner/connectors/DynamicSinkUtils.java | 379 ++++++++++-
 .../planner/connectors/DynamicSourceUtils.java     |  26 +-
 .../operations/SqlToOperationConverter.java        |  21 +-
 .../plan/abilities/sink/RowLevelDeleteSpec.java    |  97 +++
 .../plan/abilities/sink/SinkAbilitySpec.java       |   3 +-
 .../plan/nodes/exec/common/CommonExecSink.java     |  44 ++
 .../utils/RowLevelModificationContextUtils.java    |  57 ++
 .../table/planner/delegation/PlannerBase.scala     |  14 +
 .../planner/plan/schema/TableSourceTable.scala     |  30 +
 .../factories/TestUpdateDeleteTableFactory.java    | 384 ++++++++++-
 .../operations/SqlToOperationConverterTest.java    |  24 +-
 .../planner/plan/batch/sql/RowLevelDeleteTest.java | 129 ++++
 .../runtime/batch/sql/DeleteTableITCase.java       | 132 +++-
 .../runtime/stream/sql/DeleteTableITCase.java      |   7 +
 .../planner/plan/batch/sql/RowLevelDeleteTest.xml  | 714 +++++++++++++++++++++
 .../runtime/operators/sink/RowKindSetter.java      |  46 ++
 .../runtime/operators/sink/RowKindSetterTest.java  |  70 ++
 19 files changed, 2129 insertions(+), 64 deletions(-)

diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
index 3578e1845cb..536db5c430a 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/api/internal/TableEnvironmentImpl.java
@@ -721,7 +721,8 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
         } else {
             if (operations.size() > 1
                     && 
operations.stream().anyMatch(this::isRowLevelModification)) {
-                throw new TableException("Only single UPDATE/DELETE statement 
is supported.");
+                throw new TableException(
+                        "Unsupported SQL query! Only accept a single SQL 
statement of type DELETE, UPDATE.");
             }
             return planner.explain(operations, format, extraDetails);
         }
@@ -784,6 +785,7 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
 
         if (operations.size() != 1
                 || !(operations.get(0) instanceof ModifyOperation)
+                || isRowLevelModification(operations.get(0))
                 || operations.get(0) instanceof CreateTableASOperation) {
             throw new 
TableException(UNSUPPORTED_QUERY_IN_COMPILE_PLAN_SQL_MSG);
         }
@@ -859,7 +861,8 @@ public class TableEnvironmentImpl implements 
TableEnvironmentInternal {
                     if (operations.size() > 1) {
                         throw new TableException(
                                 String.format(
-                                        "Only single %s statement is 
supported.", modifyType));
+                                        "Unsupported SQL query! Only accept a 
single SQL statement of type %s.",
+                                        modifyType));
                     }
                     if (isStreamingMode) {
                         throw new TableException(
diff --git 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
index 52c84cfcccf..e7b9e5f0835 100644
--- 
a/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
+++ 
b/flink-table/flink-table-api-java/src/main/java/org/apache/flink/table/catalog/ContextResolvedTable.java
@@ -157,6 +157,15 @@ public final class ContextResolvedTable {
                 false);
     }
 
+    /** Copy the {@link ContextResolvedTable}, replacing the underlying {@link 
ResolvedSchema}. */
+    public ContextResolvedTable copy(ResolvedSchema newSchema) {
+        return new ContextResolvedTable(
+                objectIdentifier,
+                catalog,
+                new ResolvedCatalogTable((CatalogTable) 
resolvedTable.getOrigin(), newSchema),
+                false);
+    }
+
     /**
      * This method tries to return the connector name of the table, trying to 
provide a bit more
      * helpful toString for anonymous tables. It's only to help users to 
debug, and its return value
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
index b810e71c615..69574fabd74 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSinkUtils.java
@@ -19,9 +19,11 @@
 package org.apache.flink.table.planner.connectors;
 
 import org.apache.flink.annotation.Internal;
+import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ReadableConfig;
 import org.apache.flink.streaming.api.datastream.DataStream;
 import 
org.apache.flink.streaming.api.operators.collect.CollectSinkOperatorFactory;
+import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.api.Schema;
 import org.apache.flink.table.api.TableException;
 import org.apache.flink.table.api.TableResult;
@@ -35,19 +37,28 @@ import org.apache.flink.table.catalog.DataTypeFactory;
 import org.apache.flink.table.catalog.ExternalCatalogTable;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
+import org.apache.flink.table.catalog.UnresolvedIdentifier;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsOverwrite;
 import org.apache.flink.table.connector.sink.abilities.SupportsPartitioning;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.connector.sink.abilities.SupportsWritingMetadata;
+import org.apache.flink.table.connector.source.DynamicTableSource;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
 import org.apache.flink.table.operations.CollectModifyOperation;
 import org.apache.flink.table.operations.ExternalModifyOperation;
 import org.apache.flink.table.operations.SinkModifyOperation;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import org.apache.flink.table.planner.calcite.FlinkTypeFactory;
+import org.apache.flink.table.planner.functions.sql.FlinkSqlOperatorTable;
 import org.apache.flink.table.planner.plan.abilities.sink.OverwriteSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
 import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 import org.apache.flink.table.planner.plan.abilities.sink.WritingMetadataSpec;
 import org.apache.flink.table.planner.plan.nodes.calcite.LogicalSink;
+import org.apache.flink.table.planner.plan.schema.TableSourceTable;
+import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils;
 import org.apache.flink.table.types.DataType;
 import org.apache.flink.table.types.inference.TypeTransformations;
 import org.apache.flink.table.types.logical.LogicalType;
@@ -58,10 +69,20 @@ import org.apache.flink.table.types.utils.TypeConversions;
 
 import org.apache.calcite.plan.RelOptUtil;
 import org.apache.calcite.rel.RelNode;
+import org.apache.calcite.rel.core.Project;
+import org.apache.calcite.rel.core.TableModify;
 import org.apache.calcite.rel.hint.RelHint;
+import org.apache.calcite.rel.logical.LogicalFilter;
+import org.apache.calcite.rel.logical.LogicalTableModify;
+import org.apache.calcite.rel.logical.LogicalTableScan;
 import org.apache.calcite.rel.type.RelDataType;
+import org.apache.calcite.rel.type.RelDataTypeField;
+import org.apache.calcite.rel.type.RelDataTypeFieldImpl;
+import org.apache.calcite.rel.type.RelRecordType;
+import org.apache.calcite.rel.type.StructKind;
 import org.apache.calcite.rex.RexBuilder;
 import org.apache.calcite.rex.RexNode;
+import org.apache.calcite.rex.RexUtil;
 
 import java.time.ZoneId;
 import java.util.ArrayList;
@@ -69,6 +90,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
@@ -193,6 +215,12 @@ public final class DynamicSinkUtils {
 
         List<SinkAbilitySpec> sinkAbilitySpecs = new ArrayList<>();
 
+        boolean isDelete = false;
+        if (input instanceof LogicalTableModify) {
+            LogicalTableModify tableModify = (LogicalTableModify) input;
+            isDelete = tableModify.getOperation() == 
TableModify.Operation.DELETE;
+        }
+
         // 1. prepare table sink
         prepareDynamicSink(
                 tableDebugName,
@@ -201,12 +229,31 @@ public final class DynamicSinkUtils {
                 sink,
                 contextResolvedTable.getResolvedTable(),
                 sinkAbilitySpecs);
+
+        // rewrite rel node for delete
+        if (isDelete) {
+            input =
+                    convertDelete(
+                            (LogicalTableModify) input,
+                            sink,
+                            contextResolvedTable,
+                            tableDebugName,
+                            dataTypeFactory,
+                            typeFactory,
+                            sinkAbilitySpecs);
+        }
+
         sinkAbilitySpecs.forEach(spec -> spec.apply(sink));
 
         // 2. validate the query schema to the sink's table schema and apply 
cast if possible
-        final RelNode query =
-                validateSchemaAndApplyImplicitCast(
-                        input, schema, tableDebugName, dataTypeFactory, 
typeFactory);
+        RelNode query = input;
+        // skip validate and implicit cast when it's delete as it been done 
before
+        if (!isDelete) {
+            query =
+                    validateSchemaAndApplyImplicitCast(
+                            input, schema, tableDebugName, dataTypeFactory, 
typeFactory);
+        }
+
         relBuilder.push(query);
 
         // 3. convert the sink's table schema to the consumed data type of the 
sink
@@ -230,26 +277,48 @@ public final class DynamicSinkUtils {
                 sinkAbilitySpecs.toArray(new SinkAbilitySpec[0]));
     }
 
-    /**
-     * Checks if the given query can be written into the given sink's table 
schema.
-     *
-     * <p>It checks whether field types are compatible (types should be equal 
including precisions).
-     * If types are not compatible, but can be implicitly cast, a cast 
projection will be applied.
-     * Otherwise, an exception will be thrown.
-     */
+    /** Checks if the given query can be written into the given sink's table 
schema. */
     public static RelNode validateSchemaAndApplyImplicitCast(
             RelNode query,
             ResolvedSchema sinkSchema,
             String tableDebugName,
             DataTypeFactory dataTypeFactory,
             FlinkTypeFactory typeFactory) {
-        final RowType queryType = 
FlinkTypeFactory.toLogicalRowType(query.getRowType());
-        final List<RowField> queryFields = queryType.getFields();
-
         final RowType sinkType =
                 (RowType)
                         fixSinkDataType(dataTypeFactory, 
sinkSchema.toSinkRowDataType())
                                 .getLogicalType();
+
+        return validateSchemaAndApplyImplicitCast(query, sinkType, 
tableDebugName, typeFactory);
+    }
+
+    /** Checks if the given query can be written into the given target types. 
*/
+    public static RelNode validateSchemaAndApplyImplicitCast(
+            RelNode query,
+            List<DataType> targetTypes,
+            String tableDebugName,
+            DataTypeFactory dataTypeFactory,
+            FlinkTypeFactory typeFactory) {
+        final RowType sinkType =
+                (RowType)
+                        fixSinkDataType(
+                                        dataTypeFactory,
+                                        DataTypes.ROW(targetTypes.toArray(new 
DataType[0])))
+                                .getLogicalType();
+        return validateSchemaAndApplyImplicitCast(query, sinkType, 
tableDebugName, typeFactory);
+    }
+
+    /**
+     * Checks if the given query can be written into the given sink type.
+     *
+     * <p>It checks whether field types are compatible (types should be equal 
including precisions).
+     * If types are not compatible, but can be implicitly cast, a cast 
projection will be applied.
+     * Otherwise, an exception will be thrown.
+     */
+    private static RelNode validateSchemaAndApplyImplicitCast(
+            RelNode query, RowType sinkType, String tableDebugName, 
FlinkTypeFactory typeFactory) {
+        final RowType queryType = 
FlinkTypeFactory.toLogicalRowType(query.getRowType());
+        final List<RowField> queryFields = queryType.getFields();
         final List<RowField> sinkFields = sinkType.getFields();
 
         if (queryFields.size() != sinkFields.size()) {
@@ -282,6 +351,290 @@ public final class DynamicSinkUtils {
         return query;
     }
 
+    private static RelNode convertDelete(
+            LogicalTableModify tableModify,
+            DynamicTableSink sink,
+            ContextResolvedTable contextResolvedTable,
+            String tableDebugName,
+            DataTypeFactory dataTypeFactory,
+            FlinkTypeFactory typeFactory,
+            List<SinkAbilitySpec> sinkAbilitySpecs) {
+        if (!(sink instanceof SupportsRowLevelDelete)) {
+            throw new UnsupportedOperationException(
+                    String.format(
+                            "Can't perform delete operation of the table %s 
because the corresponding dynamic table sink has not yet implemented %s.",
+                            tableDebugName, 
SupportsRowLevelDelete.class.getName()));
+        }
+
+        // get the row-level delete info
+        SupportsRowLevelDelete supportsRowLevelDelete = 
(SupportsRowLevelDelete) sink;
+        RowLevelModificationScanContext context = 
RowLevelModificationContextUtils.getScanContext();
+        SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo =
+                supportsRowLevelDelete.applyRowLevelDelete(context);
+        sinkAbilitySpecs.add(
+                new 
RowLevelDeleteSpec(rowLevelDeleteInfo.getRowLevelDeleteMode(), context));
+
+        if (rowLevelDeleteInfo.getRowLevelDeleteMode()
+                == SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) {
+            // convert the LogicalTableModify node to a rel node representing 
row-level delete
+            return convertToRowLevelDelete(
+                    tableModify,
+                    contextResolvedTable,
+                    rowLevelDeleteInfo,
+                    tableDebugName,
+                    dataTypeFactory,
+                    typeFactory);
+        } else if (rowLevelDeleteInfo.getRowLevelDeleteMode()
+                == SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
+            // if it's for remaining row, convert the predicate in where clause
+            // to the negative predicate
+            convertPredicateToNegative(tableModify);
+            // convert the LogicalTableModify node to a rel node representing 
row-level delete
+            return convertToRowLevelDelete(
+                    tableModify,
+                    contextResolvedTable,
+                    rowLevelDeleteInfo,
+                    tableDebugName,
+                    dataTypeFactory,
+                    typeFactory);
+        } else {
+            throw new TableException(
+                    "Unknown delete mode: " + 
rowLevelDeleteInfo.getRowLevelDeleteMode());
+        }
+    }
+
+    /** Convert tableModify node to a rel node representing for row-level 
delete. */
+    private static RelNode convertToRowLevelDelete(
+            LogicalTableModify tableModify,
+            ContextResolvedTable contextResolvedTable,
+            SupportsRowLevelDelete.RowLevelDeleteInfo rowLevelDeleteInfo,
+            String tableDebugName,
+            DataTypeFactory dataTypeFactory,
+            FlinkTypeFactory typeFactory) {
+        // get the required columns
+        ResolvedSchema resolvedSchema = 
contextResolvedTable.getResolvedSchema();
+        Optional<List<Column>> optionalColumns = 
rowLevelDeleteInfo.requiredColumns();
+        List<Column> requiredColumns = 
optionalColumns.orElse(resolvedSchema.getColumns());
+        // get the root table scan which we may need rewrite it
+        LogicalTableScan tableScan = getSourceTableScan(tableModify);
+        // get the index for the required columns and extra meta cols if 
necessary
+        Tuple2<List<Integer>, List<MetadataColumn>> colsIndexAndExtraMetaCols =
+                getRequireColumnsIndexAndExtraMetaCols(tableScan, 
requiredColumns, resolvedSchema);
+        List<Integer> colIndexes = colsIndexAndExtraMetaCols.f0;
+        List<MetadataColumn> metadataColumns = colsIndexAndExtraMetaCols.f1;
+        // if meta columns size is greater than 0, we need to modify the 
underlying
+        // LogicalTableScan to make it can read meta column
+        if (metadataColumns.size() > 0) {
+            resolvedSchema =
+                    addExtraMetaCols(
+                            tableModify, tableScan, tableDebugName, 
metadataColumns, typeFactory);
+        }
+        // create a project only select the required columns for delete
+        return projectColumnsForDelete(
+                tableModify,
+                resolvedSchema,
+                colIndexes,
+                tableDebugName,
+                dataTypeFactory,
+                typeFactory);
+    }
+
+    /** Convert the predicate in WHERE clause to the negative predicate. */
+    private static void convertPredicateToNegative(LogicalTableModify 
tableModify) {
+        RexBuilder rexBuilder = tableModify.getCluster().getRexBuilder();
+        RelNode input = tableModify.getInput();
+        LogicalFilter newFilter;
+        // if the input is a table scan, there's no predicate which means it's 
always true
+        // the negative predicate should be false
+        if (input.getInput(0) instanceof LogicalTableScan) {
+            newFilter = LogicalFilter.create(input.getInput(0), 
rexBuilder.makeLiteral(false));
+        } else {
+            LogicalFilter filter = (LogicalFilter) input.getInput(0);
+            // create a filter with negative predicate
+            RexNode complementFilter =
+                    rexBuilder.makeCall(
+                            filter.getCondition().getType(),
+                            FlinkSqlOperatorTable.NOT,
+                            Collections.singletonList(filter.getCondition()));
+            newFilter = filter.copy(filter.getTraitSet(), filter.getInput(), 
complementFilter);
+        }
+        // replace with the new filter
+        input.replaceInput(0, newFilter);
+    }
+
+    /** Get the index for the required columns and extra meta cols if 
necessary. */
+    private static Tuple2<List<Integer>, List<MetadataColumn>>
+            getRequireColumnsIndexAndExtraMetaCols(
+                    LogicalTableScan tableScan,
+                    List<Column> requiredColumns,
+                    ResolvedSchema resolvedSchema) {
+        // index list for the required columns
+        List<Integer> columnIndexList = new ArrayList<>();
+        // extra meta cols
+        List<MetadataColumn> extraMetadataColumns = new ArrayList<>();
+        List<String> fieldNames = resolvedSchema.getColumnNames();
+        final TableSourceTable sourceTable = 
tableScan.getTable().unwrap(TableSourceTable.class);
+        DynamicTableSource dynamicTableSource = sourceTable.tableSource();
+        int additionCols = 0;
+        // iterate for each required column
+        for (Column column : requiredColumns) {
+            int index = fieldNames.indexOf(column.getName());
+            // if we can't find the column, we may need to add extra column
+            if (index <= -1) {
+                // we only consider add metadata column
+                if (column instanceof Column.MetadataColumn) {
+                    // need to add meta column
+                    columnIndexList.add(fieldNames.size() + additionCols);
+                    if (!(dynamicTableSource instanceof 
SupportsReadingMetadata)) {
+                        throw new UnsupportedOperationException(
+                                String.format(
+                                        "The table source don't support 
reading metadata, but the require columns contains the meta columns: %s.",
+                                        column));
+                    }
+                    // list what metas the source supports to read
+                    SupportsReadingMetadata supportsReadingMetadata =
+                            (SupportsReadingMetadata) dynamicTableSource;
+                    Map<String, DataType> readableMetadata =
+                            supportsReadingMetadata.listReadableMetadata();
+                    // check the source can read the meta column
+                    String metaCol =
+                            ((MetadataColumn) 
column).getMetadataKey().orElse(column.getName());
+                    if (!readableMetadata.containsKey(metaCol)) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "Expect to read the meta column %s, 
but the table source for table %s doesn't support read the metadata column."
+                                                + "Please make sure the 
readable metadata for the source contains %s.",
+                                        column,
+                                        UnresolvedIdentifier.of(
+                                                
tableScan.getTable().getQualifiedName()),
+                                        metaCol));
+                    }
+                    // mark it as extra col
+                    additionCols += 1;
+                    DataType dataType = readableMetadata.get(metaCol);
+                    if (!dataType.equals(column.getDataType())) {
+                        throw new IllegalArgumentException(
+                                String.format(
+                                        "Un-matched data type: the required 
column %s has datatype %s, but the data type in readable metadata for the table 
%s has data type %s. ",
+                                        column,
+                                        column.getDataType(),
+                                        UnresolvedIdentifier.of(
+                                                
tableScan.getTable().getQualifiedName()),
+                                        dataType));
+                    }
+                    extraMetadataColumns.add((MetadataColumn) column);
+                } else {
+                    throw new IllegalArgumentException("Unknown required 
column " + column);
+                }
+            } else {
+                columnIndexList.add(index);
+            }
+        }
+        return Tuple2.of(columnIndexList, extraMetadataColumns);
+    }
+
+    private static LogicalTableScan getSourceTableScan(RelNode relNode) {
+        while (!(relNode instanceof LogicalTableScan)) {
+            relNode = relNode.getInput(0);
+        }
+        return (LogicalTableScan) relNode;
+    }
+
+    /**
+     * Add extra meta columns for underlying table scan, return a new resolve 
schema after adding
+     * extra meta columns.
+     */
+    private static ResolvedSchema addExtraMetaCols(
+            LogicalTableModify tableModify,
+            LogicalTableScan tableScan,
+            String tableDebugName,
+            List<MetadataColumn> metadataColumns,
+            FlinkTypeFactory typeFactory) {
+        final TableSourceTable sourceTable = 
tableScan.getTable().unwrap(TableSourceTable.class);
+        DynamicTableSource dynamicTableSource = sourceTable.tableSource();
+        // get old schema and new schema after add some cols
+        ResolvedSchema oldSchema = 
sourceTable.contextResolvedTable().getResolvedSchema();
+        List<Column> newColumns = new ArrayList<>(oldSchema.getColumns());
+        newColumns.addAll(metadataColumns);
+        // get the new resolved schema after adding extra meta columns
+        ResolvedSchema resolvedSchema = ResolvedSchema.of(newColumns);
+
+        List<RelDataTypeField> oldFields = 
sourceTable.getRowType().getFieldList();
+        List<RelDataTypeField> newFields = new 
ArrayList<>(sourceTable.getRowType().getFieldList());
+        for (int i = 0; i < metadataColumns.size(); i++) {
+            MetadataColumn column = metadataColumns.get(i);
+            // add a new field
+            newFields.add(
+                    new RelDataTypeFieldImpl(
+                            column.getName(),
+                            oldFields.size() + i,
+                            typeFactory.createFieldTypeFromLogicalType(
+                                    column.getDataType().getLogicalType())));
+        }
+        // create a copy for TableSourceTable with new resolved schema
+        TableSourceTable newTableSourceTab =
+                sourceTable.copy(
+                        dynamicTableSource,
+                        
sourceTable.contextResolvedTable().copy(resolvedSchema),
+                        new RelRecordType(StructKind.FULLY_QUALIFIED, 
newFields, false),
+                        sourceTable.abilitySpecs());
+
+        // create a copy for table scan with new TableSourceTable
+        LogicalTableScan newTableScan =
+                new LogicalTableScan(
+                        tableScan.getCluster(),
+                        tableScan.getTraitSet(),
+                        tableScan.getHints(),
+                        newTableSourceTab);
+        Project project = (Project) tableModify.getInput();
+        // replace with the new table scan
+        if (project.getInput() instanceof LogicalFilter) {
+            LogicalFilter logicalFilter = (LogicalFilter) project.getInput();
+            project.replaceInput(
+                    0,
+                    logicalFilter.copy(
+                            logicalFilter.getTraitSet(),
+                            newTableScan,
+                            logicalFilter.getCondition()));
+        } else {
+            project.replaceInput(0, newTableScan);
+        }
+        // validate and apply metadata
+        DynamicSourceUtils.validateAndApplyMetadata(
+                tableDebugName, resolvedSchema, 
newTableSourceTab.tableSource());
+        return resolvedSchema;
+    }
+
+    private static RelNode projectColumnsForDelete(
+            LogicalTableModify tableModify,
+            ResolvedSchema resolvedSchema,
+            List<Integer> colIndexes,
+            String tableDebugName,
+            DataTypeFactory dataTypeFactory,
+            FlinkTypeFactory typeFactory) {
+        // now we know which columns we may need
+        List<RexNode> newRexNodeList = new ArrayList<>();
+        List<String> newFieldNames = new ArrayList<>();
+        List<DataType> deleteTargetDataTypes = new ArrayList<>();
+        Project project = (Project) (tableModify.getInput());
+        RexBuilder rexBuilder = tableModify.getCluster().getRexBuilder();
+        // iterate each index for the column, create an input ref node for it.
+        for (int index : colIndexes) {
+            newRexNodeList.add(rexBuilder.makeInputRef(project.getInput(), 
index));
+            newFieldNames.add(resolvedSchema.getColumnNames().get(index));
+            
deleteTargetDataTypes.add(resolvedSchema.getColumnDataTypes().get(index));
+        }
+        // a project to only get specific columns
+        project =
+                project.copy(
+                        project.getTraitSet(),
+                        project.getInput(),
+                        newRexNodeList,
+                        RexUtil.createStructType(typeFactory, newRexNodeList, 
newFieldNames, null));
+        return validateSchemaAndApplyImplicitCast(
+                project, deleteTargetDataTypes, tableDebugName, 
dataTypeFactory, typeFactory);
+    }
+
     // 
--------------------------------------------------------------------------------------------
 
     /** Temporary solution until we drop legacy types. */
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
index 989a20c6670..db5b4090877 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/connectors/DynamicSourceUtils.java
@@ -33,15 +33,18 @@ import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.catalog.WatermarkSpec;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import 
org.apache.flink.table.connector.source.ScanTableSource.ScanRuntimeProvider;
 import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
 import org.apache.flink.table.planner.calcite.FlinkRelBuilder;
 import 
org.apache.flink.table.planner.expressions.converter.ExpressionConverter;
 import org.apache.flink.table.planner.plan.abilities.source.SourceAbilitySpec;
 import org.apache.flink.table.planner.plan.schema.TableSourceTable;
 import org.apache.flink.table.planner.plan.stats.FlinkStatistic;
+import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils;
 import org.apache.flink.table.planner.utils.ShortcutUtils;
 import 
org.apache.flink.table.runtime.connector.source.ScanRuntimeProviderContext;
 import org.apache.flink.table.types.DataType;
@@ -157,6 +160,7 @@ public final class DynamicSourceUtils {
         if (source instanceof ScanTableSource) {
             validateScanSource(
                     tableDebugName, schema, (ScanTableSource) source, 
isBatchMode, config);
+            prepareRowLevelModificationScan(source);
         }
 
         // lookup table source is validated in LookupJoin node
@@ -378,7 +382,7 @@ public final class DynamicSourceUtils {
                 .collect(Collectors.toList());
     }
 
-    private static void validateAndApplyMetadata(
+    public static void validateAndApplyMetadata(
             String tableDebugName, ResolvedSchema schema, DynamicTableSource 
source) {
         final List<MetadataColumn> metadataColumns = 
extractMetadataColumns(schema);
 
@@ -555,6 +559,26 @@ public final class DynamicSourceUtils {
         }
     }
 
+    private static void prepareRowLevelModificationScan(DynamicTableSource 
dynamicTableSource) {
+        // if the modification type has been set and the dynamic source 
supports row-level
+        // modification scan
+        if (RowLevelModificationContextUtils.getModificationType() != null
+                && dynamicTableSource instanceof 
SupportsRowLevelModificationScan) {
+            SupportsRowLevelModificationScan modificationScan =
+                    (SupportsRowLevelModificationScan) dynamicTableSource;
+            // get the previous scan context
+            RowLevelModificationScanContext scanContext =
+                    RowLevelModificationContextUtils.getScanContext();
+            // pass the previous scan context to current table soruce and
+            // get a new scan context
+            RowLevelModificationScanContext newScanContext =
+                    modificationScan.applyRowLevelModificationScan(
+                            
RowLevelModificationContextUtils.getModificationType(), scanContext);
+            // set the scan context
+            RowLevelModificationContextUtils.setScanContext(newScanContext);
+        }
+    }
+
     private DynamicSourceUtils() {
         // no instantiation
     }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
index 1851cce8c11..d07f45a26a6 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/operations/SqlToOperationConverter.java
@@ -120,6 +120,7 @@ import 
org.apache.flink.table.catalog.exceptions.TableNotExistException;
 import org.apache.flink.table.catalog.exceptions.TableNotPartitionedException;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.Expression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
@@ -194,6 +195,7 @@ import 
org.apache.flink.table.planner.calcite.FlinkPlannerImpl;
 import org.apache.flink.table.planner.hint.FlinkHints;
 import org.apache.flink.table.planner.utils.Expander;
 import org.apache.flink.table.planner.utils.OperationConverterUtils;
+import org.apache.flink.table.planner.utils.RowLevelModificationContextUtils;
 import org.apache.flink.table.resource.ResourceType;
 import org.apache.flink.table.resource.ResourceUri;
 import org.apache.flink.table.types.DataType;
@@ -285,6 +287,7 @@ public class SqlToOperationConverter {
     /** Convert a validated sql node to Operation. */
     private static Optional<Operation> convertValidatedSqlNode(
             FlinkPlannerImpl flinkPlanner, CatalogManager catalogManager, 
SqlNode validated) {
+        beforeConversion();
         SqlToOperationConverter converter =
                 new SqlToOperationConverter(flinkPlanner, catalogManager);
         if (validated instanceof SqlCreateCatalog) {
@@ -417,6 +420,11 @@ public class SqlToOperationConverter {
 
     // ~ Tools 
------------------------------------------------------------------
 
+    private static void beforeConversion() {
+        // clear row-level modification context
+        RowLevelModificationContextUtils.clearContext();
+    }
+
     /** Convert DROP TABLE statement. */
     private Operation convertDropTable(SqlDropTable sqlDropTable) {
         UnresolvedIdentifier unresolvedIdentifier =
@@ -1501,6 +1509,9 @@ public class SqlToOperationConverter {
     }
 
     private Operation convertDelete(SqlDelete sqlDelete) {
+        // set it's delete
+        RowLevelModificationContextUtils.setModificationType(
+                
SupportsRowLevelModificationScan.RowLevelModificationType.DELETE);
         RelRoot updateRelational = flinkPlanner.rel(sqlDelete);
         LogicalTableModify tableModify = (LogicalTableModify) 
updateRelational.rel;
         UnresolvedIdentifier unresolvedTableIdentifier =
@@ -1528,12 +1539,10 @@ public class SqlToOperationConverter {
                 }
             }
         }
-
-        // otherwise, delete push down is not applicable, throw unsupported 
exception
-        throw new UnsupportedOperationException(
-                String.format(
-                        "Only delete push down is supported currently, but the 
delete statement can't pushed to table sink %s.",
-                        unresolvedTableIdentifier.asSummaryString()));
+        // delete push down is not applicable, use row-level delete
+        PlannerQueryOperation queryOperation = new 
PlannerQueryOperation(tableModify);
+        return new SinkModifyOperation(
+                contextResolvedTable, queryOperation, 
SinkModifyOperation.ModifyType.DELETE);
     }
 
     private void validateTableConstraint(SqlTableConstraint constraint) {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
new file mode 100644
index 00000000000..04d9ca5bf12
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/RowLevelDeleteSpec.java
@@ -0,0 +1,97 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.abilities.sink;
+
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.util.Preconditions;
+
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonCreator;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonIgnore;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonProperty;
+import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTypeName;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+
+import javax.annotation.Nonnull;
+import javax.annotation.Nullable;
+
+import java.util.Objects;
+
+/**
+ * A sub-class of {@link SinkAbilitySpec} that can not only 
serialize/deserialize the row-level
+ * delete mode to/from JSON, but also can delete existing data for {@link
+ * org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete}.
+ */
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonTypeName("RowLevelDelete")
+public class RowLevelDeleteSpec implements SinkAbilitySpec {
+    public static final String FIELD_NAME_ROW_LEVEL_DELETE_MODE = 
"rowLevelDeleteMode";
+
+    @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE)
+    @Nonnull
+    private final SupportsRowLevelDelete.RowLevelDeleteMode rowLevelDeleteMode;
+
+    @JsonIgnore @Nullable private final RowLevelModificationScanContext 
scanContext;
+
+    @JsonCreator
+    public RowLevelDeleteSpec(
+            @JsonProperty(FIELD_NAME_ROW_LEVEL_DELETE_MODE) @Nonnull
+                    SupportsRowLevelDelete.RowLevelDeleteMode 
rowLevelDeleteMode,
+            @Nullable RowLevelModificationScanContext scanContext) {
+        this.rowLevelDeleteMode = 
Preconditions.checkNotNull(rowLevelDeleteMode);
+        this.scanContext = scanContext;
+    }
+
+    @Override
+    public void apply(DynamicTableSink tableSink) {
+        if (tableSink instanceof SupportsRowLevelDelete) {
+            ((SupportsRowLevelDelete) 
tableSink).applyRowLevelDelete(scanContext);
+        } else {
+            throw new TableException(
+                    String.format(
+                            "%s does not support SupportsRowLevelDelete.",
+                            tableSink.getClass().getName()));
+        }
+    }
+
+    public SupportsRowLevelDelete.RowLevelDeleteMode getRowLevelDeleteMode() {
+        return rowLevelDeleteMode;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        RowLevelDeleteSpec that = (RowLevelDeleteSpec) o;
+        return rowLevelDeleteMode == that.rowLevelDeleteMode
+                && Objects.equals(scanContext, that.scanContext);
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(rowLevelDeleteMode, scanContext);
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
index 25d2d78eda1..fcfb6078433 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/abilities/sink/SinkAbilitySpec.java
@@ -34,7 +34,8 @@ import 
org.apache.flink.shaded.jackson2.com.fasterxml.jackson.annotation.JsonTyp
 @JsonSubTypes({
     @JsonSubTypes.Type(value = OverwriteSpec.class),
     @JsonSubTypes.Type(value = PartitioningSpec.class),
-    @JsonSubTypes.Type(value = WritingMetadataSpec.class)
+    @JsonSubTypes.Type(value = WritingMetadataSpec.class),
+    @JsonSubTypes.Type(value = RowLevelDeleteSpec.class)
 })
 @Internal
 public interface SinkAbilitySpec {
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
index 7f978565060..e97791a9b1f 100644
--- 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/plan/nodes/exec/common/CommonExecSink.java
@@ -47,9 +47,12 @@ import 
org.apache.flink.table.connector.sink.OutputFormatProvider;
 import org.apache.flink.table.connector.sink.SinkFunctionProvider;
 import org.apache.flink.table.connector.sink.SinkProvider;
 import org.apache.flink.table.connector.sink.SinkV2Provider;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.planner.codegen.EqualiserCodeGenerator;
 import org.apache.flink.table.planner.connectors.TransformationSinkProvider;
+import org.apache.flink.table.planner.plan.abilities.sink.RowLevelDeleteSpec;
+import org.apache.flink.table.planner.plan.abilities.sink.SinkAbilitySpec;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNode;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeBase;
 import org.apache.flink.table.planner.plan.nodes.exec.ExecNodeConfig;
@@ -66,6 +69,7 @@ import 
org.apache.flink.table.runtime.connector.sink.SinkRuntimeProviderContext;
 import org.apache.flink.table.runtime.generated.GeneratedRecordEqualiser;
 import org.apache.flink.table.runtime.keyselector.RowDataKeySelector;
 import org.apache.flink.table.runtime.operators.sink.ConstraintEnforcer;
+import org.apache.flink.table.runtime.operators.sink.RowKindSetter;
 import org.apache.flink.table.runtime.operators.sink.SinkOperator;
 import org.apache.flink.table.runtime.operators.sink.SinkUpsertMaterializer;
 import 
org.apache.flink.table.runtime.operators.sink.StreamRecordTimestampInserter;
@@ -99,6 +103,7 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
     public static final String PARTITIONER_TRANSFORMATION = "partitioner";
     public static final String UPSERT_MATERIALIZE_TRANSFORMATION = 
"upsert-materialize";
     public static final String TIMESTAMP_INSERTER_TRANSFORMATION = 
"timestamp-inserter";
+    public static final String ROW_KIND_SETTER = "row-kind-setter";
     public static final String SINK_TRANSFORMATION = "sink";
 
     public static final String FIELD_NAME_DYNAMIC_TABLE_SINK = 
"dynamicTableSink";
@@ -199,6 +204,11 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
                             inputUpsertKey);
         }
 
+        Optional<RowKind> targetRowKind = getTargetRowKind();
+        if (targetRowKind.isPresent()) {
+            sinkTransform = applyRowKindSetter(sinkTransform, 
targetRowKind.get(), config);
+        }
+
         return (Transformation<Object>)
                 applySinkProvider(
                         sinkTransform,
@@ -454,6 +464,20 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         return materializeTransform;
     }
 
+    private Transformation<RowData> applyRowKindSetter(
+            Transformation<RowData> inputTransform, RowKind rowKind, 
ExecNodeConfig config) {
+        return ExecNodeUtil.createOneInputTransformation(
+                inputTransform,
+                createTransformationMeta(
+                        ROW_KIND_SETTER,
+                        String.format("RowKindSetter(TargetRowKind=[%s])", 
rowKind),
+                        "RowKindSetter",
+                        config),
+                new RowKindSetter(rowKind),
+                inputTransform.getOutputType(),
+                inputTransform.getParallelism());
+    }
+
     private Transformation<?> applySinkProvider(
             Transformation<RowData> inputTransform,
             StreamExecutionEnvironment env,
@@ -618,4 +642,24 @@ public abstract class CommonExecSink extends 
ExecNodeBase<Object>
         CHAR,
         BINARY
     }
+
+    /**
+     * Get the target row-kind that the row data should change to, assuming 
the current row kind is
+     * RowKind.INSERT. Return Optional.empty() if it doesn't need to change. 
Currently, it'll only
+     * consider row-level delete.
+     */
+    private Optional<RowKind> getTargetRowKind() {
+        if (tableSinkSpec.getSinkAbilities() != null) {
+            for (SinkAbilitySpec sinkAbilitySpec : 
tableSinkSpec.getSinkAbilities()) {
+                if (sinkAbilitySpec instanceof RowLevelDeleteSpec) {
+                    RowLevelDeleteSpec deleteSpec = (RowLevelDeleteSpec) 
sinkAbilitySpec;
+                    if (deleteSpec.getRowLevelDeleteMode()
+                            == 
SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) {
+                        return Optional.of(RowKind.DELETE);
+                    }
+                }
+            }
+        }
+        return Optional.empty();
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/RowLevelModificationContextUtils.java
 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/RowLevelModificationContextUtils.java
new file mode 100644
index 00000000000..71b7baf341e
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/main/java/org/apache/flink/table/planner/utils/RowLevelModificationContextUtils.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.utils;
+
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+
+/**
+ * Utils for putting/getting {@link RowLevelModificationScanContext}/{@link
+ * SupportsRowLevelModificationScan.RowLevelModificationType} in current 
thread, it enables to
+ * put/get them in different methods.
+ */
+public class RowLevelModificationContextUtils {
+    private static final 
ThreadLocal<SupportsRowLevelModificationScan.RowLevelModificationType>
+            modificationTypeThreadLocal = new ThreadLocal<>();
+
+    private static final ThreadLocal<RowLevelModificationScanContext> 
scanContextThreadLocal =
+            new ThreadLocal<>();
+
+    public static void setModificationType(
+            SupportsRowLevelModificationScan.RowLevelModificationType 
rowLevelModificationType) {
+        modificationTypeThreadLocal.set(rowLevelModificationType);
+    }
+
+    public static SupportsRowLevelModificationScan.RowLevelModificationType 
getModificationType() {
+        return modificationTypeThreadLocal.get();
+    }
+
+    public static void setScanContext(RowLevelModificationScanContext 
scanContext) {
+        scanContextThreadLocal.set(scanContext);
+    }
+
+    public static RowLevelModificationScanContext getScanContext() {
+        return scanContextThreadLocal.get();
+    }
+
+    public static void clearContext() {
+        modificationTypeThreadLocal.remove();
+        scanContextThreadLocal.remove();
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
index fab1ee0db84..cf9f302164e 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/delegation/PlannerBase.scala
@@ -237,6 +237,20 @@ abstract class PlannerBase(
           case (table, sink: TableSink[_]) =>
             // Legacy tables can't be anonymous
             val identifier = catalogSink.getContextResolvedTable.getIdentifier
+
+            // check it's not for UPDATE/DELETE because they're not supported 
for Legacy table
+            if (catalogSink.isDelete || catalogSink.isUpdate) {
+              throw new TableException(
+                String.format(
+                  "Can't perform %s operation of the table %s " +
+                    " because the corresponding table sink is the legacy 
TableSink," +
+                    " Please implement %s for it.",
+                  if (catalogSink.isDelete) "delete" else "update",
+                  identifier,
+                  classOf[DynamicTableSink].getName
+                ))
+            }
+
             // check the logical field type and physical field type are 
compatible
             val queryLogicalType = 
FlinkTypeFactory.toLogicalRowType(input.getRowType)
             // validate logical schema and physical schema are compatible
diff --git 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
index c7f8ae95422..c5d1ea41004 100644
--- 
a/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
+++ 
b/flink-table/flink-table-planner/src/main/scala/org/apache/flink/table/planner/plan/schema/TableSourceTable.scala
@@ -115,6 +115,36 @@ class TableSourceTable(
     )
   }
 
+  /**
+   * Creates a copy of this table with specified digest and context resolved 
table
+   *
+   * @param newTableSource
+   *   tableSource to replace
+   * @param newResolveTable
+   *   resolved table to replace
+   * @param newRowType
+   *   new row type
+   * @return
+   *   added TableSourceTable instance with specified digest
+   */
+  def copy(
+      newTableSource: DynamicTableSource,
+      newResolveTable: ContextResolvedTable,
+      newRowType: RelDataType,
+      newAbilitySpecs: Array[SourceAbilitySpec]): TableSourceTable = {
+    new TableSourceTable(
+      relOptSchema,
+      newRowType,
+      statistic,
+      newTableSource,
+      isStreamingMode,
+      newResolveTable,
+      flinkContext,
+      flinkTypeFactory,
+      abilitySpecs ++ newAbilitySpecs
+    )
+  }
+
   /**
    * Creates a copy of this table with specified digest and statistic.
    *
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
index df114031ecf..b7c0ce53d11 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/factories/TestUpdateDeleteTableFactory.java
@@ -21,16 +21,31 @@ package org.apache.flink.table.planner.factories;
 import org.apache.flink.api.java.tuple.Tuple2;
 import org.apache.flink.configuration.ConfigOption;
 import org.apache.flink.configuration.ConfigOptions;
+import org.apache.flink.configuration.Configuration;
+import org.apache.flink.streaming.api.datastream.DataStream;
+import org.apache.flink.streaming.api.datastream.DataStreamSink;
+import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
 import org.apache.flink.streaming.api.functions.source.FromElementsFunction;
 import org.apache.flink.streaming.api.functions.source.SourceFunction;
+import org.apache.flink.table.api.DataTypes;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.catalog.Column;
+import org.apache.flink.table.catalog.ObjectIdentifier;
 import org.apache.flink.table.catalog.ResolvedCatalogTable;
 import org.apache.flink.table.catalog.ResolvedSchema;
 import org.apache.flink.table.connector.ChangelogMode;
+import org.apache.flink.table.connector.ProviderContext;
+import org.apache.flink.table.connector.RowLevelModificationScanContext;
+import org.apache.flink.table.connector.sink.DataStreamSinkProvider;
 import org.apache.flink.table.connector.sink.DynamicTableSink;
 import org.apache.flink.table.connector.sink.abilities.SupportsDeletePushDown;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.connector.source.DynamicTableSource;
 import org.apache.flink.table.connector.source.ScanTableSource;
 import org.apache.flink.table.connector.source.SourceFunctionProvider;
+import 
org.apache.flink.table.connector.source.abilities.SupportsReadingMetadata;
+import 
org.apache.flink.table.connector.source.abilities.SupportsRowLevelModificationScan;
+import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.expressions.CallExpression;
 import org.apache.flink.table.expressions.FieldReferenceExpression;
@@ -41,6 +56,10 @@ import 
org.apache.flink.table.factories.DynamicTableSourceFactory;
 import org.apache.flink.table.factories.FactoryUtil;
 import org.apache.flink.table.functions.BuiltInFunctionDefinitions;
 import org.apache.flink.table.types.DataType;
+import org.apache.flink.types.RowKind;
+import org.apache.flink.util.Preconditions;
+
+import org.jetbrains.annotations.Nullable;
 
 import java.util.ArrayList;
 import java.util.Arrays;
@@ -82,6 +101,34 @@ public class TestUpdateDeleteTableFactory
                     .defaultValue(true)
                     .withDescription("Whether the table supports delete push 
down.");
 
+    private static final ConfigOption<Boolean> MIX_DELETE =
+            ConfigOptions.key("mix-delete")
+                    .booleanType()
+                    .defaultValue(false)
+                    .withDescription(
+                            "Whether the table support both delete push down 
and row-level delete. "
+                                    + "Note: for supporting delete push down, 
only the filter pushed is empty, can the filter be accepted.");
+
+    private static final 
ConfigOption<SupportsRowLevelDelete.RowLevelDeleteMode> DELETE_MODE =
+            ConfigOptions.key("delete-mode")
+                    .enumType(SupportsRowLevelDelete.RowLevelDeleteMode.class)
+                    
.defaultValue(SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS)
+                    .withDescription("The delete mode for row level delete.");
+
+    private static final ConfigOption<List<String>> 
REQUIRED_COLUMNS_FOR_DELETE =
+            ConfigOptions.key("required-columns-for-delete")
+                    .stringType()
+                    .asList()
+                    .noDefaultValue()
+                    .withDescription(
+                            "The columns' name for the required columns in 
row-level delete");
+
+    private static final List<Column.MetadataColumn> META_COLUMNS =
+            Arrays.asList(
+                    Column.metadata("g", DataTypes.STRING(), null, true),
+                    Column.metadata("meta_f1", DataTypes.INT().notNull(), 
null, false),
+                    Column.metadata("meta_f2", DataTypes.STRING().notNull(), 
"meta_k2", false));
+
     private static final AtomicInteger idCounter = new AtomicInteger(0);
     private static final Map<String, Collection<RowData>> registeredRowData = 
new HashMap<>();
 
@@ -97,13 +144,29 @@ public class TestUpdateDeleteTableFactory
         helper.validate();
         String dataId =
                 
helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get()));
-        if (helper.getOptions().get(SUPPORT_DELETE_PUSH_DOWN)) {
-            return new SupportsDeletePushDownSink(
+        SupportsRowLevelDelete.RowLevelDeleteMode deleteMode = 
helper.getOptions().get(DELETE_MODE);
+        List<String> requireCols = 
helper.getOptions().get(REQUIRED_COLUMNS_FOR_DELETE);
+        if (helper.getOptions().get(MIX_DELETE)) {
+            return new SupportsDeleteSink(
+                    context.getObjectIdentifier(),
+                    context.getCatalogTable(),
+                    deleteMode,
                     dataId,
-                    helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE),
-                    context.getCatalogTable());
+                    requireCols);
         } else {
-            return new TestSink();
+            if (helper.getOptions().get(SUPPORT_DELETE_PUSH_DOWN)) {
+                return new SupportsDeletePushDownSink(
+                        dataId,
+                        helper.getOptions().get(ONLY_ACCEPT_EQUAL_PREDICATE),
+                        context.getCatalogTable());
+            } else {
+                return new SupportsRowLevelDeleteSink(
+                        context.getObjectIdentifier(),
+                        context.getCatalogTable(),
+                        deleteMode,
+                        dataId,
+                        requireCols);
+            }
         }
     }
 
@@ -114,7 +177,7 @@ public class TestUpdateDeleteTableFactory
 
         String dataId =
                 
helper.getOptions().getOptional(DATA_ID).orElse(String.valueOf(idCounter.get()));
-        return new TestTableSource(dataId);
+        return new TestTableSource(dataId, context.getObjectIdentifier());
     }
 
     @Override
@@ -130,20 +193,29 @@ public class TestUpdateDeleteTableFactory
     @Override
     public Set<ConfigOption<?>> optionalOptions() {
         return new HashSet<>(
-                Arrays.asList(DATA_ID, ONLY_ACCEPT_EQUAL_PREDICATE, 
SUPPORT_DELETE_PUSH_DOWN));
+                Arrays.asList(
+                        DATA_ID,
+                        ONLY_ACCEPT_EQUAL_PREDICATE,
+                        SUPPORT_DELETE_PUSH_DOWN,
+                        MIX_DELETE,
+                        DELETE_MODE,
+                        REQUIRED_COLUMNS_FOR_DELETE));
     }
 
     /** A test table source which supports reading metadata. */
-    private static class TestTableSource implements ScanTableSource {
+    private static class TestTableSource
+            implements ScanTableSource, SupportsReadingMetadata, 
SupportsRowLevelModificationScan {
         private final String dataId;
+        private final ObjectIdentifier tableIdentifier;
 
-        public TestTableSource(String dataId) {
+        public TestTableSource(String dataId, ObjectIdentifier 
tableIdentifier) {
             this.dataId = dataId;
+            this.tableIdentifier = tableIdentifier;
         }
 
         @Override
         public DynamicTableSource copy() {
-            return new TestTableSource(dataId);
+            return new TestTableSource(dataId, tableIdentifier);
         }
 
         @Override
@@ -175,6 +247,37 @@ public class TestUpdateDeleteTableFactory
                 }
             };
         }
+
+        @Override
+        public Map<String, DataType> listReadableMetadata() {
+            Map<String, DataType> metaData = new HashMap<>();
+            META_COLUMNS.forEach(
+                    column ->
+                            metaData.put(
+                                    
column.getMetadataKey().orElse(column.getName()),
+                                    column.getDataType()));
+            return metaData;
+        }
+
+        @Override
+        public void applyReadableMetadata(List<String> metadataKeys, DataType 
producedDataType) {}
+
+        @Override
+        public RowLevelModificationScanContext applyRowLevelModificationScan(
+                RowLevelModificationType rowLevelModificationType,
+                @Nullable RowLevelModificationScanContext previousContext) {
+            TestScanContext scanContext =
+                    previousContext == null
+                            ? new TestScanContext()
+                            : (TestScanContext) previousContext;
+            scanContext.scanTables.add(tableIdentifier);
+            return scanContext;
+        }
+    }
+
+    /** A test scan context for row-level modification scan. */
+    private static class TestScanContext implements 
RowLevelModificationScanContext {
+        private final Set<ObjectIdentifier> scanTables = new HashSet<>();
     }
 
     /** A common test sink. */
@@ -192,12 +295,188 @@ public class TestUpdateDeleteTableFactory
 
         @Override
         public DynamicTableSink copy() {
-            return null;
+            return new TestSink();
         }
 
         @Override
         public String asSummaryString() {
-            return null;
+            return "Test Sink";
+        }
+    }
+
+    /** A sink that supports delete push down. */
+    private static class SupportsRowLevelDeleteSink extends TestSink
+            implements SupportsRowLevelDelete {
+
+        private final ObjectIdentifier tableIdentifier;
+        private final ResolvedCatalogTable resolvedCatalogTable;
+        private final RowLevelDeleteMode deleteMode;
+        protected final String dataId;
+        private final List<String> requireColumnsForDelete;
+
+        private boolean isDelete;
+
+        public SupportsRowLevelDeleteSink(
+                ObjectIdentifier tableIdentifier,
+                ResolvedCatalogTable resolvedCatalogTable,
+                RowLevelDeleteMode deleteMode,
+                String dataId,
+                List<String> requireColumnsForDelete) {
+            this(
+                    tableIdentifier,
+                    resolvedCatalogTable,
+                    deleteMode,
+                    dataId,
+                    requireColumnsForDelete,
+                    false);
+        }
+
+        public SupportsRowLevelDeleteSink(
+                ObjectIdentifier tableIdentifier,
+                ResolvedCatalogTable resolvedCatalogTable,
+                RowLevelDeleteMode deleteMode,
+                String dataId,
+                List<String> requireColumnsForDelete,
+                boolean isDelete) {
+            this.tableIdentifier = tableIdentifier;
+            this.resolvedCatalogTable = resolvedCatalogTable;
+            this.deleteMode = deleteMode;
+            this.dataId = dataId;
+            this.requireColumnsForDelete = requireColumnsForDelete;
+            this.isDelete = isDelete;
+        }
+
+        @Override
+        public ChangelogMode getChangelogMode(ChangelogMode requestedMode) {
+            return ChangelogMode.all();
+        }
+
+        @Override
+        public SinkRuntimeProvider getSinkRuntimeProvider(Context context) {
+            return new DataStreamSinkProvider() {
+                @Override
+                public DataStreamSink<?> consumeDataStream(
+                        ProviderContext providerContext, DataStream<RowData> 
dataStream) {
+                    return dataStream
+                            .addSink(
+                                    new DeleteDataSinkFunction(
+                                            dataId,
+                                            getAllFieldGetter(
+                                                    
resolvedCatalogTable.getResolvedSchema()),
+                                            deleteMode))
+                            .setParallelism(1);
+                }
+            };
+        }
+
+        @Override
+        public DynamicTableSink copy() {
+            return new SupportsRowLevelDeleteSink(
+                    tableIdentifier,
+                    resolvedCatalogTable,
+                    deleteMode,
+                    dataId,
+                    requireColumnsForDelete,
+                    isDelete);
+        }
+
+        @Override
+        public String asSummaryString() {
+            return "support row-level delete sink";
+        }
+
+        @Override
+        public RowLevelDeleteInfo applyRowLevelDelete(
+                @Nullable RowLevelModificationScanContext context) {
+            // the context should contain the object identifier of the table 
to be written
+            Preconditions.checkArgument(context instanceof TestScanContext);
+            TestScanContext scanContext = (TestScanContext) context;
+            Preconditions.checkArgument(
+                    scanContext.scanTables.contains(tableIdentifier),
+                    String.format(
+                            "The scan context should contains the object 
identifier for table %s in row-level delete.",
+                            tableIdentifier));
+
+            this.isDelete = true;
+            return new RowLevelDeleteInfo() {
+                @Override
+                public Optional<List<Column>> requiredColumns() {
+                    List<Column> requiredCols = null;
+                    if (requireColumnsForDelete != null) {
+                        requiredCols =
+                                getRequiredColumns(
+                                        requireColumnsForDelete,
+                                        
resolvedCatalogTable.getResolvedSchema());
+                    }
+                    return Optional.ofNullable(requiredCols);
+                }
+
+                @Override
+                public RowLevelDeleteMode getRowLevelDeleteMode() {
+                    return deleteMode;
+                }
+            };
+        }
+    }
+
+    /** The sink for delete existing data. */
+    private static class DeleteDataSinkFunction extends 
RichSinkFunction<RowData> {
+        private final String dataId;
+        private final RowData.FieldGetter[] fieldGetters;
+        private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode;
+
+        private transient Collection<RowData> data;
+        private transient List<RowData> newData;
+
+        DeleteDataSinkFunction(
+                String dataId,
+                RowData.FieldGetter[] fieldGetters,
+                SupportsRowLevelDelete.RowLevelDeleteMode deleteMode) {
+            this.dataId = dataId;
+            this.fieldGetters = fieldGetters;
+            this.deleteMode = deleteMode;
+        }
+
+        @Override
+        public void open(Configuration parameters) {
+            data = registeredRowData.get(dataId);
+            newData = new ArrayList<>();
+        }
+
+        @Override
+        public void invoke(RowData value, Context context) {
+            if (deleteMode == 
SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS) {
+                consumeDeletedRows(value);
+            } else if (deleteMode == 
SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
+                consumeRemainingRows(value);
+            } else {
+                throw new TableException(String.format("Unknown delete mode: 
%s.", deleteMode));
+            }
+        }
+
+        private void consumeDeletedRows(RowData deletedRow) {
+            Preconditions.checkState(
+                    deletedRow.getRowKind() == RowKind.DELETE,
+                    String.format(
+                            "The RowKind for the coming rows should be %s in 
delete mode %s.",
+                            RowKind.DELETE, DELETE_MODE));
+            data.removeIf(rowData -> equal(rowData, deletedRow, fieldGetters));
+        }
+
+        private void consumeRemainingRows(RowData remainingRow) {
+            Preconditions.checkState(
+                    remainingRow.getRowKind() == RowKind.INSERT,
+                    String.format(
+                            "The RowKind for the coming rows should be %s in 
delete mode %s.",
+                            RowKind.INSERT, DELETE_MODE));
+            newData.add(copyRowData(remainingRow, fieldGetters));
+        }
+
+        @Override
+        public void finish() {
+            if (deleteMode == 
SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS) {
+                registeredRowData.put(dataId, newData);
+            }
         }
     }
 
@@ -315,6 +594,41 @@ public class TestUpdateDeleteTableFactory
         return true;
     }
 
+    /** A sink that supports both delete push down and row-level delete. */
+    private static class SupportsDeleteSink extends SupportsRowLevelDeleteSink
+            implements SupportsDeletePushDown {
+
+        public SupportsDeleteSink(
+                ObjectIdentifier tableIdentifier,
+                ResolvedCatalogTable resolvedCatalogTable,
+                SupportsRowLevelDelete.RowLevelDeleteMode deleteMode,
+                String dataId,
+                List<String> requireColumnsForDelete) {
+            super(
+                    tableIdentifier,
+                    resolvedCatalogTable,
+                    deleteMode,
+                    dataId,
+                    requireColumnsForDelete);
+        }
+
+        @Override
+        public boolean applyDeleteFilters(List<ResolvedExpression> filters) {
+            // only accept when the filters are empty
+            return filters.isEmpty();
+        }
+
+        @Override
+        public Optional<Long> executeDeletion() {
+            Collection<RowData> oldRows = registeredRowData.get(dataId);
+            if (oldRows != null) {
+                registeredRowData.put(dataId, new ArrayList<>());
+                return Optional.of((long) oldRows.size());
+            }
+            return Optional.empty();
+        }
+    }
+
     private static RowData.FieldGetter[] getAllFieldGetter(ResolvedSchema 
resolvedSchema) {
         List<DataType> dataTypes = resolvedSchema.getColumnDataTypes();
         RowData.FieldGetter[] fieldGetters = new 
RowData.FieldGetter[dataTypes.size()];
@@ -323,4 +637,50 @@ public class TestUpdateDeleteTableFactory
         }
         return fieldGetters;
     }
+
+    private static boolean equal(
+            RowData value1, RowData value2, RowData.FieldGetter[] 
fieldGetters) {
+        for (RowData.FieldGetter fieldGetter : fieldGetters) {
+            if (!Objects.equals(
+                    fieldGetter.getFieldOrNull(value1), 
fieldGetter.getFieldOrNull(value2))) {
+                return false;
+            }
+        }
+        return true;
+    }
+
+    private static RowData copyRowData(RowData rowData, RowData.FieldGetter[] 
fieldGetters) {
+        Object[] values = new Object[fieldGetters.length];
+        for (int i = 0; i < fieldGetters.length; i++) {
+            values[i] = fieldGetters[i].getFieldOrNull(rowData);
+        }
+        return GenericRowData.of(values);
+    }
+
+    private static List<Column> getRequiredColumns(
+            List<String> requiredColName, ResolvedSchema schema) {
+        List<Column> requiredCols = new ArrayList<>();
+        for (String colName : requiredColName) {
+            Optional<Column> optionalColumn = schema.getColumn(colName);
+            if (optionalColumn.isPresent()) {
+                requiredCols.add(optionalColumn.get());
+            } else {
+                Column metaCol = null;
+                for (Column.MetadataColumn metadataColumn : META_COLUMNS) {
+                    String metaColName =
+                            
metadataColumn.getMetadataKey().orElse(metadataColumn.getName());
+                    if (metaColName.equals(colName)) {
+                        metaCol = metadataColumn;
+                        break;
+                    }
+                }
+                if (metaCol == null) {
+                    throw new TableException(
+                            String.format("Can't find the required column: 
`%s`.", colName));
+                }
+                requiredCols.add(metaCol);
+            }
+        }
+        return requiredCols;
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
index 129b969b9a0..86d09ee4be5 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/operations/SqlToOperationConverterTest.java
@@ -2710,7 +2710,7 @@ public class SqlToOperationConverterTest {
     }
 
     @Test
-    public void testDeletePushDown() throws Exception {
+    public void testDelete() throws Exception {
         Map<String, String> options = new HashMap<>();
         options.put("connector", TestUpdateDeleteTableFactory.IDENTIFIER);
         CatalogTable catalogTable =
@@ -2722,31 +2722,25 @@ public class SqlToOperationConverterTest {
                         null,
                         Collections.emptyList(),
                         options);
-        ObjectIdentifier tableIdentifier =
-                ObjectIdentifier.of("builtin", "default", "test_push_down");
+        ObjectIdentifier tableIdentifier = ObjectIdentifier.of("builtin", 
"default", "test_delete");
         catalogManager.createTable(catalogTable, tableIdentifier, false);
 
         // no filter in delete statement
-        Operation operation = parse("DELETE FROM test_push_down");
+        Operation operation = parse("DELETE FROM test_delete");
         checkDeleteFromFilterOperation(operation, "[]");
 
         // with filters in delete statement
-        operation = parse("DELETE FROM test_push_down where a = 1 and c = 
'123'");
+        operation = parse("DELETE FROM test_delete where a = 1 and c = '123'");
         checkDeleteFromFilterOperation(operation, "[equals(a, 1), equals(c, 
'123')]");
 
         // with filter = false after reduced in delete statement
-        operation = parse("DELETE FROM test_push_down where a = 1 + 6 and a = 
2");
+        operation = parse("DELETE FROM test_delete where a = 1 + 6 and a = 2");
         checkDeleteFromFilterOperation(operation, "[false]");
 
-        assertThatThrownBy(
-                        () ->
-                                parse(
-                                        "DELETE FROM test_push_down where a = 
(select count(*) from test_push_down)"))
-                .isInstanceOf(UnsupportedOperationException.class)
-                .hasMessage(
-                        String.format(
-                                "Only delete push down is supported currently, 
but the delete statement can't pushed to table sink %s.",
-                                tableIdentifier.asSerializableString()));
+        operation = parse("DELETE FROM test_delete where a = (select count(*) 
from test_delete)");
+        assertThat(operation).isInstanceOf(SinkModifyOperation.class);
+        SinkModifyOperation modifyOperation = (SinkModifyOperation) operation;
+        assertThat(modifyOperation.isDelete()).isTrue();
     }
 
     // ~ Tool Methods 
----------------------------------------------------------
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.java
new file mode 100644
index 00000000000..7bc295ce6fd
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.planner.plan.batch.sql;
+
+import org.apache.flink.table.api.ExplainDetail;
+import org.apache.flink.table.api.TableConfig;
+import org.apache.flink.table.api.config.ExecutionConfigOptions;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
+import org.apache.flink.table.planner.utils.BatchTableTestUtil;
+import org.apache.flink.table.planner.utils.JavaScalaConversionUtil;
+import org.apache.flink.table.planner.utils.TableTestBase;
+
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Collections;
+
+import scala.collection.Seq;
+
+/** Test for row-level delete. */
+@RunWith(Parameterized.class)
+public class RowLevelDeleteTest extends TableTestBase {
+
+    private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode;
+    private final Seq<ExplainDetail> explainDetails =
+            JavaScalaConversionUtil.toScala(
+                    
Collections.singletonList(ExplainDetail.JSON_EXECUTION_PLAN));
+
+    private BatchTableTestUtil util;
+
+    @Parameterized.Parameters(name = "deleteMode = {0}")
+    public static Collection<SupportsRowLevelDelete.RowLevelDeleteMode> data() 
{
+        return Arrays.asList(
+                SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS,
+                SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS);
+    }
+
+    public RowLevelDeleteTest(SupportsRowLevelDelete.RowLevelDeleteMode 
deleteMode) {
+        this.deleteMode = deleteMode;
+    }
+
+    @Before
+    public void before() {
+        util = batchTestUtil(TableConfig.getDefault());
+        util.tableEnv()
+                .getConfig()
+                .getConfiguration()
+                
.set(ExecutionConfigOptions.TABLE_EXEC_RESOURCE_DEFAULT_PARALLELISM, 12);
+    }
+
+    @Test
+    public void testDeleteWithoutFilter() {
+        createTableForDelete();
+        util.verifyExplainInsert("DELETE FROM t", explainDetails);
+    }
+
+    @Test
+    public void testDeleteWithFilter() {
+        createTableForDelete();
+        util.verifyExplainInsert("DELETE FROM t where a = 1 and b = '123'", 
explainDetails);
+    }
+
+    @Test
+    public void testDeleteWithSubQuery() {
+        createTableForDelete();
+        util.verifyExplainInsert(
+                "DELETE FROM t where b = '123' and a = (select count(*) from 
t)", explainDetails);
+    }
+
+    @Test
+    public void testDeleteWithCustomColumns() {
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string, c double) 
WITH"
+                                        + " ("
+                                        + "'connector' = 'test-update-delete', 
"
+                                        + "'required-columns-for-delete' = 
'b;c', "
+                                        + "'delete-mode' = '%s', 
'support-delete-push-down' = 'false'"
+                                        + ") ",
+                                deleteMode));
+        util.verifyExplainInsert("DELETE FROM t where b = '123'", 
explainDetails);
+    }
+
+    @Test
+    public void testDeleteWithMetaColumns() {
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string, c double) 
WITH"
+                                        + " ("
+                                        + "'connector' = 'test-update-delete', 
"
+                                        + "'required-columns-for-delete' = 
'meta_f1;meta_k2;b', "
+                                        + "'delete-mode' = '%s', 
'support-delete-push-down' = 'false'"
+                                        + ") ",
+                                deleteMode));
+        util.verifyExplainInsert("DELETE FROM t where b = '123'", 
explainDetails);
+    }
+
+    private void createTableForDelete() {
+        util.tableEnv()
+                .executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string) WITH "
+                                        + "('connector' = 
'test-update-delete',"
+                                        + " 'delete-mode' = '%s', 
'support-delete-push-down' = 'false') ",
+                                deleteMode));
+    }
+}
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
index 8a60d24dfbe..ad706fc9466 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/batch/sql/DeleteTableITCase.java
@@ -18,6 +18,11 @@
 
 package org.apache.flink.table.planner.runtime.batch.sql;
 
+import org.apache.flink.table.api.StatementSet;
+import org.apache.flink.table.api.TableException;
+import org.apache.flink.table.api.TableResult;
+import org.apache.flink.table.connector.sink.DynamicTableSink;
+import org.apache.flink.table.connector.sink.abilities.SupportsRowLevelDelete;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
 import org.apache.flink.table.data.StringData;
@@ -27,17 +32,35 @@ import org.apache.flink.types.Row;
 import org.apache.flink.util.CollectionUtil;
 
 import org.junit.Test;
+import org.junit.runner.RunWith;
+import org.junit.runners.Parameterized;
 
 import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
 import java.util.List;
 
 import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 
 /** The IT case for DELETE statement in batch mode. */
+@RunWith(Parameterized.class)
 public class DeleteTableITCase extends BatchTestBase {
     private static final int ROW_NUM = 5;
 
+    private final SupportsRowLevelDelete.RowLevelDeleteMode deleteMode;
+
+    @Parameterized.Parameters(name = "deleteMode = {0}")
+    public static Collection<SupportsRowLevelDelete.RowLevelDeleteMode> data() 
{
+        return Arrays.asList(
+                SupportsRowLevelDelete.RowLevelDeleteMode.DELETED_ROWS,
+                SupportsRowLevelDelete.RowLevelDeleteMode.REMAINING_ROWS);
+    }
+
+    public DeleteTableITCase(SupportsRowLevelDelete.RowLevelDeleteMode 
deleteMode) {
+        this.deleteMode = deleteMode;
+    }
+
     @Test
     public void testDeletePushDown() throws Exception {
         String dataId = registerData();
@@ -50,11 +73,9 @@ public class DeleteTableITCase extends BatchTestBase {
                                         + ")",
                                 dataId));
         // it only contains equal expression, should be pushed down
-        List<Row> rows =
-                CollectionUtil.iteratorToList(
-                        tEnv().executeSql("DELETE FROM t WHERE a = 
1").collect());
+        List<Row> rows = toRows(tEnv().executeSql("DELETE FROM t where a = 
1"));
         assertThat(rows.toString()).isEqualTo("[+I[1], +I[OK]]");
-        rows = CollectionUtil.iteratorToList(tEnv().executeSql("SELECT * FROM 
t").collect());
+        rows = toRows(tEnv().executeSql("SELECT * FROM t"));
         assertThat(rows.toString())
                 .isEqualTo("[+I[0, b_0, 0.0], +I[2, b_2, 4.0], +I[3, b_3, 
6.0], +I[4, b_4, 8.0]]");
 
@@ -63,19 +84,98 @@ public class DeleteTableITCase extends BatchTestBase {
         assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t where a > 
1"))
                 .isInstanceOf(UnsupportedOperationException.class)
                 .hasMessage(
-                        "Only delete push down is supported currently, "
-                                + "but the delete statement can't pushed to 
table sink `default_catalog`.`default_database`.`t`.");
+                        String.format(
+                                "Can't perform delete operation of the table 
%s because the corresponding dynamic table sink has not yet implemented %s.",
+                                "default_catalog.default_database.t",
+                                SupportsRowLevelDelete.class.getName()));
+    }
 
+    @Test
+    public void testRowLevelDelete() throws Exception {
+        String dataId = registerData();
         tEnv().executeSql(
-                        "CREATE TABLE t1 (a int) WITH"
-                                + " ('connector' = 'test-update-delete',"
-                                + " 'support-delete-push-down' = 'false')");
-        // should throw exception for sink that doesn't implement 
SupportsDeletePushDown interface
-        assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t1"))
-                .isInstanceOf(UnsupportedOperationException.class)
+                        String.format(
+                                "CREATE TABLE t (a int, b string, c double) 
WITH"
+                                        + " ('connector' = 
'test-update-delete',"
+                                        + " 'data-id' = '%s',"
+                                        + " 'delete-mode' = '%s',"
+                                        + " 'support-delete-push-down' = 
'false'"
+                                        + ")",
+                                dataId, deleteMode));
+        tEnv().executeSql("DELETE FROM t WHERE a > 1").await();
+        List<Row> rows = toRows(tEnv().executeSql("SELECT * FROM t"));
+        assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 
2.0]]");
+
+        tEnv().executeSql("DELETE FROM t WHERE a >= (select count(1) from t 
where c > 1)").await();
+        rows = toRows(tEnv().executeSql("SELECT * FROM t"));
+        assertThat(rows.toString()).isEqualTo("[+I[0, b_0, 0.0]]");
+    }
+
+    @Test
+    public void testMixDelete() throws Exception {
+        // test mix delete push down and row-level delete
+        String dataId = registerData();
+        tEnv().executeSql(
+                        String.format(
+                                "CREATE TABLE t (a int, b string, c double) 
WITH"
+                                        + " ('connector' = 
'test-update-delete',"
+                                        + " 'data-id' = '%s',"
+                                        + " 'mix-delete' = 'true')",
+                                dataId));
+        // the deletion can't be pushed down, but the deletion should still 
success.
+        tEnv().executeSql("DELETE FROM t WHERE a >= (select count(1) from t 
where c > 2)").await();
+        List<Row> rows = toRows(tEnv().executeSql("SELECT * FROM t"));
+        assertThat(rows.toString())
+                .isEqualTo("[+I[0, b_0, 0.0], +I[1, b_1, 2.0], +I[2, b_2, 
4.0]]");
+
+        // should fall back to delete push down
+        rows = toRows(tEnv().executeSql("DELETE FROM t"));
+        assertThat(rows.toString()).isEqualTo("[+I[3], +I[OK]]");
+        rows = toRows(tEnv().executeSql("SELECT * FROM t"));
+        assertThat(rows).isEmpty();
+    }
+
+    @Test
+    public void testStatementSetContainDeleteAndInsert() throws Exception {
+        tEnv().executeSql(
+                        "CREATE TABLE t (a int, b string, c double) WITH"
+                                + " ('connector' = 'test-update-delete')");
+        StatementSet statementSet = tEnv().createStatementSet();
+        // should throw exception when statement set contains insert and 
delete statement
+        statementSet.addInsertSql("INSERT INTO t VALUES (1, 'v1', 1)");
+        statementSet.addInsertSql("DELETE FROM t");
+        assertThatThrownBy(statementSet::execute)
+                .isInstanceOf(TableException.class)
+                .hasMessage(
+                        "Unsupported SQL query! Only accept a single SQL 
statement of type DELETE.");
+    }
+
+    @Test
+    public void testCompilePlanSql() throws Exception {
+        tEnv().executeSql(
+                        "CREATE TABLE t (a int, b string, c double) WITH"
+                                + " ('connector' = 'test-update-delete')");
+        // should throw exception when compile sql for delete statement
+        assertThatThrownBy(() -> tEnv().compilePlanSql("DELETE FROM t"))
+                .isInstanceOf(TableException.class)
                 .hasMessage(
-                        "Only delete push down is supported currently, "
-                                + "but the delete statement can't pushed to 
table sink `default_catalog`.`default_database`.`t1`.");
+                        "Unsupported SQL query! compilePlanSql() only accepts 
a single SQL statement of type INSERT");
+    }
+
+    @Test
+    public void testDeleteWithLegacyTableSink() {
+        tEnv().executeSql(
+                        "CREATE TABLE t (a int, b string, c double) WITH"
+                                + " ('connector' = 'COLLECTION')");
+        assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t"))
+                .isInstanceOf(TableException.class)
+                .hasMessage(
+                        String.format(
+                                "Can't perform delete operation of the table 
%s "
+                                        + " because the corresponding table 
sink is the legacy TableSink,"
+                                        + " Please implement %s for it.",
+                                "`default_catalog`.`default_database`.`t`",
+                                DynamicTableSink.class.getName()));
     }
 
     private String registerData() {
@@ -90,4 +190,8 @@ public class DeleteTableITCase extends BatchTestBase {
         }
         return values;
     }
+
+    private List<Row> toRows(TableResult result) {
+        return CollectionUtil.iteratorToList(result.collect());
+    }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java
 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java
index e6ba79ef2b4..fe96af95d52 100644
--- 
a/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java
+++ 
b/flink-table/flink-table-planner/src/test/java/org/apache/flink/table/planner/runtime/stream/sql/DeleteTableITCase.java
@@ -34,5 +34,12 @@ public class DeleteTableITCase extends StreamingTestBase {
         assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t"))
                 .isInstanceOf(TableException.class)
                 .hasMessage("DELETE statement is not supported for streaming 
mode now.");
+
+        tEnv().executeSql(
+                        "CREATE TABLE t1 (a int) WITH ('connector' = 
'test-update-delete', "
+                                + "'support-delete-push-down' = 'false')");
+        assertThatThrownBy(() -> tEnv().executeSql("DELETE FROM t1"))
+                .isInstanceOf(TableException.class)
+                .hasMessage("DELETE statement is not supported for streaming 
mode now.");
     }
 }
diff --git 
a/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
new file mode 100644
index 00000000000..b835e944ea3
--- /dev/null
+++ 
b/flink-table/flink-table-planner/src/test/resources/org/apache/flink/table/planner/plan/batch/sql/RowLevelDeleteTest.xml
@@ -0,0 +1,714 @@
+<?xml version="1.0" ?>
+<!--
+Licensed to the Apache Software Foundation (ASF) under one or more
+contributor license agreements.  See the NOTICE file distributed with
+this work for additional information regarding copyright ownership.
+The ASF licenses this file to you under the Apache License, Version 2.0
+(the "License"); you may not use this file except in compliance with
+the License.  You may obtain a copy of the License at
+
+http://www.apache.org/licenses/LICENSE-2.0
+
+Unless required by applicable law or agreed to in writing, software
+distributed under the License is distributed on an "AS IS" BASIS,
+WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+See the License for the specific language governing permissions and
+limitations under the License.
+-->
+<Root>
+  <TestCase name="testDeleteWithoutFilter[deleteMode = DELETED_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, 
b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- TableSourceScan(table=[[default_catalog, default_database, t]], fields=[a, 
b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDeleteWithFilter[deleteMode = DELETED_ROWS]">
+       <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalFilter(condition=[AND(=($0, 1), =($1, _UTF-16LE'123'))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[CAST(1 AS INTEGER) AS a, CAST('123' AS VARCHAR(2147483647)) AS 
b], where=[AND(=(a, 1), =(b, '123'))])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[CAST(1 AS INTEGER) AS a, CAST('123' AS VARCHAR(2147483647)) AS 
b], where=[((a = 1) AND (b = '123'))])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[CAST(1 AS INTEGER) AS a, CAST('123' AS 
VARCHAR(2147483647)) AS b], where=[((a = 1) AND (b = '123'))])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+       </Resource>
+  </TestCase>
+  <TestCase name="testDeleteWithSubQuery[deleteMode = DELETED_ROWS]">
+    <Resource name="explain">
+         <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalFilter(condition=[AND(=($1, _UTF-16LE'123'), =(CAST($0):BIGINT, 
$SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalTableScan(table=[[default_catalog, default_database, t]])
+})))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, CAST('123' AS VARCHAR(2147483647)) AS b])
+   +- NestedLoopJoin(joinType=[InnerJoin], where=[=(a0, EXPR$0)], select=[a, 
a0, EXPR$0], build=[right], singleRowJoin=[true])
+      :- Calc(select=[a, CAST(a AS BIGINT) AS a0], where=[=(b, '123')])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b])
+      +- Exchange(distribution=[broadcast])
+         +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
+            +- Exchange(distribution=[single])
+               +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, CAST('123' AS VARCHAR(2147483647)) AS b])
+   +- NestedLoopJoin(joinType=[InnerJoin], where=[(a0 = EXPR$0)], select=[a, 
a0, EXPR$0], build=[right], singleRowJoin=[true])
+      :- Exchange(distribution=[any], shuffle_mode=[BATCH])
+      :  +- Calc(select=[a, CAST(a AS BIGINT) AS a0], where=[(b = '123')])
+      :     +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b])(reuse_id=[1])
+      +- Exchange(distribution=[broadcast])
+         +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
+            +- Exchange(distribution=[single])
+               +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+                  +- Calc(select=[0 AS $f0])
+                     +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, CAST(a AS BIGINT) AS a0], where=[(b = 
'123')])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[0 AS $f0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashAggregate[]",
+    "pact" : "Operator",
+    "contents" : "[]:LocalHashAggregate(select=[Partial_COUNT(*) AS 
count1$0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashAggregate[]",
+    "pact" : "Operator",
+    "contents" : "[]:HashAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS EXPR$0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "GLOBAL",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "NestedLoopJoin[]",
+    "pact" : "Operator",
+    "contents" : "[]:NestedLoopJoin(joinType=[InnerJoin], where=[(a0 = 
EXPR$0)], select=[a, a0, EXPR$0], build=[right], singleRowJoin=[true])",
+    "parallelism" : 12,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "BROADCAST",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, CAST('123' AS VARCHAR(2147483647)) AS 
b])",
+    "parallelism" : 12,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])",
+    "parallelism" : 12,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+       </Resource>
+  </TestCase>
+  <TestCase name="testDeleteWithCustomColumns[deleteMode = DELETED_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[b, c])
++- LogicalProject(b=[$1], c=[$2])
+   +- LogicalFilter(condition=[=($1, _UTF-16LE'123')])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[b, c])
++- Calc(select=[CAST('123' AS VARCHAR(2147483647)) AS b, c], where=[=(b, 
'123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b, c])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[b, c])
++- Calc(select=[CAST('123' AS VARCHAR(2147483647)) AS b, c], where=[(b = 
'123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b, c])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b, c])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[CAST('123' AS VARCHAR(2147483647)) AS b, c], 
where=[(b = '123')])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+       </Resource>
+  </TestCase>
+  <TestCase name="testDeleteWithMetaColumns[deleteMode = DELETED_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[meta_f1, 
meta_f2, b])
++- LogicalProject(meta_f1=[$3], meta_f2=[$4], b=[$1])
+   +- LogicalFilter(condition=[=($1, _UTF-16LE'123')])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b])
++- Calc(select=[meta_f1, meta_f2, CAST('123' AS VARCHAR(2147483647)) AS b], 
where=[=(b, '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b, c, meta_f1, meta_f2])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b])
++- Calc(select=[meta_f1, meta_f2, CAST('123' AS VARCHAR(2147483647)) AS b], 
where=[(b = '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b, c, meta_f1, meta_f2])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b, c, meta_f1, meta_f2])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[meta_f1, meta_f2, CAST('123' AS 
VARCHAR(2147483647)) AS b], where=[(b = '123')])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "RowKindSetter[]",
+    "pact" : "Operator",
+    "contents" : "[]:RowKindSetter(TargetRowKind=[DELETE])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+       </Resource>
+  </TestCase>
+  <TestCase name="testDeleteWithoutFilter[deleteMode = REMAINING_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalFilter(condition=[false])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Values(tuples=[[]], values=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Values(tuples=[[]], values=[a, b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: Values[]",
+    "pact" : "Data Source",
+    "contents" : "[]:Values(tuples=[[]], values=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+    </Resource>
+  </TestCase>
+  <TestCase name="testDeleteWithFilter[deleteMode = REMAINING_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalFilter(condition=[NOT(AND(=($0, 1), =($1, _UTF-16LE'123')))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, b], where=[OR(<>(a, 1), <>(b, '123'))])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, b], where=[((a <> 1) OR (b <> '123'))])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, b], where=[((a <> 1) OR (b <> '123'))])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+       </Resource>
+  </TestCase>
+  <TestCase name="testDeleteWithSubQuery[deleteMode = REMAINING_ROWS]">
+    <Resource name="explain">
+               <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[a, b])
++- LogicalProject(a=[$0], b=[$1])
+   +- LogicalFilter(condition=[NOT(AND(=($1, _UTF-16LE'123'), 
=(CAST($0):BIGINT, $SCALAR_QUERY({
+LogicalAggregate(group=[{}], EXPR$0=[COUNT()])
+  LogicalProject($f0=[0])
+    LogicalTableScan(table=[[default_catalog, default_database, t]])
+}))))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, b], where=[OR(<>(b, '123'), <>(CAST(a AS BIGINT), EXPR$0))])
+   +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a, b, 
EXPR$0], build=[right], singleRowJoin=[true])
+      :- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b])
+      +- Exchange(distribution=[broadcast])
+         +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
+            +- Exchange(distribution=[single])
+               +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+                  +- Calc(select=[0 AS $f0])
+                     +- TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[a, b])
++- Calc(select=[a, b], where=[((b <> '123') OR (CAST(a AS BIGINT) <> EXPR$0))])
+   +- NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], select=[a, b, 
EXPR$0], build=[right], singleRowJoin=[true])
+      :- Exchange(distribution=[any], shuffle_mode=[BATCH])
+      :  +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b])(reuse_id=[1])
+      +- Exchange(distribution=[broadcast])
+         +- HashAggregate(isMerge=[true], select=[Final_COUNT(count1$0) AS 
EXPR$0])
+            +- Exchange(distribution=[single])
+               +- LocalHashAggregate(select=[Partial_COUNT(*) AS count1$0])
+                  +- Calc(select=[0 AS $f0])
+                     +- Reused(reference_id=[1])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[0 AS $f0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashAggregate[]",
+    "pact" : "Operator",
+    "contents" : "[]:LocalHashAggregate(select=[Partial_COUNT(*) AS 
count1$0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "HashAggregate[]",
+    "pact" : "Operator",
+    "contents" : "[]:HashAggregate(isMerge=[true], 
select=[Final_COUNT(count1$0) AS EXPR$0])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "GLOBAL",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "NestedLoopJoin[]",
+    "pact" : "Operator",
+    "contents" : "[]:NestedLoopJoin(joinType=[LeftOuterJoin], where=[true], 
select=[a, b, EXPR$0], build=[right], singleRowJoin=[true])",
+    "parallelism" : 12,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    }, {
+      "id" : ,
+      "ship_strategy" : "BROADCAST",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[a, b], where=[((b <> '123') OR (CAST(a AS 
BIGINT) <> EXPR$0))])",
+    "parallelism" : 12,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "REBALANCE",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+       </Resource>
+  </TestCase>
+  <TestCase name="testDeleteWithCustomColumns[deleteMode = REMAINING_ROWS]">
+    <Resource name="explain">
+      <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[b, c])
++- LogicalProject(b=[$1], c=[$2])
+   +- LogicalFilter(condition=[NOT(=($1, _UTF-16LE'123'))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[b, c])
++- Calc(select=[b, c], where=[<>(b, '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b, c])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[b, c])
++- Calc(select=[b, c], where=[(b <> '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b, c])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b, c])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[b, c], where=[(b <> '123')])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+       </Resource>
+  </TestCase>
+  <TestCase name="testDeleteWithMetaColumns[deleteMode = REMAINING_ROWS]">
+    <Resource name="explain">
+         <![CDATA[== Abstract Syntax Tree ==
+LogicalSink(table=[default_catalog.default_database.t], fields=[meta_f1, 
meta_f2, b])
++- LogicalProject(meta_f1=[$3], meta_f2=[$4], b=[$1])
+   +- LogicalFilter(condition=[NOT(=($1, _UTF-16LE'123'))])
+      +- LogicalTableScan(table=[[default_catalog, default_database, t]])
+
+== Optimized Physical Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b])
++- Calc(select=[meta_f1, meta_f2, b], where=[<>(b, '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b, c, meta_f1, meta_f2])
+
+== Optimized Execution Plan ==
+Sink(table=[default_catalog.default_database.t], fields=[meta_f1, meta_f2, b])
++- Calc(select=[meta_f1, meta_f2, b], where=[(b <> '123')])
+   +- TableSourceScan(table=[[default_catalog, default_database, t]], 
fields=[a, b, c, meta_f1, meta_f2])
+
+== Physical Execution Plan ==
+{
+  "nodes" : [ {
+    "id" : ,
+    "type" : "Source: t[]",
+    "pact" : "Data Source",
+    "contents" : "[]:TableSourceScan(table=[[default_catalog, 
default_database, t]], fields=[a, b, c, meta_f1, meta_f2])",
+    "parallelism" : 1
+  }, {
+    "id" : ,
+    "type" : "Calc[]",
+    "pact" : "Operator",
+    "contents" : "[]:Calc(select=[meta_f1, meta_f2, b], where=[(b <> '123')])",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  }, {
+    "id" : ,
+    "type" : "Sink: Unnamed",
+    "pact" : "Data Sink",
+    "contents" : "Sink: Unnamed",
+    "parallelism" : 1,
+    "predecessors" : [ {
+      "id" : ,
+      "ship_strategy" : "FORWARD",
+      "side" : "second"
+    } ]
+  } ]
+}]]>
+       </Resource>
+  </TestCase>
+</Root>
diff --git 
a/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/RowKindSetter.java
 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/RowKindSetter.java
new file mode 100644
index 00000000000..8944f6605f5
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/main/java/org/apache/flink/table/runtime/operators/sink/RowKindSetter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.annotation.Internal;
+import org.apache.flink.streaming.api.operators.OneInputStreamOperator;
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.table.runtime.operators.TableStreamOperator;
+import org.apache.flink.types.RowKind;
+
+/** An operator that sets the row kind of the incoming records to a specific 
row kind. */
+@Internal
+public class RowKindSetter extends TableStreamOperator<RowData>
+        implements OneInputStreamOperator<RowData, RowData> {
+
+    private static final long serialVersionUID = 1L;
+
+    private final RowKind targetRowKind;
+
+    public RowKindSetter(RowKind targetRowKind) {
+        this.targetRowKind = targetRowKind;
+    }
+
+    @Override
+    public void processElement(StreamRecord<RowData> element) throws Exception 
{
+        element.getValue().setRowKind(targetRowKind);
+        output.collect(element);
+    }
+}
diff --git 
a/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/RowKindSetterTest.java
 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/RowKindSetterTest.java
new file mode 100644
index 00000000000..0127cdc53dd
--- /dev/null
+++ 
b/flink-table/flink-table-runtime/src/test/java/org/apache/flink/table/runtime/operators/sink/RowKindSetterTest.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.runtime.operators.sink;
+
+import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
+import org.apache.flink.streaming.util.OneInputStreamOperatorTestHarness;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.data.RowData;
+import org.apache.flink.types.RowKind;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link RowKindSetter}. */
+public class RowKindSetterTest {
+
+    @Test
+    public void testSetRowKind() throws Exception {
+        // test set to all row kind
+        for (RowKind targetRowKind : RowKind.values()) {
+            RowKindSetter rowKindSetter = new RowKindSetter(targetRowKind);
+            try (OneInputStreamOperatorTestHarness<RowData, RowData> 
operatorTestHarness =
+                    new OneInputStreamOperatorTestHarness<>(rowKindSetter)) {
+                operatorTestHarness.open();
+
+                // get the rows with all row kind
+                List<RowData> rows = getRowsWithAllRowKind();
+                for (RowData row : rows) {
+                    operatorTestHarness.processElement(new 
StreamRecord<>(row));
+                }
+                // verify the row kind of output
+                verifyRowKind(operatorTestHarness.extractOutputValues(), 
targetRowKind);
+            }
+        }
+    }
+
+    private List<RowData> getRowsWithAllRowKind() {
+        List<RowData> rows = new ArrayList<>();
+        for (RowKind rowKind : RowKind.values()) {
+            rows.add(GenericRowData.of(rowKind, 1));
+        }
+        return rows;
+    }
+
+    private void verifyRowKind(List<RowData> rows, RowKind rowKind) {
+        for (RowData row : rows) {
+            assertThat(row.getRowKind()).isEqualTo(rowKind);
+        }
+    }
+}

Reply via email to