This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink-table-store.git


The following commit(s) were added to refs/heads/master by this push:
     new 19f2973d [FLINK-28159] Table Store: Bucket pruning based on bucket key 
filter
19f2973d is described below

commit 19f2973d6bc59b0d0310c48eae8bb97741fb31d9
Author: Jingsong Lee <[email protected]>
AuthorDate: Wed Jul 6 17:54:57 2022 +0800

    [FLINK-28159] Table Store: Bucket pruning based on bucket key filter
    
    This closes #199
---
 .../table/store/connector/PredicateITCase.java     |  82 +++++++++++
 .../table/store/file/AppendOnlyFileStore.java      |   4 +
 .../flink/table/store/file/KeyValueFileStore.java  |   4 +
 .../file/operation/AbstractFileStoreScan.java      |  19 ++-
 .../file/operation/AppendOnlyFileStoreScan.java    |  19 +++
 .../file/operation/KeyValueFileStoreScan.java      |  19 +++
 .../flink/table/store/file/predicate/And.java      |   2 +-
 .../table/store/file/predicate/BucketSelector.java | 160 +++++++++++++++++++++
 .../store/file/predicate/CompoundPredicate.java    |  22 ++-
 .../flink/table/store/file/predicate/In.java       |   2 +-
 .../table/store/file/predicate/LeafFunction.java   |  22 ++-
 .../store/file/predicate/LeafUnaryFunction.java    |   2 +-
 .../flink/table/store/file/predicate/NotIn.java    |   2 +-
 .../predicate/NullFalseLeafBinaryFunction.java     |   2 +-
 .../flink/table/store/file/predicate/Or.java       |   2 +-
 .../store/file/predicate/PredicateBuilder.java     |  62 +++++++-
 .../flink/table/store/file/schema/TableSchema.java |  20 ++-
 .../store/table/AppendOnlyFileStoreTable.java      |   1 +
 .../table/ChangelogValueCountFileStoreTable.java   |   1 +
 .../table/ChangelogWithKeyFileStoreTable.java      |  52 ++++---
 .../store/table/sink/SinkRecordConverter.java      |   2 +-
 .../flink/table/store/table/source/TableScan.java  |  35 +----
 .../flink/table/store/file/TestFileStore.java      |   1 +
 .../store/file/predicate/BucketSelectorTest.java   | 147 +++++++++++++++++++
 .../store/table/AppendOnlyFileStoreTableTest.java  |   5 +-
 .../ChangelogValueCountFileStoreTableTest.java     |   5 +-
 .../table/ChangelogWithKeyFileStoreTableTest.java  |   6 +-
 .../table/store/table/FileStoreTableTestBase.java  |  39 ++++-
 .../table/store/table/WritePreemptMemoryTest.java  |   5 +-
 .../store/table/sink/SinkRecordConverterTest.java  |  11 +-
 30 files changed, 665 insertions(+), 90 deletions(-)

diff --git 
a/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
new file mode 100644
index 00000000..4e56fec8
--- /dev/null
+++ 
b/flink-table-store-connector/src/test/java/org/apache/flink/table/store/connector/PredicateITCase.java
@@ -0,0 +1,82 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.connector;
+
+import org.apache.flink.table.api.EnvironmentSettings;
+import org.apache.flink.table.api.TableEnvironment;
+import org.apache.flink.test.util.AbstractTestBase;
+import org.apache.flink.types.Row;
+import org.apache.flink.util.CloseableIterator;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableList;
+
+import org.junit.Before;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Predicate ITCase. */
+public class PredicateITCase extends AbstractTestBase {
+
+    private TableEnvironment tEnv;
+
+    @Before
+    public void before() throws IOException {
+        tEnv = 
TableEnvironment.create(EnvironmentSettings.newInstance().inBatchMode().build());
+        tEnv.executeSql(
+                String.format(
+                        "CREATE CATALOG TABLE_STORE WITH ("
+                                + "'type'='table-store', 'warehouse'='%s')",
+                        TEMPORARY_FOLDER.newFolder().toURI()));
+        tEnv.useCatalog("TABLE_STORE");
+    }
+
+    @Test
+    public void testPkFilterBucket() throws Exception {
+        sql("CREATE TABLE T (a INT PRIMARY KEY NOT ENFORCED, b INT) WITH 
('bucket' = '5')");
+        innerTest();
+    }
+
+    @Test
+    public void testNoPkFilterBucket() throws Exception {
+        sql("CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 
'bucket-key'='a')");
+        innerTest();
+    }
+
+    @Test
+    public void testAppendFilterBucket() throws Exception {
+        sql(
+                "CREATE TABLE T (a INT, b INT) WITH ('bucket' = '5', 
'bucket-key'='a', 'write-mode'='append-only')");
+        innerTest();
+    }
+
+    private void innerTest() throws Exception {
+        sql("INSERT INTO T VALUES (1, 2), (3, 4), (5, 6), (7, 8), (9, 10)");
+        assertThat(sql("SELECT * FROM T WHERE a = 
5")).containsExactlyInAnyOrder(Row.of(5, 6));
+    }
+
+    private List<Row> sql(String query, Object... args) throws Exception {
+        try (CloseableIterator<Row> iter = 
tEnv.executeSql(String.format(query, args)).collect()) {
+            return ImmutableList.copyOf(iter);
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
index 40d4203e..8a803a74 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/AppendOnlyFileStore.java
@@ -29,6 +29,7 @@ import org.apache.flink.table.types.logical.RowType;
 /** {@link FileStore} for reading and writing {@link RowData}. */
 public class AppendOnlyFileStore extends AbstractFileStore<RowData> {
 
+    private final RowType bucketKeyType;
     private final RowType rowType;
 
     public AppendOnlyFileStore(
@@ -36,8 +37,10 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<RowData> {
             long schemaId,
             CoreOptions options,
             RowType partitionType,
+            RowType bucketKeyType,
             RowType rowType) {
         super(schemaManager, schemaId, options, partitionType);
+        this.bucketKeyType = bucketKeyType;
         this.rowType = rowType;
     }
 
@@ -67,6 +70,7 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<RowData> {
     private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
         return new AppendOnlyFileStoreScan(
                 partitionType,
+                bucketKeyType.getFieldCount() == 0 ? rowType : bucketKeyType,
                 rowType,
                 snapshotManager(),
                 manifestFileFactory(),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
index 58b43685..9c7ae86e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/KeyValueFileStore.java
@@ -36,6 +36,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     private static final long serialVersionUID = 1L;
 
+    private final RowType bucketKeyType;
     private final RowType keyType;
     private final RowType valueType;
     private final Supplier<Comparator<RowData>> keyComparatorSupplier;
@@ -46,10 +47,12 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
             long schemaId,
             CoreOptions options,
             RowType partitionType,
+            RowType bucketKeyType,
             RowType keyType,
             RowType valueType,
             MergeFunction mergeFunction) {
         super(schemaManager, schemaId, options, partitionType);
+        this.bucketKeyType = bucketKeyType;
         this.keyType = keyType;
         this.valueType = valueType;
         this.mergeFunction = mergeFunction;
@@ -93,6 +96,7 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
     private KeyValueFileStoreScan newScan(boolean checkNumOfBuckets) {
         return new KeyValueFileStoreScan(
                 partitionType,
+                bucketKeyType,
                 keyType,
                 snapshotManager(),
                 manifestFileFactory(),
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
index 345ed23c..6494c31a 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AbstractFileStoreScan.java
@@ -25,6 +25,7 @@ import 
org.apache.flink.table.store.file.manifest.ManifestEntry;
 import org.apache.flink.table.store.file.manifest.ManifestFile;
 import org.apache.flink.table.store.file.manifest.ManifestFileMeta;
 import org.apache.flink.table.store.file.manifest.ManifestList;
+import org.apache.flink.table.store.file.predicate.BucketSelector;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
@@ -50,6 +51,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private final FieldStatsArraySerializer partitionStatsConverter;
     private final RowDataToObjectArrayConverter partitionConverter;
+    protected final RowType bucketKeyType;
     private final SnapshotManager snapshotManager;
     private final ManifestFile.Factory manifestFileFactory;
     private final ManifestList manifestList;
@@ -57,6 +59,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private final boolean checkNumOfBuckets;
 
     private Predicate partitionFilter;
+    private BucketSelector bucketSelector;
 
     private Long specifiedSnapshotId = null;
     private Integer specifiedBucket = null;
@@ -65,6 +68,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     public AbstractFileStoreScan(
             RowType partitionType,
+            RowType bucketKeyType,
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
@@ -72,6 +76,9 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             boolean checkNumOfBuckets) {
         this.partitionStatsConverter = new 
FieldStatsArraySerializer(partitionType);
         this.partitionConverter = new 
RowDataToObjectArrayConverter(partitionType);
+        Preconditions.checkArgument(
+                bucketKeyType.getFieldCount() > 0, "The bucket keys should not 
be empty.");
+        this.bucketKeyType = bucketKeyType;
         this.snapshotManager = snapshotManager;
         this.manifestFileFactory = manifestFileFactory;
         this.manifestList = manifestListFactory.create();
@@ -85,6 +92,11 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    protected FileStoreScan withBucketKeyFilter(Predicate predicate) {
+        this.bucketSelector = BucketSelector.create(predicate, 
bucketKeyType).orElse(null);
+        return this;
+    }
+
     @Override
     public FileStoreScan withPartitionFilter(List<BinaryRowData> partitions) {
         PredicateBuilder builder = new 
PredicateBuilder(partitionConverter.rowType());
@@ -228,7 +240,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
             // however entry.bucket() was computed against the old numOfBuckets
             // and thus the filtered manifest entries might be empty
             // which renders the bucket check invalid
-            if (filterByBucket(file)) {
+            if (filterByBucket(file) && filterByBucketSelector(file)) {
                 files.add(file);
             }
         }
@@ -267,6 +279,11 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return (specifiedBucket == null || entry.bucket() == specifiedBucket);
     }
 
+    private boolean filterByBucketSelector(ManifestEntry entry) {
+        return (bucketSelector == null
+                || bucketSelector.select(entry.bucket(), 
entry.totalBuckets()));
+    }
+
     protected abstract boolean filterByStats(ManifestEntry entry);
 
     private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta 
manifest) {
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
index 24ca55d8..746bf16f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/AppendOnlyFileStoreScan.java
@@ -26,15 +26,23 @@ import 
org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.List;
+
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+
 /** {@link FileStoreScan} for {@link 
org.apache.flink.table.store.file.AppendOnlyFileStore}. */
 public class AppendOnlyFileStoreScan extends AbstractFileStoreScan {
 
     private final FieldStatsArraySerializer rowStatsConverter;
+    private final RowType rowType;
 
     private Predicate filter;
 
     public AppendOnlyFileStoreScan(
             RowType partitionType,
+            RowType bucketKeyType,
             RowType rowType,
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
@@ -43,16 +51,27 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
             boolean checkNumOfBuckets) {
         super(
                 partitionType,
+                bucketKeyType,
                 snapshotManager,
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets);
         this.rowStatsConverter = new FieldStatsArraySerializer(rowType);
+        this.rowType = rowType;
     }
 
     public AppendOnlyFileStoreScan withFilter(Predicate predicate) {
         this.filter = predicate;
+
+        List<Predicate> bucketFilters =
+                pickTransformFieldMapping(
+                        splitAnd(predicate),
+                        rowType.getFieldNames(),
+                        bucketKeyType.getFieldNames());
+        if (bucketFilters.size() > 0) {
+            withBucketKeyFilter(and(bucketFilters));
+        }
         return this;
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
index aa6bc701..0ac3e28f 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/operation/KeyValueFileStoreScan.java
@@ -26,15 +26,23 @@ import 
org.apache.flink.table.store.file.stats.FieldStatsArraySerializer;
 import org.apache.flink.table.store.file.utils.SnapshotManager;
 import org.apache.flink.table.types.logical.RowType;
 
+import java.util.List;
+
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+
 /** {@link FileStoreScan} for {@link 
org.apache.flink.table.store.file.KeyValueFileStore}. */
 public class KeyValueFileStoreScan extends AbstractFileStoreScan {
 
     private final FieldStatsArraySerializer keyStatsConverter;
+    private final RowType keyType;
 
     private Predicate keyFilter;
 
     public KeyValueFileStoreScan(
             RowType partitionType,
+            RowType bucketKeyType,
             RowType keyType,
             SnapshotManager snapshotManager,
             ManifestFile.Factory manifestFileFactory,
@@ -43,16 +51,27 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
             boolean checkNumOfBuckets) {
         super(
                 partitionType,
+                bucketKeyType,
                 snapshotManager,
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
                 checkNumOfBuckets);
         this.keyStatsConverter = new FieldStatsArraySerializer(keyType);
+        this.keyType = keyType;
     }
 
     public KeyValueFileStoreScan withKeyFilter(Predicate predicate) {
         this.keyFilter = predicate;
+
+        List<Predicate> bucketFilters =
+                pickTransformFieldMapping(
+                        splitAnd(predicate),
+                        keyType.getFieldNames(),
+                        bucketKeyType.getFieldNames());
+        if (bucketFilters.size() > 0) {
+            withBucketKeyFilter(and(bucketFilters));
+        }
         return this;
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
index dc256767..339b459e 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/And.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Optional;
 
 /** A {@link CompoundPredicate.Function} to eval and. */
-public class And implements CompoundPredicate.Function {
+public class And extends CompoundPredicate.Function {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
new file mode 100644
index 00000000..43753a26
--- /dev/null
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/BucketSelector.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.annotation.VisibleForTesting;
+import org.apache.flink.table.data.GenericRowData;
+import org.apache.flink.table.runtime.typeutils.RowDataSerializer;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.apache.flink.shaded.guava30.com.google.common.collect.ImmutableSet;
+
+import javax.annotation.concurrent.ThreadSafe;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.function.Consumer;
+import java.util.stream.Collectors;
+
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitOr;
+
+/** Selector to select bucket from {@link Predicate}. */
+@ThreadSafe
+public class BucketSelector implements Serializable {
+
+    public static final int MAX_VALUES = 1000;
+
+    private static final long serialVersionUID = 1L;
+
+    private final int[] hashCodes;
+
+    private final Map<Integer, Set<Integer>> buckets = new 
ConcurrentHashMap<>();
+
+    public BucketSelector(int[] hashCodes) {
+        this.hashCodes = hashCodes;
+    }
+
+    public boolean select(int bucket, int numBucket) {
+        return buckets.computeIfAbsent(
+                        numBucket,
+                        k -> {
+                            ImmutableSet.Builder<Integer> builder = new 
ImmutableSet.Builder<>();
+                            for (int hash : hashCodes) {
+                                builder.add(hash % numBucket);
+                            }
+                            return builder.build();
+                        })
+                .contains(bucket);
+    }
+
+    @VisibleForTesting
+    int[] hashCodes() {
+        return hashCodes;
+    }
+
+    public static Optional<BucketSelector> create(
+            Predicate bucketPredicate, RowType bucketKeyType) {
+        @SuppressWarnings("unchecked")
+        List<Object>[] bucketValues = new List[bucketKeyType.getFieldCount()];
+
+        nextAnd:
+        for (Predicate andPredicate : splitAnd(bucketPredicate)) {
+            Integer reference = null;
+            List<Object> values = new ArrayList<>();
+            for (Predicate orPredicate : splitOr(andPredicate)) {
+                if (orPredicate instanceof LeafPredicate) {
+                    LeafPredicate leaf = (LeafPredicate) orPredicate;
+                    if (reference == null || reference == leaf.index()) {
+                        reference = leaf.index();
+                        if (leaf.function().equals(Equal.INSTANCE)
+                                || leaf.function().equals(In.INSTANCE)) {
+                            values.addAll(
+                                    leaf.literals().stream()
+                                            .filter(Objects::nonNull)
+                                            .collect(Collectors.toList()));
+                            continue;
+                        }
+                    }
+                }
+
+                // failed, go to next predicate
+                continue nextAnd;
+            }
+            if (reference != null) {
+                if (bucketValues[reference] != null) {
+                    // Repeated equals in And?
+                    return Optional.empty();
+                }
+
+                bucketValues[reference] = values;
+            }
+        }
+
+        int rowCount = 1;
+        for (List<Object> values : bucketValues) {
+            if (values == null) {
+                return Optional.empty();
+            }
+
+            rowCount *= values.size();
+            if (rowCount > MAX_VALUES) {
+                return Optional.empty();
+            }
+        }
+
+        RowDataSerializer serializer = new RowDataSerializer(bucketKeyType);
+        List<Integer> hashCodes = new ArrayList<>();
+        assembleRows(
+                bucketValues,
+                columns -> hashCodes.add(hash(columns, serializer)),
+                new ArrayList<>(),
+                0);
+
+        return Optional.of(new BucketSelector(hashCodes.stream().mapToInt(i -> 
i).toArray()));
+    }
+
+    private static int hash(List<Object> columns, RowDataSerializer 
serializer) {
+        return 
serializer.toBinaryRow(GenericRowData.of(columns.toArray())).hashCode();
+    }
+
+    private static void assembleRows(
+            List<Object>[] rowValues,
+            Consumer<List<Object>> consumer,
+            List<Object> stack,
+            int columnIndex) {
+        List<Object> columnValues = rowValues[columnIndex];
+        for (Object value : columnValues) {
+            stack.add(value);
+            if (columnIndex == rowValues.length - 1) {
+                // last column, consume row
+                consumer.accept(stack);
+            } else {
+                assembleRows(rowValues, consumer, stack, columnIndex + 1);
+            }
+            stack.remove(stack.size() - 1);
+        }
+    }
+}
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
index d823da0d..042b269b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/CompoundPredicate.java
@@ -72,12 +72,26 @@ public class CompoundPredicate implements Predicate {
     }
 
     /** Evaluate the predicate result based on multiple {@link Predicate}s. */
-    public interface Function extends Serializable {
+    public abstract static class Function implements Serializable {
 
-        boolean test(Object[] values, List<Predicate> children);
+        public abstract boolean test(Object[] values, List<Predicate> 
children);
 
-        boolean test(long rowCount, FieldStats[] fieldStats, List<Predicate> 
children);
+        public abstract boolean test(
+                long rowCount, FieldStats[] fieldStats, List<Predicate> 
children);
 
-        Optional<Predicate> negate(List<Predicate> children);
+        public abstract Optional<Predicate> negate(List<Predicate> children);
+
+        @Override
+        public int hashCode() {
+            return this.getClass().getName().hashCode();
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            return o != null && getClass() == o.getClass();
+        }
     }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
index aa338945..a43e5598 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/In.java
@@ -27,7 +27,7 @@ import java.util.Optional;
 import static 
org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
 
 /** A {@link LeafFunction} to eval in. */
-public class In implements LeafFunction {
+public class In extends LeafFunction {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
index 00c852cc..62ef23bb 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafFunction.java
@@ -26,11 +26,25 @@ import java.util.List;
 import java.util.Optional;
 
 /** Function to test a field with literals. */
-public interface LeafFunction extends Serializable {
+public abstract class LeafFunction implements Serializable {
 
-    boolean test(LogicalType type, Object field, List<Object> literals);
+    public abstract boolean test(LogicalType type, Object field, List<Object> 
literals);
 
-    boolean test(LogicalType type, long rowCount, FieldStats fieldStats, 
List<Object> literals);
+    public abstract boolean test(
+            LogicalType type, long rowCount, FieldStats fieldStats, 
List<Object> literals);
 
-    Optional<LeafFunction> negate();
+    public abstract Optional<LeafFunction> negate();
+
+    @Override
+    public int hashCode() {
+        return this.getClass().getName().hashCode();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        return o != null && getClass() == o.getClass();
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
index 72dcfbb1..034c55c8 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/LeafUnaryFunction.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import java.util.List;
 
 /** Function to test a field. */
-public abstract class LeafUnaryFunction implements LeafFunction {
+public abstract class LeafUnaryFunction extends LeafFunction {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
index d710da0f..f93bbf70 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NotIn.java
@@ -27,7 +27,7 @@ import java.util.Optional;
 import static 
org.apache.flink.table.store.file.predicate.CompareUtils.compareLiteral;
 
 /** A {@link LeafFunction} to eval not in. */
-public class NotIn implements LeafFunction {
+public class NotIn extends LeafFunction {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
index dec41e26..e184d931 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/NullFalseLeafBinaryFunction.java
@@ -24,7 +24,7 @@ import org.apache.flink.table.types.logical.LogicalType;
 import java.util.List;
 
 /** Function to test a field with a literal. */
-public abstract class NullFalseLeafBinaryFunction implements LeafFunction {
+public abstract class NullFalseLeafBinaryFunction extends LeafFunction {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
index 9ba91286..802000a7 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/Or.java
@@ -25,7 +25,7 @@ import java.util.List;
 import java.util.Optional;
 
 /** A {@link CompoundPredicate.Function} to eval or. */
-public class Or implements CompoundPredicate.Function {
+public class Or extends CompoundPredicate.Function {
 
     private static final long serialVersionUID = 1L;
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
index 93921a50..035809d9 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/predicate/PredicateBuilder.java
@@ -38,6 +38,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.Optional;
 
 import static java.util.Collections.singletonList;
 
@@ -141,15 +142,22 @@ public class PredicateBuilder {
 
     public static List<Predicate> splitAnd(Predicate predicate) {
         List<Predicate> result = new ArrayList<>();
-        splitAnd(predicate, result);
+        splitCompound(And.INSTANCE, predicate, result);
         return result;
     }
 
-    private static void splitAnd(Predicate predicate, List<Predicate> result) {
+    public static List<Predicate> splitOr(Predicate predicate) {
+        List<Predicate> result = new ArrayList<>();
+        splitCompound(Or.INSTANCE, predicate, result);
+        return result;
+    }
+
+    private static void splitCompound(
+            CompoundPredicate.Function function, Predicate predicate, 
List<Predicate> result) {
         if (predicate instanceof CompoundPredicate
-                && ((CompoundPredicate) 
predicate).function().equals(And.INSTANCE)) {
+                && ((CompoundPredicate) 
predicate).function().equals(function)) {
             for (Predicate child : ((CompoundPredicate) predicate).children()) 
{
-                splitAnd(child, result);
+                splitCompound(function, child, result);
             }
         } else {
             result.add(predicate);
@@ -220,4 +228,50 @@ public class PredicateBuilder {
                         "Unsupported predicate leaf type " + 
literalType.getTypeRoot().name());
         }
     }
+
+    public static List<Predicate> pickTransformFieldMapping(
+            List<Predicate> predicates, List<String> inputFields, List<String> 
pickedFields) {
+        return pickTransformFieldMapping(
+                predicates, 
inputFields.stream().mapToInt(pickedFields::indexOf).toArray());
+    }
+
+    public static List<Predicate> pickTransformFieldMapping(
+            List<Predicate> predicates, int[] fieldIdxMapping) {
+        List<Predicate> pick = new ArrayList<>();
+        for (Predicate p : predicates) {
+            Optional<Predicate> mapped = transformFieldMapping(p, 
fieldIdxMapping);
+            mapped.ifPresent(pick::add);
+        }
+        return pick;
+    }
+
+    public static Optional<Predicate> transformFieldMapping(
+            Predicate predicate, int[] fieldIdxMapping) {
+        if (predicate instanceof CompoundPredicate) {
+            CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
+            List<Predicate> children = new ArrayList<>();
+            for (Predicate child : compoundPredicate.children()) {
+                Optional<Predicate> mapped = transformFieldMapping(child, 
fieldIdxMapping);
+                if (mapped.isPresent()) {
+                    children.add(mapped.get());
+                } else {
+                    return Optional.empty();
+                }
+            }
+            return Optional.of(new 
CompoundPredicate(compoundPredicate.function(), children));
+        } else {
+            LeafPredicate leafPredicate = (LeafPredicate) predicate;
+            int mapped = fieldIdxMapping[leafPredicate.index()];
+            if (mapped >= 0) {
+                return Optional.of(
+                        new LeafPredicate(
+                                leafPredicate.function(),
+                                leafPredicate.type(),
+                                mapped,
+                                leafPredicate.literals()));
+            } else {
+                return Optional.empty();
+            }
+        }
+    }
 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
index 2a6c6b34..bf7d0068 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/file/schema/TableSchema.java
@@ -134,7 +134,8 @@ public class TableSchema implements Serializable {
         return options;
     }
 
-    public List<String> bucketKeys() {
+    /** Original bucket keys, maybe empty. */
+    public List<String> originalBucketKeys() {
         String key = options.get(BUCKET_KEY.key());
         if (StringUtils.isNullOrWhitespaceOnly(key)) {
             return Collections.emptyList();
@@ -146,6 +147,12 @@ public class TableSchema implements Serializable {
                             "Field names %s should contains all bucket keys 
%s.",
                             fieldNames(), bucketKeys));
         }
+        if (bucketKeys.stream().anyMatch(partitionKeys::contains)) {
+            throw new RuntimeException(
+                    String.format(
+                            "Bucket keys %s should not in partition keys %s.",
+                            bucketKeys, partitionKeys));
+        }
         if (primaryKeys.size() > 0) {
             if (!containsAll(primaryKeys, bucketKeys)) {
                 throw new RuntimeException(
@@ -173,6 +180,17 @@ public class TableSchema implements Serializable {
         return projectedLogicalRowType(partitionKeys);
     }
 
+    public RowType logicalBucketKeyType() {
+        List<String> bucketKeys = originalBucketKeys();
+        if (bucketKeys.isEmpty()) {
+            bucketKeys = trimmedPrimaryKeys();
+        }
+        if (bucketKeys.isEmpty()) {
+            bucketKeys = fieldNames();
+        }
+        return projectedLogicalRowType(bucketKeys);
+    }
+
     public RowType logicalTrimmedPrimaryKeysType() {
         return projectedLogicalRowType(trimmedPrimaryKeys());
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
index 4d343810..16f4be28 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTable.java
@@ -60,6 +60,7 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
                         tableSchema.id(),
                         new CoreOptions(tableSchema.options()),
                         tableSchema.logicalPartitionType(),
+                        tableSchema.logicalBucketKeyType(),
                         tableSchema.logicalRowType());
     }
 
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
index 855602f5..ad90ede0 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTable.java
@@ -69,6 +69,7 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
                         tableSchema.id(),
                         new CoreOptions(tableSchema.options()),
                         tableSchema.logicalPartitionType(),
+                        tableSchema.logicalBucketKeyType(),
                         tableSchema.logicalRowType(),
                         countType,
                         mergeFunction);
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
index 424a1dfb..37a29062 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTable.java
@@ -50,14 +50,17 @@ import 
org.apache.flink.table.store.table.source.ValueContentRowDataRecordIterat
 import org.apache.flink.table.types.logical.LogicalType;
 import org.apache.flink.table.types.logical.RowType;
 
-import java.util.ArrayList;
 import java.util.List;
-import java.util.Optional;
 import java.util.stream.Collectors;
 
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.pickTransformFieldMapping;
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.splitAnd;
+
 /** {@link FileStoreTable} for {@link WriteMode#CHANGE_LOG} write mode with 
primary keys. */
 public class ChangelogWithKeyFileStoreTable extends AbstractFileStoreTable {
 
+    private static final String KEY_FIELD_PREFIX = "_KEY_";
+
     private static final long serialVersionUID = 1L;
 
     private final KeyValueFileStore store;
@@ -66,21 +69,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
             Path path, SchemaManager schemaManager, TableSchema tableSchema) {
         super(path, tableSchema);
         RowType rowType = tableSchema.logicalRowType();
-
-        // add _KEY_ prefix to avoid conflict with value
-        RowType keyType =
-                new RowType(
-                        
tableSchema.logicalTrimmedPrimaryKeysType().getFields().stream()
-                                .map(
-                                        f ->
-                                                new RowType.RowField(
-                                                        "_KEY_" + f.getName(),
-                                                        f.getType(),
-                                                        
f.getDescription().orElse(null)))
-                                .collect(Collectors.toList()));
-
         Configuration conf = Configuration.fromMap(tableSchema.options());
-
         CoreOptions.MergeEngine mergeEngine = 
conf.get(CoreOptions.MERGE_ENGINE);
         MergeFunction mergeFunction;
         switch (mergeEngine) {
@@ -105,11 +94,25 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                         tableSchema.id(),
                         new CoreOptions(conf),
                         tableSchema.logicalPartitionType(),
-                        keyType,
+                        addKeyNamePrefix(tableSchema.logicalBucketKeyType()),
+                        
addKeyNamePrefix(tableSchema.logicalTrimmedPrimaryKeysType()),
                         rowType,
                         mergeFunction);
     }
 
+    private RowType addKeyNamePrefix(RowType type) {
+        // add prefix to avoid conflict with value
+        return new RowType(
+                type.getFields().stream()
+                        .map(
+                                f ->
+                                        new RowType.RowField(
+                                                KEY_FIELD_PREFIX + f.getName(),
+                                                f.getType(),
+                                                
f.getDescription().orElse(null)))
+                        .collect(Collectors.toList()));
+    }
+
     @Override
     public TableScan newScan() {
         KeyValueFileStoreScan scan = store.newScan();
@@ -133,16 +136,11 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                 // file 2 will be ignored, and the final result will be key = 
a, value = 1 while the
                 // correct result is an empty set
                 // TODO support value filter
-                List<String> trimmedPrimaryKeys = 
tableSchema.trimmedPrimaryKeys();
-                int[] fieldIdxToKeyIdx =
-                        tableSchema.fields().stream()
-                                .mapToInt(f -> 
trimmedPrimaryKeys.indexOf(f.name()))
-                                .toArray();
-                List<Predicate> keyFilters = new ArrayList<>();
-                for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
-                    Optional<Predicate> mapped = mapFilterFields(p, 
fieldIdxToKeyIdx);
-                    mapped.ifPresent(keyFilters::add);
-                }
+                List<Predicate> keyFilters =
+                        pickTransformFieldMapping(
+                                splitAnd(predicate),
+                                tableSchema.fieldNames(),
+                                tableSchema.trimmedPrimaryKeys());
                 if (keyFilters.size() > 0) {
                     scan.withKeyFilter(PredicateBuilder.and(keyFilters));
                 }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
index 9d926ba5..394964ad 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/sink/SinkRecordConverter.java
@@ -51,7 +51,7 @@ public class SinkRecordConverter {
                 numBucket,
                 tableSchema.logicalRowType(),
                 tableSchema.projection(tableSchema.partitionKeys()),
-                tableSchema.projection(tableSchema.bucketKeys()),
+                tableSchema.projection(tableSchema.originalBucketKeys()),
                 tableSchema.projection(tableSchema.trimmedPrimaryKeys()),
                 tableSchema.projection(tableSchema.primaryKeys()));
     }
diff --git 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
index f69ae473..5a9fc05b 100644
--- 
a/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
+++ 
b/flink-table-store-core/src/main/java/org/apache/flink/table/store/table/source/TableScan.java
@@ -22,8 +22,6 @@ import org.apache.flink.annotation.VisibleForTesting;
 import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.store.file.data.DataFileMeta;
 import org.apache.flink.table.store.file.operation.FileStoreScan;
-import org.apache.flink.table.store.file.predicate.CompoundPredicate;
-import org.apache.flink.table.store.file.predicate.LeafPredicate;
 import org.apache.flink.table.store.file.predicate.Predicate;
 import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.schema.TableSchema;
@@ -36,6 +34,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Optional;
 
+import static 
org.apache.flink.table.store.file.predicate.PredicateBuilder.transformFieldMapping;
+
 /** An abstraction layer above {@link FileStoreScan} to provide input split 
generation. */
 public abstract class TableScan {
 
@@ -74,7 +74,7 @@ public abstract class TableScan {
         List<Predicate> partitionFilters = new ArrayList<>();
         List<Predicate> nonPartitionFilters = new ArrayList<>();
         for (Predicate p : PredicateBuilder.splitAnd(predicate)) {
-            Optional<Predicate> mapped = mapFilterFields(p, 
fieldIdxToPartitionIdx);
+            Optional<Predicate> mapped = transformFieldMapping(p, 
fieldIdxToPartitionIdx);
             if (mapped.isPresent()) {
                 partitionFilters.add(mapped.get());
             } else {
@@ -136,35 +136,6 @@ public abstract class TableScan {
 
     protected abstract void withNonPartitionFilter(Predicate predicate);
 
-    protected Optional<Predicate> mapFilterFields(Predicate predicate, int[] 
fieldIdxMapping) {
-        if (predicate instanceof CompoundPredicate) {
-            CompoundPredicate compoundPredicate = (CompoundPredicate) 
predicate;
-            List<Predicate> children = new ArrayList<>();
-            for (Predicate child : compoundPredicate.children()) {
-                Optional<Predicate> mapped = mapFilterFields(child, 
fieldIdxMapping);
-                if (mapped.isPresent()) {
-                    children.add(mapped.get());
-                } else {
-                    return Optional.empty();
-                }
-            }
-            return Optional.of(new 
CompoundPredicate(compoundPredicate.function(), children));
-        } else {
-            LeafPredicate leafPredicate = (LeafPredicate) predicate;
-            int mapped = fieldIdxMapping[leafPredicate.index()];
-            if (mapped >= 0) {
-                return Optional.of(
-                        new LeafPredicate(
-                                leafPredicate.function(),
-                                leafPredicate.type(),
-                                mapped,
-                                leafPredicate.literals()));
-            } else {
-                return Optional.empty();
-            }
-        }
-    }
-
     /** Scanning plan containing snapshot ID and input splits. */
     public static class Plan {
 
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
index 21ed64a7..3a6e39ca 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/TestFileStore.java
@@ -124,6 +124,7 @@ public class TestFileStore extends KeyValueFileStore {
                 options,
                 partitionType,
                 keyType,
+                keyType,
                 valueType,
                 mergeFunction);
         this.root = root;
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java
new file mode 100644
index 00000000..aea241b5
--- /dev/null
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/file/predicate/BucketSelectorTest.java
@@ -0,0 +1,147 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.table.store.file.predicate;
+
+import org.apache.flink.table.types.logical.IntType;
+import org.apache.flink.table.types.logical.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.util.Arrays;
+import java.util.Optional;
+
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.and;
+import static org.apache.flink.table.store.file.predicate.PredicateBuilder.or;
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link BucketSelector}. */
+public class BucketSelectorTest {
+
+    private final RowType rowType = RowType.of(new IntType(), new IntType(), 
new IntType());
+
+    private final PredicateBuilder builder = new PredicateBuilder(rowType);
+
+    @Test
+    public void testRepeatEqual() {
+        assertThat(newSelector(and(builder.equal(0, 0), builder.equal(0, 
1)))).isEmpty();
+    }
+
+    @Test
+    public void testNotFull() {
+        assertThat(newSelector(and(builder.equal(0, 0)))).isEmpty();
+    }
+
+    @Test
+    public void testOtherPredicate() {
+        assertThat(newSelector(and(builder.notEqual(0, 0)))).isEmpty();
+    }
+
+    @Test
+    public void testOrIllegal() {
+        assertThat(
+                        newSelector(
+                                and(
+                                        or(builder.equal(0, 5), 
builder.equal(1, 6)),
+                                        builder.equal(1, 1),
+                                        builder.equal(2, 2))))
+                .isEmpty();
+    }
+
+    @Test
+    public void testNormal() {
+        BucketSelector selector =
+                newSelector(and(builder.equal(0, 0), builder.equal(1, 1), 
builder.equal(2, 2)))
+                        .get();
+        assertThat(selector.hashCodes()).containsExactly(1141287431);
+    }
+
+    @Test
+    public void testIn() {
+        BucketSelector selector =
+                newSelector(
+                                and(
+                                        builder.in(0, Arrays.asList(5, 6, 7)),
+                                        builder.equal(1, 1),
+                                        builder.equal(2, 2)))
+                        .get();
+        assertThat(selector.hashCodes()).containsExactly(-1272936927, 
582056914, -1234868890);
+    }
+
+    @Test
+    public void testOr() {
+        BucketSelector selector =
+                newSelector(
+                                and(
+                                        or(
+                                                builder.equal(0, 5),
+                                                builder.equal(0, 6),
+                                                builder.equal(0, 7)),
+                                        builder.equal(1, 1),
+                                        builder.equal(2, 2)))
+                        .get();
+        assertThat(selector.hashCodes()).containsExactly(-1272936927, 
582056914, -1234868890);
+    }
+
+    @Test
+    public void testInNull() {
+        BucketSelector selector =
+                newSelector(
+                                and(
+                                        builder.in(0, Arrays.asList(5, 6, 7, 
null)),
+                                        builder.equal(1, 1),
+                                        builder.equal(2, 2)))
+                        .get();
+        assertThat(selector.hashCodes()).containsExactly(-1272936927, 
582056914, -1234868890);
+    }
+
+    @Test
+    public void testMultipleIn() {
+        BucketSelector selector =
+                newSelector(
+                                and(
+                                        builder.in(0, Arrays.asList(5, 6, 7)),
+                                        builder.in(1, Arrays.asList(1, 8)),
+                                        builder.equal(2, 2)))
+                        .get();
+        assertThat(selector.hashCodes())
+                .containsExactly(
+                        -1272936927, -1567268077, 582056914, 2124429589, 
-1234868890, 1063796399);
+    }
+
+    @Test
+    public void testMultipleOr() {
+        BucketSelector selector =
+                newSelector(
+                                and(
+                                        or(
+                                                builder.equal(0, 5),
+                                                builder.equal(0, 6),
+                                                builder.equal(0, 7)),
+                                        or(builder.equal(1, 1), 
builder.equal(1, 8)),
+                                        builder.equal(2, 2)))
+                        .get();
+        assertThat(selector.hashCodes())
+                .containsExactly(
+                        -1272936927, -1567268077, 582056914, 2124429589, 
-1234868890, 1063796399);
+    }
+
+    private Optional<BucketSelector> newSelector(Predicate predicate) {
+        return BucketSelector.create(predicate, rowType);
+    }
+}
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
index b6bfd6bf..78cd1c8f 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/AppendOnlyFileStoreTableTest.java
@@ -39,6 +39,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -162,12 +163,14 @@ public class AppendOnlyFileStoreTableTest extends 
FileStoreTableTestBase {
     }
 
     @Override
-    protected FileStoreTable createFileStoreTable() throws Exception {
+    protected FileStoreTable createFileStoreTable(Consumer<Configuration> 
configure)
+            throws Exception {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
         conf.set(CoreOptions.FILE_FORMAT, "avro");
         conf.set(CoreOptions.WRITE_MODE, WriteMode.APPEND_ONLY);
+        configure.accept(conf);
         SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema tableSchema =
                 schemaManager.commitNewVersion(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
index e5169555..e39ac3dd 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogValueCountFileStoreTableTest.java
@@ -40,6 +40,7 @@ import org.junit.jupiter.api.io.TempDir;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
+import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -165,12 +166,14 @@ public class ChangelogValueCountFileStoreTableTest 
extends FileStoreTableTestBas
     }
 
     @Override
-    protected FileStoreTable createFileStoreTable() throws Exception {
+    protected FileStoreTable createFileStoreTable(Consumer<Configuration> 
configure)
+            throws Exception {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
         conf.set(CoreOptions.FILE_FORMAT, "avro");
         conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
+        configure.accept(conf);
         SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema tableSchema =
                 schemaManager.commitNewVersion(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
index 4c3b999d..0f3c31a1 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/ChangelogWithKeyFileStoreTableTest.java
@@ -209,15 +209,11 @@ public class ChangelogWithKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         write.close();
     }
 
-    @Override
-    protected FileStoreTable createFileStoreTable() throws Exception {
-        return createFileStoreTable(false);
-    }
-
     protected FileStoreTable createFileStoreTable(boolean changelogFile) 
throws Exception {
         return createFileStoreTable(conf -> 
conf.set(CoreOptions.CHANGELOG_FILE, changelogFile));
     }
 
+    @Override
     protected FileStoreTable createFileStoreTable(Consumer<Configuration> 
configure)
             throws Exception {
         Path tablePath = new Path(tempDir.toString());
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
index f41de18b..86e28662 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/FileStoreTableTestBase.java
@@ -18,6 +18,7 @@
 
 package org.apache.flink.table.store.table;
 
+import org.apache.flink.configuration.Configuration;
 import org.apache.flink.table.api.DataTypes;
 import org.apache.flink.table.data.GenericRowData;
 import org.apache.flink.table.data.RowData;
@@ -25,6 +26,7 @@ import org.apache.flink.table.data.binary.BinaryRowData;
 import org.apache.flink.table.data.writer.BinaryRowWriter;
 import org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader;
 import 
org.apache.flink.table.store.file.mergetree.compact.ConcatRecordReader.ReaderSupplier;
+import org.apache.flink.table.store.file.predicate.PredicateBuilder;
 import org.apache.flink.table.store.file.utils.RecordReader;
 import org.apache.flink.table.store.file.utils.RecordReaderIterator;
 import org.apache.flink.table.store.table.sink.TableCommit;
@@ -42,8 +44,11 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.function.Consumer;
 import java.util.function.Function;
 
+import static org.apache.flink.table.store.CoreOptions.BUCKET;
+import static org.apache.flink.table.store.CoreOptions.BUCKET_KEY;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Base test class for {@link FileStoreTable}. */
@@ -101,6 +106,33 @@ public abstract class FileStoreTableTestBase {
                 .hasSameElementsAs(Collections.singletonList("2|21|201"));
     }
 
+    @Test
+    public void testBucketFilter() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        conf -> {
+                            conf.set(BUCKET, 5);
+                            conf.set(BUCKET_KEY, "a");
+                        });
+
+        TableWrite write = table.newWrite();
+        write.write(GenericRowData.of(1, 1, 2L));
+        write.write(GenericRowData.of(1, 3, 4L));
+        write.write(GenericRowData.of(1, 5, 6L));
+        write.write(GenericRowData.of(1, 7, 8L));
+        write.write(GenericRowData.of(1, 9, 10L));
+        table.newCommit("user").commit("0", write.prepareCommit());
+        write.close();
+
+        List<Split> splits =
+                table.newScan()
+                        .withFilter(new PredicateBuilder(ROW_TYPE).equal(1, 5))
+                        .plan()
+                        .splits;
+        assertThat(splits.size()).isEqualTo(1);
+        assertThat(splits.get(0).bucket()).isEqualTo(1);
+    }
+
     protected List<String> getResult(
             TableRead read,
             List<Split> splits,
@@ -141,5 +173,10 @@ public abstract class FileStoreTableTestBase {
         return b;
     }
 
-    protected abstract FileStoreTable createFileStoreTable() throws Exception;
+    protected FileStoreTable createFileStoreTable() throws Exception {
+        return createFileStoreTable(conf -> {});
+    }
+
+    protected abstract FileStoreTable 
createFileStoreTable(Consumer<Configuration> configure)
+            throws Exception;
 }
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
index 85c65757..aa96da9e 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/WritePreemptMemoryTest.java
@@ -41,6 +41,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Random;
+import java.util.function.Consumer;
 
 import static org.assertj.core.api.Assertions.assertThat;
 
@@ -87,7 +88,8 @@ public class WritePreemptMemoryTest extends 
FileStoreTableTestBase {
     }
 
     @Override
-    protected FileStoreTable createFileStoreTable() throws Exception {
+    protected FileStoreTable createFileStoreTable(Consumer<Configuration> 
configure)
+            throws Exception {
         Path tablePath = new Path(tempDir.toString());
         Configuration conf = new Configuration();
         conf.set(CoreOptions.PATH, tablePath.toString());
@@ -95,6 +97,7 @@ public class WritePreemptMemoryTest extends 
FileStoreTableTestBase {
         conf.set(CoreOptions.WRITE_MODE, WriteMode.CHANGE_LOG);
         conf.set(CoreOptions.WRITE_BUFFER_SIZE, new MemorySize(30 * 1024));
         conf.set(CoreOptions.PAGE_SIZE, new MemorySize(1024));
+        configure.accept(conf);
         SchemaManager schemaManager = new SchemaManager(tablePath);
         TableSchema schema =
                 schemaManager.commitNewVersion(
diff --git 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
index eea054d4..827ef906 100644
--- 
a/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
+++ 
b/flink-table-store-core/src/test/java/org/apache/flink/table/store/table/sink/SinkRecordConverterTest.java
@@ -47,6 +47,9 @@ public class SinkRecordConverterTest {
 
         assertThatThrownBy(() -> converter("a", "b"))
                 .hasMessageContaining("Primary keys [b] should contains all 
bucket keys [a].");
+
+        assertThatThrownBy(() -> converter("a", "a", "a,b"))
+                .hasMessageContaining("Bucket keys [a] should not in partition 
keys [a].");
     }
 
     @Test
@@ -69,6 +72,10 @@ public class SinkRecordConverterTest {
     }
 
     private SinkRecordConverter converter(String bk, String pk) {
+        return converter("", bk, pk);
+    }
+
+    private SinkRecordConverter converter(String partK, String bk, String pk) {
         RowType rowType =
                 new RowType(
                         Arrays.asList(
@@ -83,7 +90,9 @@ public class SinkRecordConverterTest {
                         0,
                         fields,
                         TableSchema.currentHighestFieldId(fields),
-                        Collections.emptyList(),
+                        "".equals(partK)
+                                ? Collections.emptyList()
+                                : Arrays.asList(partK.split(",")),
                         "".equals(pk) ? Collections.emptyList() : 
Arrays.asList(pk.split(",")),
                         options,
                         "");

Reply via email to