This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 214a27ffeb [flink] Cast Predicate to PartitionPredicate to avoid stack 
overflow (#5880)
214a27ffeb is described below

commit 214a27ffeba03a1a37aae364466c17855dbd239e
Author: YeJunHao <[email protected]>
AuthorDate: Mon Jul 14 14:19:01 2025 +0800

    [flink] Cast Predicate to PartitionPredicate to avoid stack overflow (#5880)
---
 .../main/java/org/apache/paimon/types/RowType.java |  5 ++
 .../paimon/append/AppendCompactCoordinator.java    | 16 +++--
 .../paimon/partition/PartitionPredicate.java       | 15 ++++-
 .../paimon/table/FallbackReadFileStoreTable.java   |  8 +++
 .../paimon/table/source/AbstractDataTableScan.java |  7 ++
 .../apache/paimon/table/source/InnerTableScan.java |  5 ++
 .../apache/paimon/table/source/ReadBuilder.java    |  4 ++
 .../paimon/table/source/ReadBuilderImpl.java       | 30 ++++++++-
 .../table/source/snapshot/SnapshotReader.java      |  3 +
 .../table/source/snapshot/SnapshotReaderImpl.java  | 13 +++-
 .../apache/paimon/table/system/AuditLogTable.java  | 13 ++++
 .../apache/paimon/flink/action/CompactAction.java  | 55 ++++++++++++----
 .../apache/paimon/flink/action/RescaleAction.java  | 19 ++----
 .../paimon/flink/action/SortCompactAction.java     |  2 +-
 .../flink/compact/AppendTableCompactBuilder.java   |  8 +--
 .../flink/source/AppendTableCompactSource.java     | 17 +++--
 .../paimon/flink/source/BaseDataTableSource.java   |  6 +-
 .../flink/source/CompactorSourceBuilder.java       | 13 ++--
 .../paimon/flink/source/FlinkSourceBuilder.java    | 15 ++++-
 .../paimon/flink/source/FlinkTableSource.java      | 41 +++++++-----
 .../flink/source/LogHybridSourceFactory.java       | 10 ++-
 .../paimon/flink/source/SystemTableSource.java     |  5 +-
 .../paimon/flink/action/CompactActionITCase.java   | 53 +++++++++++++++-
 .../paimon/flink/sink/CompactorSinkITCase.java     | 25 ++++----
 .../paimon/flink/source/CompactorSourceITCase.java | 14 ++--
 .../flink/source/FileStoreSourceMetricsTest.java   |  2 +-
 .../paimon/spark/procedure/CompactProcedure.java   | 26 +++++---
 .../paimon/spark/sql/DeletionVectorTest.scala      | 74 ++++++++++++++++++++++
 28 files changed, 394 insertions(+), 110 deletions(-)

diff --git a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java 
b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
index e93ef23136..e886c97ca6 100644
--- a/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
+++ b/paimon-api/src/main/java/org/apache/paimon/types/RowType.java
@@ -323,6 +323,11 @@ public final class RowType extends DataType {
                 .copy(isNullable());
     }
 
+    public int[] projectNames(List<String> names) {
+        List<String> fieldNames = 
fields.stream().map(DataField::name).collect(Collectors.toList());
+        return names.stream().mapToInt(fieldNames::indexOf).toArray();
+    }
+
     public RowType project(String... names) {
         return project(Arrays.asList(names));
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
index 1fead5e612..07374831e7 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/append/AppendCompactCoordinator.java
@@ -28,7 +28,7 @@ import org.apache.paimon.index.IndexFileHandler;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
 import org.apache.paimon.manifest.ManifestEntry;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.DeletionFile;
 import org.apache.paimon.table.source.EndOfScanException;
@@ -90,7 +90,9 @@ public class AppendCompactCoordinator {
     }
 
     public AppendCompactCoordinator(
-            FileStoreTable table, boolean isStreaming, @Nullable Predicate 
filter) {
+            FileStoreTable table,
+            boolean isStreaming,
+            @Nullable PartitionPredicate partitionPredicate) {
         checkArgument(table.primaryKeys().isEmpty());
         this.snapshotManager = table.snapshotManager();
         CoreOptions options = table.coreOptions();
@@ -103,7 +105,7 @@ public class AppendCompactCoordinator {
                 options.deletionVectorsEnabled()
                         ? new 
DvMaintainerCache(table.store().newIndexFileHandler())
                         : null;
-        this.filesIterator = new FilesIterator(table, isStreaming, filter);
+        this.filesIterator = new FilesIterator(table, isStreaming, 
partitionPredicate);
     }
 
     public List<AppendCompactTask> run() {
@@ -389,10 +391,12 @@ public class AppendCompactCoordinator {
         @Nullable private Iterator<ManifestEntry> currentIterator;
 
         public FilesIterator(
-                FileStoreTable table, boolean isStreaming, @Nullable Predicate 
filter) {
+                FileStoreTable table,
+                boolean isStreaming,
+                @Nullable PartitionPredicate partitionPredicate) {
             this.snapshotReader = table.newSnapshotReader();
-            if (filter != null) {
-                snapshotReader.withFilter(filter);
+            if (partitionPredicate != null) {
+                snapshotReader.withPartitionFilter(partitionPredicate);
             }
             // drop stats to reduce memory
             if (table.coreOptions().manifestDeleteFileDropStats()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
index bfc241d62f..4a9b7d646a 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/partition/PartitionPredicate.java
@@ -35,6 +35,7 @@ import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 
 import javax.annotation.Nullable;
 
+import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -48,7 +49,7 @@ import static 
org.apache.paimon.utils.Preconditions.checkArgument;
 import static org.apache.paimon.utils.Preconditions.checkNotNull;
 
 /** A special predicate to filter partition only, just like {@link Predicate}. 
*/
-public interface PartitionPredicate {
+public interface PartitionPredicate extends Serializable {
 
     boolean test(BinaryRow part);
 
@@ -282,4 +283,16 @@ public interface PartitionPredicate {
         }
         return result;
     }
+
+    static PartitionPredicate fromMap(
+            RowType partitionType, Map<String, String> values, String 
defaultPartValue) {
+        return fromPredicate(
+                partitionType, createPartitionPredicate(values, partitionType, 
defaultPartValue));
+    }
+
+    static PartitionPredicate fromMaps(
+            RowType partitionType, List<Map<String, String>> values, String 
defaultPartValue) {
+        return fromMultiple(
+                partitionType, createBinaryPartitions(values, partitionType, 
defaultPartValue));
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
index 2f8b2a1f0f..c438b93fe6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/FallbackReadFileStoreTable.java
@@ -32,6 +32,7 @@ import org.apache.paimon.io.DataOutputViewStreamWrapper;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.reader.RecordReader;
 import org.apache.paimon.schema.TableSchema;
@@ -337,6 +338,13 @@ public class FallbackReadFileStoreTable extends 
DelegatedFileStoreTable {
             return this;
         }
 
+        @Override
+        public InnerTableScan withPartitionFilter(PartitionPredicate 
partitionPredicate) {
+            mainScan.withPartitionFilter(partitionPredicate);
+            fallbackScan.withPartitionFilter(partitionPredicate);
+            return this;
+        }
+
         @Override
         public FallbackReadScan withBucketFilter(Filter<Integer> bucketFilter) 
{
             mainScan.withBucketFilter(bucketFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
index 7f8714522d..76b348691e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/AbstractDataTableScan.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.source.snapshot.CompactedStartingScanner;
@@ -138,6 +139,12 @@ abstract class AbstractDataTableScan implements 
DataTableScan {
         return this;
     }
 
+    @Override
+    public AbstractDataTableScan withPartitionFilter(PartitionPredicate 
partitionPredicate) {
+        snapshotReader.withPartitionFilter(partitionPredicate);
+        return this;
+    }
+
     @Override
     public AbstractDataTableScan withLevelFilter(Filter<Integer> levelFilter) {
         snapshotReader.withLevelFilter(levelFilter);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
index 647520a3bc..89f15055d5 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/InnerTableScan.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.metrics.MetricRegistry;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Filter;
@@ -54,6 +55,10 @@ public interface InnerTableScan extends TableScan {
         return this;
     }
 
+    default InnerTableScan withPartitionFilter(PartitionPredicate 
partitionPredicate) {
+        return this;
+    }
+
     default InnerTableScan withBucket(int bucket) {
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
index 212d08d907..1f3d52b63c 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilder.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table.source;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.Table;
@@ -100,6 +101,9 @@ public interface ReadBuilder extends Serializable {
     /** Push partition filter. */
     ReadBuilder withPartitionFilter(Map<String, String> partitionSpec);
 
+    /** Push partition filters. */
+    ReadBuilder withPartitionFilter(PartitionPredicate partitionPredicate);
+
     ReadBuilder withBucket(int bucket);
 
     /**
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
index 386425ce79..f12c4fe4be 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/ReadBuilderImpl.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.table.source;
 
+import org.apache.paimon.CoreOptions;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.table.InnerTable;
@@ -29,6 +31,8 @@ import javax.annotation.Nullable;
 import java.util.Map;
 import java.util.Objects;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
+import static org.apache.paimon.partition.PartitionPredicate.fromPredicate;
 import static org.apache.paimon.utils.Preconditions.checkState;
 
 /** Implementation for {@link ReadBuilder}. */
@@ -37,6 +41,8 @@ public class ReadBuilderImpl implements ReadBuilder {
     private static final long serialVersionUID = 1L;
 
     private final InnerTable table;
+    private final RowType partitionType;
+    private final String defaultPartitionName;
 
     private Predicate filter;
 
@@ -45,7 +51,7 @@ public class ReadBuilderImpl implements ReadBuilder {
     private Integer shardIndexOfThisSubtask;
     private Integer shardNumberOfParallelSubtasks;
 
-    private Map<String, String> partitionSpec;
+    private @Nullable PartitionPredicate partitionFilter;
 
     private @Nullable Integer specifiedBucket = null;
     private Filter<Integer> bucketFilter;
@@ -56,6 +62,8 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     public ReadBuilderImpl(InnerTable table) {
         this.table = table;
+        this.partitionType = table.rowType().project(table.partitionKeys());
+        this.defaultPartitionName = new 
CoreOptions(table.options()).partitionDefaultName();
     }
 
     @Override
@@ -84,7 +92,19 @@ public class ReadBuilderImpl implements ReadBuilder {
 
     @Override
     public ReadBuilder withPartitionFilter(Map<String, String> partitionSpec) {
-        this.partitionSpec = partitionSpec;
+        if (partitionSpec != null) {
+            this.partitionFilter =
+                    fromPredicate(
+                            partitionType,
+                            createPartitionPredicate(
+                                    partitionSpec, partitionType, 
defaultPartitionName));
+        }
+        return this;
+    }
+
+    @Override
+    public ReadBuilder withPartitionFilter(@Nullable PartitionPredicate 
partitions) {
+        this.partitionFilter = partitions;
         return this;
     }
 
@@ -154,7 +174,10 @@ public class ReadBuilderImpl implements ReadBuilder {
     }
 
     private InnerTableScan configureScan(InnerTableScan scan) {
-        
scan.withFilter(filter).withReadType(readType).withPartitionFilter(partitionSpec);
+        // `filter` may contains partition related predicate, but 
`partitionFilter` will overwrite
+        // it if `partitionFilter` is not null. So we must avoid to put part 
of partition filter in
+        // `filter`, another part in `partitionFilter`
+        
scan.withFilter(filter).withReadType(readType).withPartitionFilter(partitionFilter);
         checkState(
                 bucketFilter == null || shardIndexOfThisSubtask == null,
                 "Bucket filter and shard configuration cannot be used 
together. "
@@ -200,6 +223,7 @@ public class ReadBuilderImpl implements ReadBuilder {
         ReadBuilderImpl that = (ReadBuilderImpl) o;
         return Objects.equals(table.name(), that.table.name())
                 && Objects.equals(filter, that.filter)
+                && Objects.equals(partitionFilter, that.partitionFilter)
                 && Objects.equals(readType, that.readType);
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
index 12600b7ddc..79a1c94c32 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReader.java
@@ -27,6 +27,7 @@ import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.ScanMode;
@@ -76,6 +77,8 @@ public interface SnapshotReader {
 
     SnapshotReader withPartitionFilter(List<BinaryRow> partitions);
 
+    SnapshotReader withPartitionFilter(PartitionPredicate partitionPredicate);
+
     SnapshotReader withPartitionsFilter(List<Map<String, String>> partitions);
 
     SnapshotReader withMode(ScanMode scanMode);
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index 5855e88ab1..8239bcabba 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -37,6 +37,7 @@ import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.ManifestsReader;
 import org.apache.paimon.operation.metrics.ScanMetrics;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
 import org.apache.paimon.schema.TableSchema;
@@ -198,9 +199,19 @@ public class SnapshotReaderImpl implements SnapshotReader {
         return this;
     }
 
+    @Override
+    public SnapshotReader withPartitionFilter(PartitionPredicate 
partitionPredicate) {
+        if (partitionPredicate != null) {
+            scan.withPartitionFilter(partitionPredicate);
+        }
+        return this;
+    }
+
     @Override
     public SnapshotReader withPartitionsFilter(List<Map<String, String>> 
partitions) {
-        scan.withPartitionsFilter(partitions);
+        if (partitions != null) {
+            scan.withPartitionsFilter(partitions);
+        }
         return this;
     }
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
index 3325c7ce11..b9afacec2b 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/system/AuditLogTable.java
@@ -34,6 +34,7 @@ import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.metrics.MetricRegistry;
 import org.apache.paimon.operation.ManifestsReader;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
@@ -328,6 +329,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public SnapshotReader withPartitionFilter(PartitionPredicate 
partitionPredicate) {
+            wrapped.withPartitionFilter(partitionPredicate);
+            return this;
+        }
+
         @Override
         public SnapshotReader withPartitionsFilter(List<Map<String, String>> 
partitions) {
             wrapped.withPartitionsFilter(partitions);
@@ -485,6 +492,12 @@ public class AuditLogTable implements DataTable, 
ReadonlyTable {
             return this;
         }
 
+        @Override
+        public InnerTableScan withPartitionFilter(PartitionPredicate 
partitionPredicate) {
+            batchScan.withPartitionFilter(partitionPredicate);
+            return this;
+        }
+
         @Override
         public InnerTableScan withBucketFilter(Filter<Integer> bucketFilter) {
             batchScan.withBucketFilter(bucketFilter);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
index 93cfc4b0d3..b0368082aa 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/CompactAction.java
@@ -36,11 +36,14 @@ import org.apache.paimon.flink.sink.RowDataChannelComputer;
 import org.apache.paimon.flink.source.CompactorSourceBuilder;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.PartitionPredicateVisitor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.predicate.PredicateProjectionConverter;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Preconditions;
@@ -66,6 +69,7 @@ import java.util.LinkedHashMap;
 import java.util.List;
 import java.util.Map;
 
+import static 
org.apache.paimon.partition.PartitionPredicate.createBinaryPartitions;
 import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 
 /** Table compact action for Flink. */
@@ -191,24 +195,38 @@ public class CompactAction extends TableActionBase {
         builder.build();
     }
 
-    protected Predicate getPredicate() throws Exception {
+    protected PartitionPredicate getPredicate() throws Exception {
         Preconditions.checkArgument(
                 partitions == null || whereSql == null,
                 "partitions and where cannot be used together.");
         Predicate predicate = null;
+        RowType partitionType = table.rowType().project(table.partitionKeys());
+        String partitionDefaultName = ((FileStoreTable) 
table).coreOptions().partitionDefaultName();
         if (partitions != null) {
-            predicate =
-                    PredicateBuilder.or(
-                            partitions.stream()
-                                    .map(
-                                            p ->
-                                                    createPartitionPredicate(
-                                                            p,
-                                                            table.rowType(),
-                                                            ((FileStoreTable) 
table)
-                                                                    
.coreOptions()
-                                                                    
.partitionDefaultName()))
-                                    .toArray(Predicate[]::new));
+            boolean fullMode =
+                    partitions.stream()
+                            .allMatch(part -> part.size() == 
partitionType.getFieldCount());
+            PartitionPredicate partitionFilter;
+            if (fullMode) {
+                List<BinaryRow> binaryPartitions =
+                        createBinaryPartitions(partitions, partitionType, 
partitionDefaultName);
+                return PartitionPredicate.fromMultiple(partitionType, 
binaryPartitions);
+            } else {
+                // partitions may be partial partition fields, so here must to 
use predicate way.
+                predicate =
+                        partitions.stream()
+                                .map(
+                                        partition ->
+                                                createPartitionPredicate(
+                                                        partition,
+                                                        table.rowType(),
+                                                        partitionDefaultName))
+                                .reduce(PredicateBuilder::or)
+                                .orElseThrow(
+                                        () ->
+                                                new RuntimeException(
+                                                        "Failed to get 
partition filter."));
+            }
         } else if (whereSql != null) {
             SimpleSqlPredicateConvertor simpleSqlPredicateConvertor =
                     new SimpleSqlPredicateConvertor(table.rowType());
@@ -223,9 +241,18 @@ public class CompactAction extends TableActionBase {
             Preconditions.checkArgument(
                     predicate.visit(partitionPredicateVisitor),
                     "Only partition key can be specialized in compaction 
action.");
+            predicate =
+                    predicate
+                            .visit(
+                                    new PredicateProjectionConverter(
+                                            
table.rowType().projectNames(table.partitionKeys())))
+                            .orElseThrow(
+                                    () ->
+                                            new RuntimeException(
+                                                    "Failed to convert 
partition predicate."));
         }
 
-        return predicate;
+        return PartitionPredicate.fromPredicate(partitionType, predicate);
     }
 
     private boolean buildForPostponeBucketCompaction(
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
index 7e83abc3df..fb3d068ed1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/RescaleAction.java
@@ -24,11 +24,8 @@ import org.apache.paimon.flink.sink.FlinkSinkBuilder;
 import org.apache.paimon.flink.source.FlinkSourceBuilder;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.partition.PartitionPredicate;
-import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.InternalRowPartitionComputer;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.api.common.RuntimeExecutionMode;
@@ -98,14 +95,12 @@ public class RescaleAction extends TableActionBase {
                 String.valueOf(snapshot.id()));
         fileStoreTable = fileStoreTable.copy(dynamicOptions);
 
-        RowType partitionType = fileStoreTable.schema().logicalPartitionType();
-        Predicate partitionPredicate =
-                PartitionPredicate.createPartitionPredicate(
-                        partitionType,
-                        InternalRowPartitionComputer.convertSpecToInternal(
-                                partition,
-                                partitionType,
-                                
fileStoreTable.coreOptions().partitionDefaultName()));
+        PartitionPredicate predicate =
+                PartitionPredicate.fromMap(
+                        fileStoreTable.schema().logicalPartitionType(),
+                        partition,
+                        fileStoreTable.coreOptions().partitionDefaultName());
+
         DataStream<RowData> source =
                 new FlinkSourceBuilder(fileStoreTable)
                         .env(env)
@@ -114,7 +109,7 @@ public class RescaleAction extends TableActionBase {
                                 scanParallelism == null
                                         ? currentBucketNum(snapshot)
                                         : scanParallelism)
-                        .predicate(partitionPredicate)
+                        .partitionPredicate(predicate)
                         .build();
 
         Map<String, String> bucketOptions = new 
HashMap<>(fileStoreTable.options());
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
index 2724a5da3a..535cdfc7a9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/action/SortCompactAction.java
@@ -91,7 +91,7 @@ public class SortCompactAction extends CompactAction {
                                                 identifier.getObjectName())
                                         .asSummaryString());
 
-        sourceBuilder.predicate(getPredicate());
+        sourceBuilder.partitionPredicate(getPredicate());
 
         String scanParallelism = 
tableConfig.get(FlinkConnectorOptions.SCAN_PARALLELISM.key());
         if (scanParallelism != null) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
index 007e625afd..8cd9bca856 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/compact/AppendTableCompactBuilder.java
@@ -25,7 +25,7 @@ import org.apache.paimon.flink.sink.AppendTableCompactSink;
 import org.apache.paimon.flink.source.AppendTableCompactSource;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.utils.Preconditions;
 
@@ -62,7 +62,7 @@ public class AppendTableCompactBuilder {
 
     private boolean isContinuous = false;
 
-    @Nullable private Predicate partitionPredicate;
+    @Nullable private PartitionPredicate partitionPredicate;
     @Nullable private Duration partitionIdleTime = null;
 
     public AppendTableCompactBuilder(
@@ -76,8 +76,8 @@ public class AppendTableCompactBuilder {
         this.isContinuous = isContinuous;
     }
 
-    public void withPartitionPredicate(Predicate predicate) {
-        this.partitionPredicate = predicate;
+    public void withPartitionPredicate(PartitionPredicate partitionPredicate) {
+        this.partitionPredicate = partitionPredicate;
     }
 
     public void withPartitionIdleTime(@Nullable Duration partitionIdleTime) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendTableCompactSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendTableCompactSource.java
index 7c707e850f..dae05ff60f 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendTableCompactSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/AppendTableCompactSource.java
@@ -21,7 +21,7 @@ package org.apache.paimon.flink.source;
 import org.apache.paimon.append.AppendCompactCoordinator;
 import org.apache.paimon.append.AppendCompactTask;
 import org.apache.paimon.flink.sink.CompactionTaskTypeInfo;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.EndOfScanException;
 
@@ -58,17 +58,17 @@ public class AppendTableCompactSource extends 
AbstractNonCoordinatedSource<Appen
     private final FileStoreTable table;
     private final boolean streaming;
     private final long scanInterval;
-    private final Predicate filter;
+    private final PartitionPredicate partitionFilter;
 
     public AppendTableCompactSource(
             FileStoreTable table,
             boolean isStreaming,
             long scanInterval,
-            @Nullable Predicate filter) {
+            @Nullable PartitionPredicate partitionFilter) {
         this.table = table;
         this.streaming = isStreaming;
         this.scanInterval = scanInterval;
-        this.filter = filter;
+        this.partitionFilter = partitionFilter;
     }
 
     @Override
@@ -82,7 +82,7 @@ public class AppendTableCompactSource extends 
AbstractNonCoordinatedSource<Appen
         Preconditions.checkArgument(
                 readerContext.currentParallelism() == 1,
                 "Compaction Operator parallelism in paimon MUST be one.");
-        return new CompactSourceReader(table, streaming, filter, scanInterval);
+        return new CompactSourceReader(table, streaming, partitionFilter, 
scanInterval);
     }
 
     /** BucketUnawareCompactSourceReader. */
@@ -92,9 +92,12 @@ public class AppendTableCompactSource extends 
AbstractNonCoordinatedSource<Appen
         private final long scanInterval;
 
         public CompactSourceReader(
-                FileStoreTable table, boolean streaming, Predicate filter, 
long scanInterval) {
+                FileStoreTable table,
+                boolean streaming,
+                PartitionPredicate partitions,
+                long scanInterval) {
             this.scanInterval = scanInterval;
-            compactionCoordinator = new AppendCompactCoordinator(table, 
streaming, filter);
+            compactionCoordinator = new AppendCompactCoordinator(table, 
streaming, partitions);
         }
 
         @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
index 66d903c72b..73ccbf7fb1 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/BaseDataTableSource.java
@@ -218,7 +218,8 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
                         .sourceBounded(!unbounded)
                         .logSourceProvider(logSourceProvider)
                         .projection(projectFields)
-                        .predicate(getPredicateWithScanPartitions())
+                        .predicate(predicate)
+                        .partitionPredicate(partitionPredicate)
                         .limit(limit)
                         .watermarkStrategy(watermarkStrategy)
                         
.dynamicPartitionFilteringFields(dynamicPartitionFilteringFields());
@@ -373,7 +374,8 @@ public abstract class BaseDataTableSource extends 
FlinkTableSource
                 table.newReadBuilder()
                         .dropStats()
                         .withProjection(new int[0])
-                        .withFilter(getPredicateWithScanPartitions())
+                        .withFilter(predicate)
+                        .withPartitionFilter(partitionPredicate)
                         .newScan()
                         .plan()
                         .splits();
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
index 47e1997fb6..ada68c05c0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/CompactorSourceBuilder.java
@@ -24,7 +24,7 @@ import org.apache.paimon.flink.FlinkConnectorOptions;
 import org.apache.paimon.flink.LogicalTypeConversion;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.table.FileStoreTable;
 import org.apache.paimon.table.source.ReadBuilder;
 import org.apache.paimon.table.system.CompactBucketsTable;
@@ -62,7 +62,7 @@ public class CompactorSourceBuilder {
 
     private boolean isContinuous = false;
     private StreamExecutionEnvironment env;
-    @Nullable private Predicate partitionPredicate = null;
+    @Nullable private PartitionPredicate partitionPredicate = null;
     @Nullable private Duration partitionIdleTime = null;
 
     public CompactorSourceBuilder(String tableIdentifier, FileStoreTable 
table) {
@@ -89,8 +89,10 @@ public class CompactorSourceBuilder {
         compactBucketsTable =
                 compactBucketsTable.copy(
                         isContinuous ? streamingCompactOptions() : 
batchCompactOptions());
-        ReadBuilder readBuilder =
-                
compactBucketsTable.newReadBuilder().withFilter(partitionPredicate);
+        ReadBuilder readBuilder = compactBucketsTable.newReadBuilder();
+        if (partitionPredicate != null) {
+            readBuilder.withPartitionFilter(partitionPredicate);
+        }
         if 
(CoreOptions.fromMap(table.options()).manifestDeleteFileDropStats()) {
             readBuilder = readBuilder.dropStats();
         }
@@ -174,7 +176,8 @@ public class CompactorSourceBuilder {
         };
     }
 
-    public CompactorSourceBuilder withPartitionPredicate(@Nullable Predicate 
partitionPredicate) {
+    public CompactorSourceBuilder withPartitionPredicate(
+            @Nullable PartitionPredicate partitionPredicate) {
         this.partitionPredicate = partitionPredicate;
         return this;
     }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
index b5ef9c469f..5f9f01f1cc 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkSourceBuilder.java
@@ -31,6 +31,7 @@ import 
org.apache.paimon.flink.source.align.AlignedContinuousFileStoreSource;
 import org.apache.paimon.flink.source.operator.MonitorSource;
 import org.apache.paimon.flink.utils.TableScanUtils;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.table.FileStoreTable;
@@ -90,6 +91,7 @@ public class FlinkSourceBuilder {
     private StreamExecutionEnvironment env;
     @Nullable private int[][] projectedFields;
     @Nullable private Predicate predicate;
+    @Nullable private PartitionPredicate partitionPredicate;
     @Nullable private LogSourceProvider logSourceProvider;
     @Nullable private Integer parallelism;
     @Nullable private Long limit;
@@ -154,6 +156,11 @@ public class FlinkSourceBuilder {
         return this;
     }
 
+    public FlinkSourceBuilder partitionPredicate(PartitionPredicate 
partitionPredicate) {
+        this.partitionPredicate = partitionPredicate;
+        return this;
+    }
+
     public FlinkSourceBuilder limit(@Nullable Long limit) {
         this.limit = limit;
         return this;
@@ -197,7 +204,12 @@ public class FlinkSourceBuilder {
         if (readType != null) {
             readBuilder.withReadType(readType);
         }
-        readBuilder.withFilter(predicate);
+        if (predicate != null) {
+            readBuilder.withFilter(predicate);
+        }
+        if (partitionPredicate != null) {
+            readBuilder.withPartitionFilter(partitionPredicate);
+        }
         if (limit != null) {
             readBuilder.withLimit(limit.intValue());
         }
@@ -340,6 +352,7 @@ public class FlinkSourceBuilder {
                                                 table,
                                                 projectedRowType(),
                                                 predicate,
+                                                partitionPredicate,
                                                 outerProject()))
                                 .addSource(
                                         new 
LogHybridSourceFactory(logSourceProvider),
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
index 5af489ed5c..621ceb7357 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/FlinkTableSource.java
@@ -71,6 +71,12 @@ public abstract class FlinkTableSource
     protected final Options options;
 
     @Nullable protected Predicate predicate;
+    /**
+     * This field is only used for normal source (not lookup source). 
Specified partitions in lookup
+     * sources are handled in {@link 
org.apache.paimon.flink.lookup.PartitionLoader}.
+     */
+    @Nullable protected PartitionPredicate partitionPredicate;
+
     @Nullable protected int[][] projectFields;
     @Nullable protected Long limit;
     protected SplitStatistics splitStatistics;
@@ -86,6 +92,7 @@ public abstract class FlinkTableSource
             @Nullable Long limit) {
         this.table = table;
         this.options = Options.fromMap(table.options());
+        this.partitionPredicate = getPartitionPredicateWithOptions();
 
         this.predicate = predicate;
         this.projectFields = projectFields;
@@ -130,32 +137,30 @@ public abstract class FlinkTableSource
      * This method is only used for normal source (not lookup source). 
Specified partitions in
      * lookup sources are handled in {@link 
org.apache.paimon.flink.lookup.PartitionLoader}.
      */
-    protected Predicate getPredicateWithScanPartitions() {
+    private PartitionPredicate getPartitionPredicateWithOptions() {
         if (options.contains(FlinkConnectorOptions.SCAN_PARTITIONS)) {
-            Predicate partitionPredicate;
+            PartitionPredicate partitionPredicate;
             try {
                 partitionPredicate =
-                        PartitionPredicate.createPartitionPredicate(
-                                ParameterUtils.getPartitions(
-                                        
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
-                                                .split(";")),
-                                table.rowType(),
-                                
options.get(CoreOptions.PARTITION_DEFAULT_NAME));
+                        PartitionPredicate.fromPredicate(
+                                table.rowType().project(table.partitionKeys()),
+                                PartitionPredicate.createPartitionPredicate(
+                                        ParameterUtils.getPartitions(
+                                                
options.get(FlinkConnectorOptions.SCAN_PARTITIONS)
+                                                        .split(";")),
+                                        table.rowType(),
+                                        
options.get(CoreOptions.PARTITION_DEFAULT_NAME)));
+                return partitionPredicate;
             } catch (IllegalArgumentException e) {
                 // In older versions of Flink, however, lookup sources will 
first be treated as
                 // normal sources. So this method will also be visited by 
lookup tables, whose
                 // option value might be max_pt() or max_two_pt(). In this 
case we ignore the
                 // filters.
-                return predicate;
+                return null;
             }
 
-            if (predicate == null) {
-                return partitionPredicate;
-            } else {
-                return PredicateBuilder.and(predicate, partitionPredicate);
-            }
         } else {
-            return predicate;
+            return null;
         }
     }
 
@@ -222,7 +227,8 @@ public abstract class FlinkTableSource
                 List<PartitionEntry> partitionEntries =
                         table.newReadBuilder()
                                 .dropStats()
-                                .withFilter(getPredicateWithScanPartitions())
+                                .withFilter(predicate)
+                                .withPartitionFilter(partitionPredicate)
                                 .newScan()
                                 .listPartitionEntries();
                 long totalSize = 0;
@@ -238,7 +244,8 @@ public abstract class FlinkTableSource
                 List<Split> splits =
                         table.newReadBuilder()
                                 .dropStats()
-                                .withFilter(getPredicateWithScanPartitions())
+                                .withFilter(predicate)
+                                .withPartitionFilter(partitionPredicate)
                                 .withProjection(new int[0])
                                 .newScan()
                                 .plan()
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
index afdcae2fff..d3dd232f12 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/LogHybridSourceFactory.java
@@ -24,6 +24,7 @@ import org.apache.paimon.flink.NestedProjectedRowData;
 import org.apache.paimon.flink.log.LogSourceProvider;
 import org.apache.paimon.flink.metrics.FlinkMetricRegistry;
 import org.apache.paimon.options.Options;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.table.DataTable;
 import org.apache.paimon.table.Table;
@@ -74,6 +75,7 @@ public class LogHybridSourceFactory
             Table table,
             @Nullable RowType readType,
             @Nullable Predicate predicate,
+            @Nullable PartitionPredicate partitionPredicate,
             @Nullable NestedProjectedRowData rowData) {
         if (!(table instanceof DataTable)) {
             throw new UnsupportedOperationException(
@@ -88,9 +90,15 @@ public class LogHybridSourceFactory
         if (readType != null) {
             readBuilder.withReadType(readType);
         }
+        if (predicate != null) {
+            readBuilder.withFilter(predicate);
+        }
+        if (partitionPredicate != null) {
+            readBuilder.withPartitionFilter(partitionPredicate);
+        }
 
         return new FlinkHybridFirstSource(
-                readBuilder.withFilter(predicate),
+                readBuilder,
                 dataTable.snapshotManager(),
                 dataTable.coreOptions().toConfiguration(),
                 rowData);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
index e9331414f5..05563fdeac 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/source/SystemTableSource.java
@@ -92,7 +92,10 @@ public class SystemTableSource extends FlinkTableSource {
         if (readType != null) {
             readBuilder.withReadType(readType);
         }
-        readBuilder.withFilter(getPredicateWithScanPartitions());
+        if (predicate != null) {
+            readBuilder.withFilter(predicate);
+        }
+        readBuilder.withPartitionFilter(partitionPredicate);
 
         if (unbounded && table instanceof DataTable) {
             source =
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
index 419eb6a521..edc8b469e7 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/action/CompactActionITCase.java
@@ -358,6 +358,45 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
         checkFileAndRowSize(table, 3L, 0L, 1, 6);
     }
 
+    @Test
+    public void testLotsOfPartitionsCompact() throws Exception {
+        Map<String, String> tableOptions = new HashMap<>();
+        tableOptions.put(CoreOptions.BUCKET.key(), "-1");
+        tableOptions.put(CoreOptions.COMPACTION_MIN_FILE_NUM.key(), "2");
+
+        FileStoreTable table =
+                prepareTable(
+                        Collections.singletonList("k"),
+                        Collections.emptyList(),
+                        Collections.emptyList(),
+                        tableOptions);
+
+        // base records
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        writeData(
+                rowData(1, 100, 15, BinaryString.fromString("20221208")),
+                rowData(1, 100, 16, BinaryString.fromString("20221208")),
+                rowData(1, 100, 15, BinaryString.fromString("20221209")));
+
+        checkLatestSnapshot(table, 2, Snapshot.CommitKind.APPEND);
+
+        List<String> partitions = new ArrayList<>();
+        for (int i = 0; i < 1000; i++) {
+            partitions.add("--partition");
+            partitions.add("k=" + i);
+        }
+
+        // repairing that the ut don't specify the real partition of table
+        runActionForUnawareTable(false, partitions);
+
+        // first compaction, snapshot will be 3.
+        checkFileAndRowSize(table, 3L, 0L, 1, 6);
+    }
+
     @Test
     public void testTableConf() throws Exception {
         prepareTable(
@@ -440,14 +479,20 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
     }
 
     private void runAction(boolean isStreaming) throws Exception {
-        runAction(isStreaming, false);
+        runAction(isStreaming, false, Collections.emptyList());
+    }
+
+    private void runActionForUnawareTable(boolean isStreaming, List<String> 
extra)
+            throws Exception {
+        runAction(isStreaming, true, extra);
     }
 
     private void runActionForUnawareTable(boolean isStreaming) throws 
Exception {
-        runAction(isStreaming, true);
+        runAction(isStreaming, true, Collections.emptyList());
     }
 
-    private void runAction(boolean isStreaming, boolean unawareBucket) throws 
Exception {
+    private void runAction(boolean isStreaming, boolean unawareBucket, 
List<String> extra)
+            throws Exception {
         StreamExecutionEnvironment env;
         if (isStreaming) {
             env = streamExecutionEnvironmentBuilder().streamingMode().build();
@@ -486,6 +531,8 @@ public class CompactActionITCase extends 
CompactActionITCaseBase {
             }
         }
 
+        baseArgs.addAll(extra);
+
         CompactAction action = createAction(CompactAction.class, 
baseArgs.toArray(new String[0]));
 
         action.withStreamExecutionEnvironment(env).build();
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
index 0e85f559d9..39664c3aa3 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/sink/CompactorSinkITCase.java
@@ -32,7 +32,7 @@ import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.options.Options;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -70,7 +70,6 @@ import java.util.Map;
 import java.util.Random;
 import java.util.UUID;
 
-import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -122,16 +121,15 @@ public class CompactorSinkITCase extends AbstractTestBase 
{
         StreamExecutionEnvironment env = 
streamExecutionEnvironmentBuilder().batchMode().build();
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(tablePath.toString(), table);
-        Predicate predicate =
-                createPartitionPredicate(
-                        getSpecifiedPartitions(),
-                        table.rowType(),
-                        table.coreOptions().partitionDefaultName());
         DataStreamSource<RowData> source =
                 sourceBuilder
                         .withEnv(env)
                         .withContinuousMode(false)
-                        .withPartitionPredicate(predicate)
+                        .withPartitionPredicate(
+                                PartitionPredicate.fromMaps(
+                                        table.schema().logicalPartitionType(),
+                                        getSpecifiedPartitions(),
+                                        
table.coreOptions().partitionDefaultName()))
                         .build();
         new CompactorSinkBuilder(table, true).withInput(source).build();
         env.execute();
@@ -162,16 +160,15 @@ public class CompactorSinkITCase extends AbstractTestBase 
{
                 streamExecutionEnvironmentBuilder().streamingMode().build();
         CompactorSourceBuilder sourceBuilder =
                 new CompactorSourceBuilder(tablePath.toString(), table);
-        Predicate predicate =
-                createPartitionPredicate(
-                        getSpecifiedPartitions(),
-                        table.rowType(),
-                        table.coreOptions().partitionDefaultName());
         DataStreamSource<RowData> source =
                 sourceBuilder
                         .withEnv(env)
                         .withContinuousMode(false)
-                        .withPartitionPredicate(predicate)
+                        .withPartitionPredicate(
+                                PartitionPredicate.fromMaps(
+                                        table.schema().logicalPartitionType(),
+                                        getSpecifiedPartitions(),
+                                        
table.coreOptions().partitionDefaultName()))
                         .build();
         Integer sinkParalellism = new Random().nextInt(100) + 1;
         new CompactorSinkBuilder(
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 9afd04c8d0..d5224670d4 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -27,7 +27,7 @@ import org.apache.paimon.flink.util.AbstractTestBase;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.fs.local.LocalFileIO;
 import org.apache.paimon.io.DataFileMetaSerializer;
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.schema.Schema;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -60,7 +60,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
-import static 
org.apache.paimon.partition.PartitionPredicate.createPartitionPredicate;
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -264,16 +263,15 @@ public class CompactorSourceITCase extends 
AbstractTestBase {
 
         StreamExecutionEnvironment env =
                 streamExecutionEnvironmentBuilder().streamingMode().build();
-        Predicate partitionPredicate =
-                createPartitionPredicate(
-                        specifiedPartitions,
-                        table.rowType(),
-                        table.coreOptions().partitionDefaultName());
         DataStreamSource<RowData> compactorSource =
                 new CompactorSourceBuilder("test", table)
                         .withContinuousMode(isStreaming)
                         .withEnv(env)
-                        .withPartitionPredicate(partitionPredicate)
+                        .withPartitionPredicate(
+                                PartitionPredicate.fromMaps(
+                                        table.schema().logicalPartitionType(),
+                                        specifiedPartitions,
+                                        
table.coreOptions().partitionDefaultName()))
                         .build();
         CloseableIterator<RowData> it = compactorSource.executeAndCollect();
 
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
index 24f35cdfdb..89ac2c947a 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/FileStoreSourceMetricsTest.java
@@ -129,7 +129,7 @@ public class FileStoreSourceMetricsTest {
     public void logHybridFileStoreSourceScanMetricsTest() throws Exception {
         writeOnce();
         FlinkSource logHybridFileStoreSource =
-                LogHybridSourceFactory.buildHybridFirstSource(table, null, 
null, null);
+                LogHybridSourceFactory.buildHybridFirstSource(table, null, 
null, null, null);
         logHybridFileStoreSource.restoreEnumerator(context, null);
         assertThat(TestingMetricUtils.getGauge(scanMetricGroup, 
"lastScannedManifests").getValue())
                 .isEqualTo(1L);
diff --git 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
index 196f682cd5..8458941b91 100644
--- 
a/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
+++ 
b/paimon-spark/paimon-spark-common/src/main/java/org/apache/paimon/spark/procedure/CompactProcedure.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.manifest.PartitionEntry;
 import org.apache.paimon.operation.BaseAppendFileStoreWrite;
+import org.apache.paimon.partition.PartitionPredicate;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.spark.PaimonSplitScan;
 import org.apache.paimon.spark.SparkUtils;
@@ -46,6 +47,7 @@ import org.apache.paimon.table.sink.TableCommitImpl;
 import org.apache.paimon.table.source.DataSplit;
 import org.apache.paimon.table.source.EndOfScanException;
 import org.apache.paimon.table.source.snapshot.SnapshotReader;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.ParameterUtils;
 import org.apache.paimon.utils.ProcedureUtils;
@@ -237,25 +239,33 @@ public class CompactProcedure extends BaseProcedure {
         BucketMode bucketMode = table.bucketMode();
         OrderType orderType = OrderType.of(sortType);
         boolean fullCompact = compactStrategy.equalsIgnoreCase(FULL);
+        RowType partitionType = table.schema().logicalPartitionType();
         Predicate filter =
                 condition == null
                         ? null
                         : ExpressionUtils.convertConditionToPaimonPredicate(
                                         condition,
                                         ((LogicalPlan) relation).output(),
-                                        table.rowType(),
+                                        partitionType,
                                         false)
                                 .getOrElse(null);
+        PartitionPredicate partitionPredicate =
+                PartitionPredicate.fromPredicate(partitionType, filter);
         if (orderType.equals(OrderType.NONE)) {
             JavaSparkContext javaSparkContext = new 
JavaSparkContext(spark().sparkContext());
             switch (bucketMode) {
                 case HASH_FIXED:
                 case HASH_DYNAMIC:
                     compactAwareBucketTable(
-                            table, fullCompact, filter, partitionIdleTime, 
javaSparkContext);
+                            table,
+                            fullCompact,
+                            partitionPredicate,
+                            partitionIdleTime,
+                            javaSparkContext);
                     break;
                 case BUCKET_UNAWARE:
-                    compactUnAwareBucketTable(table, filter, 
partitionIdleTime, javaSparkContext);
+                    compactUnAwareBucketTable(
+                            table, partitionPredicate, partitionIdleTime, 
javaSparkContext);
                     break;
                 default:
                     throw new UnsupportedOperationException(
@@ -279,12 +289,12 @@ public class CompactProcedure extends BaseProcedure {
     private void compactAwareBucketTable(
             FileStoreTable table,
             boolean fullCompact,
-            @Nullable Predicate filter,
+            @Nullable PartitionPredicate partitionPredicate,
             @Nullable Duration partitionIdleTime,
             JavaSparkContext javaSparkContext) {
         SnapshotReader snapshotReader = table.newSnapshotReader();
-        if (filter != null) {
-            snapshotReader.withFilter(filter);
+        if (partitionPredicate != null) {
+            snapshotReader.withPartitionFilter(partitionPredicate);
         }
         Set<BinaryRow> partitionToBeCompacted =
                 getHistoryPartition(snapshotReader, partitionIdleTime);
@@ -358,12 +368,12 @@ public class CompactProcedure extends BaseProcedure {
 
     private void compactUnAwareBucketTable(
             FileStoreTable table,
-            @Nullable Predicate filter,
+            @Nullable PartitionPredicate partitionPredicate,
             @Nullable Duration partitionIdleTime,
             JavaSparkContext javaSparkContext) {
         List<AppendCompactTask> compactionTasks;
         try {
-            compactionTasks = new AppendCompactCoordinator(table, false, 
filter).run();
+            compactionTasks = new AppendCompactCoordinator(table, false, 
partitionPredicate).run();
         } catch (EndOfScanException e) {
             compactionTasks = new ArrayList<>();
         }
diff --git 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
index f85213da7c..ffc31bb46f 100644
--- 
a/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
+++ 
b/paimon-spark/paimon-spark-ut/src/test/scala/org/apache/paimon/spark/sql/DeletionVectorTest.scala
@@ -443,6 +443,80 @@ class DeletionVectorTest extends PaimonSparkTestBase with 
AdaptiveSparkPlanHelpe
       }
   }
 
+  test(s"Paimon DeletionVector: delete for append partitioned table with 
bucket = 1") {
+    val bucket = -1
+    withTable("T") {
+      val bucketKey = if (bucket > 1) {
+        ", 'bucket-key' = 'id'"
+      } else {
+        ""
+      }
+      spark.sql(
+        s"""
+           |CREATE TABLE T (id INT, name STRING, pt STRING)
+           |PARTITIONED BY(pt)
+           |TBLPROPERTIES ('deletion-vectors.enabled' = 'true', 
'deletion-vectors.bitmap64' = '${Random
+            .nextBoolean()}', 'bucket' = '$bucket' $bucketKey)
+           |""".stripMargin)
+
+      val table = loadTable("T")
+      val dvMaintainerFactory =
+        new 
DeletionVectorsMaintainer.Factory(table.store().newIndexFileHandler())
+
+      def getDeletionVectors(ptValues: Seq[String]): Map[String, 
DeletionVector] = {
+        getLatestDeletionVectors(table, dvMaintainerFactory, 
ptValues.map(BinaryRow.singleColumn))
+      }
+
+      spark.sql(
+        "INSERT INTO T VALUES (1, 'a', '2024'), (2, 'b', '2024'), (3, 'c', 
'2025'), (4, 'd', '2025')")
+      val deletionVectors1 = getAllLatestDeletionVectors(table, 
dvMaintainerFactory)
+      Assertions.assertEquals(0, deletionVectors1.size)
+
+      val cond1 = "id = 2"
+      val rowMetaInfo1 = getFilePathAndRowIndex(cond1)
+      runAndCheckSplitScan(s"DELETE FROM T WHERE $cond1")
+      checkAnswer(
+        spark.sql(s"SELECT * from T ORDER BY id"),
+        Row(1, "a", "2024") :: Row(3, "c", "2025") :: Row(4, "d", "2025") :: 
Nil)
+      val deletionVectors2 = getDeletionVectors(Seq("2024", "2025"))
+      Assertions.assertEquals(1, deletionVectors2.size)
+      deletionVectors2
+        .foreach {
+          case (filePath, dv) =>
+            rowMetaInfo1(filePath).foreach(index => 
Assertions.assertTrue(dv.isDeleted(index)))
+        }
+
+      val cond2 = "id = 3"
+      val rowMetaInfo2 = rowMetaInfo1 ++ getFilePathAndRowIndex(cond2)
+      runAndCheckSplitScan(s"DELETE FROM T WHERE $cond2")
+      checkAnswer(
+        spark.sql(s"SELECT * from T ORDER BY id"),
+        Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+      val deletionVectors3 = getDeletionVectors(Seq("2024"))
+      Assertions.assertTrue(deletionVectors2 == deletionVectors3)
+      val deletionVectors4 = getDeletionVectors(Seq("2024", "2025"))
+      deletionVectors4
+        .foreach {
+          case (filePath, dv) =>
+            rowMetaInfo2(filePath).foreach(index => 
Assertions.assertTrue(dv.isDeleted(index)))
+        }
+
+      spark.sql("""CALL sys.compact(table => 'T', partitions => "pt = 
'2024'")""")
+      Assertions.assertTrue(getDeletionVectors(Seq("2024")).isEmpty)
+      Assertions.assertTrue(getDeletionVectors(Seq("2025")).nonEmpty)
+      checkAnswer(
+        spark.sql(s"SELECT * from T ORDER BY id"),
+        Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+
+      spark.sql("""CALL sys.compact(table => 'T', where => "pt = '2025'")""")
+      Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
+      Assertions.assertTrue(getDeletionVectors(Seq("2025")).isEmpty)
+      checkAnswer(
+        spark.sql(s"SELECT * from T ORDER BY id"),
+        Row(1, "a", "2024") :: Row(4, "d", "2025") :: Nil)
+    }
+  }
+
   test("Paimon deletionVector: deletion vector write verification") {
     withTable("T") {
       spark.sql(s"""

Reply via email to