This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 974b725f63 [core] Introduce BucketSelector based on partition values 
to achieve bucket level predicate push down (#7486)
974b725f63 is described below

commit 974b725f636fc6bb31c0efb72b2ae3ad83d174aa
Author: Jingsong Lee <[email protected]>
AuthorDate: Fri Mar 20 13:32:15 2026 +0800

    [core] Introduce BucketSelector based on partition values to achieve bucket 
level predicate push down (#7486)
    
    Introducing BucketSelector based on partition values to achieve bucket
    level predicate push down optimization.
    
    Case 1: bucket filtering with compound predicates on a single-field
    bucket key.
    
    Table schema:
    - Partition key: column 'a' (INT)
    - Bucket key: column 'b' (INT)
    - Bucket count: 10
    
    Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) =
    100 rows.
    
    Scenarios:
    - Predicate: (a < 3 AND b = 5) OR (a = 3 AND b = 7) - Tests partition
    range filter with bucket equality, combined with OR. Expected: buckets
    for partition 1,2 with b=5 and partition 3 with b=7.
    - Predicate: (a < 3 AND b = 5) OR (a = 3 AND b < 100) - Tests partition
    range with bucket equality, OR partition equality with bucket range.
    Expected: mixed buckets from partition 3 and specific buckets from
    partitions 1,2.
    - Predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) - Tests partition
    equality with bucket equality in both OR branches. Expected: exact
    bucket matching for each partition-b combination.
    
    Case2: bucket filtering with compound predicates on a composite
    (multi-field) bucket key.
    
    Table schema:
    - Partition key: column 'a' (INT)
    - Bucket key: columns 'b' and 'c' (composite, INT)
    - Bucket count: 10
    
    Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) ×
    10 c-values (c=0 to 9) = 1000 rows.
    
    Test scenarios:
    
    - Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 - Tests
    nested OR within AND, with partition range, bucket field equality, and
    additional bucket field filter. The 'c = 5' condition is part of the
    composite bucket key, affecting bucket selection.
    - Predicate: ((a < 3 AND b = 5) OR (a = 3 AND b < 100)) AND c = 5 -
    Tests range predicate on one bucket field (b) combined with equality on
    another (c). Validates handling of multiple bucket key fields with
    different predicate types.
    - Predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 - Tests
    exact matching on both partition and bucket fields. The composite bucket
    key (b,c) ensures precise bucket targeting.
---
 .../predicate/PartitionValuePredicateVisitor.java  |  91 +++++
 .../paimon/predicate/PredicateReplaceVisitor.java  |   6 +-
 .../TriFilter.java}                                |  29 +-
 .../PartitionValuePredicateVisitorTest.java        | 375 +++++++++++++++++++++
 .../org/apache/paimon/AppendOnlyFileStore.java     |  33 +-
 .../java/org/apache/paimon/KeyValueFileStore.java  |  33 +-
 .../org/apache/paimon/manifest/BucketFilter.java   |  14 +-
 .../apache/paimon/manifest/ManifestEntryCache.java |  12 +-
 .../paimon/operation/AbstractFileStoreScan.java    |  18 +-
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   5 +
 .../paimon/operation/BucketSelectConverter.java    | 171 ++--------
 ...ketSelectConverter.java => BucketSelector.java} |  92 +++--
 .../org/apache/paimon/operation/FileStoreScan.java |   5 +-
 .../paimon/operation/KeyValueFileStoreScan.java    |   5 +
 .../paimon/table/PrimaryKeyFileStoreTable.java     |   2 -
 .../table/source/snapshot/SnapshotReaderImpl.java  |   1 +
 .../test/java/org/apache/paimon/TestFileStore.java |   1 -
 .../apache/paimon/manifest/BucketFilterTest.java   | 163 ---------
 .../operation/BucketSelectConverterTest.java       | 148 --------
 .../paimon/operation/BucketSelectorTest.java       | 258 ++++++++++++++
 .../apache/paimon/table/BucketFilterScanTest.java  | 250 ++++++++++++++
 21 files changed, 1149 insertions(+), 563 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java
new file mode 100644
index 0000000000..943b751870
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PartitionValuePredicateVisitor.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.types.RowType;
+
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Optional;
+import java.util.Set;
+
+/**
+ * A {@link PredicateReplaceVisitor} that evaluates partition predicates 
against a known partition
+ * value, replacing them with {@link AlwaysTrue} or {@link AlwaysFalse}.
+ *
+ * <p>For leaf predicates that only reference partition fields, the predicate 
is evaluated against
+ * the given partition row (with field indices remapped from table schema to 
partition schema). If
+ * the evaluation returns true, the predicate is replaced with AlwaysTrue; 
otherwise with
+ * AlwaysFalse.
+ *
+ * <p>For leaf predicates that reference any non-partition field, the 
predicate is kept as-is.
+ *
+ * <p>For compound predicates (AND/OR), children are recursively visited and 
the result is
+ * simplified via {@link PredicateBuilder#and} / {@link PredicateBuilder#or}.
+ */
+public class PartitionValuePredicateVisitor implements PredicateReplaceVisitor 
{
+
+    private final Set<String> partitionFields;
+
+    /** Mapping from table field index to partition field index. -1 if not a 
partition field. */
+    private final int[] tableToPartitionMapping;
+
+    private final InternalRow partitionRow;
+
+    public PartitionValuePredicateVisitor(
+            RowType tableType, RowType partitionType, InternalRow 
partitionRow) {
+        this.partitionRow = partitionRow;
+        this.partitionFields = new HashSet<>(partitionType.getFieldNames());
+
+        List<String> tableFieldNames = tableType.getFieldNames();
+        List<String> partitionFieldNames = partitionType.getFieldNames();
+
+        this.tableToPartitionMapping = new int[tableFieldNames.size()];
+        for (int i = 0; i < tableFieldNames.size(); i++) {
+            tableToPartitionMapping[i] = 
partitionFieldNames.indexOf(tableFieldNames.get(i));
+        }
+    }
+
+    @Override
+    public Optional<Predicate> visit(LeafPredicate predicate) {
+        Set<String> refFields = PredicateVisitor.collectFieldNames(predicate);
+        if (!partitionFields.containsAll(refFields)) {
+            return Optional.of(predicate);
+        }
+
+        // Remap field indices from table schema to partition schema
+        List<Object> remappedInputs = new ArrayList<>();
+        for (Object input : predicate.transform().inputs()) {
+            if (input instanceof FieldRef) {
+                FieldRef ref = (FieldRef) input;
+                int partIdx = tableToPartitionMapping[ref.index()];
+                remappedInputs.add(new FieldRef(partIdx, ref.name(), 
ref.type()));
+            } else {
+                remappedInputs.add(input);
+            }
+        }
+
+        // Evaluate the remapped predicate against the known partition row
+        LeafPredicate remapped = predicate.copyWithNewInputs(remappedInputs);
+        boolean result = remapped.test(partitionRow);
+        return Optional.of(result ? PredicateBuilder.alwaysTrue() : 
PredicateBuilder.alwaysFalse());
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
index aeaa5d5aee..b71941c0cc 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
@@ -37,6 +37,10 @@ public interface PredicateReplaceVisitor extends 
PredicateVisitor<Optional<Predi
                 return Optional.empty();
             }
         }
-        return Optional.of(new CompoundPredicate(predicate.function(), 
converted));
+        if (predicate.function() instanceof And) {
+            return Optional.of(PredicateBuilder.and(converted));
+        } else {
+            return Optional.of(PredicateBuilder.or(converted));
+        }
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
 b/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java
similarity index 51%
copy from 
paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
copy to paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java
index aeaa5d5aee..8a5de1e631 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/predicate/PredicateReplaceVisitor.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/TriFilter.java
@@ -16,27 +16,18 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.predicate;
+package org.apache.paimon.utils;
 
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Optional;
-
-/** A {@link PredicateVisitor} to replace {@link Predicate}. */
+/** Represents a filter (boolean-valued function) of three argument. */
 @FunctionalInterface
-public interface PredicateReplaceVisitor extends 
PredicateVisitor<Optional<Predicate>> {
+public interface TriFilter<T, U, R> {
+
+    TriFilter<?, ?, ?> ALWAYS_TRUE = (t, u, r) -> true;
+
+    boolean test(T t, U u, R r);
 
-    @Override
-    default Optional<Predicate> visit(CompoundPredicate predicate) {
-        List<Predicate> converted = new ArrayList<>();
-        for (Predicate child : predicate.children()) {
-            Optional<Predicate> optional = child.visit(this);
-            if (optional.isPresent()) {
-                converted.add(optional.get());
-            } else {
-                return Optional.empty();
-            }
-        }
-        return Optional.of(new CompoundPredicate(predicate.function(), 
converted));
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    static <T, U, R> TriFilter<T, U, R> alwaysTrue() {
+        return (TriFilter) ALWAYS_TRUE;
     }
 }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java
new file mode 100644
index 0000000000..7486d00372
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/predicate/PartitionValuePredicateVisitorTest.java
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.predicate;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Optional;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link PartitionValuePredicateVisitor}. */
+public class PartitionValuePredicateVisitorTest {
+
+    // Table schema: (pt INT, a INT, b INT), partition key: pt
+    private static final RowType TABLE_TYPE =
+            DataTypes.ROW(
+                    DataTypes.FIELD(0, "pt", DataTypes.INT()),
+                    DataTypes.FIELD(1, "a", DataTypes.INT()),
+                    DataTypes.FIELD(2, "b", DataTypes.INT()));
+
+    private static final RowType PARTITION_TYPE =
+            DataTypes.ROW(DataTypes.FIELD(0, "pt", DataTypes.INT()));
+
+    private static final PredicateBuilder BUILDER = new 
PredicateBuilder(TABLE_TYPE);
+
+    // ========================== Leaf: partition field 
==========================
+
+    @Test
+    public void testPartitionEqualMatch() {
+        // pt = 1, partition value is pt=1 => AlwaysTrue
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Optional<Predicate> result = BUILDER.equal(0, 1).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+    }
+
+    @Test
+    public void testPartitionEqualNoMatch() {
+        // pt = 2, partition value is pt=1 => AlwaysFalse
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Optional<Predicate> result = BUILDER.equal(0, 2).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysFalse(result.get());
+    }
+
+    @Test
+    public void testPartitionGreaterThanMatch() {
+        // pt > 0, partition value is pt=1 => AlwaysTrue
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Optional<Predicate> result = BUILDER.greaterThan(0, 0).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+    }
+
+    @Test
+    public void testPartitionGreaterThanNoMatch() {
+        // pt > 5, partition value is pt=1 => AlwaysFalse
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Optional<Predicate> result = BUILDER.greaterThan(0, 5).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysFalse(result.get());
+    }
+
+    @Test
+    public void testPartitionLessOrEqualMatch() {
+        // pt <= 1, partition value is pt=1 => AlwaysTrue
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Optional<Predicate> result = BUILDER.lessOrEqual(0, 1).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+    }
+
+    @Test
+    public void testPartitionBetweenMatch() {
+        // pt BETWEEN 0 AND 5, partition value is pt=3 => AlwaysTrue
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(3));
+
+        Optional<Predicate> result = BUILDER.between(0, 0, 5).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+    }
+
+    @Test
+    public void testPartitionBetweenNoMatch() {
+        // pt BETWEEN 5 AND 10, partition value is pt=3 => AlwaysFalse
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(3));
+
+        Optional<Predicate> result = BUILDER.between(0, 5, 10).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysFalse(result.get());
+    }
+
+    @Test
+    public void testPartitionIsNullNoMatch() {
+        // pt IS NULL, partition value is pt=1 => AlwaysFalse
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Optional<Predicate> result = BUILDER.isNull(0).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysFalse(result.get());
+    }
+
+    @Test
+    public void testPartitionIsNotNullMatch() {
+        // pt IS NOT NULL, partition value is pt=1 => AlwaysTrue
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Optional<Predicate> result = BUILDER.isNotNull(0).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+    }
+
+    // ========================== Leaf: non-partition field 
==========================
+
+    @Test
+    public void testNonPartitionFieldKeptAsIs() {
+        // a = 5 is not a partition predicate => kept unchanged
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Predicate original = BUILDER.equal(1, 5);
+        Optional<Predicate> result = original.visit(visitor);
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(original);
+    }
+
+    // ========================== Compound: AND ==========================
+
+    @Test
+    public void testAndPartitionMatchAndNonPartition() {
+        // (pt = 1 AND a = 5), partition value is pt=1
+        // => AlwaysTrue simplified away by PredicateBuilder.and(), leaving 
just a=5
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Predicate and = PredicateBuilder.and(BUILDER.equal(0, 1), 
BUILDER.equal(1, 5));
+        Optional<Predicate> result = and.visit(visitor);
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5));
+    }
+
+    @Test
+    public void testAndPartitionNoMatchAndNonPartition() {
+        // (pt = 2 AND a = 5), partition value is pt=1
+        // => AlwaysFalse short-circuits the entire AND
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Predicate and = PredicateBuilder.and(BUILDER.equal(0, 2), 
BUILDER.equal(1, 5));
+        Optional<Predicate> result = and.visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysFalse(result.get());
+    }
+
+    @Test
+    public void testAndAllPartitionMatch() {
+        // (pt = 1 AND pt > 0), partition value is pt=1
+        // => both AlwaysTrue, simplified to AlwaysTrue
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Predicate and = PredicateBuilder.and(BUILDER.equal(0, 1), 
BUILDER.greaterThan(0, 0));
+        Optional<Predicate> result = and.visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+    }
+
+    // ========================== Compound: OR ==========================
+
+    @Test
+    public void testOrPartitionPredicates() {
+        // (pt = 1 OR pt = 2), partition value is pt=1
+        // => AlwaysTrue short-circuits the entire OR
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1), 
BUILDER.equal(0, 2));
+        Optional<Predicate> result = or.visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+    }
+
+    @Test
+    public void testOrPartitionMatchAndNonPartition() {
+        // (pt = 1 OR a = 5), partition value is pt=1
+        // => AlwaysTrue short-circuits the entire OR
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1), 
BUILDER.equal(1, 5));
+        Optional<Predicate> result = or.visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+    }
+
+    @Test
+    public void testOrPartitionNoMatchAndNonPartition() {
+        // (pt = 2 OR a = 5), partition value is pt=1
+        // => AlwaysFalse filtered out, leaving just a=5
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Predicate or = PredicateBuilder.or(BUILDER.equal(0, 2), 
BUILDER.equal(1, 5));
+        Optional<Predicate> result = or.visit(visitor);
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5));
+    }
+
+    // ========================== Multiple partition keys 
==========================
+
+    @Test
+    public void testMultiplePartitionFieldsBothMatch() {
+        // Table: (pt1 INT, pt2 STRING, a INT), partition keys: (pt1, pt2)
+        RowType tableType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+                        DataTypes.FIELD(1, "pt2", DataTypes.STRING()),
+                        DataTypes.FIELD(2, "a", DataTypes.INT()));
+        RowType partitionType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+                        DataTypes.FIELD(1, "pt2", DataTypes.STRING()));
+        PredicateBuilder builder = new PredicateBuilder(tableType);
+
+        // pt1 = 1 AND pt2 = 'x', partition value is (1, 'x') => both match => 
AlwaysTrue
+        GenericRow partitionRow = GenericRow.of(1, 
BinaryString.fromString("x"));
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(tableType, partitionType, 
partitionRow);
+
+        Predicate and =
+                PredicateBuilder.and(
+                        builder.equal(0, 1), builder.equal(1, 
BinaryString.fromString("x")));
+        Optional<Predicate> result = and.visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+    }
+
+    @Test
+    public void testMultiplePartitionFieldsPartialMatch() {
+        // pt1 = 1 AND pt2 = 'y', partition value is (1, 'x') => pt1 matches, 
pt2 doesn't
+        RowType tableType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+                        DataTypes.FIELD(1, "pt2", DataTypes.STRING()),
+                        DataTypes.FIELD(2, "a", DataTypes.INT()));
+        RowType partitionType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "pt1", DataTypes.INT()),
+                        DataTypes.FIELD(1, "pt2", DataTypes.STRING()));
+        PredicateBuilder builder = new PredicateBuilder(tableType);
+
+        GenericRow partitionRow = GenericRow.of(1, 
BinaryString.fromString("x"));
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(tableType, partitionType, 
partitionRow);
+
+        Predicate and =
+                PredicateBuilder.and(
+                        builder.equal(0, 1), builder.equal(1, 
BinaryString.fromString("y")));
+        Optional<Predicate> result = and.visit(visitor);
+        assertThat(result).isPresent();
+        // pt2 doesn't match => AlwaysFalse short-circuits the entire AND
+        assertAlwaysFalse(result.get());
+    }
+
+    // ========================== Partition field not at index 0 
==========================
+
+    @Test
+    public void testPartitionFieldNotFirstInTable() {
+        // Table: (a INT, pt INT, b INT), partition key: pt (index 1 in table, 
index 0 in partition)
+        RowType tableType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "a", DataTypes.INT()),
+                        DataTypes.FIELD(1, "pt", DataTypes.INT()),
+                        DataTypes.FIELD(2, "b", DataTypes.INT()));
+        RowType partitionType = DataTypes.ROW(DataTypes.FIELD(0, "pt", 
DataTypes.INT()));
+        PredicateBuilder builder = new PredicateBuilder(tableType);
+
+        // pt = 3, partition value is pt=3
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(tableType, partitionType, 
GenericRow.of(3));
+
+        Optional<Predicate> result = builder.equal(1, 3).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysTrue(result.get());
+
+        // pt = 5, partition value is pt=3 => AlwaysFalse
+        result = builder.equal(1, 5).visit(visitor);
+        assertThat(result).isPresent();
+        assertAlwaysFalse(result.get());
+
+        // a = 10 => kept as-is (non-partition field)
+        Predicate original = builder.equal(0, 10);
+        result = original.visit(visitor);
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(original);
+    }
+
+    // ========================== Nested compound ==========================
+
+    @Test
+    public void testNestedAndOr() {
+        // ((pt = 1 OR pt = 2) AND a = 5), partition value is pt=1
+        // Inner OR: AlwaysTrue short-circuits to AlwaysTrue
+        // Outer AND: AlwaysTrue simplified away, leaving just a=5
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Predicate or = PredicateBuilder.or(BUILDER.equal(0, 1), 
BUILDER.equal(0, 2));
+        Predicate and = PredicateBuilder.and(or, BUILDER.equal(1, 5));
+        Optional<Predicate> result = and.visit(visitor);
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(BUILDER.equal(1, 5));
+    }
+
+    // ========================== Only non-partition predicates 
==========================
+
+    @Test
+    public void testAllNonPartitionPredicatesUnchanged() {
+        // (a = 5 AND b = 10), partition value is pt=1 => kept unchanged
+        PartitionValuePredicateVisitor visitor =
+                new PartitionValuePredicateVisitor(TABLE_TYPE, PARTITION_TYPE, 
GenericRow.of(1));
+
+        Predicate original = PredicateBuilder.and(BUILDER.equal(1, 5), 
BUILDER.equal(2, 10));
+        Optional<Predicate> result = original.visit(visitor);
+        assertThat(result).isPresent();
+        assertThat(result.get()).isEqualTo(original);
+    }
+
+    // ========================== Helpers ==========================
+
+    private static void assertAlwaysTrue(Predicate predicate) {
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+        assertThat(((LeafPredicate) 
predicate).function()).isEqualTo(AlwaysTrue.INSTANCE);
+    }
+
+    private static void assertAlwaysFalse(Predicate predicate) {
+        assertThat(predicate).isInstanceOf(LeafPredicate.class);
+        assertThat(((LeafPredicate) 
predicate).function()).isEqualTo(AlwaysFalse.INSTANCE);
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index ad71a1d595..f0f5e1c6ff 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -30,7 +30,6 @@ import 
org.apache.paimon.operation.BucketedAppendFileStoreWrite;
 import org.apache.paimon.operation.DataEvolutionFileStoreScan;
 import org.apache.paimon.operation.DataEvolutionSplitRead;
 import org.apache.paimon.operation.RawFileSplitRead;
-import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.table.BucketMode;
@@ -41,12 +40,6 @@ import org.apache.paimon.types.RowType;
 import javax.annotation.Nullable;
 
 import java.util.Comparator;
-import java.util.List;
-import java.util.Optional;
-
-import static org.apache.paimon.predicate.PredicateBuilder.and;
-import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
 
 /** {@link FileStore} for reading and writing {@link InternalRow}. */
 public class AppendOnlyFileStore extends AbstractFileStore<InternalRow> {
@@ -143,26 +136,12 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
     @Override
     public AppendOnlyFileStoreScan newScan() {
         BucketSelectConverter bucketSelectConverter =
-                predicate -> {
-                    if (bucketMode() != BucketMode.HASH_FIXED) {
-                        return Optional.empty();
-                    }
-
-                    if (bucketKeyType.getFieldCount() == 0) {
-                        return Optional.empty();
-                    }
-
-                    List<Predicate> bucketFilters =
-                            pickTransformFieldMapping(
-                                    splitAnd(predicate),
-                                    rowType.getFieldNames(),
-                                    bucketKeyType.getFieldNames());
-                    if (!bucketFilters.isEmpty()) {
-                        return BucketSelectConverter.create(
-                                and(bucketFilters), bucketKeyType, 
options.bucketFunctionType());
-                    }
-                    return Optional.empty();
-                };
+                new BucketSelectConverter(
+                        bucketMode(),
+                        options.bucketFunctionType(),
+                        rowType,
+                        partitionType,
+                        bucketKeyType);
 
         if (options().dataEvolutionEnabled()) {
             return new DataEvolutionFileStoreScan(
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 710e2dc3a5..a1ae650b43 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -33,7 +33,6 @@ import org.apache.paimon.operation.KeyValueFileStoreWrite;
 import org.apache.paimon.operation.MergeFileSplitRead;
 import org.apache.paimon.operation.RawFileSplitRead;
 import org.apache.paimon.postpone.PostponeBucketFileStoreWrite;
-import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.schema.KeyValueFieldsExtractor;
 import org.apache.paimon.schema.SchemaManager;
 import org.apache.paimon.schema.TableSchema;
@@ -48,18 +47,12 @@ import javax.annotation.Nullable;
 
 import java.util.Comparator;
 import java.util.List;
-import java.util.Optional;
 import java.util.function.Supplier;
 
-import static org.apache.paimon.predicate.PredicateBuilder.and;
-import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-
 /** {@link FileStore} for querying and updating {@link KeyValue}s. */
 public class KeyValueFileStore extends AbstractFileStore<KeyValue> {
 
     private final boolean crossPartitionUpdate;
-    private final RowType bucketKeyType;
     private final RowType keyType;
     private final RowType valueType;
     private final KeyValueFieldsExtractor keyValueFieldsExtractor;
@@ -74,7 +67,6 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             boolean crossPartitionUpdate,
             CoreOptions options,
             RowType partitionType,
-            RowType bucketKeyType,
             RowType keyType,
             RowType valueType,
             KeyValueFieldsExtractor keyValueFieldsExtractor,
@@ -83,7 +75,6 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             CatalogEnvironment catalogEnvironment) {
         super(fileIO, schemaManager, schema, tableName, options, 
partitionType, catalogEnvironment);
         this.crossPartitionUpdate = crossPartitionUpdate;
-        this.bucketKeyType = bucketKeyType;
         this.keyType = keyType;
         this.valueType = valueType;
         this.keyValueFieldsExtractor = keyValueFieldsExtractor;
@@ -203,25 +194,13 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public KeyValueFileStoreScan newScan() {
-        BucketMode bucketMode = bucketMode();
         BucketSelectConverter bucketSelectConverter =
-                keyFilter -> {
-                    if (bucketMode != BucketMode.HASH_FIXED
-                            && bucketMode != BucketMode.POSTPONE_MODE) {
-                        return Optional.empty();
-                    }
-
-                    List<Predicate> bucketFilters =
-                            pickTransformFieldMapping(
-                                    splitAnd(keyFilter),
-                                    keyType.getFieldNames(),
-                                    bucketKeyType.getFieldNames());
-                    if (!bucketFilters.isEmpty()) {
-                        return BucketSelectConverter.create(
-                                and(bucketFilters), bucketKeyType, 
options.bucketFunctionType());
-                    }
-                    return Optional.empty();
-                };
+                new BucketSelectConverter(
+                        bucketMode(),
+                        options.bucketFunctionType(),
+                        schema.logicalRowType(),
+                        partitionType,
+                        schema.logicalBucketKeyType());
 
         return new KeyValueFileStoreScan(
                 newManifestsReader(),
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
index 45cd074a5a..4662d7dab5 100644
--- a/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/manifest/BucketFilter.java
@@ -18,8 +18,9 @@
 
 package org.apache.paimon.manifest;
 
-import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.utils.Filter;
+import org.apache.paimon.utils.TriFilter;
 
 import javax.annotation.Nullable;
 
@@ -29,13 +30,13 @@ public class BucketFilter {
     private final boolean onlyReadRealBuckets;
     private final @Nullable Integer specifiedBucket;
     private final @Nullable Filter<Integer> bucketFilter;
-    private final @Nullable BiFilter<Integer, Integer> totalAwareBucketFilter;
+    private final @Nullable TriFilter<BinaryRow, Integer, Integer> 
totalAwareBucketFilter;
 
     public BucketFilter(
             boolean onlyReadRealBuckets,
             @Nullable Integer specifiedBucket,
             @Nullable Filter<Integer> bucketFilter,
-            @Nullable BiFilter<Integer, Integer> totalAwareBucketFilter) {
+            @Nullable TriFilter<BinaryRow, Integer, Integer> 
totalAwareBucketFilter) {
         this.onlyReadRealBuckets = onlyReadRealBuckets;
         this.specifiedBucket = specifiedBucket;
         this.bucketFilter = bucketFilter;
@@ -46,7 +47,7 @@ public class BucketFilter {
             boolean onlyReadRealBuckets,
             @Nullable Integer specifiedBucket,
             @Nullable Filter<Integer> bucketFilter,
-            @Nullable BiFilter<Integer, Integer> totalAwareBucketFilter) {
+            @Nullable TriFilter<BinaryRow, Integer, Integer> 
totalAwareBucketFilter) {
         if (!onlyReadRealBuckets
                 && specifiedBucket == null
                 && bucketFilter == null
@@ -63,7 +64,7 @@ public class BucketFilter {
         return specifiedBucket;
     }
 
-    public boolean test(int bucket, int totalBucket) {
+    public boolean test(BinaryRow partition, int bucket, int totalBucket) {
         if (onlyReadRealBuckets && bucket < 0) {
             return false;
         }
@@ -73,6 +74,7 @@ public class BucketFilter {
         if (bucketFilter != null && !bucketFilter.test(bucket)) {
             return false;
         }
-        return totalAwareBucketFilter == null || 
totalAwareBucketFilter.test(bucket, totalBucket);
+        return totalAwareBucketFilter == null
+                || totalAwareBucketFilter.test(partition, bucket, totalBucket);
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
index 59b2a34650..09afdcc3ac 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntryCache.java
@@ -125,10 +125,10 @@ public class ManifestEntryCache extends 
ObjectsCache<Path, ManifestEntry, Manife
         List<RichSegments> segments = manifestSegments.segments();
 
         // try to do fast filter first
-        Optional<BinaryRow> partition = 
extractSinglePartition(partitionFilter);
-        if (partition.isPresent()) {
+        Optional<BinaryRow> singlePartition = 
extractSinglePartition(partitionFilter);
+        if (singlePartition.isPresent()) {
             Map<Integer, List<RichSegments>> segMap =
-                    manifestSegments.indexedSegments().get(partition.get());
+                    
manifestSegments.indexedSegments().get(singlePartition.get());
             if (segMap == null) {
                 return Collections.emptyList();
             }
@@ -147,11 +147,13 @@ public class ManifestEntryCache extends 
ObjectsCache<Path, ManifestEntry, Manife
         // do force loop filter
         List<Segments> segmentsList = new ArrayList<>();
         for (RichSegments richSegments : segments) {
-            if (partitionFilter != null && 
!partitionFilter.test(richSegments.partition())) {
+            BinaryRow partition = richSegments.partition();
+            if (partitionFilter != null && !partitionFilter.test(partition)) {
                 continue;
             }
             if (bucketFilter != null
-                    && !bucketFilter.test(richSegments.bucket(), 
richSegments.totalBucket())) {
+                    && !bucketFilter.test(
+                            partition, richSegments.bucket(), 
richSegments.totalBucket())) {
                 continue;
             }
             segmentsList.add(richSegments.segments());
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 68ebacaa80..30908cce3e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -46,6 +46,7 @@ import org.apache.paimon.utils.Pair;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RowRangeIndex;
 import org.apache.paimon.utils.SnapshotManager;
+import org.apache.paimon.utils.TriFilter;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -87,7 +88,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private boolean onlyReadRealBuckets = false;
     private Integer specifiedBucket = null;
     private Filter<Integer> bucketFilter = null;
-    private BiFilter<Integer, Integer> totalAwareBucketFilter = null;
+    private TriFilter<BinaryRow, Integer, Integer> totalAwareBucketFilter = 
null;
     protected ScanMode scanMode = ScanMode.ALL;
     private Integer specifiedLevel = null;
     private Filter<Integer> levelFilter = null;
@@ -162,7 +163,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     @Override
     public FileStoreScan withTotalAwareBucketFilter(
-            BiFilter<Integer, Integer> totalAwareBucketFilter) {
+            TriFilter<BinaryRow, Integer, Integer> totalAwareBucketFilter) {
         this.totalAwareBucketFilter = totalAwareBucketFilter;
         return this;
     }
@@ -533,14 +534,21 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         Function<InternalRow, Integer> levelGetter = 
ManifestEntrySerializer.levelGetter();
         BucketFilter bucketFilter = createBucketFilter();
         return row -> {
-            if ((partitionFilter != null && 
!partitionFilter.test(partitionGetter.apply(row)))) {
-                return false;
+            BinaryRow partition = null;
+            if (partitionFilter != null) {
+                partition = partitionGetter.apply(row);
+                if (!partitionFilter.test(partition)) {
+                    return false;
+                }
             }
 
             if (bucketFilter != null) {
                 int bucket = bucketGetter.apply(row);
                 int totalBucket = totalBucketGetter.apply(row);
-                if (!bucketFilter.test(bucket, totalBucket)) {
+                if (partition == null) {
+                    partition = partitionGetter.apply(row);
+                }
+                if (!bucketFilter.test(partition, bucket, totalBucket)) {
                     return false;
                 }
             }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index 3546a1fcae..2714e773a2 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -86,6 +86,11 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
 
     public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
         this.inputFilter = predicate;
+        return this;
+    }
+
+    @Override
+    public FileStoreScan withCompleteFilter(Predicate predicate) {
         
this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter);
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
index 6577099aa8..e441641f7e 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
@@ -19,160 +19,55 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions.BucketFunctionType;
-import org.apache.paimon.annotation.VisibleForTesting;
-import org.apache.paimon.bucket.BucketFunction;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.GenericRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.predicate.Equal;
-import org.apache.paimon.predicate.FieldRef;
-import org.apache.paimon.predicate.In;
-import org.apache.paimon.predicate.LeafPredicate;
 import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.table.BucketMode;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.utils.TriFilter;
 
-import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
-
-import javax.annotation.concurrent.ThreadSafe;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Map;
-import java.util.Objects;
 import java.util.Optional;
 import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.function.Consumer;
-import java.util.stream.Collectors;
 
-import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
-import static org.apache.paimon.predicate.PredicateBuilder.splitOr;
+import static org.apache.paimon.predicate.PredicateVisitor.collectFieldNames;
 
 /** Bucket filter push down in scan to skip files. */
-public interface BucketSelectConverter {
-
-    int MAX_VALUES = 1000;
-
-    Optional<BiFilter<Integer, Integer>> convert(Predicate predicate);
-
-    static Optional<BiFilter<Integer, Integer>> create(
-            Predicate bucketPredicate,
-            RowType bucketKeyType,
-            BucketFunctionType bucketFunctionType) {
-        @SuppressWarnings("unchecked")
-        List<Object>[] bucketValues = new List[bucketKeyType.getFieldCount()];
-
-        BucketFunction bucketFunction = 
BucketFunction.create(bucketFunctionType, bucketKeyType);
-
-        nextAnd:
-        for (Predicate andPredicate : splitAnd(bucketPredicate)) {
-            Integer reference = null;
-            List<Object> values = new ArrayList<>();
-            for (Predicate orPredicate : splitOr(andPredicate)) {
-                if (orPredicate instanceof LeafPredicate) {
-                    LeafPredicate leaf = (LeafPredicate) orPredicate;
-                    Optional<FieldRef> fieldRefOptional = 
leaf.fieldRefOptional();
-                    if (fieldRefOptional.isPresent()) {
-                        FieldRef fieldRef = fieldRefOptional.get();
-                        if (reference == null || reference == 
fieldRef.index()) {
-                            reference = fieldRef.index();
-                            if (leaf.function().equals(Equal.INSTANCE)
-                                    || leaf.function().equals(In.INSTANCE)) {
-                                values.addAll(
-                                        leaf.literals().stream()
-                                                .filter(Objects::nonNull)
-                                                .collect(Collectors.toList()));
-                                continue;
-                            }
-                        }
-                    }
-                }
-
-                // failed, go to next predicate
-                continue nextAnd;
-            }
-            if (reference != null) {
-                if (bucketValues[reference] != null) {
-                    // Repeated equals in And?
-                    return Optional.empty();
-                }
-
-                bucketValues[reference] = values;
-            }
-        }
-
-        int rowCount = 1;
-        for (List<Object> values : bucketValues) {
-            if (values == null) {
-                return Optional.empty();
-            }
-
-            rowCount *= values.size();
-            if (rowCount > MAX_VALUES) {
-                return Optional.empty();
-            }
-        }
-
-        InternalRowSerializer serializer = new 
InternalRowSerializer(bucketKeyType);
-        List<BinaryRow> bucketKeys = new ArrayList<>();
-        assembleRows(
-                bucketValues,
-                columns ->
-                        bucketKeys.add(
-                                
serializer.toBinaryRow(GenericRow.of(columns.toArray())).copy()),
-                new ArrayList<>(),
-                0);
-
-        return Optional.of(new Selector(bucketKeys, bucketFunction));
+public class BucketSelectConverter {
+
+    private final BucketMode bucketMode;
+    private final BucketFunctionType bucketFunctionType;
+    private final RowType rowType;
+    private final RowType partitionType;
+    private final RowType bucketKeyType;
+
+    public BucketSelectConverter(
+            BucketMode bucketMode,
+            BucketFunctionType bucketFunctionType,
+            RowType rowType,
+            RowType partitionType,
+            RowType bucketKeyType) {
+        this.bucketMode = bucketMode;
+        this.bucketFunctionType = bucketFunctionType;
+        this.rowType = rowType;
+        this.partitionType = partitionType;
+        this.bucketKeyType = bucketKeyType;
     }
 
-    static void assembleRows(
-            List<Object>[] rowValues,
-            Consumer<List<Object>> consumer,
-            List<Object> stack,
-            int columnIndex) {
-        List<Object> columnValues = rowValues[columnIndex];
-        for (Object value : columnValues) {
-            stack.add(value);
-            if (columnIndex == rowValues.length - 1) {
-                // last column, consume row
-                consumer.accept(stack);
-            } else {
-                assembleRows(rowValues, consumer, stack, columnIndex + 1);
-            }
-            stack.remove(stack.size() - 1);
+    public Optional<TriFilter<BinaryRow, Integer, Integer>> convert(Predicate 
predicate) {
+        if (bucketMode != BucketMode.HASH_FIXED && bucketMode != 
BucketMode.POSTPONE_MODE) {
+            return Optional.empty();
         }
-    }
 
-    /** Selector to select bucket from {@link Predicate}. */
-    @ThreadSafe
-    class Selector implements BiFilter<Integer, Integer> {
-
-        private final List<BinaryRow> bucketKeys;
-
-        private final BucketFunction bucketFunction;
-
-        private final Map<Integer, Set<Integer>> buckets = new 
ConcurrentHashMap<>();
-
-        public Selector(List<BinaryRow> bucketKeys, BucketFunction 
bucketFunction) {
-            this.bucketKeys = bucketKeys;
-            this.bucketFunction = bucketFunction;
+        if (bucketKeyType.getFieldCount() == 0) {
+            return Optional.empty();
         }
 
-        @Override
-        public boolean test(Integer bucket, Integer numBucket) {
-            return buckets.computeIfAbsent(numBucket, k -> 
createBucketSet(numBucket))
-                    .contains(bucket);
+        Set<String> predicateFields = collectFieldNames(predicate);
+        if (!predicateFields.containsAll(bucketKeyType.getFieldNames())) {
+            return Optional.empty();
         }
 
-        @VisibleForTesting
-        Set<Integer> createBucketSet(int numBucket) {
-            ImmutableSet.Builder<Integer> builder = new 
ImmutableSet.Builder<>();
-            for (BinaryRow key : bucketKeys) {
-                builder.add(bucketFunction.bucket(key, numBucket));
-            }
-            return builder.build();
-        }
+        return Optional.of(
+                new BucketSelector(
+                        predicate, bucketFunctionType, rowType, partitionType, 
bucketKeyType));
     }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
 b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java
similarity index 63%
copy from 
paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
copy to 
paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java
index 6577099aa8..8c28c3c0c6 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelectConverter.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/BucketSelector.java
@@ -19,7 +19,6 @@
 package org.apache.paimon.operation;
 
 import org.apache.paimon.CoreOptions.BucketFunctionType;
-import org.apache.paimon.annotation.VisibleForTesting;
 import org.apache.paimon.bucket.BucketFunction;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
@@ -28,15 +27,18 @@ import org.apache.paimon.predicate.Equal;
 import org.apache.paimon.predicate.FieldRef;
 import org.apache.paimon.predicate.In;
 import org.apache.paimon.predicate.LeafPredicate;
+import org.apache.paimon.predicate.PartitionValuePredicateVisitor;
 import org.apache.paimon.predicate.Predicate;
 import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.BiFilter;
+import org.apache.paimon.utils.TriFilter;
 
 import org.apache.paimon.shade.guava30.com.google.common.collect.ImmutableSet;
 
 import javax.annotation.concurrent.ThreadSafe;
 
 import java.util.ArrayList;
+import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Objects;
@@ -46,20 +48,67 @@ import java.util.concurrent.ConcurrentHashMap;
 import java.util.function.Consumer;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.predicate.PredicateBuilder.and;
+import static 
org.apache.paimon.predicate.PredicateBuilder.pickTransformFieldMapping;
 import static org.apache.paimon.predicate.PredicateBuilder.splitAnd;
 import static org.apache.paimon.predicate.PredicateBuilder.splitOr;
 
-/** Bucket filter push down in scan to skip files. */
-public interface BucketSelectConverter {
+/** Selector to select bucket from {@link Predicate}. */
+@ThreadSafe
+public class BucketSelector implements TriFilter<BinaryRow, Integer, Integer> {
+
+    public static final int MAX_VALUES = 1000;
+
+    private final BucketFunctionType bucketFunctionType;
+    private final RowType rowType;
+    private final RowType partitionType;
+    private final RowType bucketKeyType;
+    private final Predicate predicate;
+    private final Map<BinaryRow, Optional<PartitionSelector>> 
partitionSelectors;
+
+    public BucketSelector(
+            Predicate predicate,
+            BucketFunctionType bucketFunctionType,
+            RowType rowType,
+            RowType partitionType,
+            RowType bucketKeyType) {
+        this.predicate = predicate;
+        this.bucketFunctionType = bucketFunctionType;
+        this.rowType = rowType;
+        this.partitionType = partitionType;
+        this.bucketKeyType = bucketKeyType;
+        this.partitionSelectors = new ConcurrentHashMap<>();
+    }
 
-    int MAX_VALUES = 1000;
+    @Override
+    public boolean test(BinaryRow partition, Integer bucket, Integer 
numBucket) {
+        return partitionSelectors
+                .computeIfAbsent(partition, this::createPartitionSelector)
+                .map(selector -> selector.test(bucket, numBucket))
+                .orElse(true);
+    }
 
-    Optional<BiFilter<Integer, Integer>> convert(Predicate predicate);
+    private Optional<PartitionSelector> createPartitionSelector(BinaryRow 
partition) {
+        Optional<Predicate> partRemoved =
+                predicate.visit(
+                        new PartitionValuePredicateVisitor(rowType, 
partitionType, partition));
+        if (!partRemoved.isPresent()) {
+            return Optional.empty();
+        }
+
+        List<Predicate> bucketFilters =
+                pickTransformFieldMapping(
+                        splitAnd(partRemoved.get()),
+                        rowType.getFieldNames(),
+                        bucketKeyType.getFieldNames());
+        if (bucketFilters.isEmpty()) {
+            return Optional.empty();
+        }
 
-    static Optional<BiFilter<Integer, Integer>> create(
-            Predicate bucketPredicate,
-            RowType bucketKeyType,
-            BucketFunctionType bucketFunctionType) {
+        return createPartitionSelector(and(bucketFilters));
+    }
+
+    private Optional<PartitionSelector> createPartitionSelector(Predicate 
bucketPredicate) {
         @SuppressWarnings("unchecked")
         List<Object>[] bucketValues = new List[bucketKeyType.getFieldCount()];
 
@@ -94,11 +143,16 @@ public interface BucketSelectConverter {
             }
             if (reference != null) {
                 if (bucketValues[reference] != null) {
-                    // Repeated equals in And?
-                    return Optional.empty();
+                    // Same field appears in multiple AND branches,
+                    // compute intersection to narrow down possible values
+                    bucketValues[reference].retainAll(new HashSet<>(values));
+                    if (bucketValues[reference].isEmpty()) {
+                        // Empty intersection: contradictory conditions, no 
match
+                        return Optional.empty();
+                    }
+                } else {
+                    bucketValues[reference] = values;
                 }
-
-                bucketValues[reference] = values;
             }
         }
 
@@ -124,10 +178,10 @@ public interface BucketSelectConverter {
                 new ArrayList<>(),
                 0);
 
-        return Optional.of(new Selector(bucketKeys, bucketFunction));
+        return Optional.of(new PartitionSelector(bucketKeys, bucketFunction));
     }
 
-    static void assembleRows(
+    private static void assembleRows(
             List<Object>[] rowValues,
             Consumer<List<Object>> consumer,
             List<Object> stack,
@@ -145,9 +199,8 @@ public interface BucketSelectConverter {
         }
     }
 
-    /** Selector to select bucket from {@link Predicate}. */
     @ThreadSafe
-    class Selector implements BiFilter<Integer, Integer> {
+    private static class PartitionSelector implements BiFilter<Integer, 
Integer> {
 
         private final List<BinaryRow> bucketKeys;
 
@@ -155,7 +208,7 @@ public interface BucketSelectConverter {
 
         private final Map<Integer, Set<Integer>> buckets = new 
ConcurrentHashMap<>();
 
-        public Selector(List<BinaryRow> bucketKeys, BucketFunction 
bucketFunction) {
+        public PartitionSelector(List<BinaryRow> bucketKeys, BucketFunction 
bucketFunction) {
             this.bucketKeys = bucketKeys;
             this.bucketFunction = bucketFunction;
         }
@@ -166,8 +219,7 @@ public interface BucketSelectConverter {
                     .contains(bucket);
         }
 
-        @VisibleForTesting
-        Set<Integer> createBucketSet(int numBucket) {
+        private Set<Integer> createBucketSet(int numBucket) {
             ImmutableSet.Builder<Integer> builder = new 
ImmutableSet.Builder<>();
             for (BinaryRow key : bucketKeys) {
                 builder.add(bucketFunction.bucket(key, numBucket));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 1e044f810f..8f543ca7d3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -35,6 +35,7 @@ import org.apache.paimon.utils.BiFilter;
 import org.apache.paimon.utils.Filter;
 import org.apache.paimon.utils.Range;
 import org.apache.paimon.utils.RowRangeIndex;
+import org.apache.paimon.utils.TriFilter;
 
 import javax.annotation.Nullable;
 
@@ -56,13 +57,15 @@ public interface FileStoreScan {
 
     FileStoreScan withPartitionFilter(PartitionPredicate predicate);
 
+    FileStoreScan withCompleteFilter(Predicate predicate);
+
     FileStoreScan withBucket(int bucket);
 
     FileStoreScan onlyReadRealBuckets();
 
     FileStoreScan withBucketFilter(Filter<Integer> bucketFilter);
 
-    FileStoreScan withTotalAwareBucketFilter(BiFilter<Integer, Integer> 
bucketFilter);
+    FileStoreScan withTotalAwareBucketFilter(TriFilter<BinaryRow, Integer, 
Integer> bucketFilter);
 
     FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index a09fb25ee6..8595753cf0 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -116,6 +116,11 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
 
     public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
         this.keyFilter = predicate;
+        return this;
+    }
+
+    @Override
+    public FileStoreScan withCompleteFilter(Predicate predicate) {
         
this.bucketSelectConverter.convert(predicate).ifPresent(this::withTotalAwareBucketFilter);
         return this;
     }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
index 5e4cffcbf6..a2fee49bfb 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/PrimaryKeyFileStoreTable.java
@@ -93,8 +93,6 @@ public class PrimaryKeyFileStoreTable extends 
AbstractFileStoreTable {
                             tableSchema.crossPartitionUpdate(),
                             options,
                             tableSchema.logicalPartitionType(),
-                            PrimaryKeyTableUtils.addKeyNamePrefix(
-                                    tableSchema.logicalBucketKeyType()),
                             keyType,
                             rowType,
                             extractor,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
index a6710ff008..67d0b9a566 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/source/snapshot/SnapshotReaderImpl.java
@@ -241,6 +241,7 @@ public class SnapshotReaderImpl implements SnapshotReader {
         if (!pair.getRight().isEmpty()) {
             nonPartitionFilterConsumer.accept(scan, 
PredicateBuilder.and(pair.getRight()));
         }
+        scan.withCompleteFilter(predicate);
         return this;
     }
 
diff --git a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java 
b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
index 8142d44acf..09595d9188 100644
--- a/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
+++ b/paimon-core/src/test/java/org/apache/paimon/TestFileStore.java
@@ -134,7 +134,6 @@ public class TestFileStore extends KeyValueFileStore {
                 options,
                 partitionType,
                 keyType,
-                keyType,
                 valueType,
                 keyValueFieldsExtractor,
                 mfFactory,
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java 
b/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java
deleted file mode 100644
index eeec69d202..0000000000
--- a/paimon-core/src/test/java/org/apache/paimon/manifest/BucketFilterTest.java
+++ /dev/null
@@ -1,163 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.manifest;
-
-import org.apache.paimon.utils.BiFilter;
-import org.apache.paimon.utils.Filter;
-
-import org.junit.jupiter.api.Test;
-
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link BucketFilter}. */
-public class BucketFilterTest {
-
-    @Test
-    public void testCreateWithAllNullParameters() {
-        // Test that create method returns null when all parameters are 
null/false
-        assertThat(BucketFilter.create(false, null, null, null)).isNull();
-    }
-
-    @Test
-    public void testCreateWithOnlyReadRealBuckets() {
-        // Test that create method returns a BucketFilter when 
onlyReadRealBuckets is true
-        BucketFilter filter = BucketFilter.create(true, null, null, null);
-        assertThat(filter).isNotNull();
-        assertThat(filter.specifiedBucket()).isNull();
-    }
-
-    @Test
-    public void testCreateWithSpecifiedBucket() {
-        // Test that create method returns a BucketFilter when specifiedBucket 
is not null
-        BucketFilter filter = BucketFilter.create(false, 1, null, null);
-        assertThat(filter).isNotNull();
-        assertThat(filter.specifiedBucket()).isEqualTo(1);
-    }
-
-    @Test
-    public void testCreateWithBucketFilter() {
-        // Test that create method returns a BucketFilter when bucketFilter is 
not null
-        Filter<Integer> bucketFilter = value -> value > 0;
-        BucketFilter filter = BucketFilter.create(false, null, bucketFilter, 
null);
-        assertThat(filter).isNotNull();
-        assertThat(filter.specifiedBucket()).isNull();
-    }
-
-    @Test
-    public void testCreateWithTotalAwareBucketFilter() {
-        // Test that create method returns a BucketFilter when 
totalAwareBucketFilter is not null
-        BiFilter<Integer, Integer> totalAwareBucketFilter =
-                (bucket, totalBucket) -> bucket < totalBucket;
-        BucketFilter filter = BucketFilter.create(false, null, null, 
totalAwareBucketFilter);
-        assertThat(filter).isNotNull();
-        assertThat(filter.specifiedBucket()).isNull();
-    }
-
-    @Test
-    public void testTestWithOnlyReadRealBuckets() {
-        // Test the test method with onlyReadRealBuckets parameter
-        BucketFilter filter = BucketFilter.create(true, null, null, null);
-
-        // Real buckets (non-negative) should pass
-        assertThat(filter.test(0, 1)).isTrue();
-        assertThat(filter.test(1, 2)).isTrue();
-
-        // Virtual buckets (negative) should not pass
-        assertThat(filter.test(-1, 1)).isFalse();
-        assertThat(filter.test(-2, 2)).isFalse();
-    }
-
-    @Test
-    public void testTestWithSpecifiedBucket() {
-        // Test the test method with specifiedBucket parameter
-        BucketFilter filter = BucketFilter.create(false, 1, null, null);
-
-        // Only the specified bucket should pass
-        assertThat(filter.test(1, 2)).isTrue();
-
-        // Other buckets should not pass
-        assertThat(filter.test(0, 2)).isFalse();
-        assertThat(filter.test(2, 3)).isFalse();
-    }
-
-    @Test
-    public void testTestWithBucketFilter() {
-        // Test the test method with bucketFilter parameter
-        Filter<Integer> bucketFilter = value -> value % 2 == 0; // Even 
buckets only
-        BucketFilter filter = BucketFilter.create(false, null, bucketFilter, 
null);
-
-        // Even buckets should pass
-        assertThat(filter.test(0, 1)).isTrue();
-        assertThat(filter.test(2, 3)).isTrue();
-        assertThat(filter.test(4, 5)).isTrue();
-
-        // Odd buckets should not pass
-        assertThat(filter.test(1, 2)).isFalse();
-        assertThat(filter.test(3, 4)).isFalse();
-        assertThat(filter.test(5, 6)).isFalse();
-    }
-
-    @Test
-    public void testTestWithTotalAwareBucketFilter() {
-        // Test the test method with totalAwareBucketFilter parameter
-        BiFilter<Integer, Integer> totalAwareBucketFilter =
-                (bucket, totalBucket) -> bucket < totalBucket / 2;
-        BucketFilter filter = BucketFilter.create(false, null, null, 
totalAwareBucketFilter);
-
-        // Buckets less than half of totalBucket should pass
-        assertThat(filter.test(0, 4)).isTrue();
-        assertThat(filter.test(1, 4)).isTrue();
-
-        // Buckets greater than or equal to half of totalBucket should not pass
-        assertThat(filter.test(2, 4)).isFalse();
-        assertThat(filter.test(3, 4)).isFalse();
-    }
-
-    @Test
-    public void testTestWithMultipleFilters() {
-        // Test the test method with multiple filters combined
-        Filter<Integer> bucketFilter = value -> value > 0; // Positive buckets 
only
-        BiFilter<Integer, Integer> totalAwareBucketFilter =
-                (bucket, totalBucket) -> bucket < totalBucket - 1;
-        BucketFilter filter = BucketFilter.create(true, 1, bucketFilter, 
totalAwareBucketFilter);
-
-        // Bucket 1 is positive, is the specified bucket, and is less than 
totalBucket-1 for
-        // totalBucket=3
-        assertThat(filter.test(1, 3)).isTrue();
-
-        // Bucket 0 is not positive, so it should not pass
-        assertThat(filter.test(0, 3)).isFalse();
-
-        // Bucket 2 is not the specified bucket, so it should not pass
-        assertThat(filter.test(2, 3)).isFalse();
-
-        // Bucket 1 with totalBucket=2 should not pass because 1 >= 2-1
-        assertThat(filter.test(1, 2)).isFalse();
-
-        // Negative bucket should not pass because onlyReadRealBuckets is true
-        assertThat(filter.test(-1, 3)).isFalse();
-    }
-
-    @Test
-    public void testSpecifiedBucket() {
-        // Test the specifiedBucket method
-        BucketFilter filterWithSpecifiedBucket = BucketFilter.create(false, 2, 
null, null);
-        assertThat(filterWithSpecifiedBucket.specifiedBucket()).isEqualTo(2);
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java
deleted file mode 100644
index dc64bdb596..0000000000
--- 
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectConverterTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.operation;
-
-import org.apache.paimon.CoreOptions.BucketFunctionType;
-import org.apache.paimon.operation.BucketSelectConverter.Selector;
-import org.apache.paimon.predicate.Predicate;
-import org.apache.paimon.predicate.PredicateBuilder;
-import org.apache.paimon.types.IntType;
-import org.apache.paimon.types.RowType;
-
-import org.junit.jupiter.api.Test;
-
-import java.util.Arrays;
-import java.util.Optional;
-
-import static org.apache.paimon.predicate.PredicateBuilder.and;
-import static org.apache.paimon.predicate.PredicateBuilder.or;
-import static org.assertj.core.api.Assertions.assertThat;
-
-/** Test for {@link BucketSelectConverter}. */
-public class BucketSelectConverterTest {
-
-    private final RowType rowType = RowType.of(new IntType(), new IntType(), 
new IntType());
-
-    private final PredicateBuilder builder = new PredicateBuilder(rowType);
-
-    @Test
-    public void testRepeatEqual() {
-        assertThat(newSelector(and(builder.equal(0, 0), builder.equal(0, 
1)))).isEmpty();
-    }
-
-    @Test
-    public void testNotFull() {
-        assertThat(newSelector(and(builder.equal(0, 0)))).isEmpty();
-    }
-
-    @Test
-    public void testOtherPredicate() {
-        assertThat(newSelector(and(builder.notEqual(0, 0)))).isEmpty();
-    }
-
-    @Test
-    public void testOrIllegal() {
-        assertThat(
-                        newSelector(
-                                and(
-                                        or(builder.equal(0, 5), 
builder.equal(1, 6)),
-                                        builder.equal(1, 1),
-                                        builder.equal(2, 2))))
-                .isEmpty();
-    }
-
-    @Test
-    public void testNormal() {
-        Selector selector =
-                newSelector(and(builder.equal(0, 0), builder.equal(1, 1), 
builder.equal(2, 2)))
-                        .get();
-        assertThat(selector.createBucketSet(20)).containsExactly(11);
-    }
-
-    @Test
-    public void testIn() {
-        Selector selector =
-                newSelector(
-                                and(
-                                        builder.in(0, Arrays.asList(5, 6, 7)),
-                                        builder.equal(1, 1),
-                                        builder.equal(2, 2)))
-                        .get();
-        assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
-    }
-
-    @Test
-    public void testOr() {
-        Selector selector =
-                newSelector(
-                                and(
-                                        or(
-                                                builder.equal(0, 5),
-                                                builder.equal(0, 6),
-                                                builder.equal(0, 7)),
-                                        builder.equal(1, 1),
-                                        builder.equal(2, 2)))
-                        .get();
-        assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
-    }
-
-    @Test
-    public void testInNull() {
-        Selector selector =
-                newSelector(
-                                and(
-                                        builder.in(0, Arrays.asList(5, 6, 7, 
null)),
-                                        builder.equal(1, 1),
-                                        builder.equal(2, 2)))
-                        .get();
-        assertThat(selector.createBucketSet(20)).containsExactly(7, 14, 10);
-    }
-
-    @Test
-    public void testMultipleIn() {
-        Selector selector =
-                newSelector(
-                                and(
-                                        builder.in(0, Arrays.asList(5, 6, 7)),
-                                        builder.in(1, Arrays.asList(1, 8)),
-                                        builder.equal(2, 2)))
-                        .get();
-        assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 
10, 19);
-    }
-
-    @Test
-    public void testMultipleOr() {
-        Selector selector =
-                newSelector(
-                                and(
-                                        or(
-                                                builder.equal(0, 5),
-                                                builder.equal(0, 6),
-                                                builder.equal(0, 7)),
-                                        or(builder.equal(1, 1), 
builder.equal(1, 8)),
-                                        builder.equal(2, 2)))
-                        .get();
-        assertThat(selector.createBucketSet(20)).containsExactly(7, 17, 14, 9, 
10, 19);
-    }
-
-    private Optional<Selector> newSelector(Predicate predicate) {
-        return (Optional)
-                BucketSelectConverter.create(predicate, rowType, 
BucketFunctionType.DEFAULT);
-    }
-}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java 
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java
new file mode 100644
index 0000000000..3128ff20db
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/operation/BucketSelectorTest.java
@@ -0,0 +1,258 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.operation;
+
+import org.apache.paimon.CoreOptions.BucketFunctionType;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Set;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Tests for {@link BucketSelector}. */
+public class BucketSelectorTest {
+
+    private static final int NUM_BUCKETS = 10;
+
+    // ========================== Single bucket key, non-partitioned 
==========================
+
+    @Test
+    public void testEqualPredicate() {
+        // k = 5 => should select exactly one bucket
+        RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        RowType partType = RowType.of();
+        RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate = pb.equal(0, 5);
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, bucketKeyType);
+
+        Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, 
NUM_BUCKETS);
+        assertThat(selected).hasSize(1);
+    }
+
+    @Test
+    public void testEqualAndRangePredicate() {
+        // k = 5 AND k < 100 => should still select bucket for k=5
+        RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        RowType partType = RowType.of();
+        RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), 
pb.lessThan(0, 100));
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, bucketKeyType);
+
+        Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, 
NUM_BUCKETS);
+        assertThat(selected).hasSize(1);
+    }
+
+    @Test
+    public void testEqualAndInWithOverlap() {
+        // k = 5 AND k IN (5, 10) => intersection is {5}, should select one 
bucket
+        RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        RowType partType = RowType.of();
+        RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.in(0, 
Arrays.asList(5, 10)));
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, bucketKeyType);
+
+        Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, 
NUM_BUCKETS);
+        assertThat(selected).hasSize(1);
+    }
+
+    @Test
+    public void testInAndInWithOverlap() {
+        // k IN (1, 5) AND k IN (5, 10) => intersection is {5}, should select 
one bucket
+        RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        RowType partType = RowType.of();
+        RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate =
+                PredicateBuilder.and(pb.in(0, Arrays.asList(1, 5)), pb.in(0, 
Arrays.asList(5, 10)));
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, bucketKeyType);
+
+        Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, 
NUM_BUCKETS);
+        assertThat(selected).hasSize(1);
+    }
+
+    @Test
+    public void testRedundantEquals() {
+        // k = 5 AND k = 5 => redundant, should still select one bucket
+        RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        RowType partType = RowType.of();
+        RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(0, 
5));
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, bucketKeyType);
+
+        Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, 
NUM_BUCKETS);
+        assertThat(selected).hasSize(1);
+    }
+
+    @Test
+    public void testContradictoryEquals() {
+        // k = 5 AND k = 10 => empty intersection, no bucket can match
+        RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        RowType partType = RowType.of();
+        RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(0, 
10));
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, bucketKeyType);
+
+        // Empty intersection => Optional.empty() => orElse(true) => all 
buckets pass
+        // (conservative: no filtering when we can't determine)
+        Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, 
NUM_BUCKETS);
+        assertThat(selected).hasSize(NUM_BUCKETS);
+    }
+
+    @Test
+    public void testRangeOnlyFallsBackToFullScan() {
+        // k < 100 => no Equal/In to extract, full scan
+        RowType rowType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        RowType partType = RowType.of();
+        RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate = pb.lessThan(0, 100);
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, bucketKeyType);
+
+        Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, 
NUM_BUCKETS);
+        assertThat(selected).hasSize(NUM_BUCKETS);
+    }
+
+    // ========================== Multi-field bucket key 
==========================
+
+    @Test
+    public void testMultiFieldBucketKey() {
+        // k1 = 5 AND k2 = 10 => one combination, one bucket
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "k1", DataTypes.INT()),
+                        DataTypes.FIELD(1, "k2", DataTypes.INT()));
+        RowType partType = RowType.of();
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate = PredicateBuilder.and(pb.equal(0, 5), pb.equal(1, 
10));
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, rowType);
+
+        Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, 
NUM_BUCKETS);
+        assertThat(selected).hasSize(1);
+    }
+
+    @Test
+    public void testMultiFieldWithIntersection() {
+        // k1 IN (1, 5) AND k1 IN (5, 10) AND k2 = 3 => k1={5}, k2={3} => one 
combination
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "k1", DataTypes.INT()),
+                        DataTypes.FIELD(1, "k2", DataTypes.INT()));
+        RowType partType = RowType.of();
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate =
+                PredicateBuilder.and(
+                        pb.in(0, Arrays.asList(1, 5)),
+                        pb.in(0, Arrays.asList(5, 10)),
+                        pb.equal(1, 3));
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, rowType);
+
+        Set<Integer> selected = selectedBuckets(selector, BinaryRow.EMPTY_ROW, 
NUM_BUCKETS);
+        assertThat(selected).hasSize(1);
+    }
+
+    // ========================== Partitioned table ==========================
+
+    @Test
+    public void testPartitionedTableWithBucketFilter() {
+        // Table: (pt INT, k INT), partition: (pt), bucket key: (k)
+        // Predicate: pt = 1 AND k = 5
+        RowType rowType =
+                DataTypes.ROW(
+                        DataTypes.FIELD(0, "pt", DataTypes.INT()),
+                        DataTypes.FIELD(1, "k", DataTypes.INT()));
+        RowType partType = DataTypes.ROW(DataTypes.FIELD(0, "pt", 
DataTypes.INT()));
+        RowType bucketKeyType = DataTypes.ROW(DataTypes.FIELD(0, "k", 
DataTypes.INT()));
+        PredicateBuilder pb = new PredicateBuilder(rowType);
+
+        Predicate predicate = PredicateBuilder.and(pb.equal(0, 1), pb.equal(1, 
5));
+        BucketSelector selector =
+                new BucketSelector(
+                        predicate, BucketFunctionType.DEFAULT, rowType, 
partType, bucketKeyType);
+
+        // For partition pt=1 (matching), bucket filtering should work
+        BinaryRow partition1 =
+                new 
org.apache.paimon.data.serializer.InternalRowSerializer(partType)
+                        .toBinaryRow(org.apache.paimon.data.GenericRow.of(1))
+                        .copy();
+        Set<Integer> selected1 = selectedBuckets(selector, partition1, 
NUM_BUCKETS);
+        assertThat(selected1).hasSize(1);
+
+        // For partition pt=2 (not matching), predicate becomes AlwaysFalse
+        // => no bucket key values extracted => full scan (conservative)
+        BinaryRow partition2 =
+                new 
org.apache.paimon.data.serializer.InternalRowSerializer(partType)
+                        .toBinaryRow(org.apache.paimon.data.GenericRow.of(2))
+                        .copy();
+        Set<Integer> selected2 = selectedBuckets(selector, partition2, 
NUM_BUCKETS);
+        assertThat(selected2).hasSize(NUM_BUCKETS);
+    }
+
+    // ========================== Helpers ==========================
+
+    private static Set<Integer> selectedBuckets(
+            BucketSelector selector, BinaryRow partition, int numBuckets) {
+        Set<Integer> selected = new HashSet<>();
+        for (int b = 0; b < numBuckets; b++) {
+            if (selector.test(partition, b, numBuckets)) {
+                selected.add(b);
+            }
+        }
+        return selected;
+    }
+}
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java 
b/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java
new file mode 100644
index 0000000000..94494e0406
--- /dev/null
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/BucketFilterScanTest.java
@@ -0,0 +1,250 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.table;
+
+import org.apache.paimon.catalog.Identifier;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.predicate.PredicateBuilder;
+import org.apache.paimon.schema.Schema;
+import org.apache.paimon.table.source.DataSplit;
+import org.apache.paimon.types.DataTypes;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Set;
+import java.util.stream.Collectors;
+
+import static org.apache.paimon.CoreOptions.BUCKET;
+import static org.apache.paimon.CoreOptions.BUCKET_KEY;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Tests bucket filtering with compound predicates on a partitioned, 
fixed-bucket, append-only
+ * table.
+ */
+public class BucketFilterScanTest extends TableTestBase {
+
+    @Test
+    public void testBucketFilterWithCompoundPredicateOnAppendTable() throws 
Exception {
+        testBucketFilterWithCompoundPredicate(false);
+    }
+
+    @Test
+    public void testBucketFilterWithCompoundPredicateOnPkTable() throws 
Exception {
+        testBucketFilterWithCompoundPredicate(true);
+    }
+
+    @Test
+    public void testCompositeBucketFilterWithCompoundPredicateOnAppendTable() 
throws Exception {
+        testCompositeBucketFilterWithCompoundPredicate(false);
+    }
+
+    @Test
+    public void testCompositeBucketFilterWithCompoundPredicateOnPkTable() 
throws Exception {
+        testCompositeBucketFilterWithCompoundPredicate(true);
+    }
+
+    /**
+     * Tests bucket filtering with compound predicates on a single-field 
bucket key.
+     *
+     * <p>Table schema:
+     *
+     * <ul>
+     *   <li>Partition key: column 'a' (INT)
+     *   <li>Bucket key: column 'b' (INT)
+     *   <li>Bucket count: 10
+     * </ul>
+     *
+     * <p>Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) 
= 100 rows.
+     *
+     * <p>Test scenarios:
+     *
+     * <ol>
+     *   <li>Predicate: (a &lt; 3 AND b = 5) OR (a = 3 AND b = 7) - Tests 
partition range filter
+     *       with bucket equality, combined with OR. Expected: buckets for 
partition 1,2 with b=5
+     *       and partition 3 with b=7.
+     *   <li>Predicate: (a &lt; 3 AND b = 5) OR (a = 3 AND b &lt; 100) - Tests 
partition range with
+     *       bucket equality, OR partition equality with bucket range. 
Expected: mixed buckets from
+     *       partition 3 and specific buckets from partitions 1,2.
+     *   <li>Predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) - Tests 
partition equality with
+     *       bucket equality in both OR branches. Expected: exact bucket 
matching for each
+     *       partition-b combination.
+     * </ol>
+     */
+    private void testBucketFilterWithCompoundPredicate(boolean pk) throws 
Exception {
+        // ---- schema & table ----
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.INT())
+                        .partitionKeys("a")
+                        .option(BUCKET.key(), "10");
+        if (pk) {
+            builder.primaryKey("a", "b");
+        } else {
+            builder.option(BUCKET_KEY.key(), "b");
+        }
+        Schema schema = builder.build();
+
+        Identifier tableId = identifier("test_bucket_filter");
+        catalog.createTable(tableId, schema, false);
+        Table table = catalog.getTable(tableId);
+
+        // ---- write data: 5 partitions × 20 b-values = 100 rows ----
+        GenericRow[] rows = new GenericRow[100];
+        int idx = 0;
+        for (int a = 1; a <= 5; a++) {
+            for (int b = 1; b <= 20; b++) {
+                rows[idx++] = GenericRow.of(a, b, a * 100 + b);
+            }
+        }
+        write(table, rows);
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+        // ---- build predicate: (a < 3 AND b = 5) OR (a = 3 AND b = 7) ----
+        Predicate predicate1 =
+                PredicateBuilder.or(
+                        PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 
5)),
+                        PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7)));
+        assertThat(plan(table, predicate1)).containsExactlyInAnyOrder("3,1", 
"1,6", "2,6");
+
+        // ---- build predicate: (a < 3 AND b = 5) OR (a = 3 AND b < 100) ----
+        Predicate predicate2 =
+                PredicateBuilder.or(
+                        PredicateBuilder.and(pb.lessThan(0, 3), pb.equal(1, 
5)),
+                        PredicateBuilder.and(pb.equal(0, 3), pb.lessThan(1, 
100)));
+        assertThat(plan(table, predicate2))
+                .containsExactlyInAnyOrder(
+                        "3,0", "3,1", "1,6", "3,4", "3,5", "2,6", "3,6", 
"3,7", "3,8");
+
+        // ---- build predicate: (a = 2 AND b = 5) OR (a = 3 AND b = 7) ----
+        Predicate predicate3 =
+                PredicateBuilder.or(
+                        PredicateBuilder.and(pb.equal(0, 2), pb.equal(1, 5)),
+                        PredicateBuilder.and(pb.equal(0, 3), pb.equal(1, 7)));
+        assertThat(plan(table, predicate3)).containsExactlyInAnyOrder("3,1", 
"2,6");
+    }
+
+    /**
+     * Tests bucket filtering with compound predicates on a composite 
(multi-field) bucket key.
+     *
+     * <p>Table schema:
+     *
+     * <ul>
+     *   <li>Partition key: column 'a' (INT)
+     *   <li>Bucket key: columns 'b' and 'c' (composite, INT)
+     *   <li>Bucket count: 10
+     * </ul>
+     *
+     * <p>Data distribution: 5 partitions (a=1 to 5) × 20 b-values (b=1 to 20) 
× 10 c-values (c=0 to
+     * 9) = 1000 rows.
+     *
+     * <p>Test scenarios:
+     *
+     * <ol>
+     *   <li>Predicate: ((a &lt; 3 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 
- Tests nested OR
+     *       within AND, with partition range, bucket field equality, and 
additional bucket field
+     *       filter. The 'c = 5' condition is part of the composite bucket 
key, affecting bucket
+     *       selection.
+     *   <li>Predicate: ((a &lt; 3 AND b = 5) OR (a = 3 AND b &lt; 100)) AND c 
= 5 - Tests range
+     *       predicate on one bucket field (b) combined with equality on 
another (c). Validates
+     *       handling of multiple bucket key fields with different predicate 
types.
+     *   <li>Predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND c = 5 - 
Tests exact matching on
+     *       both partition and bucket fields. The composite bucket key (b,c) 
ensures precise bucket
+     *       targeting.
+     * </ol>
+     */
+    private void testCompositeBucketFilterWithCompoundPredicate(boolean pk) 
throws Exception {
+        // ---- schema & table ----
+        Schema.Builder builder =
+                Schema.newBuilder()
+                        .column("a", DataTypes.INT())
+                        .column("b", DataTypes.INT())
+                        .column("c", DataTypes.INT())
+                        .partitionKeys("a")
+                        .option(BUCKET.key(), "10");
+        if (pk) {
+            builder.primaryKey("a", "b", "c");
+        } else {
+            builder.option(BUCKET_KEY.key(), "b,c");
+        }
+        Schema schema = builder.build();
+
+        Identifier tableId = identifier("test_composite_bucket_filter");
+        catalog.createTable(tableId, schema, false);
+        Table table = catalog.getTable(tableId);
+
+        // ---- write data: 5 partitions × 20 b-values x 10 c-values = 1000 
rows ----
+        GenericRow[] rows = new GenericRow[1000];
+        int idx = 0;
+        for (int a = 1; a <= 5; a++) {
+            for (int b = 1; b <= 20; b++) {
+                for (int c = 0; c < 10; c++) {
+                    rows[idx++] = GenericRow.of(a, b, c);
+                }
+            }
+        }
+        write(table, rows);
+        PredicateBuilder pb = new PredicateBuilder(table.rowType());
+
+        // ---- build predicate: ((a < 3 AND b = 5) OR (a = 3 AND b = 7)) AND 
c = 5 ----
+        Predicate predicate1 =
+                PredicateBuilder.and(
+                        PredicateBuilder.or(
+                                PredicateBuilder.and(pb.lessThan(0, 3), 
pb.equal(1, 5)),
+                                PredicateBuilder.and(pb.equal(0, 3), 
pb.equal(1, 7))),
+                        pb.equal(2, 5));
+        assertThat(plan(table, predicate1)).containsExactlyInAnyOrder("1,0", 
"2,0", "3,5");
+
+        // ---- build predicate: ((a < 3 AND b = 5) OR (a = 3 AND b < 100)) 
AND c = 5 ----
+        Predicate predicate2 =
+                PredicateBuilder.and(
+                        PredicateBuilder.or(
+                                PredicateBuilder.and(pb.lessThan(0, 3), 
pb.equal(1, 5)),
+                                PredicateBuilder.and(pb.equal(0, 3), 
pb.lessThan(1, 100))),
+                        pb.equal(2, 5));
+        assertThat(plan(table, predicate2))
+                .containsExactlyInAnyOrder(
+                        "3,9", "1,0", "2,0", "3,0", "3,1", "3,2", "3,3", 
"3,4", "3,5", "3,6", "3,7",
+                        "3,8");
+
+        // ---- build predicate: ((a = 2 AND b = 5) OR (a = 3 AND b = 7)) AND 
c = 5 ----
+        Predicate predicate3 =
+                PredicateBuilder.and(
+                        PredicateBuilder.or(
+                                PredicateBuilder.and(pb.equal(0, 2), 
pb.equal(1, 5)),
+                                PredicateBuilder.and(pb.equal(0, 3), 
pb.equal(1, 7))),
+                        pb.equal(2, 5));
+        assertThat(plan(table, predicate3)).containsExactlyInAnyOrder("2,0", 
"3,5");
+    }
+
+    private Set<String> plan(Table table, Predicate predicate) {
+        return 
table.newReadBuilder().withFilter(predicate).newScan().plan().splits().stream()
+                .map(
+                        split -> {
+                            DataSplit dataSplit = (DataSplit) split;
+                            int partitionA = dataSplit.partition().getInt(0);
+                            int bucket = dataSplit.bucket();
+                            return partitionA + "," + bucket;
+                        })
+                .collect(Collectors.toSet());
+    }
+}

Reply via email to