This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-paimon.git

commit 6a6c060fae074d6e7efc981f8c1dc1a46a1775ac
Author: Jingsong <[email protected]>
AuthorDate: Tue Jan 30 16:02:51 2024 +0800

    [core] Extract PageFileInput and PageFileOutput from Hash lookup
---
 .../compression/BlockCompressionFactory.java       |   9 +-
 .../apache/paimon/io/CompressedPageFileInput.java  | 115 +++++++++++++++++++++
 .../apache/paimon/io/CompressedPageFileOutput.java | 109 +++++++++++++++++++
 .../java/org/apache/paimon/io/PageFileInput.java   |  43 ++++++++
 .../java/org/apache/paimon/io/PageFileOutput.java  |  33 ++++++
 .../paimon/io/UncompressedPageFileInput.java       |  79 ++++++++++++++
 .../paimon/io/UncompressedPageFileOutput.java      |  44 ++++++++
 .../org/apache/paimon/io/cache/CacheCallback.java  |  25 +++++
 .../java/org/apache/paimon/io/cache/CacheKey.java  | 104 +++++++++++++++++++
 .../org/apache/paimon/io/cache/CacheManager.java   |  65 ++----------
 .../org/apache/paimon/io/cache/CacheReader.java    |  27 +++++
 .../paimon/io/cache/FileBasedRandomInputView.java  |  60 ++++-------
 .../paimon/lookup/hash/HashLookupStoreReader.java  |   8 +-
 .../paimon/lookup/hash/HashLookupStoreWriter.java  |  21 ++--
 .../apache/paimon/utils/FileBasedBloomFilter.java  |  17 ++-
 .../paimon/io/CompressedInputOutputTest.java       | 112 ++++++++++++++++++++
 .../io/cache/FileBasedRandomInputViewTest.java     |   4 +-
 .../paimon/utils/FileBasedBloomFilterTest.java     |   4 +-
 18 files changed, 760 insertions(+), 119 deletions(-)

diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
index 40bb10fc4..f90d4e039 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
@@ -23,6 +23,8 @@ import io.airlift.compress.lzo.LzoDecompressor;
 import io.airlift.compress.zstd.ZstdCompressor;
 import io.airlift.compress.zstd.ZstdDecompressor;
 
+import javax.annotation.Nullable;
+
 /**
  * Each compression codec has an implementation of {@link 
BlockCompressionFactory} to create
  * compressors and decompressors.
@@ -34,7 +36,12 @@ public interface BlockCompressionFactory {
     BlockDecompressor getDecompressor();
 
     /** Creates {@link BlockCompressionFactory} according to the 
configuration. */
-    static BlockCompressionFactory create(String compression) {
+    @Nullable
+    static BlockCompressionFactory create(@Nullable String compression) {
+        if (compression == null) {
+            return null;
+        }
+
         switch (compression.toUpperCase()) {
             case "LZ4":
                 return new Lz4BlockCompressionFactory();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java 
b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java
new file mode 100644
index 000000000..3d1a23892
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java
@@ -0,0 +1,115 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.BlockDecompressor;
+import org.apache.paimon.utils.MathUtils;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/** A class to wrap compressed {@link RandomAccessFile}. */
+public class CompressedPageFileInput implements PageFileInput {
+
+    private final RandomAccessFile file;
+    private final int pageSize;
+    private final long uncompressBytes;
+    private final long[] pagePositions;
+
+    private final BlockDecompressor decompressor;
+    private final byte[] uncompressedBuffer;
+    private final byte[] compressedBuffer;
+
+    private final int pageSizeBits;
+    private final int pageSizeMask;
+
+    public CompressedPageFileInput(
+            RandomAccessFile file,
+            int pageSize,
+            BlockCompressionFactory compressionFactory,
+            long uncompressBytes,
+            long[] pagePositions) {
+        this.file = file;
+        this.pageSize = pageSize;
+        this.uncompressBytes = uncompressBytes;
+        this.pagePositions = pagePositions;
+
+        this.uncompressedBuffer = new byte[pageSize];
+        this.decompressor = compressionFactory.getDecompressor();
+        this.compressedBuffer =
+                new 
byte[compressionFactory.getCompressor().getMaxCompressedSize(pageSize)];
+
+        this.pageSizeBits = MathUtils.log2strict(pageSize);
+        this.pageSizeMask = pageSize - 1;
+    }
+
+    @Override
+    public RandomAccessFile file() {
+        return file;
+    }
+
+    @Override
+    public long uncompressBytes() {
+        return uncompressBytes;
+    }
+
+    @Override
+    public int pageSize() {
+        return pageSize;
+    }
+
+    @Override
+    public byte[] readPage(int pageIndex) throws IOException {
+        long position = pagePositions[pageIndex];
+        file.seek(position);
+        int compressLength = file.readInt();
+        file.readFully(compressedBuffer, 0, compressLength);
+
+        int uncompressedLength =
+                decompressor.decompress(compressedBuffer, 0, compressLength, 
uncompressedBuffer, 0);
+
+        byte[] result = new byte[uncompressedLength];
+        System.arraycopy(uncompressedBuffer, 0, result, 0, uncompressedLength);
+        return result;
+    }
+
+    @Override
+    public byte[] readPosition(long position, int length) throws IOException {
+        int offset = (int) (position & this.pageSizeMask);
+        int pageIndex = (int) (position >>> this.pageSizeBits);
+
+        byte[] result = new byte[length];
+        int n = 0;
+        do {
+            byte[] page = readPage(pageIndex);
+            int currentLength = Math.min(page.length - offset, length - n);
+            System.arraycopy(page, offset, result, n, currentLength);
+            offset = 0;
+            n += currentLength;
+            pageIndex++;
+        } while (n < length);
+        return result;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.file.close();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java
 
b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java
new file mode 100644
index 000000000..6e1571589
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileOutput.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.BlockCompressor;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+import java.io.OutputStream;
+import java.util.ArrayList;
+import java.util.List;
+
+/** A class to output bytes with compression. */
+public class CompressedPageFileOutput implements PageFileOutput {
+
+    private final FileOutputStream out;
+    private final byte[] page;
+    private final BlockCompressor compressor;
+    private final byte[] compressedPage;
+    private final List<Long> pages;
+
+    private long uncompressBytes;
+    private long position;
+    private int count;
+
+    public CompressedPageFileOutput(
+            File file, int pageSize, BlockCompressionFactory 
compressionFactory)
+            throws FileNotFoundException {
+        this.out = new FileOutputStream(file);
+        this.page = new byte[pageSize];
+        this.compressor = compressionFactory.getCompressor();
+        this.compressedPage = new 
byte[compressor.getMaxCompressedSize(pageSize)];
+        this.pages = new ArrayList<>();
+
+        this.uncompressBytes = 0;
+        this.position = 0;
+        this.count = 0;
+    }
+
+    private void flushBuffer() throws IOException {
+        if (count > 0) {
+            pages.add(position);
+            int len = compressor.compress(page, 0, count, compressedPage, 0);
+            // write length
+            out.write((len >>> 24) & 0xFF);
+            out.write((len >>> 16) & 0xFF);
+            out.write((len >>> 8) & 0xFF);
+            out.write(len & 0xFF);
+            // write page
+            out.write(compressedPage, 0, len);
+            count = 0;
+            position += (len + 4);
+        }
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        uncompressBytes += len;
+
+        while (len > 0) {
+            if (count >= page.length) {
+                flushBuffer();
+            }
+            int toWrite = Math.min(len, page.length - count);
+            System.arraycopy(b, off, page, count, toWrite);
+            off += toWrite;
+            len -= toWrite;
+            count += toWrite;
+        }
+    }
+
+    @Override
+    public void close() throws IOException {
+        try (OutputStream ignored = out) {
+            flushBuffer();
+        }
+    }
+
+    public long uncompressBytes() {
+        return uncompressBytes;
+    }
+
+    public long[] pages() {
+        long[] pages = new long[this.pages.size()];
+        for (int i = 0; i < pages.length; i++) {
+            pages[i] = this.pages.get(i);
+        }
+        return pages;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java 
b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
new file mode 100644
index 000000000..da699304f
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/io/PageFileInput.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/** An interface to read pages from file. */
+public interface PageFileInput extends Closeable {
+
+    RandomAccessFile file();
+
+    long uncompressBytes();
+
+    int pageSize();
+
+    byte[] readPage(int pageIndex) throws IOException;
+
+    byte[] readPosition(long position, int length) throws IOException;
+
+    static PageFileInput create(File file, int pageSize) throws IOException {
+        RandomAccessFile accessFile = new RandomAccessFile(file, "r");
+        return new UncompressedPageFileInput(accessFile, pageSize);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java 
b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
new file mode 100644
index 000000000..1a623ad90
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/io/PageFileOutput.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import java.io.Closeable;
+import java.io.File;
+import java.io.IOException;
+
+/** An interface to write bytes with pages into file. */
+public interface PageFileOutput extends Closeable {
+
+    void write(byte[] bytes, int off, int len) throws IOException;
+
+    static PageFileOutput create(File file) throws IOException {
+        return new UncompressedPageFileOutput(file);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java
 
b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java
new file mode 100644
index 000000000..dd7922393
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileInput.java
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.utils.MathUtils;
+
+import java.io.IOException;
+import java.io.RandomAccessFile;
+
+/** A class to wrap uncompressed {@link RandomAccessFile}. */
+public class UncompressedPageFileInput implements PageFileInput {
+
+    private final RandomAccessFile file;
+    private final long fileLength;
+    private final int pageSize;
+    private final int pageSizeBits;
+
+    public UncompressedPageFileInput(RandomAccessFile file, int pageSize) 
throws IOException {
+        this.file = file;
+        this.fileLength = file.length();
+        this.pageSize = pageSize;
+        this.pageSizeBits = MathUtils.log2strict(pageSize);
+    }
+
+    @Override
+    public RandomAccessFile file() {
+        return file;
+    }
+
+    @Override
+    public long uncompressBytes() {
+        return fileLength;
+    }
+
+    @Override
+    public int pageSize() {
+        return pageSize;
+    }
+
+    @Override
+    public byte[] readPage(int pageIndex) throws IOException {
+        long position = (long) pageIndex << pageSizeBits;
+        file.seek(position);
+
+        int length = (int) Math.min(pageSize, fileLength - position);
+        byte[] result = new byte[length];
+        file.readFully(result);
+        return result;
+    }
+
+    @Override
+    public byte[] readPosition(long position, int length) throws IOException {
+        file.seek(position);
+        byte[] result = new byte[length];
+        file.readFully(result);
+        return result;
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.file.close();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java
 
b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java
new file mode 100644
index 000000000..799395f0b
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/io/UncompressedPageFileOutput.java
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.IOException;
+
+/** A class to wrap uncompressed {@link FileOutputStream}. */
+public class UncompressedPageFileOutput implements PageFileOutput {
+
+    private final FileOutputStream out;
+
+    public UncompressedPageFileOutput(File file) throws FileNotFoundException {
+        this.out = new FileOutputStream(file);
+    }
+
+    @Override
+    public void write(byte[] bytes, int off, int len) throws IOException {
+        this.out.write(bytes, off, len);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.out.close();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheCallback.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheCallback.java
new file mode 100644
index 000000000..523e34b43
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheCallback.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io.cache;
+
+/** Callback for cache removal. */
+public interface CacheCallback {
+
+    void onRemoval(CacheKey key);
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
new file mode 100644
index 000000000..b313018d3
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheKey.java
@@ -0,0 +1,104 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io.cache;
+
+import java.io.RandomAccessFile;
+import java.util.Objects;
+
+/** Key for cache manager. */
+public interface CacheKey {
+
+    static CacheKey forPosition(RandomAccessFile file, long position, int 
length) {
+        return new PositionCacheKey(file, position, length);
+    }
+
+    static CacheKey forPageIndex(RandomAccessFile file, int pageSize, int 
pageIndex) {
+        return new PageIndexCacheKey(file, pageSize, pageIndex);
+    }
+
+    /** Key for file position and length. */
+    class PositionCacheKey implements CacheKey {
+
+        private final RandomAccessFile file;
+        private final long position;
+        private final int length;
+
+        private PositionCacheKey(RandomAccessFile file, long position, int 
length) {
+            this.file = file;
+            this.position = position;
+            this.length = length;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PositionCacheKey that = (PositionCacheKey) o;
+            return position == that.position
+                    && length == that.length
+                    && Objects.equals(file, that.file);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(file, position, length);
+        }
+    }
+
+    /** Key for file page index. */
+    class PageIndexCacheKey implements CacheKey {
+
+        private final RandomAccessFile file;
+        private final int pageSize;
+        private final int pageIndex;
+
+        private PageIndexCacheKey(RandomAccessFile file, int pageSize, int 
pageIndex) {
+            this.file = file;
+            this.pageSize = pageSize;
+            this.pageIndex = pageIndex;
+        }
+
+        public int pageIndex() {
+            return pageIndex;
+        }
+
+        @Override
+        public boolean equals(Object o) {
+            if (this == o) {
+                return true;
+            }
+            if (o == null || getClass() != o.getClass()) {
+                return false;
+            }
+            PageIndexCacheKey that = (PageIndexCacheKey) o;
+            return pageSize == that.pageSize
+                    && pageIndex == that.pageIndex
+                    && Objects.equals(file, that.file);
+        }
+
+        @Override
+        public int hashCode() {
+            return Objects.hash(file, pageSize, pageIndex);
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
index 0634e1c07..3f09fa5f4 100644
--- a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java
@@ -28,9 +28,6 @@ import 
org.apache.paimon.shade.caffeine2.com.github.benmanes.caffeine.cache.Remo
 import 
org.apache.paimon.shade.guava30.com.google.common.util.concurrent.MoreExecutors;
 
 import java.io.IOException;
-import java.io.RandomAccessFile;
-import java.util.Objects;
-import java.util.function.BiConsumer;
 
 /** Cache manager to cache bytes to paged {@link MemorySegment}s. */
 public class CacheManager {
@@ -61,16 +58,12 @@ public class CacheManager {
         return cache;
     }
 
-    public MemorySegment getPage(
-            RandomAccessFile file,
-            long readOffset,
-            int readLength,
-            BiConsumer<Long, Integer> cleanCallback) {
-        CacheKey key = new CacheKey(file, readOffset, readLength);
+    public MemorySegment getPage(CacheKey key, CacheReader reader, 
CacheCallback callback) {
         CacheValue value = cache.getIfPresent(key);
         while (value == null || value.isClosed) {
             try {
-                value = new CacheValue(key.read(), cleanCallback);
+                this.fileReadCount++;
+                value = new CacheValue(MemorySegment.wrap(reader.read(key)), 
callback);
             } catch (IOException e) {
                 throw new RuntimeException(e);
             }
@@ -79,8 +72,8 @@ public class CacheManager {
         return value.segment;
     }
 
-    public void invalidPage(RandomAccessFile file, long readOffset, int 
readLength) {
-        cache.invalidate(new CacheKey(file, readOffset, readLength));
+    public void invalidPage(CacheKey key) {
+        cache.invalidate(key);
     }
 
     private int weigh(CacheKey cacheKey, CacheValue cacheValue) {
@@ -89,63 +82,23 @@ public class CacheManager {
 
     private void onRemoval(CacheKey key, CacheValue value, RemovalCause cause) 
{
         value.isClosed = true;
-        value.cleanCallback.accept(key.offset, key.length);
+        value.callback.onRemoval(key);
     }
 
     public int fileReadCount() {
         return fileReadCount;
     }
 
-    private class CacheKey {
-
-        private final RandomAccessFile file;
-        private final long offset;
-        private final int length;
-
-        private CacheKey(RandomAccessFile file, long offset, int length) {
-            this.file = file;
-            this.offset = offset;
-            this.length = length;
-        }
-
-        private MemorySegment read() throws IOException {
-            byte[] bytes = new byte[length];
-            file.seek(offset);
-            file.readFully(bytes);
-            fileReadCount++;
-            return MemorySegment.wrap(bytes);
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) {
-                return true;
-            }
-            if (o == null || getClass() != o.getClass()) {
-                return false;
-            }
-            CacheKey cacheKey = (CacheKey) o;
-            return Objects.equals(file, cacheKey.file)
-                    && offset == cacheKey.offset
-                    && length == cacheKey.length;
-        }
-
-        @Override
-        public int hashCode() {
-            return Objects.hash(file, offset, length);
-        }
-    }
-
     private static class CacheValue {
 
         private final MemorySegment segment;
-        private final BiConsumer<Long, Integer> cleanCallback;
+        private final CacheCallback callback;
 
         private boolean isClosed = false;
 
-        private CacheValue(MemorySegment segment, BiConsumer<Long, Integer> 
cleanCallback) {
+        private CacheValue(MemorySegment segment, CacheCallback callback) {
             this.segment = segment;
-            this.cleanCallback = cleanCallback;
+            this.callback = callback;
         }
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheReader.java 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheReader.java
new file mode 100644
index 000000000..2067cafce
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/io/cache/CacheReader.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io.cache;
+
+import java.io.IOException;
+
+/** Reader to read byte[]. */
+public interface CacheReader {
+
+    byte[] read(CacheKey key) throws IOException;
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
index ef7594516..b6fa559d9 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/io/cache/FileBasedRandomInputView.java
@@ -19,14 +19,14 @@
 package org.apache.paimon.io.cache;
 
 import org.apache.paimon.data.AbstractPagedInputView;
+import org.apache.paimon.io.PageFileInput;
 import org.apache.paimon.io.SeekableDataInputView;
+import org.apache.paimon.io.cache.CacheKey.PageIndexCacheKey;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.utils.MathUtils;
 
 import java.io.Closeable;
 import java.io.EOFException;
-import java.io.File;
-import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.io.RandomAccessFile;
 import java.util.ArrayList;
@@ -43,23 +43,19 @@ import static 
org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT;
 public class FileBasedRandomInputView extends AbstractPagedInputView
         implements SeekableDataInputView, Closeable {
 
-    private final RandomAccessFile file;
-    private final long fileLength;
+    private final PageFileInput input;
     private final CacheManager cacheManager;
     private final Map<Integer, SegmentContainer> segments;
-    private final int segmentSize;
     private final int segmentSizeBits;
     private final int segmentSizeMask;
 
     private int currentSegmentIndex;
 
-    public FileBasedRandomInputView(File file, CacheManager cacheManager, int 
segmentSize)
-            throws FileNotFoundException {
-        this.file = new RandomAccessFile(file, "r");
-        this.fileLength = file.length();
+    public FileBasedRandomInputView(PageFileInput input, CacheManager 
cacheManager) {
+        this.input = input;
         this.cacheManager = cacheManager;
         this.segments = new HashMap<>();
-        this.segmentSize = segmentSize;
+        int segmentSize = input.pageSize();
         this.segmentSizeBits = MathUtils.log2strict(segmentSize);
         this.segmentSizeMask = segmentSize - 1;
 
@@ -68,8 +64,8 @@ public class FileBasedRandomInputView extends 
AbstractPagedInputView
 
     @Override
     public void setReadPosition(long position) {
-        final int offset = (int) (position & this.segmentSizeMask);
-        this.currentSegmentIndex = positionToSegmentIndex(position);
+        int offset = (int) (position & this.segmentSizeMask);
+        this.currentSegmentIndex = (int) (position >>> this.segmentSizeBits);
         MemorySegment segment = getCurrentPage();
         seekInput(segment, offset, getLimitForSegment(segment));
     }
@@ -77,11 +73,13 @@ public class FileBasedRandomInputView extends 
AbstractPagedInputView
     private MemorySegment getCurrentPage() {
         SegmentContainer container = segments.get(currentSegmentIndex);
         if (container == null || container.accessCount == REFRESH_COUNT) {
-            long offset = segmentIndexToPosition(currentSegmentIndex);
-            int length = (int) Math.min(segmentSize, fileLength - offset);
-            container =
-                    new SegmentContainer(
-                            cacheManager.getPage(file, offset, length, 
this::invalidPage));
+            int pageIndex = currentSegmentIndex;
+            MemorySegment segment =
+                    cacheManager.getPage(
+                            CacheKey.forPageIndex(input.file(), 
input.pageSize(), pageIndex),
+                            key -> input.readPage(pageIndex),
+                            this::invalidPage);
+            container = new SegmentContainer(segment);
             segments.put(currentSegmentIndex, container);
         }
         return container.access();
@@ -90,7 +88,7 @@ public class FileBasedRandomInputView extends 
AbstractPagedInputView
     @Override
     protected MemorySegment nextSegment(MemorySegment current) throws 
EOFException {
         currentSegmentIndex++;
-        if (segmentIndexToPosition(currentSegmentIndex) >= fileLength) {
+        if ((long) currentSegmentIndex << segmentSizeBits >= 
input.uncompressBytes()) {
             throw new EOFException();
         }
 
@@ -102,8 +100,8 @@ public class FileBasedRandomInputView extends 
AbstractPagedInputView
         return segment.size();
     }
 
-    private void invalidPage(long position, int length) {
-        segments.remove(positionToSegmentIndex(position));
+    private void invalidPage(CacheKey key) {
+        segments.remove(((PageIndexCacheKey) key).pageIndex());
     }
 
     @Override
@@ -111,25 +109,11 @@ public class FileBasedRandomInputView extends 
AbstractPagedInputView
         // copy out to avoid ConcurrentModificationException
         List<Integer> pages = new ArrayList<>(segments.keySet());
         pages.forEach(
-                page -> {
-                    long offset = segmentIndexToPosition(page);
-                    int length = (int) Math.min(segmentSize, fileLength - 
offset);
-                    cacheManager.invalidPage(file, offset, length);
-                });
+                page ->
+                        cacheManager.invalidPage(
+                                CacheKey.forPageIndex(input.file(), 
input.pageSize(), page)));
 
-        file.close();
-    }
-
-    private int positionToSegmentIndex(long position) {
-        return (int) (position >>> this.segmentSizeBits);
-    }
-
-    private long segmentIndexToPosition(int segmentIndex) {
-        return (long) segmentIndex << segmentSizeBits;
-    }
-
-    public RandomAccessFile file() {
-        return file;
+        input.close();
     }
 
     private static class SegmentContainer {
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
index 19849953a..853cd4049 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreReader.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.lookup.hash;
 
+import org.apache.paimon.io.PageFileInput;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.io.cache.FileBasedRandomInputView;
 import org.apache.paimon.lookup.LookupStoreReader;
@@ -81,13 +82,14 @@ public class HashLookupStoreReader
         dataOffsets = context.dataOffsets;
 
         LOG.info("Opening file {}", file.getName());
-        // Create Mapped file in read-only mode
-        inputView = new FileBasedRandomInputView(file, cacheManager, 
cachePageSize);
+
+        PageFileInput fileInput = PageFileInput.create(file, cachePageSize);
+        inputView = new FileBasedRandomInputView(fileInput, cacheManager);
 
         if (context.bloomFilterEnabled) {
             bloomFilter =
                     new FileBasedBloomFilter(
-                            inputView.file(),
+                            fileInput,
                             cacheManager,
                             context.bloomFilterExpectedEntries,
                             0,
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
index d0d00ba0e..8853ab827 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/hash/HashLookupStoreWriter.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.lookup.hash;
 
+import org.apache.paimon.io.PageFileOutput;
 import org.apache.paimon.lookup.LookupStoreFactory.Context;
 import org.apache.paimon.lookup.LookupStoreWriter;
 import org.apache.paimon.utils.BloomFilter;
@@ -37,7 +38,6 @@ import java.io.File;
 import java.io.FileInputStream;
 import java.io.FileOutputStream;
 import java.io.IOException;
-import java.io.OutputStream;
 import java.io.RandomAccessFile;
 import java.nio.MappedByteBuffer;
 import java.nio.channels.FileChannel;
@@ -57,7 +57,7 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
     private final double loadFactor;
     // Output
     private final File tempFolder;
-    private final OutputStream outputStream;
+    private final File outputFile;
     // Index stream
     private File[] indexFiles;
     private DataOutputStream[] indexStreams;
@@ -84,6 +84,7 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
     HashLookupStoreWriter(double loadFactor, File file, @Nullable 
BloomFilter.Builder bloomFilter)
             throws IOException {
         this.loadFactor = loadFactor;
+        this.outputFile = file;
         if (loadFactor <= 0.0 || loadFactor >= 1.0) {
             throw new IllegalArgumentException(
                     "Illegal load factor = " + loadFactor + ", should be 
between 0.0 and 1.0.");
@@ -93,7 +94,6 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
         if (!tempFolder.mkdir()) {
             throw new IOException("Can not create temp folder: " + tempFolder);
         }
-        this.outputStream = new BufferedOutputStream(new 
FileOutputStream(file));
         this.indexStreams = new DataOutputStream[0];
         this.dataStreams = new DataOutputStream[0];
         this.indexFiles = new File[0];
@@ -223,7 +223,7 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
             context.dataOffsets[i] = indexesLength + context.dataOffsets[i];
         }
 
-        try {
+        try (PageFileOutput output = PageFileOutput.create(outputFile)) {
             // Write bloom filter file
             if (bloomFilter != null) {
                 File bloomFilterFile = new File(tempFolder, "bloomfilter.dat");
@@ -254,12 +254,15 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
 
             // Merge and write to output
             checkFreeDiskSpace(filesToMerge);
-            mergeFiles(filesToMerge, outputStream);
-            return context;
+            mergeFiles(filesToMerge, output);
         } finally {
-            outputStream.close();
             cleanup(filesToMerge);
         }
+
+        LOG.info(
+                "Compressed Total store size: {} Mb",
+                new DecimalFormat("#,##0.0").format(outputFile.length() / 
(1024 * 1024)));
+        return context;
     }
 
     private File buildIndex(int keyLength) throws IOException {
@@ -377,7 +380,7 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
     }
 
     // Merge files to the provided fileChannel
-    private void mergeFiles(List<File> inputFiles, OutputStream outputStream) 
throws IOException {
+    private void mergeFiles(List<File> inputFiles, PageFileOutput output) 
throws IOException {
         long startTime = System.nanoTime();
 
         // Merge files
@@ -391,7 +394,7 @@ public class HashLookupStoreWriter implements 
LookupStoreWriter {
                     byte[] buffer = new byte[8192];
                     int length;
                     while ((length = bufferedInputStream.read(buffer)) > 0) {
-                        outputStream.write(buffer, 0, length);
+                        output.write(buffer, 0, length);
                     }
                 } finally {
                     bufferedInputStream.close();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
index d6ce54bf3..e2abc361d 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/utils/FileBasedBloomFilter.java
@@ -21,18 +21,18 @@
 package org.apache.paimon.utils;
 
 import org.apache.paimon.annotation.VisibleForTesting;
+import org.apache.paimon.io.PageFileInput;
+import org.apache.paimon.io.cache.CacheKey;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.MemorySegment;
 
-import java.io.RandomAccessFile;
-
 import static org.apache.paimon.io.cache.CacheManager.REFRESH_COUNT;
 import static org.apache.paimon.utils.Preconditions.checkArgument;
 
 /** Util to apply a built bloom filter . */
 public class FileBasedBloomFilter {
 
-    private final RandomAccessFile file;
+    private final PageFileInput input;
     private final CacheManager cacheManager;
     private final BloomFilter filter;
     private final long readOffset;
@@ -41,12 +41,12 @@ public class FileBasedBloomFilter {
     private int accessCount;
 
     public FileBasedBloomFilter(
-            RandomAccessFile file,
+            PageFileInput input,
             CacheManager cacheManager,
             long expectedEntries,
             long readOffset,
             int readLength) {
-        this.file = file;
+        this.input = input;
         this.cacheManager = cacheManager;
         checkArgument(expectedEntries >= 0);
         this.filter = new BloomFilter(expectedEntries, readLength);
@@ -62,10 +62,9 @@ public class FileBasedBloomFilter {
         if (accessCount == REFRESH_COUNT || filter.getMemorySegment() == null) 
{
             MemorySegment segment =
                     cacheManager.getPage(
-                            file,
-                            readOffset,
-                            readLength,
-                            (position, length) -> filter.unsetMemorySegment());
+                            CacheKey.forPosition(input.file(), readOffset, 
readLength),
+                            key -> input.readPosition(readOffset, readLength),
+                            key -> filter.unsetMemorySegment());
             filter.setMemorySegment(segment, 0);
             accessCount = 0;
         }
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java
new file mode 100644
index 000000000..42666a878
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/io/CompressedInputOutputTest.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.io;
+
+import org.apache.paimon.compression.Lz4BlockCompressionFactory;
+import org.apache.paimon.utils.MathUtils;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.RandomAccessFile;
+import java.nio.file.Path;
+import java.util.concurrent.ThreadLocalRandom;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link CompressedPageFileOutput} and {@link 
CompressedPageFileInput}. */
+public class CompressedInputOutputTest {
+
+    @TempDir Path tempDir;
+
+    @Test
+    public void testRandom() throws IOException {
+        for (int i = 0; i < 100; i++) {
+            innerTestRandom();
+        }
+    }
+
+    @SuppressWarnings("ResultOfMethodCallIgnored")
+    private void innerTestRandom() throws IOException {
+        ThreadLocalRandom rnd = ThreadLocalRandom.current();
+        byte[] bytes = new byte[rnd.nextInt(1_000) + 1];
+        rnd.nextBytes(bytes);
+        int pageSize = MathUtils.roundDownToPowerOf2(rnd.nextInt(2_000) + 2);
+
+        // prepare compressed file
+        File compressed = new File(tempDir.toFile(), "compressed");
+        compressed.delete();
+        Lz4BlockCompressionFactory compressionFactory = new 
Lz4BlockCompressionFactory();
+        CompressedPageFileOutput output1 =
+                new CompressedPageFileOutput(compressed, pageSize, 
compressionFactory);
+        long uncompressBytes;
+        long[] pagePositions;
+        try {
+            output1.write(bytes, 0, bytes.length);
+        } finally {
+            output1.close();
+            uncompressBytes = output1.uncompressBytes();
+            pagePositions = output1.pages();
+        }
+        CompressedPageFileInput input1 =
+                new CompressedPageFileInput(
+                        new RandomAccessFile(compressed, "r"),
+                        pageSize,
+                        compressionFactory,
+                        uncompressBytes,
+                        pagePositions);
+
+        // prepare uncompressed file
+        File uncompressed = new File(tempDir.toFile(), "uncompressed");
+        uncompressed.delete();
+        try (UncompressedPageFileOutput output2 = new 
UncompressedPageFileOutput(uncompressed)) {
+            output2.write(bytes, 0, bytes.length);
+        }
+        UncompressedPageFileInput input2 =
+                new UncompressedPageFileInput(new 
RandomAccessFile(uncompressed, "r"), pageSize);
+
+        // test uncompressBytes
+        
assertThat(input1.uncompressBytes()).isEqualTo(input2.uncompressBytes());
+
+        // test readPage
+        for (int i = 0; i < pagePositions.length; i++) {
+            assertThat(input1.readPage(i)).isEqualTo(input2.readPage(i));
+        }
+
+        // test readPosition
+        for (int i = 0; i < 10; i++) {
+            long position;
+            int length;
+            if (uncompressBytes == 1) {
+                position = 0;
+                length = 1;
+            } else {
+                position = rnd.nextLong(uncompressBytes - 1);
+                length = rnd.nextInt((int) (uncompressBytes - position)) + 1;
+            }
+            assertThat(input1.readPosition(position, length))
+                    .isEqualTo(input2.readPosition(position, length));
+        }
+
+        input1.close();
+        input2.close();
+    }
+}
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
index e1bfe16cf..a0be24d8f 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/io/cache/FileBasedRandomInputViewTest.java
@@ -18,6 +18,7 @@
 
 package org.apache.paimon.io.cache;
 
+import org.apache.paimon.io.PageFileInput;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.options.MemorySize;
 
@@ -66,7 +67,8 @@ public class FileBasedRandomInputViewTest {
 
         File file = writeFile(bytes);
         CacheManager cacheManager = new 
CacheManager(MemorySize.ofKibiBytes(128));
-        FileBasedRandomInputView view = new FileBasedRandomInputView(file, 
cacheManager, 1024);
+        FileBasedRandomInputView view =
+                new FileBasedRandomInputView(PageFileInput.create(file, 1024), 
cacheManager);
 
         // read first one
         // this assertThatCode check the ConcurrentModificationException is 
not threw.
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
index 9a71544c0..bf43680c7 100644
--- 
a/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
+++ 
b/paimon-common/src/test/java/org/apache/paimon/utils/FileBasedBloomFilterTest.java
@@ -20,6 +20,7 @@
 
 package org.apache.paimon.utils;
 
+import org.apache.paimon.io.PageFileInput;
 import org.apache.paimon.io.cache.CacheManager;
 import org.apache.paimon.memory.MemorySegment;
 import org.apache.paimon.options.MemorySize;
@@ -30,7 +31,6 @@ import org.junit.jupiter.api.io.TempDir;
 
 import java.io.File;
 import java.io.IOException;
-import java.io.RandomAccessFile;
 import java.nio.file.Files;
 import java.nio.file.Path;
 import java.nio.file.StandardOpenOption;
@@ -53,7 +53,7 @@ public class FileBasedBloomFilterTest {
         CacheManager cacheManager = new 
CacheManager(MemorySize.ofMebiBytes(1));
         FileBasedBloomFilter filter =
                 new FileBasedBloomFilter(
-                        new RandomAccessFile(file, "r"), cacheManager, 100, 0, 
1000);
+                        PageFileInput.create(file, 1000), cacheManager, 100, 
0, 1000);
 
         Arrays.stream(inputs)
                 .forEach(i -> 
Assertions.assertThat(filter.testHash(Integer.hashCode(i))).isTrue());

Reply via email to