This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git

commit 2a64fa43f270affef2070c36ea48ad73b0a0498e
Author: Jingsong <[email protected]>
AuthorDate: Sat May 4 21:08:53 2024 +0800

    [flink] Delete in flink should only pushdown partition or table dropping
    
    1. Primary key filter should use job to delete too, for dynamic bucket 
table or look changelog producer table, we should use job to delete.
    2. delete.force-produce-changelog should work on changelog-producer none 
too, delta files should be streaming read too.
---
 .../shortcodes/generated/core_configuration.html   |   2 +-
 .../main/java/org/apache/paimon/CoreOptions.java   |   6 +-
 .../predicate/AllPrimaryKeyEqualVisitor.java       | 110 ---------------
 .../predicate/DeletePushDownVisitorTest.java       | 148 ---------------------
 .../java/org/apache/paimon/table/TableUtils.java   |  77 -----------
 .../SupportsRowLevelOperationFlinkTableSink.java   |  25 +---
 .../apache/paimon/flink/BatchFileStoreITCase.java  |   2 +-
 .../apache/paimon/flink/ReadWriteTableITCase.java  |   2 +-
 8 files changed, 12 insertions(+), 360 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index bdc6371b3..4c493cf7c 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -180,7 +180,7 @@ under the License.
             <td><h5>delete.force-produce-changelog</h5></td>
             <td style="word-wrap: break-word;">false</td>
             <td>Boolean</td>
-            <td>Force produce changelog in delete sql no matter what if 
changelog producer is not NONE.</td>
+            <td>Force produce changelog in delete sql, or you can use 
'streaming-read-overwrite' to read changelog from overwrite commit.</td>
         </tr>
         <tr>
             <td><h5>deletion-vectors.enabled</h5></td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 9c0d3bf3b..e1dec5cd4 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -1110,7 +1110,8 @@ public class CoreOptions implements Serializable {
                     .booleanType()
                     .defaultValue(false)
                     .withDescription(
-                            "Force produce changelog in delete sql no matter 
what if changelog producer is not NONE.");
+                            "Force produce changelog in delete sql, "
+                                    + "or you can use 
'streaming-read-overwrite' to read changelog from overwrite commit.");
 
     public static final ConfigOption<RangeStrategy> SORT_RANG_STRATEGY =
             key("sort-compaction.range-strategy")
@@ -1758,8 +1759,7 @@ public class CoreOptions implements Serializable {
     }
 
     public boolean deleteForceProduceChangelog() {
-        return options.get(DELETION_FORCE_PRODUCE_CHANGELOG)
-                && changelogProducer() != CoreOptions.ChangelogProducer.NONE;
+        return options.get(DELETION_FORCE_PRODUCE_CHANGELOG);
     }
 
     /** Specifies the merge engine for table with primary key. */
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/AllPrimaryKeyEqualVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/AllPrimaryKeyEqualVisitor.java
deleted file mode 100644
index b4bbddf30..000000000
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/AllPrimaryKeyEqualVisitor.java
+++ /dev/null
@@ -1,110 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.predicate;
-
-import org.apache.paimon.shade.guava30.com.google.common.collect.Sets;
-
-import java.util.Collections;
-import java.util.List;
-import java.util.Set;
-
-/**
- * Visit the predicate and check if it only contains primary keys and can be 
push down.
- *
- * <p>We will check: all the primary keys are in the predicate with equal 
operator.
- *
- * <p>TODO: support IN.
- */
-public class AllPrimaryKeyEqualVisitor implements FunctionVisitor<Set<String>> 
{
-
-    private final List<String> primaryKeys;
-
-    public AllPrimaryKeyEqualVisitor(List<String> primaryKeys) {
-        this.primaryKeys = primaryKeys;
-    }
-
-    @Override
-    public Set<String> visitIsNotNull(FieldRef fieldRef) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitIsNull(FieldRef fieldRef) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitStartsWith(FieldRef fieldRef, Object literal) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitLessThan(FieldRef fieldRef, Object literal) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitGreaterOrEqual(FieldRef fieldRef, Object literal) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitNotEqual(FieldRef fieldRef, Object literal) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitLessOrEqual(FieldRef fieldRef, Object literal) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitEqual(FieldRef fieldRef, Object literal) {
-        if (primaryKeys.contains(fieldRef.name())) {
-            return Collections.singleton(fieldRef.name());
-        } else {
-            return Collections.emptySet();
-        }
-    }
-
-    @Override
-    public Set<String> visitGreaterThan(FieldRef fieldRef, Object literal) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitIn(FieldRef fieldRef, List<Object> literals) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitNotIn(FieldRef fieldRef, List<Object> literals) {
-        return Collections.emptySet();
-    }
-
-    @Override
-    public Set<String> visitAnd(List<Set<String>> children) {
-        return children.stream().reduce(Sets::union).get();
-    }
-
-    @Override
-    public Set<String> visitOr(List<Set<String>> children) {
-        return Collections.emptySet();
-    }
-}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/predicate/DeletePushDownVisitorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/predicate/DeletePushDownVisitorTest.java
index 9c767355c..0d92dbfd0 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/predicate/DeletePushDownVisitorTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/predicate/DeletePushDownVisitorTest.java
@@ -24,161 +24,13 @@ import org.junit.jupiter.api.Test;
 
 import java.util.Arrays;
 import java.util.Collections;
-import java.util.HashSet;
 import java.util.List;
-import java.util.Set;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
-/* test cases for DeleteFilter Push Down, suppose primaryKeys = (a,b,c,d), 
partitionKeys=(b,c,d), other fields = (e,f),
- * the following cases are tested:
- *
- * 1. where a=1 and b=2 and c=3 and d=4, push down
- * 2. where a=1 and b=2 and c=3 and d=4 and e is not null, push down
- * 3. where a=1 and b=2 and c=3 and d=4 and f=6, push down
- * 4. where a=1 and b=2 and c=3 and d=4 and e is not null and f=6, push down
- * 5. where a in (1,2) and b=2 and c=3 and d=4, push down
- * 6. where a=1 and b=1 and c is not null and d=4, do not push down
- * 7. where a=1, do not push down
- * 8. where a=1 and b=2 and d=4, do not push down
- * 9. where a=1 and c=3 and d=4, do not push down
- * 10. where b=2 and c=3 and d=4 and f=6, not push down
- *
- * 11. where b=2 and c=3 and d=4, push down
- * 12. where b=2 and c=3, push down
- * 13. where b=2 and d=4, push down
- * 14. where b=2 and c=3 and d=4 and e=5, do not push down
- * 15. where b=2 and c=3 or d=4, do not push down
- * 16. where b=2 and c=3 and d>5, do not push down
- */
 /** DeletePushDownVisitorTest tests the DeletePushDownVisitor. */
 public class DeletePushDownVisitorTest {
 
-    @Test
-    public void testPrimaryKeyPushDown() {
-        List<String> primaryKeys = Arrays.asList("a", "b", "c", "d");
-
-        Predicate predicateA =
-                new LeafPredicate(
-                        Equal.INSTANCE, DataTypes.INT(), 1, "a", 
Collections.singletonList(1));
-
-        Predicate predicateA1 =
-                new LeafPredicate(
-                        Equal.INSTANCE, DataTypes.INT(), 1, "a", 
Collections.singletonList(2));
-
-        Predicate predicateB =
-                new LeafPredicate(
-                        Equal.INSTANCE, DataTypes.INT(), 2, "b", 
Collections.singletonList(2));
-
-        Predicate predicateC =
-                new LeafPredicate(
-                        Equal.INSTANCE, DataTypes.INT(), 3, "c", 
Collections.singletonList(3));
-
-        Predicate predicateCIsNotNull =
-                new LeafPredicate(
-                        IsNotNull.INSTANCE, DataTypes.INT(), 3, "c", 
Collections.singletonList(3));
-
-        Predicate predicateD =
-                new LeafPredicate(
-                        Equal.INSTANCE, DataTypes.INT(), 4, "d", 
Collections.singletonList(4));
-
-        // non-primary key's isNotNull filter
-        Predicate predicateE =
-                new LeafPredicate(
-                        IsNotNull.INSTANCE, DataTypes.INT(), 5, "e", 
Collections.singletonList(5));
-
-        Predicate predicateF =
-                new LeafPredicate(
-                        IsNotNull.INSTANCE, DataTypes.INT(), 6, "f", 
Collections.singletonList(6));
-
-        /* filters contain all the primary keys with AND of Equal *********** 
*/
-
-        // where a=1 and b=2 and c=3 and d=4, push down
-        AllPrimaryKeyEqualVisitor visitor = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        Predicate compoundPredicate =
-                PredicateBuilder.and(Arrays.asList(predicateA, predicateB, 
predicateC, predicateD));
-        Set<String> visitResult = compoundPredicate.visit(visitor);
-        assertThat(visitResult).containsAnyElementsOf(primaryKeys);
-
-        // where a=1 and b=2 and c=3 and d=4 and e is not null, push down
-        AllPrimaryKeyEqualVisitor visitor1 = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        Predicate predicateABCDE =
-                PredicateBuilder.and(
-                        Arrays.asList(predicateA, predicateB, predicateC, 
predicateD, predicateE));
-        
assertThat(predicateABCDE.visit(visitor1)).containsAnyElementsOf(primaryKeys);
-
-        // where a=1 and b=2 and c=3 and d=4 and f=6, push down
-        AllPrimaryKeyEqualVisitor visitor2 = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        Predicate predicateABCDF =
-                PredicateBuilder.and(
-                        Arrays.asList(predicateA, predicateB, predicateC, 
predicateD, predicateF));
-        
assertThat(predicateABCDF.visit(visitor2)).containsAnyElementsOf(primaryKeys);
-
-        // where a=1 and b=2 and c=3 and d=4 and e is not null and f=6, push 
down
-        AllPrimaryKeyEqualVisitor visitor3 = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        Predicate predicateABCDEF =
-                PredicateBuilder.and(
-                        Arrays.asList(
-                                predicateA,
-                                predicateB,
-                                predicateC,
-                                predicateD,
-                                predicateE,
-                                predicateF));
-        
assertThat(predicateABCDEF.visit(visitor3)).containsAnyElementsOf(primaryKeys);
-
-        // where a in (1,2) and b=2 and c=3 and d=4, push down
-        AllPrimaryKeyEqualVisitor visitor4 = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        Predicate predicateAABCD =
-                PredicateBuilder.and(
-                        Arrays.asList(
-                                PredicateBuilder.or(predicateA, predicateA1),
-                                predicateB,
-                                predicateC,
-                                predicateD,
-                                predicateE));
-        
assertThat(predicateAABCD.visit(visitor4)).containsAnyElementsOf(primaryKeys);
-
-        /* not all the primary keys filters are of Equal func ************* */
-
-        // where a=1 and b=1 and c is not null and d=4, do not push down
-        AllPrimaryKeyEqualVisitor visitor5 = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        Predicate predicateABCNotNull =
-                PredicateBuilder.and(
-                        Arrays.asList(predicateA, predicateB, 
predicateCIsNotNull, predicateD));
-        assertThat(predicateABCNotNull.visit(visitor5)).isNotEqualTo(new 
HashSet<>(primaryKeys));
-
-        /* filters not contain all the primary keys ****************** */
-
-        // where a=1, do not push down
-        AllPrimaryKeyEqualVisitor visitor6 = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        
assertThat(PredicateBuilder.and(Collections.singletonList(predicateA)).visit(visitor6))
-                .isNotEqualTo(new HashSet<>(primaryKeys));
-
-        // where a=1 and b=2 and d=4, do not push down
-        AllPrimaryKeyEqualVisitor visitor7 = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        assertThat(
-                        PredicateBuilder.and(Arrays.asList(predicateA, 
predicateB, predicateD))
-                                .visit(visitor7))
-                .isNotEqualTo(new HashSet<>(primaryKeys));
-
-        // where a=1 and c=3 and d=4, do not push down
-        AllPrimaryKeyEqualVisitor visitor8 = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        assertThat(
-                        PredicateBuilder.and(Arrays.asList(predicateA, 
predicateC, predicateD))
-                                .visit(visitor8))
-                .isNotEqualTo(new HashSet<>(primaryKeys));
-
-        // where b=2 and c=3 and d=4 and f=6, not push down
-        AllPrimaryKeyEqualVisitor visitor9 = new 
AllPrimaryKeyEqualVisitor(primaryKeys);
-        assertThat(
-                        PredicateBuilder.and(
-                                        Arrays.asList(
-                                                predicateB, predicateC, 
predicateD, predicateF))
-                                .visit(visitor9))
-                .isNotEqualTo(new HashSet<>(primaryKeys));
-    }
-
     @Test
     public void testPartitionKeyNotPushDown() {
         List<String> partitionKeys = Arrays.asList("b", "c", "d");
diff --git a/paimon-core/src/main/java/org/apache/paimon/table/TableUtils.java 
b/paimon-core/src/main/java/org/apache/paimon/table/TableUtils.java
deleted file mode 100644
index 9b9db719a..000000000
--- a/paimon-core/src/main/java/org/apache/paimon/table/TableUtils.java
+++ /dev/null
@@ -1,77 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.table;
-
-import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.disk.IOManager;
-import org.apache.paimon.disk.IOManagerImpl;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.reader.RecordReader;
-import org.apache.paimon.table.sink.BatchTableCommit;
-import org.apache.paimon.table.sink.BatchTableWrite;
-import org.apache.paimon.table.sink.BatchWriteBuilder;
-import org.apache.paimon.table.source.ReadBuilder;
-import org.apache.paimon.table.source.Split;
-import org.apache.paimon.types.RowKind;
-import org.apache.paimon.utils.CloseableIterator;
-
-import java.util.List;
-
-/** Utils for Table. TODO we can introduce LocalAction maybe? */
-public class TableUtils {
-
-    private static final String TEMP_DIR = 
System.getProperty("java.io.tmpdir");
-
-    /**
-     * Delete according to filters.
-     *
-     * <p>NOTE: This method is only suitable for deletion of small amount of 
data.
-     *
-     * @return the number of deleted records
-     */
-    public static long deleteWhere(Table table, List<Predicate> filters) {
-        ReadBuilder readBuilder = table.newReadBuilder().withFilter(filters);
-        BatchWriteBuilder writeBuilder = table.newBatchWriteBuilder();
-        List<Split> splits = readBuilder.newScan().plan().splits();
-        long hit = 0;
-        try (RecordReader<InternalRow> reader = 
readBuilder.newRead().createReader(splits);
-                BatchTableWrite write = writeBuilder.newWrite();
-                // we create temp io manager for writer
-                IOManager ioManager = new IOManagerImpl(TEMP_DIR);
-                BatchTableCommit commit = writeBuilder.newCommit()) {
-            write.withIOManager(ioManager);
-            CloseableIterator<InternalRow> iterator = 
reader.toCloseableIterator();
-            Predicate filter = PredicateBuilder.and(filters);
-            while (iterator.hasNext()) {
-                InternalRow row = iterator.next();
-                if (filter.test(row)) {
-                    hit++;
-                    row.setRowKind(RowKind.DELETE);
-                    write.write(row);
-                }
-            }
-
-            commit.commit(write.prepareCommit());
-            return hit;
-        } catch (Exception e) {
-            throw new RuntimeException(e);
-        }
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
index 8c490fb9d..2870abcbc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/SupportsRowLevelOperationFlinkTableSink.java
@@ -25,13 +25,11 @@ import org.apache.paimon.flink.PredicateConverter;
 import org.apache.paimon.flink.log.LogStoreTableFactory;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.AllPrimaryKeyEqualVisitor;
 import org.apache.paimon.predicate.OnlyPartitionKeyEqualVisitor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.Table;
-import org.apache.paimon.table.TableUtils;
 import org.apache.paimon.table.sink.BatchWriteBuilder;
 
 import org.apache.flink.table.catalog.Column;
@@ -61,6 +59,7 @@ import java.util.stream.Collectors;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.MergeEngine.DEDUPLICATE;
 import static org.apache.paimon.CoreOptions.MergeEngine.PARTIAL_UPDATE;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Flink table sink that supports row level update and delete. */
 public abstract class SupportsRowLevelOperationFlinkTableSink extends 
FlinkTableSinkBase
@@ -167,12 +166,10 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
         if (deletePredicate == null) {
             commit.truncateTable(identifier);
             return Optional.empty();
-        } else if (deleteIsDropPartition()) {
+        } else {
+            checkArgument(deleteIsDropPartition());
             
commit.dropPartitions(Collections.singletonList(deletePartitions()), 
identifier);
             return Optional.empty();
-        } else {
-            return Optional.of(
-                    TableUtils.deleteWhere(table, 
Collections.singletonList(deletePredicate)));
         }
     }
 
@@ -192,10 +189,9 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
     }
 
     private boolean canPushDownDeleteFilter() {
-        CoreOptions coreOptions = CoreOptions.fromMap(table.options());
-        return -1 != coreOptions.bucket()
-                && !coreOptions.deleteForceProduceChangelog()
-                && (deletePredicate == null || deleteIsDropPartition() || 
deleteInSingleNode());
+        CoreOptions options = CoreOptions.fromMap(table.options());
+        return (deletePredicate == null || deleteIsDropPartition())
+                && !options.deleteForceProduceChangelog();
     }
 
     private boolean deleteIsDropPartition() {
@@ -214,13 +210,4 @@ public abstract class 
SupportsRowLevelOperationFlinkTableSink extends FlinkTable
         deletePredicate.visit(visitor);
         return visitor.partitions();
     }
-
-    private boolean deleteInSingleNode() {
-        if (deletePredicate == null) {
-            return false;
-        }
-        return deletePredicate
-                .visit(new AllPrimaryKeyEqualVisitor(table.primaryKeys()))
-                .containsAll(table.primaryKeys());
-    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
index 83b8c80fd..2758b7352 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/BatchFileStoreITCase.java
@@ -440,7 +440,7 @@ public class BatchFileStoreITCase extends CatalogITCaseBase 
{
     }
 
     @ParameterizedTest
-    @ValueSource(strings = {"lookup", "input"})
+    @ValueSource(strings = {"none", "lookup", "input"})
     public void testDeletePartitionWithChangelog(String producer) throws 
Exception {
         sql(
                 "CREATE TABLE delete_table (pt INT, pk INT, v STRING, PRIMARY 
KEY(pt, pk) NOT ENFORCED) PARTITIONED BY (pt)   "
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
index 589854c78..4b28bbc40 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/ReadWriteTableITCase.java
@@ -1552,7 +1552,7 @@ public class ReadWriteTableITCase extends 
AbstractTestBase {
     }
 
     @Test
-    public void testDeletePushDownWithPrimaryKey() throws Exception {
+    public void testDeleteWithPrimaryKeyFilter() throws Exception {
         // Step1: define table schema
         String table =
                 createTable(

Reply via email to