This is an automated email from the ASF dual-hosted git repository.

czweng pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new c9d6f47c4 [core] Filter on Manifest Cache (#929)
c9d6f47c4 is described below

commit c9d6f47c4beaf30c679529fc3cf96ced4683b303
Author: Jingsong Lee <[email protected]>
AuthorDate: Tue Apr 18 19:41:54 2023 +0800

    [core] Filter on Manifest Cache (#929)
---
 docs/content/maintenance/write-performance.md      |   2 +-
 .../shortcodes/generated/core_configuration.html   |  12 +--
 .../org/apache/paimon/reader/RecordReader.java     |  47 +++++++++
 .../main/java/org/apache/paimon/utils/Filter.java  |   7 ++
 .../java/org/apache/paimon/AbstractFileStore.java  |  25 +++--
 .../org/apache/paimon/AppendOnlyFileStore.java     |  17 +++-
 .../main/java/org/apache/paimon/CoreOptions.java   |  17 ++--
 .../src/main/java/org/apache/paimon/FileStore.java |   3 +
 .../java/org/apache/paimon/KeyValueFileStore.java  |  16 ++-
 .../paimon/manifest/ManifestCacheFilter.java       |  23 +++--
 .../paimon/manifest/ManifestEntrySerializer.java   |  15 +++
 .../paimon/operation/AbstractFileStoreScan.java    |  97 +++++++++++++-----
 .../paimon/operation/AbstractFileStoreWrite.java   |   4 +-
 .../paimon/operation/AppendOnlyFileStoreScan.java  |   1 -
 .../org/apache/paimon/operation/FileStoreScan.java |   5 +
 .../paimon/operation/KeyValueFileStoreScan.java    |   1 -
 .../paimon/table/AppendOnlyFileStoreTable.java     |   9 +-
 .../table/ChangelogValueCountFileStoreTable.java   |   9 +-
 .../table/ChangelogWithKeyFileStoreTable.java      |   9 +-
 .../org/apache/paimon/table/FileStoreTable.java    |   3 +
 .../apache/paimon/table/system/BucketsTable.java   |  37 +++----
 .../java/org/apache/paimon/utils/ObjectsCache.java |  26 +++--
 .../java/org/apache/paimon/utils/ObjectsFile.java  |  19 +++-
 .../paimon/table/FileStoreTableTestBase.java       |   2 +-
 .../ContinuousCompactorFollowUpScannerTest.java    |   3 +-
 .../org/apache/paimon/utils/ObjectsCacheTest.java  | 108 +++++++++++++++++++++
 .../flink/sink/BucketsRowChannelComputer.java      |  49 ++++++++++
 .../apache/paimon/flink/sink/ChannelComputer.java  |  11 +++
 .../paimon/flink/sink/CompactorSinkBuilder.java    |   8 +-
 .../sink/OffsetRowDataHashStreamPartitioner.java   |  83 ----------------
 .../paimon/flink/sink/RowDataChannelComputer.java  |  14 +--
 .../paimon/flink/sink/StoreCompactOperator.java    |  26 ++---
 .../apache/paimon/flink/sink/StoreSinkWrite.java   |   3 +-
 .../paimon/flink/sink/StoreSinkWriteImpl.java      |  22 +++--
 .../paimon/flink/sink/StoreSinkWriteState.java     |   7 ++
 .../flink/sink/cdc/CdcRecordChannelComputer.java   |   3 +-
 .../sink/cdc/CdcRecordStoreWriteOperator.java      |   2 +-
 .../paimon/flink/source/CompactorSourceITCase.java |   9 +-
 38 files changed, 512 insertions(+), 242 deletions(-)

diff --git a/docs/content/maintenance/write-performance.md 
b/docs/content/maintenance/write-performance.md
index 1f01d75b6..c2dde5d20 100644
--- a/docs/content/maintenance/write-performance.md
+++ b/docs/content/maintenance/write-performance.md
@@ -56,7 +56,7 @@ It is recommended that the parallelism of sink should be less 
than or equal to t
 ## Write Initialize
 
 In the initialization of write, the writer of the bucket needs to read all 
historical files. If there is a bottleneck
-here (For example, writing a large number of partitions simultaneously), you 
can use `manifest.cache-size` to cache
+here (For example, writing a large number of partitions simultaneously), you 
can use `write-manifest-cache` to cache
 the read manifest data to accelerate initialization.
 
 ## Compaction
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 62a9f6537..b6b91d807 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -158,12 +158,6 @@
             <td>Float</td>
             <td>The index load factor for lookup.</td>
         </tr>
-        <tr>
-            <td><h5>manifest.cache-size</h5></td>
-            <td style="word-wrap: break-word;">0 bytes</td>
-            <td>MemorySize</td>
-            <td>Cache size for reading manifest files.</td>
-        </tr>
         <tr>
             <td><h5>manifest.format</h5></td>
             <td style="word-wrap: break-word;">avro</td>
@@ -374,6 +368,12 @@
             <td>Boolean</td>
             <td>Whether the write buffer can be spillable. Enabled by default 
when using object storage.</td>
         </tr>
+        <tr>
+            <td><h5>write-manifest-cache</h5></td>
+            <td style="word-wrap: break-word;">0 bytes</td>
+            <td>MemorySize</td>
+            <td>Cache size for reading manifest files for write 
initialization.</td>
+        </tr>
         <tr>
             <td><h5>write-mode</h5></td>
             <td style="word-wrap: break-word;">change-log</td>
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java 
b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
index b02075b91..474e33b4d 100644
--- a/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/reader/RecordReader.java
@@ -20,6 +20,7 @@ package org.apache.paimon.reader;
 
 import org.apache.paimon.annotation.Public;
 import org.apache.paimon.utils.CloseableIterator;
+import org.apache.paimon.utils.Filter;
 
 import javax.annotation.Nullable;
 
@@ -89,6 +90,31 @@ public interface RecordReader<T> extends Closeable {
                 }
             };
         }
+
+        /** Filters a {@link RecordIterator}. */
+        default RecordIterator<T> filter(Filter<T> filter) {
+            RecordIterator<T> thisIterator = this;
+            return new RecordIterator<T>() {
+                @Nullable
+                @Override
+                public T next() throws IOException {
+                    while (true) {
+                        T next = thisIterator.next();
+                        if (next == null) {
+                            return null;
+                        }
+                        if (filter.test(next)) {
+                            return next;
+                        }
+                    }
+                }
+
+                @Override
+                public void releaseBatch() {
+                    thisIterator.releaseBatch();
+                }
+            };
+        }
     }
 
     // 
-------------------------------------------------------------------------
@@ -136,6 +162,27 @@ public interface RecordReader<T> extends Closeable {
         };
     }
 
+    /** Filters a {@link RecordReader}. */
+    default RecordReader<T> filter(Filter<T> filter) {
+        RecordReader<T> thisReader = this;
+        return new RecordReader<T>() {
+            @Nullable
+            @Override
+            public RecordIterator<T> readBatch() throws IOException {
+                RecordIterator<T> iterator = thisReader.readBatch();
+                if (iterator == null) {
+                    return null;
+                }
+                return iterator.filter(filter);
+            }
+
+            @Override
+            public void close() throws IOException {
+                thisReader.close();
+            }
+        };
+    }
+
     /** Convert this reader to a {@link CloseableIterator}. */
     default CloseableIterator<T> toCloseableIterator() {
         return new RecordReaderIterator<>(this);
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
index 39ff64ea6..2764bc773 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
@@ -27,6 +27,8 @@ import org.apache.paimon.predicate.Predicate;
 @FunctionalInterface
 public interface Filter<T> {
 
+    Filter<?> ALWAYS_TRUE = t -> true;
+
     /**
      * Evaluates this predicate on the given argument.
      *
@@ -34,4 +36,9 @@ public interface Filter<T> {
      * @return {@code true} if the input argument matches the predicate, 
otherwise {@code false}
      */
     boolean test(T t);
+
+    @SuppressWarnings({"unchecked", "rawtypes"})
+    static <T> Filter<T> alwaysTrue() {
+        return (Filter) ALWAYS_TRUE;
+    }
 }
diff --git a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
index 5f8676012..de939bdd9 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AbstractFileStore.java
@@ -51,7 +51,7 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
     protected final CoreOptions options;
     protected final RowType partitionType;
 
-    @Nullable private final SegmentsCache<String> manifestCache;
+    @Nullable private final SegmentsCache<String> writeManifestCache;
 
     public AbstractFileStore(
             FileIO fileIO,
@@ -64,11 +64,11 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
         this.schemaId = schemaId;
         this.options = options;
         this.partitionType = partitionType;
-        MemorySize manifestCacheSize = options.manifestCacheSize();
-        this.manifestCache =
-                manifestCacheSize.getBytes() == 0
+        MemorySize writeManifestCache = options.writeManifestCache();
+        this.writeManifestCache =
+                writeManifestCache.getBytes() == 0
                         ? null
-                        : new SegmentsCache<>(options.pageSize(), 
manifestCacheSize);
+                        : new SegmentsCache<>(options.pageSize(), 
writeManifestCache);
     }
 
     public FileStorePathFactory pathFactory() {
@@ -86,6 +86,10 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
 
     @VisibleForTesting
     public ManifestFile.Factory manifestFileFactory() {
+        return manifestFileFactory(false);
+    }
+
+    protected ManifestFile.Factory manifestFileFactory(boolean forWrite) {
         return new ManifestFile.Factory(
                 fileIO,
                 schemaManager,
@@ -93,13 +97,20 @@ public abstract class AbstractFileStore<T> implements 
FileStore<T> {
                 options.manifestFormat(),
                 pathFactory(),
                 options.manifestTargetSize().getBytes(),
-                manifestCache);
+                forWrite ? writeManifestCache : null);
     }
 
     @VisibleForTesting
     public ManifestList.Factory manifestListFactory() {
+        return manifestListFactory(false);
+    }
+
+    protected ManifestList.Factory manifestListFactory(boolean forWrite) {
         return new ManifestList.Factory(
-                fileIO, options.manifestFormat(), pathFactory(), 
manifestCache);
+                fileIO,
+                options.manifestFormat(),
+                pathFactory(),
+                forWrite ? writeManifestCache : null);
     }
 
     @Override
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
index 893039c23..8e28b5b2d 100644
--- a/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/AppendOnlyFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.AppendOnlyFileStoreRead;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.AppendOnlyFileStoreWrite;
@@ -66,6 +67,12 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
 
     @Override
     public AppendOnlyFileStoreWrite newWrite(String commitUser) {
+        return newWrite(commitUser, null);
+    }
+
+    @Override
+    public AppendOnlyFileStoreWrite newWrite(
+            String commitUser, ManifestCacheFilter manifestFilter) {
         return new AppendOnlyFileStoreWrite(
                 fileIO,
                 newRead(),
@@ -74,11 +81,11 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 rowType,
                 pathFactory(),
                 snapshotManager(),
-                newScan(true),
+                newScan(true).withManifestCacheFilter(manifestFilter),
                 options);
     }
 
-    private AppendOnlyFileStoreScan newScan(boolean checkNumOfBuckets) {
+    private AppendOnlyFileStoreScan newScan(boolean forWrite) {
         return new AppendOnlyFileStoreScan(
                 partitionType,
                 bucketKeyType.getFieldCount() == 0 ? rowType : bucketKeyType,
@@ -86,10 +93,10 @@ public class AppendOnlyFileStore extends 
AbstractFileStore<InternalRow> {
                 snapshotManager(),
                 schemaManager,
                 schemaId,
-                manifestFileFactory(),
-                manifestListFactory(),
+                manifestFileFactory(forWrite),
+                manifestListFactory(forWrite),
                 options.bucket(),
-                checkNumOfBuckets);
+                forWrite);
     }
 
     @Override
diff --git a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
index c27d8b26a..ce8ccca63 100644
--- a/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-core/src/main/java/org/apache/paimon/CoreOptions.java
@@ -133,12 +133,6 @@ public class CoreOptions implements Serializable {
                             "To avoid frequent manifest merges, this parameter 
specifies the minimum number "
                                     + "of ManifestFileMeta to merge.");
 
-    public static final ConfigOption<MemorySize> MANIFEST_CACHE_SIZE =
-            key("manifest.cache-size")
-                    .memoryType()
-                    .defaultValue(MemorySize.ofMebiBytes(0))
-                    .withDescription("Cache size for reading manifest files.");
-
     public static final ConfigOption<String> PARTITION_DEFAULT_NAME =
             key("partition.default-name")
                     .stringType()
@@ -228,6 +222,13 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Whether the write buffer can be spillable. 
Enabled by default when using object storage.");
 
+    public static final ConfigOption<MemorySize> WRITE_MANIFEST_CACHE =
+            key("write-manifest-cache")
+                    .memoryType()
+                    .defaultValue(MemorySize.ofMebiBytes(0))
+                    .withDescription(
+                            "Cache size for reading manifest files for write 
initialization.");
+
     public static final ConfigOption<Integer> LOCAL_SORT_MAX_NUM_FILE_HANDLES =
             key("local-sort.max-num-file-handles")
                     .intType()
@@ -650,8 +651,8 @@ public class CoreOptions implements Serializable {
         return options.get(MANIFEST_TARGET_FILE_SIZE);
     }
 
-    public MemorySize manifestCacheSize() {
-        return options.get(MANIFEST_CACHE_SIZE);
+    public MemorySize writeManifestCache() {
+        return options.get(WRITE_MANIFEST_CACHE);
     }
 
     public String partitionDefaultName() {
diff --git a/paimon-core/src/main/java/org/apache/paimon/FileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
index 57fb4d23c..f0a60d905 100644
--- a/paimon-core/src/main/java/org/apache/paimon/FileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/FileStore.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon;
 
+import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.FileStoreCommit;
 import org.apache.paimon.operation.FileStoreExpire;
 import org.apache.paimon.operation.FileStoreRead;
@@ -50,6 +51,8 @@ public interface FileStore<T> extends Serializable {
 
     FileStoreWrite<T> newWrite(String commitUser);
 
+    FileStoreWrite<T> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter);
+
     FileStoreCommit newCommit(String commitUser);
 
     FileStoreExpire newExpire();
diff --git a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java 
b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
index 1201c8e96..77fbd5d76 100644
--- a/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
+++ b/paimon-core/src/main/java/org/apache/paimon/KeyValueFileStore.java
@@ -21,6 +21,7 @@ package org.apache.paimon;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FileFormatDiscover;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
 import org.apache.paimon.operation.KeyValueFileStoreRead;
 import org.apache.paimon.operation.KeyValueFileStoreScan;
@@ -87,6 +88,11 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
 
     @Override
     public KeyValueFileStoreWrite newWrite(String commitUser) {
+        return newWrite(commitUser, null);
+    }
+
+    @Override
+    public KeyValueFileStoreWrite newWrite(String commitUser, 
ManifestCacheFilter manifestFilter) {
         return new KeyValueFileStoreWrite(
                 fileIO,
                 schemaManager,
@@ -98,12 +104,12 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 mfFactory,
                 pathFactory(),
                 snapshotManager(),
-                newScan(true),
+                newScan(true).withManifestCacheFilter(manifestFilter),
                 options,
                 keyValueFieldsExtractor);
     }
 
-    private KeyValueFileStoreScan newScan(boolean checkNumOfBuckets) {
+    private KeyValueFileStoreScan newScan(boolean forWrite) {
         return new KeyValueFileStoreScan(
                 partitionType,
                 bucketKeyType,
@@ -112,10 +118,10 @@ public class KeyValueFileStore extends 
AbstractFileStore<KeyValue> {
                 schemaManager,
                 schemaId,
                 keyValueFieldsExtractor,
-                manifestFileFactory(),
-                manifestListFactory(),
+                manifestFileFactory(forWrite),
+                manifestListFactory(forWrite),
                 options.bucket(),
-                checkNumOfBuckets);
+                forWrite);
     }
 
     @Override
diff --git a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
similarity index 60%
copy from paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
copy to 
paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
index 39ff64ea6..32ef757b5 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/Filter.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestCacheFilter.java
@@ -16,22 +16,21 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.utils;
+package org.apache.paimon.manifest;
 
-import org.apache.paimon.predicate.Predicate;
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.operation.AbstractFileStoreScan;
+
+import javax.annotation.concurrent.ThreadSafe;
 
 /**
- * Represents a filter (boolean-valued function) of one argument. This class 
is for avoiding name
- * conflicting to {@link Predicate}.
+ * Filter for manifest cache, this is used in {@link AbstractFileStoreScan} 
for improving cache
+ * utilization. NOTE: Please use this interface with caution and make sure 
that only filtered data
+ * is required, otherwise it will cause correctness issues.
  */
+@ThreadSafe
 @FunctionalInterface
-public interface Filter<T> {
+public interface ManifestCacheFilter {
 
-    /**
-     * Evaluates this predicate on the given argument.
-     *
-     * @param t the input argument
-     * @return {@code true} if the input argument matches the predicate, 
otherwise {@code false}
-     */
-    boolean test(T t);
+    boolean test(BinaryRow partition, int bucket);
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
index ad398e2ac..a733ba9e1 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/manifest/ManifestEntrySerializer.java
@@ -18,11 +18,14 @@
 
 package org.apache.paimon.manifest;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.utils.VersionedObjectSerializer;
 
+import java.util.function.Function;
+
 import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
 
@@ -72,4 +75,16 @@ public class ManifestEntrySerializer extends 
VersionedObjectSerializer<ManifestE
                 row.getInt(3),
                 dataFileMetaSerializer.fromRow(row.getRow(4, 
dataFileMetaSerializer.numFields())));
     }
+
+    public static Function<InternalRow, BinaryRow> partitionGetter() {
+        return row -> deserializeBinaryRow(row.getBinary(2));
+    }
+
+    public static Function<InternalRow, Integer> bucketGetter() {
+        return row -> row.getInt(3);
+    }
+
+    public static Function<InternalRow, Integer> totalBucketGetter() {
+        return row -> row.getInt(4);
+    }
 }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
index 044355c07..0650d9a55 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreScan.java
@@ -20,7 +20,10 @@ package org.apache.paimon.operation;
 
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
+import org.apache.paimon.manifest.ManifestEntrySerializer;
 import org.apache.paimon.manifest.ManifestFile;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.manifest.ManifestList;
@@ -34,7 +37,6 @@ import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.FileStorePathFactory;
 import org.apache.paimon.utils.FileUtils;
 import org.apache.paimon.utils.Filter;
-import org.apache.paimon.utils.Preconditions;
 import org.apache.paimon.utils.RowDataToObjectArrayConverter;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -49,6 +51,8 @@ import java.util.concurrent.ExecutionException;
 import java.util.function.Function;
 import java.util.stream.Collectors;
 
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
 /** Default implementation of {@link FileStoreScan}. */
 public abstract class AbstractFileStoreScan implements FileStoreScan {
 
@@ -63,7 +67,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     private final ConcurrentMap<Long, TableSchema> tableSchemas;
     private final SchemaManager schemaManager;
-    private final long schemaId;
 
     private Predicate partitionFilter;
     private BucketSelector bucketSelector;
@@ -74,24 +77,23 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     private ScanKind scanKind = ScanKind.ALL;
     private Filter<Integer> levelFilter = null;
 
+    private ManifestCacheFilter manifestCacheFilter = null;
+
     public AbstractFileStoreScan(
             RowType partitionType,
             RowType bucketKeyType,
             SnapshotManager snapshotManager,
             SchemaManager schemaManager,
-            long schemaId,
             ManifestFile.Factory manifestFileFactory,
             ManifestList.Factory manifestListFactory,
             int numOfBuckets,
             boolean checkNumOfBuckets) {
         this.partitionStatsConverter = new 
FieldStatsArraySerializer(partitionType);
         this.partitionConverter = new 
RowDataToObjectArrayConverter(partitionType);
-        Preconditions.checkArgument(
-                bucketKeyType.getFieldCount() > 0, "The bucket keys should not 
be empty.");
+        checkArgument(bucketKeyType.getFieldCount() > 0, "The bucket keys 
should not be empty.");
         this.bucketKeyType = bucketKeyType;
         this.snapshotManager = snapshotManager;
         this.schemaManager = schemaManager;
-        this.schemaId = schemaId;
         this.manifestFileFactory = manifestFileFactory;
         this.manifestList = manifestListFactory.create();
         this.numOfBuckets = numOfBuckets;
@@ -141,6 +143,20 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withPartitionBucket(BinaryRow partition, int bucket) {
+        if (manifestCacheFilter != null) {
+            checkArgument(
+                    manifestCacheFilter.test(partition, bucket),
+                    String.format(
+                            "This is a bug! The partition %s and bucket %s is 
filtered!",
+                            partition, bucket));
+        }
+        withPartitionFilter(Collections.singletonList(partition));
+        withBucket(bucket);
+        return this;
+    }
+
     @Override
     public FileStoreScan withSnapshot(long snapshotId) {
         this.specifiedSnapshotId = snapshotId;
@@ -171,6 +187,12 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
         return this;
     }
 
+    @Override
+    public FileStoreScan withManifestCacheFilter(ManifestCacheFilter 
manifestFilter) {
+        this.manifestCacheFilter = manifestFilter;
+        return this;
+    }
+
     @Override
     public Plan plan() {
         List<ManifestFileMeta> manifests = specifiedManifests;
@@ -200,7 +222,7 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                                                     .parallelStream()
                                                     
.filter(this::filterManifestFileMeta)
                                                     .flatMap(m -> 
readManifestFileMeta(m).stream())
-                                                    
.filter(this::filterManifestEntry)
+                                                    
.filter(this::filterByStats)
                                                     
.collect(Collectors.toList()))
                             .get();
         } catch (InterruptedException | ExecutionException e) {
@@ -280,10 +302,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
     // called by multiple threads
     // ------------------------------------------------------------------------
 
-    protected long tableSchemaId() {
-        return schemaId;
-    }
-
     /** Note: Keep this thread-safe. */
     protected TableSchema scanTableSchema(long id) {
         return tableSchemas.computeIfAbsent(id, key -> 
schemaManager.schema(id));
@@ -297,17 +315,6 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
                         
manifest.partitionStats().fields(partitionStatsConverter));
     }
 
-    /** Note: Keep this thread-safe. */
-    private boolean filterManifestEntry(ManifestEntry entry) {
-        return filterByPartition(entry) && filterByStats(entry);
-    }
-
-    /** Note: Keep this thread-safe. */
-    private boolean filterByPartition(ManifestEntry entry) {
-        return (partitionFilter == null
-                || 
partitionFilter.test(partitionConverter.convert(entry.partition())));
-    }
-
     /** Note: Keep this thread-safe. */
     private boolean filterByBucket(ManifestEntry entry) {
         return (specifiedBucket == null || entry.bucket() == specifiedBucket);
@@ -329,7 +336,51 @@ public abstract class AbstractFileStoreScan implements 
FileStoreScan {
 
     /** Note: Keep this thread-safe. */
     private List<ManifestEntry> readManifestFileMeta(ManifestFileMeta 
manifest) {
-        return manifestFileFactory.create().read(manifest.fileName());
+        return manifestFileFactory
+                .create()
+                .read(manifest.fileName(), manifestCacheRowFilter(), 
manifestEntryRowFilter());
+    }
+
+    /** Note: Keep this thread-safe. */
+    private Filter<InternalRow> manifestEntryRowFilter() {
+        Function<InternalRow, BinaryRow> partitionGetter =
+                ManifestEntrySerializer.partitionGetter();
+        Function<InternalRow, Integer> bucketGetter = 
ManifestEntrySerializer.bucketGetter();
+        Function<InternalRow, Integer> totalBucketGetter =
+                ManifestEntrySerializer.totalBucketGetter();
+        return row -> {
+            if ((partitionFilter != null
+                    && !partitionFilter.test(
+                            
partitionConverter.convert(partitionGetter.apply(row))))) {
+                return false;
+            }
+
+            if (specifiedBucket != null && numOfBuckets == 
totalBucketGetter.apply(row)) {
+                return specifiedBucket.intValue() == bucketGetter.apply(row);
+            }
+
+            return true;
+        };
+    }
+
+    /** Note: Keep this thread-safe. */
+    private Filter<InternalRow> manifestCacheRowFilter() {
+        if (manifestCacheFilter == null) {
+            return Filter.alwaysTrue();
+        }
+
+        Function<InternalRow, BinaryRow> partitionGetter =
+                ManifestEntrySerializer.partitionGetter();
+        Function<InternalRow, Integer> bucketGetter = 
ManifestEntrySerializer.bucketGetter();
+        Function<InternalRow, Integer> totalBucketGetter =
+                ManifestEntrySerializer.totalBucketGetter();
+        return row -> {
+            if (numOfBuckets != totalBucketGetter.apply(row)) {
+                return true;
+            }
+
+            return manifestCacheFilter.test(partitionGetter.apply(row), 
bucketGetter.apply(row));
+        };
     }
 
     // ------------------------------------------------------------------------
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
index 148083129..a761aefbf 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AbstractFileStoreWrite.java
@@ -318,8 +318,8 @@ public abstract class AbstractFileStoreWrite<T>
         List<DataFileMeta> existingFileMetas = new ArrayList<>();
         if (snapshotId != null) {
             // Concat all the DataFileMeta of existing files into 
existingFileMetas.
-            
scan.withSnapshot(snapshotId).withPartitionFilter(Collections.singletonList(partition))
-                    .withBucket(bucket).plan().files().stream()
+            scan.withSnapshot(snapshotId).withPartitionBucket(partition, 
bucket).plan().files()
+                    .stream()
                     .map(ManifestEntry::file)
                     .forEach(existingFileMetas::add);
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
index f2382fb5b..fdab17423 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/AppendOnlyFileStoreScan.java
@@ -58,7 +58,6 @@ public class AppendOnlyFileStoreScan extends 
AbstractFileStoreScan {
                 bucketKeyType,
                 snapshotManager,
                 schemaManager,
-                schemaId,
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java 
b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
index 93f5e6196..9d2a37d62 100644
--- a/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
+++ b/paimon-core/src/main/java/org/apache/paimon/operation/FileStoreScan.java
@@ -21,6 +21,7 @@ package org.apache.paimon.operation;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.manifest.FileKind;
+import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.manifest.ManifestEntry;
 import org.apache.paimon.manifest.ManifestFileMeta;
 import org.apache.paimon.predicate.Predicate;
@@ -43,6 +44,8 @@ public interface FileStoreScan {
 
     FileStoreScan withBucket(int bucket);
 
+    FileStoreScan withPartitionBucket(BinaryRow partition, int bucket);
+
     FileStoreScan withSnapshot(long snapshotId);
 
     FileStoreScan withManifestList(List<ManifestFileMeta> manifests);
@@ -51,6 +54,8 @@ public interface FileStoreScan {
 
     FileStoreScan withLevelFilter(Filter<Integer> levelFilter);
 
+    FileStoreScan withManifestCacheFilter(ManifestCacheFilter manifestFilter);
+
     /** Produce a {@link Plan}. */
     Plan plan();
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
index 7011c74c0..a35177b00 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreScan.java
@@ -60,7 +60,6 @@ public class KeyValueFileStoreScan extends 
AbstractFileStoreScan {
                 bucketKeyType,
                 snapshotManager,
                 schemaManager,
-                schemaId,
                 manifestFileFactory,
                 manifestListFactory,
                 numOfBuckets,
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
index 8dcf5c757..c9f3b6006 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/AppendOnlyFileStoreTable.java
@@ -24,6 +24,7 @@ import org.apache.paimon.WriteMode;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.operation.AppendOnlyFileStoreRead;
 import org.apache.paimon.operation.AppendOnlyFileStoreScan;
 import org.apache.paimon.operation.FileStoreScan;
@@ -123,8 +124,14 @@ public class AppendOnlyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public TableWriteImpl<InternalRow> newWrite(String commitUser) {
+        return newWrite(commitUser, null);
+    }
+
+    @Override
+    public TableWriteImpl<InternalRow> newWrite(
+            String commitUser, ManifestCacheFilter manifestFilter) {
         return new TableWriteImpl<>(
-                store().newWrite(commitUser),
+                store().newWrite(commitUser, manifestFilter),
                 new InternalRowKeyAndBucketExtractor(tableSchema),
                 record -> {
                     Preconditions.checkState(
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
index cab4f57c5..b91397348 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogValueCountFileStoreTable.java
@@ -27,6 +27,7 @@ import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.ValueCountMergeFunction;
 import org.apache.paimon.operation.FileStoreScan;
 import org.apache.paimon.operation.KeyValueFileStoreScan;
@@ -141,9 +142,15 @@ public class ChangelogValueCountFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public TableWriteImpl<KeyValue> newWrite(String commitUser) {
+        return newWrite(commitUser, null);
+    }
+
+    @Override
+    public TableWriteImpl<KeyValue> newWrite(
+            String commitUser, ManifestCacheFilter manifestFilter) {
         final KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
-                store().newWrite(commitUser),
+                store().newWrite(commitUser, manifestFilter),
                 new InternalRowKeyAndBucketExtractor(tableSchema),
                 record -> {
                     switch (record.row().getRowKind()) {
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
index 934eaa9e2..74edb4a01 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/ChangelogWithKeyFileStoreTable.java
@@ -26,6 +26,7 @@ import org.apache.paimon.WriteMode;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
+import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.mergetree.compact.DeduplicateMergeFunction;
 import org.apache.paimon.mergetree.compact.LookupMergeFunction;
 import org.apache.paimon.mergetree.compact.MergeFunctionFactory;
@@ -214,6 +215,12 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
 
     @Override
     public TableWriteImpl<KeyValue> newWrite(String commitUser) {
+        return newWrite(commitUser, null);
+    }
+
+    @Override
+    public TableWriteImpl<KeyValue> newWrite(
+            String commitUser, ManifestCacheFilter manifestFilter) {
         final SequenceGenerator sequenceGenerator =
                 store().options()
                         .sequenceField()
@@ -221,7 +228,7 @@ public class ChangelogWithKeyFileStoreTable extends 
AbstractFileStoreTable {
                         .orElse(null);
         final KeyValue kv = new KeyValue();
         return new TableWriteImpl<>(
-                store().newWrite(commitUser),
+                store().newWrite(commitUser, manifestFilter),
                 new InternalRowKeyAndBucketExtractor(tableSchema),
                 record -> {
                     long sequenceNumber =
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
index 71902147b..664d400b3 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/FileStoreTable.java
@@ -21,6 +21,7 @@ package org.apache.paimon.table;
 import org.apache.paimon.FileStore;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.io.DataFileMeta;
+import org.apache.paimon.manifest.ManifestCacheFilter;
 import org.apache.paimon.schema.TableSchema;
 import org.apache.paimon.stats.BinaryTableStats;
 import org.apache.paimon.table.sink.TableCommitImpl;
@@ -80,6 +81,8 @@ public interface FileStoreTable extends DataTable {
     @Override
     TableWriteImpl<?> newWrite(String commitUser);
 
+    TableWriteImpl<?> newWrite(String commitUser, ManifestCacheFilter 
manifestFilter);
+
     @Override
     TableCommitImpl newCommit(String commitUser);
 
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java 
b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
index d1ed08ab4..9e4ead3cb 100644
--- a/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
+++ b/paimon-core/src/main/java/org/apache/paimon/table/system/BucketsTable.java
@@ -21,7 +21,6 @@ package org.apache.paimon.table.system;
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.GenericRow;
 import org.apache.paimon.data.InternalRow;
-import org.apache.paimon.data.JoinedRow;
 import org.apache.paimon.fs.FileIO;
 import org.apache.paimon.fs.Path;
 import org.apache.paimon.io.DataFileMeta;
@@ -41,7 +40,6 @@ import org.apache.paimon.types.BigIntType;
 import org.apache.paimon.types.DataField;
 import org.apache.paimon.types.IntType;
 import org.apache.paimon.types.RowType;
-import org.apache.paimon.types.VarBinaryType;
 import org.apache.paimon.utils.IteratorRecordReader;
 import org.apache.paimon.utils.SnapshotManager;
 
@@ -51,6 +49,9 @@ import java.util.Collections;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.paimon.utils.SerializationUtils.newBytesType;
+import static org.apache.paimon.utils.SerializationUtils.serializeBinaryRow;
+
 /**
  * A table to produce modified partitions and buckets for each snapshot.
  *
@@ -85,21 +86,11 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
 
     @Override
     public RowType rowType() {
-        RowType partitionType = wrapped.schema().logicalPartitionType();
-
         List<DataField> fields = new ArrayList<>();
-        fields.add(new DataField(0, "_SNAPSHOT_ID", new BigIntType()));
-        fields.addAll(partitionType.getFields());
-        // same with ManifestEntry.schema
-        fields.add(new DataField(1, "_BUCKET", new IntType()));
-        fields.add(new DataField(2, "_FILES", new VarBinaryType()));
-        return new RowType(fields);
-    }
-
-    public static RowType partitionWithBucketRowType(RowType partitionType) {
-        List<DataField> fields = new ArrayList<>(partitionType.getFields());
-        // same with ManifestEntry.schema
-        fields.add(new DataField(3, "_BUCKET", new IntType()));
+        fields.add(new DataField(0, "_SNAPSHOT_ID", new BigIntType(false)));
+        fields.add(new DataField(1, "_PARTITION", newBytesType(false)));
+        fields.add(new DataField(2, "_BUCKET", new IntType(false)));
+        fields.add(new DataField(3, "_FILES", newBytesType(false)));
         return new RowType(fields);
     }
 
@@ -171,10 +162,6 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
 
             DataSplit dataSplit = (DataSplit) split;
 
-            InternalRow row =
-                    new JoinedRow(GenericRow.of(dataSplit.snapshotId()), 
dataSplit.partition());
-            row = new JoinedRow(row, GenericRow.of(dataSplit.bucket()));
-
             List<DataFileMeta> files = Collections.emptyList();
             if (isContinuous) {
                 // Serialized files are only useful in streaming jobs.
@@ -182,10 +169,12 @@ public class BucketsTable implements DataTable, 
ReadonlyTable {
                 // be compacted and don't need to concern incremental new 
files.
                 files = dataSplit.files();
             }
-            row =
-                    new JoinedRow(
-                            row,
-                            GenericRow.of((Object) 
dataFileMetaSerializer.serializeList(files)));
+            InternalRow row =
+                    GenericRow.of(
+                            dataSplit.snapshotId(),
+                            serializeBinaryRow(dataSplit.partition()),
+                            dataSplit.bucket(),
+                            dataFileMetaSerializer.serializeList(files));
 
             return new 
IteratorRecordReader<>(Collections.singletonList(row).iterator());
         }
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
index 06ed61320..14254ae95 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsCache.java
@@ -18,14 +18,14 @@
 
 package org.apache.paimon.utils;
 
+import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.RandomAccessInputView;
 import org.apache.paimon.data.Segments;
 import org.apache.paimon.data.SimpleCollectingOutputView;
-import org.apache.paimon.data.serializer.RowCompactedSerializer;
+import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.memory.MemorySegmentSource;
-import org.apache.paimon.types.RowType;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -38,7 +38,7 @@ public class ObjectsCache<K, V> {
 
     private final SegmentsCache<K> cache;
     private final ObjectSerializer<V> serializer;
-    private final RowCompactedSerializer compactedSerializer;
+    private final InternalRowSerializer rowSerializer;
     private final Function<K, CloseableIterator<InternalRow>> reader;
 
     public ObjectsCache(
@@ -47,26 +47,31 @@ public class ObjectsCache<K, V> {
             Function<K, CloseableIterator<InternalRow>> reader) {
         this.cache = cache;
         this.serializer = serializer;
-        this.compactedSerializer = new 
RowCompactedSerializer(RowType.of(serializer.fieldTypes()));
+        this.rowSerializer = new 
InternalRowSerializer(serializer.fieldTypes());
         this.reader = reader;
     }
 
-    public List<V> read(K key) throws IOException {
-        Segments segments = cache.getSegments(key, this::readSegments);
+    public List<V> read(K key, Filter<InternalRow> loadFilter, 
Filter<InternalRow> readFilter)
+            throws IOException {
+        Segments segments = cache.getSegments(key, k -> readSegments(k, 
loadFilter));
         List<V> entries = new ArrayList<>();
         RandomAccessInputView view =
                 new RandomAccessInputView(
                         segments.segments(), cache.pageSize(), 
segments.limitInLastSegment());
+        BinaryRow binaryRow = new BinaryRow(rowSerializer.getArity());
         while (true) {
             try {
-                
entries.add(serializer.fromRow(compactedSerializer.deserialize(view)));
+                rowSerializer.mapFromPages(binaryRow, view);
+                if (readFilter.test(binaryRow)) {
+                    entries.add(serializer.fromRow(binaryRow));
+                }
             } catch (EOFException e) {
                 return entries;
             }
         }
     }
 
-    private Segments readSegments(K key) {
+    private Segments readSegments(K key, Filter<InternalRow> loadFilter) {
         try (CloseableIterator<InternalRow> iterator = reader.apply(key)) {
             ArrayList<MemorySegment> segments = new ArrayList<>();
             MemorySegmentSource segmentSource =
@@ -74,7 +79,10 @@ public class ObjectsCache<K, V> {
             SimpleCollectingOutputView output =
                     new SimpleCollectingOutputView(segments, segmentSource, 
cache.pageSize());
             while (iterator.hasNext()) {
-                compactedSerializer.serialize(iterator.next(), output);
+                InternalRow row = iterator.next();
+                if (loadFilter.test(row)) {
+                    rowSerializer.serializeToPages(row, output);
+                }
             }
             return new Segments(segments, 
output.getCurrentPositionInSegment());
         } catch (Exception e) {
diff --git a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java 
b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
index 10ac5ee2c..8e9e6bf32 100644
--- a/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
+++ b/paimon-core/src/main/java/org/apache/paimon/utils/ObjectsFile.java
@@ -21,10 +21,12 @@ package org.apache.paimon.utils;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.format.FormatReaderFactory;
 import org.apache.paimon.fs.FileIO;
+import org.apache.paimon.reader.RecordReader;
 
 import javax.annotation.Nullable;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.List;
 
 import static org.apache.paimon.utils.FileUtils.createFormatReader;
@@ -54,13 +56,24 @@ public abstract class ObjectsFile<T> {
     }
 
     public List<T> read(String fileName) {
+        return read(fileName, Filter.alwaysTrue(), Filter.alwaysTrue());
+    }
+
+    public List<T> read(
+            String fileName, Filter<InternalRow> loadFilter, 
Filter<InternalRow> readFilter) {
         try {
             if (cache != null) {
-                return cache.read(fileName);
+                return cache.read(fileName, loadFilter, readFilter);
             }
 
-            return FileUtils.readListFromFile(
-                    fileIO, pathFactory.toPath(fileName), serializer, 
readerFactory);
+            RecordReader<InternalRow> reader =
+                    createFormatReader(fileIO, readerFactory, 
pathFactory.toPath(fileName));
+            if (readFilter != Filter.ALWAYS_TRUE) {
+                reader = reader.filter(readFilter);
+            }
+            List<T> result = new ArrayList<>();
+            reader.forEachRemaining(row -> 
result.add(serializer.fromRow(row)));
+            return result;
         } catch (IOException e) {
             throw new RuntimeException("Failed to read manifest list " + 
fileName, e);
         }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
index edf58e0d5..bc27cef46 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/FileStoreTableTestBase.java
@@ -427,7 +427,7 @@ public abstract class FileStoreTableTestBase {
                 createFileStoreTable(
                         conf ->
                                 conf.set(
-                                        CoreOptions.MANIFEST_CACHE_SIZE,
+                                        CoreOptions.WRITE_MANIFEST_CACHE,
                                         MemorySize.ofMebiBytes(1)));
         StreamTableWrite write = table.newWrite(commitUser);
         StreamTableCommit commit = table.newCommit(commitUser);
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
index 72c0e62f8..28c769645 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/source/snapshot/ContinuousCompactorFollowUpScannerTest.java
@@ -38,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** Tests for {@link ContinuousCompactorFollowUpScanner}. */
@@ -117,7 +118,7 @@ public class ContinuousCompactorFollowUpScannerTest extends 
ScannerTestBase {
                 "%s %d|%d|%d|%d",
                 rowData.getRowKind().shortString(),
                 rowData.getLong(0),
-                rowData.getInt(1),
+                deserializeBinaryRow(rowData.getBinary(1)).getInt(0),
                 rowData.getInt(2),
                 numFiles);
     }
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java 
b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
new file mode 100644
index 000000000..672ca209e
--- /dev/null
+++ b/paimon-core/src/test/java/org/apache/paimon/utils/ObjectsCacheTest.java
@@ -0,0 +1,108 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.utils;
+
+import org.apache.paimon.data.BinaryString;
+import org.apache.paimon.data.GenericRow;
+import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.types.DataTypes;
+import org.apache.paimon.types.RowType;
+
+import org.junit.jupiter.api.Test;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link ObjectsCache}. */
+public class ObjectsCacheTest {
+
+    @Test
+    public void test() throws IOException {
+        Map<String, List<String>> map = new HashMap<>();
+        ObjectsCache<String, String> cache =
+                new ObjectsCache<>(
+                        new SegmentsCache<>(1024, MemorySize.ofKibiBytes(5)),
+                        new StringSerializer(),
+                        k ->
+                                CloseableIterator.adapterForIterator(
+                                        map.get(k).stream()
+                                                .map(BinaryString::fromString)
+                                                .map(GenericRow::of)
+                                                .map(r -> (InternalRow) r)
+                                                .iterator()));
+
+        // test empty
+        map.put("k1", Collections.emptyList());
+        List<String> values = cache.read("k1", Filter.alwaysTrue(), 
Filter.alwaysTrue());
+        assertThat(values).isEmpty();
+
+        // test values
+        List<String> expect = Arrays.asList("v1", "v2", "v3");
+        map.put("k2", expect);
+        values = cache.read("k2", Filter.alwaysTrue(), Filter.alwaysTrue());
+        assertThat(values).containsExactlyElementsOf(expect);
+
+        // test cache
+        values = cache.read("k2", Filter.alwaysTrue(), Filter.alwaysTrue());
+        assertThat(values).containsExactlyElementsOf(expect);
+
+        // test filter
+        values =
+                cache.read("k2", Filter.alwaysTrue(), r -> 
r.getString(0).toString().endsWith("2"));
+        assertThat(values).containsExactly("v2");
+
+        // test load filter
+        expect = Arrays.asList("v1", "v2", "v3");
+        map.put("k3", expect);
+        values =
+                cache.read("k3", r -> r.getString(0).toString().endsWith("2"), 
Filter.alwaysTrue());
+        assertThat(values).containsExactly("v2");
+
+        // test load filter empty
+        expect = Arrays.asList("v1", "v2", "v3");
+        map.put("k4", expect);
+        values =
+                cache.read("k4", r -> r.getString(0).toString().endsWith("5"), 
Filter.alwaysTrue());
+        assertThat(values).isEmpty();
+    }
+
+    private static class StringSerializer extends ObjectSerializer<String> {
+
+        public StringSerializer() {
+            super(RowType.of(DataTypes.STRING()));
+        }
+
+        @Override
+        public InternalRow toRow(String record) {
+            return GenericRow.of(BinaryString.fromString(record));
+        }
+
+        @Override
+        public String fromRow(InternalRow rowData) {
+            return rowData.getString(0).toString();
+        }
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketsRowChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketsRowChannelComputer.java
new file mode 100644
index 000000000..665e0dc84
--- /dev/null
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/BucketsRowChannelComputer.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.flink.sink;
+
+import org.apache.paimon.data.BinaryRow;
+import org.apache.paimon.table.system.BucketsTable;
+
+import org.apache.flink.table.data.RowData;
+
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+
+/** {@link ChannelComputer} to partition {@link RowData} from {@link 
BucketsTable}. */
+public class BucketsRowChannelComputer implements ChannelComputer<RowData> {
+
+    private transient int numberOfChannels;
+
+    @Override
+    public void setup(int numberOfChannels) {
+        this.numberOfChannels = numberOfChannels;
+    }
+
+    @Override
+    public int channel(RowData rowData) {
+        BinaryRow partition = deserializeBinaryRow(rowData.getBinary(1));
+        int bucket = rowData.getInt(2);
+        return ChannelComputer.select(partition, bucket, numberOfChannels);
+    }
+
+    @Override
+    public String toString() {
+        return "compactor-partitioner";
+    }
+}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
index fdef4c141..648143d92 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/ChannelComputer.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.flink.sink;
 
+import org.apache.paimon.data.BinaryRow;
+
 import java.io.Serializable;
 
 /**
@@ -30,4 +32,13 @@ public interface ChannelComputer<T> extends Serializable {
     void setup(int numChannels);
 
     int channel(T record);
+
+    static int select(BinaryRow partition, int bucket, int numChannels) {
+        int startChannel = Math.abs(partition.hashCode()) % numChannels;
+        return (startChannel + bucket) % numChannels;
+    }
+
+    static int select(int bucket, int numChannels) {
+        return bucket % numChannels;
+    }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
index d293e4ec7..1abc68256 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/CompactorSinkBuilder.java
@@ -20,7 +20,6 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.operation.Lock;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.table.system.BucketsTable;
 
 import org.apache.flink.streaming.api.datastream.DataStream;
 import org.apache.flink.streaming.api.datastream.DataStreamSink;
@@ -51,11 +50,8 @@ public class CompactorSinkBuilder {
     }
 
     public DataStreamSink<?> build() {
-        OffsetRowDataHashStreamPartitioner partitioner =
-                new OffsetRowDataHashStreamPartitioner(
-                        BucketsTable.partitionWithBucketRowType(
-                                table.schema().logicalPartitionType()),
-                        1);
+        BucketingStreamPartitioner<RowData> partitioner =
+                new BucketingStreamPartitioner<>(new 
BucketsRowChannelComputer());
         PartitionTransformation<RowData> partitioned =
                 new PartitionTransformation<>(input.getTransformation(), 
partitioner);
 
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/OffsetRowDataHashStreamPartitioner.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/OffsetRowDataHashStreamPartitioner.java
deleted file mode 100644
index fe0216358..000000000
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/OffsetRowDataHashStreamPartitioner.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.paimon.flink.sink;
-
-import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.flink.FlinkRowWrapper;
-import org.apache.paimon.types.RowType;
-import org.apache.paimon.utils.OffsetRow;
-
-import org.apache.flink.runtime.io.network.api.writer.SubtaskStateMapper;
-import org.apache.flink.runtime.plugable.SerializationDelegate;
-import org.apache.flink.streaming.runtime.partitioner.StreamPartitioner;
-import org.apache.flink.streaming.runtime.streamrecord.StreamRecord;
-import org.apache.flink.table.data.RowData;
-
-/**
- * {@link StreamPartitioner} to partition {@link RowData} according to its 
hash value from an {@link
- * OffsetRow}.
- */
-public class OffsetRowDataHashStreamPartitioner extends 
StreamPartitioner<RowData> {
-
-    private final RowType offsetRowType;
-    private final int offset;
-
-    private transient OffsetRow offsetRow;
-    private transient InternalRowSerializer serializer;
-
-    public OffsetRowDataHashStreamPartitioner(RowType offsetRowType, int 
offset) {
-        this.offsetRowType = offsetRowType;
-        this.offset = offset;
-    }
-
-    @Override
-    public void setup(int numberOfChannels) {
-        super.setup(numberOfChannels);
-        this.offsetRow = new OffsetRow(offsetRowType.getFieldCount(), offset);
-        serializer = new InternalRowSerializer(offsetRowType);
-    }
-
-    @Override
-    public int selectChannel(SerializationDelegate<StreamRecord<RowData>> 
record) {
-        RowData rowData = record.getInstance().getValue();
-        int hash =
-                serializer.toBinaryRow(offsetRow.replace(new 
FlinkRowWrapper(rowData))).hashCode();
-        return Math.abs(hash) % numberOfChannels;
-    }
-
-    @Override
-    public StreamPartitioner<RowData> copy() {
-        return this;
-    }
-
-    @Override
-    public SubtaskStateMapper getDownstreamSubtaskStateMapper() {
-        return SubtaskStateMapper.FULL;
-    }
-
-    @Override
-    public boolean isPointwise() {
-        return false;
-    }
-
-    @Override
-    public String toString() {
-        return "compactor-stream-partitioner";
-    }
-}
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
index 85e7ebce7..19e1f9d5d 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/RowDataChannelComputer.java
@@ -53,15 +53,11 @@ public class RowDataChannelComputer implements 
ChannelComputer<RowData> {
     }
 
     public int channel(BinaryRow partition, int bucket) {
-        int startChannel;
-        if (hasLogSink) {
-            // log sinks like Kafka only consider bucket and don't care about 
partition
-            // so same bucket, even from different partition, must go to the 
same channel
-            startChannel = 0;
-        } else {
-            startChannel = Math.abs(partition.hashCode()) % numChannels;
-        }
-        return (startChannel + bucket) % numChannels;
+        // log sinks like Kafka only consider bucket and don't care about 
partition
+        // so same bucket, even from different partition, must go to the same 
channel
+        return hasLogSink
+                ? ChannelComputer.select(bucket, numChannels)
+                : ChannelComputer.select(partition, bucket, numChannels);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
index 156b3a798..f1b77cf35 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreCompactOperator.java
@@ -20,12 +20,9 @@ package org.apache.paimon.flink.sink;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.data.BinaryRow;
-import org.apache.paimon.data.serializer.InternalRowSerializer;
-import org.apache.paimon.flink.FlinkRowWrapper;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.DataFileMetaSerializer;
 import org.apache.paimon.table.FileStoreTable;
-import org.apache.paimon.utils.OffsetRow;
 import org.apache.paimon.utils.Preconditions;
 
 import org.apache.flink.runtime.state.StateInitializationContext;
@@ -36,6 +33,8 @@ import org.apache.flink.table.data.RowData;
 import java.io.IOException;
 import java.util.List;
 
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
+
 /**
  * A dedicated operator for manual triggered compaction.
  *
@@ -52,8 +51,6 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData> {
 
     private transient StoreSinkWriteState state;
     private transient StoreSinkWrite write;
-    private transient InternalRowSerializer partitionSerializer;
-    private transient OffsetRow reusedPartition;
     private transient DataFileMetaSerializer dataFileMetaSerializer;
 
     public StoreCompactOperator(
@@ -81,13 +78,14 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData> {
                 StateUtils.getSingleValueFromState(
                         context, "commit_user_state", String.class, 
initialCommitUser);
 
-        RowDataChannelComputer channelComputer = new 
RowDataChannelComputer(table.schema(), false);
-        
channelComputer.setup(getRuntimeContext().getNumberOfParallelSubtasks());
         state =
                 new StoreSinkWriteState(
                         context,
                         (tableName, partition, bucket) ->
-                                channelComputer.channel(partition, bucket)
+                                ChannelComputer.select(
+                                                partition,
+                                                bucket,
+                                                
getRuntimeContext().getNumberOfParallelSubtasks())
                                         == 
getRuntimeContext().getIndexOfThisSubtask());
 
         write =
@@ -101,8 +99,6 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData> {
     @Override
     public void open() throws Exception {
         super.open();
-        partitionSerializer = new 
InternalRowSerializer(table.schema().logicalPartitionType());
-        reusedPartition = new OffsetRow(partitionSerializer.getArity(), 1);
         dataFileMetaSerializer = new DataFileMetaSerializer();
     }
 
@@ -111,13 +107,9 @@ public class StoreCompactOperator extends 
PrepareCommitOperator<RowData> {
         RowData record = element.getValue();
 
         long snapshotId = record.getLong(0);
-
-        reusedPartition.replace(new FlinkRowWrapper(record));
-        BinaryRow partition = 
partitionSerializer.toBinaryRow(reusedPartition).copy();
-
-        int bucket = record.getInt(partitionSerializer.getArity() + 1);
-
-        byte[] serializedFiles = 
record.getBinary(partitionSerializer.getArity() + 2);
+        BinaryRow partition = deserializeBinaryRow(record.getBinary(1));
+        int bucket = record.getInt(2);
+        byte[] serializedFiles = record.getBinary(3);
         List<DataFileMeta> files = 
dataFileMetaSerializer.deserializeList(serializedFiles);
 
         if (isStreaming) {
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
index 0eae74e00..d3e22efe5 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWrite.java
@@ -30,7 +30,6 @@ import org.apache.flink.runtime.io.disk.iomanager.IOManager;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.List;
-import java.util.function.Function;
 
 /** Helper class of {@link PrepareCommitOperator} for different types of 
paimon sinks. */
 public interface StoreSinkWrite {
@@ -59,7 +58,7 @@ public interface StoreSinkWrite {
      * changes. {@link TableWriteImpl} with the new schema will be provided by 
{@code
      * newWriteProvider}.
      */
-    void replace(Function<String, TableWriteImpl<?>> newWriteProvider) throws 
Exception;
+    void replace(FileStoreTable newTable) throws Exception;
 
     /** Provider of {@link StoreSinkWrite}. */
     @FunctionalInterface
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
index 67f6ddab1..18ecc44a0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteImpl.java
@@ -35,7 +35,6 @@ import org.slf4j.LoggerFactory;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
-import java.util.function.Function;
 
 /** Default implementation of {@link StoreSinkWrite}. This writer does not 
have states. */
 public class StoreSinkWriteImpl implements StoreSinkWrite {
@@ -44,6 +43,8 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
 
     protected final String commitUser;
     protected final StoreSinkWriteState state;
+    private final IOManager ioManager;
+    private final boolean isOverwrite;
     private final boolean waitCompaction;
 
     protected TableWriteImpl<?> write;
@@ -57,12 +58,19 @@ public class StoreSinkWriteImpl implements StoreSinkWrite {
             boolean waitCompaction) {
         this.commitUser = commitUser;
         this.state = state;
+        this.ioManager = ioManager;
+        this.isOverwrite = isOverwrite;
         this.waitCompaction = waitCompaction;
+        this.write = newTableWrite(table);
+    }
 
-        write =
-                table.newWrite(commitUser)
-                        .withIOManager(new 
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
-                        .withOverwrite(isOverwrite);
+    private TableWriteImpl<?> newTableWrite(FileStoreTable table) {
+        return table.newWrite(
+                        commitUser,
+                        (part, bucket) ->
+                                state.stateValueFilter().filter(table.name(), 
part, bucket))
+                .withIOManager(new 
IOManagerImpl(ioManager.getSpillingDirectoriesPaths()))
+                .withOverwrite(isOverwrite);
     }
 
     @Override
@@ -125,14 +133,14 @@ public class StoreSinkWriteImpl implements StoreSinkWrite 
{
     }
 
     @Override
-    public void replace(Function<String, TableWriteImpl<?>> newWriteProvider) 
throws Exception {
+    public void replace(FileStoreTable newTable) throws Exception {
         if (commitUser == null) {
             return;
         }
 
         List<AbstractFileStoreWrite.State> states = write.checkpoint();
         write.close();
-        write = newWriteProvider.apply(commitUser);
+        write = newTableWrite(newTable);
         write.restore(states);
     }
 }
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
index feff9e9dd..072d6a1b9 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/StoreSinkWriteState.java
@@ -46,6 +46,8 @@ import java.util.Map;
  */
 public class StoreSinkWriteState {
 
+    private final StateValueFilter stateValueFilter;
+
     private final ListState<Tuple5<String, String, byte[], Integer, byte[]>> 
listState;
     private final Map<String, Map<String, List<StateValue>>> map;
 
@@ -53,6 +55,7 @@ public class StoreSinkWriteState {
     public StoreSinkWriteState(
             StateInitializationContext context, StateValueFilter 
stateValueFilter)
             throws Exception {
+        this.stateValueFilter = stateValueFilter;
         TupleSerializer<Tuple5<String, String, byte[], Integer, byte[]>> 
listStateSerializer =
                 new TupleSerializer<>(
                         (Class<Tuple5<String, String, byte[], Integer, 
byte[]>>)
@@ -81,6 +84,10 @@ public class StoreSinkWriteState {
         }
     }
 
+    public StateValueFilter stateValueFilter() {
+        return stateValueFilter;
+    }
+
     public @Nullable List<StateValue> get(String tableName, String key) {
         Map<String, List<StateValue>> innerMap = map.get(tableName);
         return innerMap == null ? null : innerMap.get(key);
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
index c15340a74..43eb14a6a 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordChannelComputer.java
@@ -50,8 +50,7 @@ public class CdcRecordChannelComputer implements 
ChannelComputer<CdcRecord> {
     }
 
     public int channel(BinaryRow partition, int bucket) {
-        int startChannel = Math.abs(partition.hashCode()) % numChannels;
-        return (startChannel + bucket) % numChannels;
+        return ChannelComputer.select(partition, bucket, numChannels);
     }
 
     @Override
diff --git 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
index df45d8281..3e9cafeb0 100644
--- 
a/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
+++ 
b/paimon-flink/paimon-flink-common/src/main/java/org/apache/paimon/flink/sink/cdc/CdcRecordStoreWriteOperator.java
@@ -111,7 +111,7 @@ public class CdcRecordStoreWriteOperator extends 
PrepareCommitOperator<CdcRecord
                 }
                 Thread.sleep(retrySleepMillis);
             }
-            write.replace(commitUser -> table.newWrite(commitUser));
+            write.replace(table);
         }
 
         try {
diff --git 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
index 05237690e..2e8ab836f 100644
--- 
a/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
+++ 
b/paimon-flink/paimon-flink-common/src/test/java/org/apache/paimon/flink/source/CompactorSourceITCase.java
@@ -57,6 +57,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.UUID;
 
+import static org.apache.paimon.utils.SerializationUtils.deserializeBinaryRow;
 import static org.assertj.core.api.Assertions.assertThat;
 
 /** IT cases for {@link CompactorSourceBuilder}. */
@@ -272,18 +273,20 @@ public class CompactorSourceITCase extends 
AbstractTestBase {
     private String toString(RowData rowData) {
         int numFiles;
         try {
-            numFiles = 
dataFileMetaSerializer.deserializeList(rowData.getBinary(4)).size();
+            numFiles = 
dataFileMetaSerializer.deserializeList(rowData.getBinary(3)).size();
         } catch (IOException e) {
             throw new UncheckedIOException(e);
         }
 
+        BinaryRow partition = deserializeBinaryRow(rowData.getBinary(1));
+
         return String.format(
                 "%s %d|%s|%d|%d|%d",
                 rowData.getRowKind().shortString(),
                 rowData.getLong(0),
-                rowData.getString(1).toString(),
+                partition.getString(0),
+                partition.getInt(1),
                 rowData.getInt(2),
-                rowData.getInt(3),
                 numFiles);
     }
 

Reply via email to