This is an automated email from the ASF dual-hosted git repository.

lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git


The following commit(s) were added to refs/heads/master by this push:
     new 982e7b1db [core] Introduce a basic SortLookupStoreFactory (#3770)
982e7b1db is described below

commit 982e7b1dbf5c23999105e603b5748b8e586c2202
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 18 11:56:33 2024 +0800

    [core] Introduce a basic SortLookupStoreFactory (#3770)
---
 .../shortcodes/generated/core_configuration.html   |   6 +
 .../main/java/org/apache/paimon/CoreOptions.java   |  36 ++++
 .../paimon/compression/AirCompressorFactory.java   |  12 +-
 .../compression/BlockCompressionFactory.java       |  25 ++-
 ...ssionFactory.java => BlockCompressionType.java} |  31 ++-
 .../compression/Lz4BlockCompressionFactory.java    |   6 +
 .../compression/ZstdBlockCompressionFactory.java   |   5 +
 .../data/serializer/RowCompactedSerializer.java    |  63 +++++-
 .../apache/paimon/lookup/LookupStoreFactory.java   |  25 +++
 .../apache/paimon/lookup/LookupStoreReader.java    |   3 +
 .../sort/BlockAlignedType.java}                    |  29 ++-
 .../org/apache/paimon/lookup/sort/BlockEntry.java  |  78 ++++++++
 .../org/apache/paimon/lookup/sort/BlockHandle.java |  98 ++++++++++
 .../apache/paimon/lookup/sort/BlockIterator.java   | 112 +++++++++++
 .../org/apache/paimon/lookup/sort/BlockReader.java |  87 +++++++++
 .../apache/paimon/lookup/sort/BlockTrailer.java    | 100 ++++++++++
 .../org/apache/paimon/lookup/sort/BlockWriter.java |  99 ++++++++++
 .../paimon/lookup/sort/BloomFilterHandle.java      |  68 +++++++
 .../java/org/apache/paimon/lookup/sort/Footer.java | 101 ++++++++++
 .../SortContext.java}                              |  20 +-
 .../paimon/lookup/sort/SortLookupStoreFactory.java |  62 ++++++
 .../paimon/lookup/sort/SortLookupStoreReader.java  | 157 +++++++++++++++
 .../paimon/lookup/sort/SortLookupStoreWriter.java  | 212 +++++++++++++++++++++
 .../java/org/apache/paimon/memory/MemorySlice.java | 130 +++++++++++++
 .../org/apache/paimon/memory/MemorySliceInput.java | 102 ++++++++++
 .../apache/paimon/memory/MemorySliceOutput.java    | 110 +++++++++++
 .../java/org/apache/paimon/utils/IntArrayList.java |   4 +
 .../lookup/sort/SortLookupStoreFactoryTest.java    |  85 +++++++++
 .../paimon/operation/KeyValueFileStoreWrite.java   |  14 +-
 .../apache/paimon/table/query/LocalTableQuery.java |  18 +-
 .../paimon/table/PrimaryKeyFileStoreTableTest.java |  13 ++
 31 files changed, 1866 insertions(+), 45 deletions(-)

diff --git a/docs/layouts/shortcodes/generated/core_configuration.html 
b/docs/layouts/shortcodes/generated/core_configuration.html
index 19a73cc5d..93bff9692 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -387,6 +387,12 @@ Mainly to resolve data skew on primary keys. We recommend 
starting with 64 mb wh
             <td>Float</td>
             <td>The index load factor for lookup.</td>
         </tr>
+        <tr>
+            <td><h5>lookup.local-file-type</h5></td>
+            <td style="word-wrap: break-word;">hash</td>
+            <td><p>Enum</p></td>
+            <td>The local file type for lookup.<br /><br />Possible 
values:<ul><li>"sort": Construct a sorted file for lookup.</li><li>"hash": 
Construct a hash file for lookup.</li></ul></td>
+        </tr>
         <tr>
             <td><h5>manifest.compression</h5></td>
             <td style="word-wrap: break-word;">"zstd"</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java 
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 9bb6ab91f..5b2f8aa77 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -809,6 +809,12 @@ public class CoreOptions implements Serializable {
                     .withDescription(
                             "Define partition by table options, cannot define 
partition on DDL and table options at the same time.");
 
+    public static final ConfigOption<LookupLocalFileType> 
LOOKUP_LOCAL_FILE_TYPE =
+            key("lookup.local-file-type")
+                    .enumType(LookupLocalFileType.class)
+                    .defaultValue(LookupLocalFileType.HASH)
+                    .withDescription("The local file type for lookup.");
+
     public static final ConfigOption<Float> LOOKUP_HASH_LOAD_FACTOR =
             key("lookup.hash-load-factor")
                     .floatType()
@@ -1624,6 +1630,10 @@ public class CoreOptions implements Serializable {
         return (int) options.get(CACHE_PAGE_SIZE).getBytes();
     }
 
+    public LookupLocalFileType lookupLocalFileType() {
+        return options.get(LOOKUP_LOCAL_FILE_TYPE);
+    }
+
     public MemorySize lookupCacheMaxMemory() {
         return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
     }
@@ -2616,4 +2626,30 @@ public class CoreOptions implements Serializable {
             return text(description);
         }
     }
+
+    /** Specifies the local file type for lookup. */
+    public enum LookupLocalFileType implements DescribedEnum {
+        SORT("sort", "Construct a sorted file for lookup."),
+
+        HASH("hash", "Construct a hash file for lookup.");
+
+        private final String value;
+
+        private final String description;
+
+        LookupLocalFileType(String value, String description) {
+            this.value = value;
+            this.description = description;
+        }
+
+        @Override
+        public String toString() {
+            return value;
+        }
+
+        @Override
+        public InlineElement getDescription() {
+            return text(description);
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/AirCompressorFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/AirCompressorFactory.java
index f22c9aad8..5caa78a12 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/AirCompressorFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/AirCompressorFactory.java
@@ -24,14 +24,24 @@ import io.airlift.compress.Decompressor;
 /** Implementation of {@link BlockCompressionFactory} for airlift compressors. 
*/
 public class AirCompressorFactory implements BlockCompressionFactory {
 
+    private final BlockCompressionType type;
     private final Compressor internalCompressor;
     private final Decompressor internalDecompressor;
 
-    public AirCompressorFactory(Compressor internalCompressor, Decompressor 
internalDecompressor) {
+    public AirCompressorFactory(
+            BlockCompressionType type,
+            Compressor internalCompressor,
+            Decompressor internalDecompressor) {
+        this.type = type;
         this.internalCompressor = internalCompressor;
         this.internalDecompressor = internalDecompressor;
     }
 
+    @Override
+    public BlockCompressionType getCompressionType() {
+        return type;
+    }
+
     @Override
     public BlockCompressor getCompressor() {
         return new AirBlockCompressor(internalCompressor);
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
index 90ef2002d..3bf81a6c9 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
@@ -29,6 +29,8 @@ import javax.annotation.Nullable;
  */
 public interface BlockCompressionFactory {
 
+    BlockCompressionType getCompressionType();
+
     BlockCompressor getCompressor();
 
     BlockDecompressor getDecompressor();
@@ -39,12 +41,31 @@ public interface BlockCompressionFactory {
         switch (compression.toUpperCase()) {
             case "NONE":
                 return null;
+            case "ZSTD":
+                return new ZstdBlockCompressionFactory();
             case "LZ4":
                 return new Lz4BlockCompressionFactory();
             case "LZO":
-                return new AirCompressorFactory(new LzoCompressor(), new 
LzoDecompressor());
-            case "ZSTD":
+                return new AirCompressorFactory(
+                        BlockCompressionType.LZO, new LzoCompressor(), new 
LzoDecompressor());
+            default:
+                throw new IllegalStateException("Unknown CompressionMethod " + 
compression);
+        }
+    }
+
+    /** Creates {@link BlockCompressionFactory} according to the {@link 
BlockCompressionType}. */
+    @Nullable
+    static BlockCompressionFactory create(BlockCompressionType compression) {
+        switch (compression) {
+            case NONE:
+                return null;
+            case ZSTD:
                 return new ZstdBlockCompressionFactory();
+            case LZ4:
+                return new Lz4BlockCompressionFactory();
+            case LZO:
+                return new AirCompressorFactory(
+                        BlockCompressionType.LZO, new LzoCompressor(), new 
LzoDecompressor());
             default:
                 throw new IllegalStateException("Unknown CompressionMethod " + 
compression);
         }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java
similarity index 55%
copy from 
paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
copy to 
paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java
index 5e97cb3e5..51e9f8340 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java
@@ -18,16 +18,31 @@
 
 package org.apache.paimon.compression;
 
-/** Implementation of {@link BlockCompressionFactory} for zstd codec. */
-public class ZstdBlockCompressionFactory implements BlockCompressionFactory {
+/** Block Compression type. */
+public enum BlockCompressionType {
+    NONE(0),
+    ZSTD(1),
+    LZ4(2),
+    LZO(3);
 
-    @Override
-    public BlockCompressor getCompressor() {
-        return new ZstdBlockCompressor();
+    private final int persistentId;
+
+    BlockCompressionType(int persistentId) {
+        this.persistentId = persistentId;
+    }
+
+    public int persistentId() {
+        return this.persistentId;
     }
 
-    @Override
-    public BlockDecompressor getDecompressor() {
-        return new ZstdBlockDecompressor();
+    public static BlockCompressionType getCompressionTypeByPersistentId(int 
persistentId) {
+        BlockCompressionType[] types = values();
+        for (BlockCompressionType type : types) {
+            if (type.persistentId == persistentId) {
+                return type;
+            }
+        }
+
+        throw new IllegalArgumentException("Unknown persistentId " + 
persistentId);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/Lz4BlockCompressionFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/Lz4BlockCompressionFactory.java
index 91be9e916..1e2c42d66 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/Lz4BlockCompressionFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/Lz4BlockCompressionFactory.java
@@ -20,6 +20,12 @@ package org.apache.paimon.compression;
 
 /** Implementation of {@link BlockCompressionFactory} for Lz4 codec. */
 public class Lz4BlockCompressionFactory implements BlockCompressionFactory {
+
+    @Override
+    public BlockCompressionType getCompressionType() {
+        return BlockCompressionType.LZ4;
+    }
+
     @Override
     public BlockCompressor getCompressor() {
         return new Lz4BlockCompressor();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
index 5e97cb3e5..13745e6ee 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
@@ -21,6 +21,11 @@ package org.apache.paimon.compression;
 /** Implementation of {@link BlockCompressionFactory} for zstd codec. */
 public class ZstdBlockCompressionFactory implements BlockCompressionFactory {
 
+    @Override
+    public BlockCompressionType getCompressionType() {
+        return BlockCompressionType.ZSTD;
+    }
+
     @Override
     public BlockCompressor getCompressor() {
         return new ZstdBlockCompressor();
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
index d39dc8c95..91c1c1c7c 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
@@ -32,6 +32,7 @@ import org.apache.paimon.data.Timestamp;
 import org.apache.paimon.io.DataInputView;
 import org.apache.paimon.io.DataOutputView;
 import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.types.DataType;
 import org.apache.paimon.types.RowKind;
 import org.apache.paimon.types.RowType;
@@ -42,6 +43,7 @@ import javax.annotation.Nullable;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.Arrays;
+import java.util.Comparator;
 import java.util.Objects;
 
 import static org.apache.paimon.data.BinaryRow.HEADER_SIZE_IN_BITS;
@@ -160,6 +162,10 @@ public class RowCompactedSerializer implements 
Serializer<InternalRow> {
         return row;
     }
 
+    public Comparator<MemorySlice> createSliceComparator() {
+        return new SliceComparator(rowType);
+    }
+
     private static FieldWriter createFieldWriter(DataType fieldType) {
         final FieldWriter fieldWriter;
         switch (fieldType.getTypeRoot()) {
@@ -515,6 +521,7 @@ public class RowCompactedSerializer implements 
Serializer<InternalRow> {
 
         private MemorySegment segment;
         private MemorySegment[] segments;
+        private int offset;
         private int position;
 
         private RowReader(int headerSizeInBytes) {
@@ -522,17 +529,22 @@ public class RowCompactedSerializer implements 
Serializer<InternalRow> {
         }
 
         private void pointTo(byte[] bytes) {
-            this.segment = MemorySegment.wrap(bytes);
+            pointTo(MemorySegment.wrap(bytes), 0);
+        }
+
+        private void pointTo(MemorySegment segment, int offset) {
+            this.segment = segment;
             this.segments = new MemorySegment[] {segment};
-            this.position = headerSizeInBytes;
+            this.offset = offset;
+            this.position = offset + headerSizeInBytes;
         }
 
         private RowKind readRowKind() {
-            return RowKind.fromByteValue(segment.get(0));
+            return RowKind.fromByteValue(segment.get(offset));
         }
 
         private boolean isNullAt(int pos) {
-            return bitGet(segment, 0, pos + HEADER_SIZE_IN_BITS);
+            return bitGet(segment, offset, pos + HEADER_SIZE_IN_BITS);
         }
 
         private boolean readBoolean() {
@@ -635,4 +647,47 @@ public class RowCompactedSerializer implements 
Serializer<InternalRow> {
             return serializer.deserialize(bytes);
         }
     }
+
+    private static class SliceComparator implements Comparator<MemorySlice> {
+
+        private final RowReader reader1;
+        private final RowReader reader2;
+        private final FieldReader[] fieldReaders;
+
+        public SliceComparator(RowType rowType) {
+            this.reader1 = new RowReader(rowType.getFieldCount());
+            this.reader2 = new RowReader(rowType.getFieldCount());
+            this.fieldReaders = new FieldReader[rowType.getFieldCount()];
+            for (int i = 0; i < rowType.getFieldCount(); i++) {
+                fieldReaders[i] = createFieldReader(rowType.getTypeAt(i));
+            }
+        }
+
+        @Override
+        public int compare(MemorySlice slice1, MemorySlice slice2) {
+            reader1.pointTo(slice1.segment(), slice1.offset());
+            reader2.pointTo(slice2.segment(), slice2.offset());
+            for (int i = 0; i < fieldReaders.length; i++) {
+                boolean isNull1 = reader1.isNullAt(i);
+                boolean isNull2 = reader2.isNullAt(i);
+                if (!isNull1 || !isNull2) {
+                    if (isNull1) {
+                        return -1;
+                    } else if (isNull2) {
+                        return 1;
+                    } else {
+                        FieldReader fieldReader = fieldReaders[i];
+                        Object o1 = fieldReader.readField(reader1, i);
+                        Object o2 = fieldReader.readField(reader2, i);
+                        @SuppressWarnings({"unchecked", "rawtypes"})
+                        int comp = ((Comparable) o1).compareTo(o2);
+                        if (comp != 0) {
+                            return comp;
+                        }
+                    }
+                }
+            }
+            return 0;
+        }
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
index c22b8605a..2d57cf192 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
@@ -19,6 +19,10 @@
 package org.apache.paimon.lookup;
 
 import org.apache.paimon.CoreOptions;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
+import org.apache.paimon.memory.MemorySlice;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.utils.BloomFilter;
 
@@ -26,6 +30,7 @@ import javax.annotation.Nullable;
 
 import java.io.File;
 import java.io.IOException;
+import java.util.Comparator;
 import java.util.function.Function;
 
 /**
@@ -59,6 +64,26 @@ public interface LookupStoreFactory {
         return bfGenerator;
     }
 
+    static LookupStoreFactory create(
+            CoreOptions options, CacheManager cacheManager, 
Comparator<MemorySlice> keyComparator) {
+        String compression =
+                
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION);
+        switch (options.lookupLocalFileType()) {
+            case SORT:
+                return new SortLookupStoreFactory(
+                        keyComparator, cacheManager, options.cachePageSize(), 
compression);
+            case HASH:
+                return new HashLookupStoreFactory(
+                        cacheManager,
+                        options.cachePageSize(),
+                        
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
+                        compression);
+            default:
+                throw new IllegalArgumentException(
+                        "Unsupported lookup local file type: " + 
options.lookupLocalFileType());
+        }
+    }
+
     /** Context between writer and reader. */
     interface Context {}
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
index c867e2053..6b4ca0159 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
@@ -18,6 +18,8 @@
 
 package org.apache.paimon.lookup;
 
+import javax.annotation.Nullable;
+
 import java.io.Closeable;
 import java.io.IOException;
 
@@ -25,5 +27,6 @@ import java.io.IOException;
 public interface LookupStoreReader extends Closeable {
 
     /** Lookup value by key. */
+    @Nullable
     byte[] lookup(byte[] key) throws IOException;
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
similarity index 59%
copy from 
paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
copy to 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
index 5e97cb3e5..e5849d9f7 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
@@ -16,18 +16,29 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.compression;
+package org.apache.paimon.lookup.sort;
 
-/** Implementation of {@link BlockCompressionFactory} for zstd codec. */
-public class ZstdBlockCompressionFactory implements BlockCompressionFactory {
+/** Aligned type for block. */
+public enum BlockAlignedType {
+    ALIGNED((byte) 0),
+    UNALIGNED((byte) 1);
 
-    @Override
-    public BlockCompressor getCompressor() {
-        return new ZstdBlockCompressor();
+    private final byte b;
+
+    BlockAlignedType(byte b) {
+        this.b = b;
+    }
+
+    public byte toByte() {
+        return b;
     }
 
-    @Override
-    public BlockDecompressor getDecompressor() {
-        return new ZstdBlockDecompressor();
+    public static BlockAlignedType fromByte(byte b) {
+        for (BlockAlignedType type : BlockAlignedType.values()) {
+            if (type.toByte() == b) {
+                return type;
+            }
+        }
+        throw new IllegalStateException("Illegal block aligned type: " + b);
     }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
new file mode 100644
index 000000000..4473886e3
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+
+import java.util.Map.Entry;
+
+import static java.util.Objects.requireNonNull;
+
+/** Entry represents a key value. */
+public class BlockEntry implements Entry<MemorySlice, MemorySlice> {
+
+    private final MemorySlice key;
+    private final MemorySlice value;
+
+    public BlockEntry(MemorySlice key, MemorySlice value) {
+        requireNonNull(key, "key is null");
+        requireNonNull(value, "value is null");
+        this.key = key;
+        this.value = value;
+    }
+
+    @Override
+    public MemorySlice getKey() {
+        return key;
+    }
+
+    @Override
+    public MemorySlice getValue() {
+        return value;
+    }
+
+    @Override
+    public final MemorySlice setValue(MemorySlice value) {
+        throw new UnsupportedOperationException();
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        BlockEntry entry = (BlockEntry) o;
+
+        if (!key.equals(entry.key)) {
+            return false;
+        }
+        return value.equals(entry.value);
+    }
+
+    @Override
+    public int hashCode() {
+        int result = key.hashCode();
+        result = 31 * result + value.hashCode();
+        return result;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
new file mode 100644
index 000000000..737f57a8b
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+
+/** Handle for a block. */
+public class BlockHandle {
+    public static final int MAX_ENCODED_LENGTH = 9 + 5;
+
+    private final long offset;
+    private final int size;
+
+    BlockHandle(long offset, int size) {
+        this.offset = offset;
+        this.size = size;
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public int getFullBlockSize() {
+        return size + BlockTrailer.ENCODED_LENGTH;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        BlockHandle that = (BlockHandle) o;
+
+        if (size != that.size) {
+            return false;
+        }
+        if (offset != that.offset) {
+            return false;
+        }
+
+        return true;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = Long.hashCode(offset);
+        result = 31 * result + size;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "BlockHandle" + "{offset=" + offset + ", size=" + size + '}';
+    }
+
+    public static BlockHandle readBlockHandle(MemorySliceInput sliceInput) {
+        long offset = sliceInput.readVarLenLong();
+        int size = sliceInput.readVarLenInt();
+        return new BlockHandle(offset, size);
+    }
+
+    public static MemorySlice writeBlockHandle(BlockHandle blockHandle) {
+        MemorySliceOutput sliceOutput = new 
MemorySliceOutput(MAX_ENCODED_LENGTH);
+        writeBlockHandleTo(blockHandle, sliceOutput);
+        return sliceOutput.toSlice();
+    }
+
+    public static void writeBlockHandleTo(BlockHandle blockHandle, 
MemorySliceOutput sliceOutput) {
+        sliceOutput.writeVarLenLong(blockHandle.offset);
+        sliceOutput.writeVarLenInt(blockHandle.size);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
new file mode 100644
index 000000000..c5647dc50
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static java.util.Objects.requireNonNull;
+
+/** An {@link Iterator} for a block. */
+public abstract class BlockIterator implements Iterator<Map.Entry<MemorySlice, 
MemorySlice>> {
+
+    protected final MemorySliceInput data;
+
+    private final int recordCount;
+    private final Comparator<MemorySlice> comparator;
+
+    private BlockEntry polled;
+
+    public BlockIterator(
+            MemorySliceInput data, int recordCount, Comparator<MemorySlice> 
comparator) {
+        this.data = data;
+        this.recordCount = recordCount;
+        this.comparator = comparator;
+    }
+
+    @Override
+    public boolean hasNext() {
+        return polled != null || data.isReadable();
+    }
+
+    @Override
+    public BlockEntry next() {
+        if (!hasNext()) {
+            throw new NoSuchElementException();
+        }
+
+        if (polled != null) {
+            BlockEntry result = polled;
+            polled = null;
+            return result;
+        }
+
+        return readEntry();
+    }
+
+    @Override
+    public void remove() {
+        throw new UnsupportedOperationException();
+    }
+
+    public boolean seekTo(MemorySlice targetKey) {
+        int left = 0;
+        int right = recordCount - 1;
+
+        while (left <= right) {
+            int mid = left + (right - left) / 2;
+
+            seekTo(mid);
+            BlockEntry midEntry = readEntry();
+            int compare = comparator.compare(midEntry.getKey(), targetKey);
+
+            if (compare == 0) {
+                polled = midEntry;
+                return true;
+            } else if (compare > 0) {
+                polled = midEntry;
+                right = mid - 1;
+            } else {
+                left = mid + 1;
+            }
+        }
+
+        return false;
+    }
+
+    public abstract void seekTo(int record);
+
+    private BlockEntry readEntry() {
+        requireNonNull(data, "data is null");
+
+        int keyLength;
+        keyLength = data.readVarLenInt();
+        MemorySlice key = data.readSlice(keyLength);
+
+        int valueLength = data.readVarLenInt();
+        MemorySlice value = data.readSlice(valueLength);
+
+        return new BlockEntry(key, value);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
new file mode 100644
index 000000000..544a3d42b
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+
+import java.util.Comparator;
+
+import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED;
+
+/** Reader for a block. */
+public class BlockReader {
+    private final MemorySlice block;
+    private final Comparator<MemorySlice> comparator;
+
+    public BlockReader(MemorySlice block, Comparator<MemorySlice> comparator) {
+        this.block = block;
+        this.comparator = comparator;
+    }
+
+    public long size() {
+        return block.length();
+    }
+
+    public BlockIterator iterator() {
+        BlockAlignedType alignedType =
+                BlockAlignedType.fromByte(block.readByte(block.length() - 1));
+        int intValue = block.readInt(block.length() - 5);
+        if (alignedType == ALIGNED) {
+            return new AlignedIterator(block.slice(0, block.length() - 5), 
intValue, comparator);
+        } else {
+            int indexLength = intValue * 4;
+            int indexOffset = block.length() - 5 - indexLength;
+            MemorySlice data = block.slice(0, indexOffset);
+            MemorySlice index = block.slice(indexOffset, indexLength);
+            return new UnalignedIterator(data, index, comparator);
+        }
+    }
+
+    private static class AlignedIterator extends BlockIterator {
+
+        private final int recordSize;
+
+        public AlignedIterator(
+                MemorySlice data, int recordSize, Comparator<MemorySlice> 
comparator) {
+            super(data.toInput(), data.length() / recordSize, comparator);
+            this.recordSize = recordSize;
+        }
+
+        @Override
+        public void seekTo(int record) {
+            data.setPosition(record * recordSize);
+        }
+    }
+
+    private static class UnalignedIterator extends BlockIterator {
+
+        private final MemorySlice index;
+
+        public UnalignedIterator(
+                MemorySlice data, MemorySlice index, Comparator<MemorySlice> 
comparator) {
+            super(data.toInput(), index.length() / 4, comparator);
+            this.index = index;
+        }
+
+        @Override
+        public void seekTo(int record) {
+            data.setPosition(index.readInt(record * 4));
+        }
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
new file mode 100644
index 000000000..6d49bd9cc
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.compression.BlockCompressionType;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+
+import static java.util.Objects.requireNonNull;
+
+/** Trailer of a block. */
+public class BlockTrailer {
+    public static final int ENCODED_LENGTH = 5;
+
+    private final BlockCompressionType compressionType;
+    private final int crc32c;
+
+    public BlockTrailer(BlockCompressionType compressionType, int crc32c) {
+        requireNonNull(compressionType, "compressionType is null");
+
+        this.compressionType = compressionType;
+        this.crc32c = crc32c;
+    }
+
+    public BlockCompressionType getCompressionType() {
+        return compressionType;
+    }
+
+    public int getCrc32c() {
+        return crc32c;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        BlockTrailer that = (BlockTrailer) o;
+        if (crc32c != that.crc32c) {
+            return false;
+        }
+        return compressionType == that.compressionType;
+    }
+
+    @Override
+    public int hashCode() {
+        int result = compressionType.hashCode();
+        result = 31 * result + crc32c;
+        return result;
+    }
+
+    @Override
+    public String toString() {
+        return "BlockTrailer"
+                + "{compressionType="
+                + compressionType
+                + ", crc32c=0x"
+                + Integer.toHexString(crc32c)
+                + '}';
+    }
+
+    public static BlockTrailer readBlockTrailer(MemorySliceInput input) {
+        BlockCompressionType compressionType =
+                
BlockCompressionType.getCompressionTypeByPersistentId(input.readUnsignedByte());
+        int crc32c = input.readInt();
+        return new BlockTrailer(compressionType, crc32c);
+    }
+
+    public static MemorySlice writeBlockTrailer(BlockTrailer blockTrailer) {
+        MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH);
+        writeBlockTrailer(blockTrailer, output);
+        return output.toSlice();
+    }
+
+    public static void writeBlockTrailer(BlockTrailer blockTrailer, 
MemorySliceOutput sliceOutput) {
+        
sliceOutput.writeByte(blockTrailer.getCompressionType().persistentId());
+        sliceOutput.writeInt(blockTrailer.getCrc32c());
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
new file mode 100644
index 000000000..d4f48e9c7
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.utils.IntArrayList;
+
+import java.io.IOException;
+
+import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED;
+import static org.apache.paimon.lookup.sort.BlockAlignedType.UNALIGNED;
+
+/** Writer to build a Block. */
+public class BlockWriter {
+
+    private final IntArrayList positions;
+    private final MemorySliceOutput block;
+
+    private int alignedSize;
+    private boolean aligned;
+
+    public BlockWriter(int blockSize) {
+        this.positions = new IntArrayList(32);
+        this.block = new MemorySliceOutput(blockSize + 128);
+        this.alignedSize = 0;
+        this.aligned = true;
+    }
+
+    public void reset() {
+        this.positions.clear();
+        this.block.reset();
+        this.alignedSize = 0;
+        this.aligned = true;
+    }
+
+    public void add(byte[] key, byte[] value) {
+        int startPosition = block.size();
+        block.writeVarLenInt(key.length);
+        block.writeBytes(key);
+        block.writeVarLenInt(value.length);
+        block.writeBytes(value);
+        int endPosition = block.size();
+
+        positions.add(startPosition);
+        if (aligned) {
+            int currentSize = endPosition - startPosition;
+            if (alignedSize == 0) {
+                alignedSize = currentSize;
+            } else {
+                aligned = alignedSize == currentSize;
+            }
+        }
+    }
+
+    public int size() {
+        return positions.size();
+    }
+
+    public int memory() {
+        int memory = block.size() + 5;
+        if (!aligned) {
+            memory += positions.size() * 4;
+        }
+        return memory;
+    }
+
+    public MemorySlice finish() throws IOException {
+        if (positions.isEmpty()) {
+            throw new IllegalStateException();
+        }
+        if (aligned) {
+            block.writeInt(alignedSize);
+        } else {
+            for (int i = 0; i < positions.size(); i++) {
+                block.writeInt(positions.get(i));
+            }
+            block.writeInt(positions.size());
+        }
+        block.writeByte(aligned ? ALIGNED.toByte() : UNALIGNED.toByte());
+        return block.toSlice();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
new file mode 100644
index 000000000..7ec6a845c
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import java.util.Objects;
+
+/** Handle for bloom filter. */
+public class BloomFilterHandle {
+
+    public static final int MAX_ENCODED_LENGTH = 9 + 5 + 9;
+
+    private final long offset;
+    private final int size;
+    private final long expectedEntries;
+
+    BloomFilterHandle(long offset, int size, long expectedEntries) {
+        this.offset = offset;
+        this.size = size;
+        this.expectedEntries = expectedEntries;
+    }
+
+    public long offset() {
+        return offset;
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public long expectedEntries() {
+        return expectedEntries;
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+        BloomFilterHandle that = (BloomFilterHandle) o;
+        return offset == that.offset
+                && size == that.size
+                && expectedEntries == that.expectedEntries;
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(offset, size, expectedEntries);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
new file mode 100644
index 000000000..b8ae51789
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.paimon.lookup.sort.SortLookupStoreWriter.MAGIC_NUMBER;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Footer for a sorted file. */
+public class Footer {
+
+    public static final int ENCODED_LENGTH = 36;
+
+    @Nullable private final BloomFilterHandle bloomFilterHandle;
+    private final BlockHandle indexBlockHandle;
+
+    Footer(@Nullable BloomFilterHandle bloomFilterHandle, BlockHandle 
indexBlockHandle) {
+        this.bloomFilterHandle = bloomFilterHandle;
+        this.indexBlockHandle = indexBlockHandle;
+    }
+
+    @Nullable
+    public BloomFilterHandle getBloomFilterHandle() {
+        return bloomFilterHandle;
+    }
+
+    public BlockHandle getIndexBlockHandle() {
+        return indexBlockHandle;
+    }
+
+    public static Footer readFooter(MemorySliceInput sliceInput) throws 
IOException {
+        // read bloom filter and index handles
+        @Nullable
+        BloomFilterHandle bloomFilterHandle =
+                new BloomFilterHandle(
+                        sliceInput.readLong(), sliceInput.readInt(), 
sliceInput.readLong());
+        if (bloomFilterHandle.offset() == 0
+                && bloomFilterHandle.size() == 0
+                && bloomFilterHandle.expectedEntries() == 0) {
+            bloomFilterHandle = null;
+        }
+        BlockHandle indexBlockHandle = new BlockHandle(sliceInput.readLong(), 
sliceInput.readInt());
+
+        // skip padding
+        sliceInput.setPosition(ENCODED_LENGTH - 4);
+
+        // verify magic number
+        int magicNumber = sliceInput.readInt();
+        checkArgument(magicNumber == MAGIC_NUMBER, "File is not a table (bad 
magic number)");
+
+        return new Footer(bloomFilterHandle, indexBlockHandle);
+    }
+
+    public static MemorySlice writeFooter(Footer footer) {
+        MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH);
+        writeFooter(footer, output);
+        return output.toSlice();
+    }
+
+    public static void writeFooter(Footer footer, MemorySliceOutput 
sliceOutput) {
+        // write bloom filter and index handles
+        if (footer.bloomFilterHandle == null) {
+            sliceOutput.writeLong(0);
+            sliceOutput.writeInt(0);
+            sliceOutput.writeLong(0);
+        } else {
+            sliceOutput.writeLong(footer.bloomFilterHandle.offset());
+            sliceOutput.writeInt(footer.bloomFilterHandle.size());
+            sliceOutput.writeLong(footer.bloomFilterHandle.expectedEntries());
+        }
+
+        sliceOutput.writeLong(footer.indexBlockHandle.offset());
+        sliceOutput.writeInt(footer.indexBlockHandle.size());
+
+        // write magic number
+        sliceOutput.writeInt(MAGIC_NUMBER);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
similarity index 69%
copy from 
paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
copy to 
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
index c867e2053..5aacb56bb 100644
--- 
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
@@ -16,14 +16,20 @@
  * limitations under the License.
  */
 
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.sort;
 
-import java.io.Closeable;
-import java.io.IOException;
+import org.apache.paimon.lookup.LookupStoreFactory.Context;
 
-/** Reader, lookup value by key bytes. */
-public interface LookupStoreReader extends Closeable {
+/** A {@link Context} for sort store. */
+public class SortContext implements Context {
 
-    /** Lookup value by key. */
-    byte[] lookup(byte[] key) throws IOException;
+    private final long fileSize;
+
+    public SortContext(long fileSize) {
+        this.fileSize = fileSize;
+    }
+
+    public long fileSize() {
+        return fileSize;
+    }
 }
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
new file mode 100644
index 000000000..50b784688
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.LookupStoreFactory;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.utils.BloomFilter;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Comparator;
+
+/** A {@link LookupStoreFactory} which uses hash to lookup records on disk. */
+public class SortLookupStoreFactory implements LookupStoreFactory {
+
+    private final Comparator<MemorySlice> comparator;
+    private final CacheManager cacheManager;
+    private final int blockSize;
+    @Nullable private final BlockCompressionFactory compressionFactory;
+
+    public SortLookupStoreFactory(
+            Comparator<MemorySlice> comparator,
+            CacheManager cacheManager,
+            int blockSize,
+            String compression) {
+        this.comparator = comparator;
+        this.cacheManager = cacheManager;
+        this.blockSize = blockSize;
+        this.compressionFactory = BlockCompressionFactory.create(compression);
+    }
+
+    @Override
+    public SortLookupStoreReader createReader(File file, Context context) 
throws IOException {
+        return new SortLookupStoreReader(comparator, file, (SortContext) 
context, cacheManager);
+    }
+
+    @Override
+    public SortLookupStoreWriter createWriter(File file, @Nullable 
BloomFilter.Builder bloomFilter)
+            throws IOException {
+        return new SortLookupStoreWriter(file, blockSize, bloomFilter, 
compressionFactory);
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
new file mode 100644
index 000000000..9b8c0f79a
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.BlockDecompressor;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.LookupStoreReader;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Comparator;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A {@link LookupStoreReader} for sort store.
+ *
+ * <p>TODO add block cache support.
+ *
+ * <p>TODO separate index cache and block cache.
+ */
+public class SortLookupStoreReader implements LookupStoreReader {
+
+    private final Comparator<MemorySlice> comparator;
+    private final FileChannel fileChannel;
+    private final long fileSize;
+
+    private final BlockIterator indexBlockIterator;
+
+    public SortLookupStoreReader(
+            Comparator<MemorySlice> comparator,
+            File file,
+            SortContext context,
+            CacheManager cacheManager)
+            throws IOException {
+        this.comparator = comparator;
+        //noinspection resource
+        this.fileChannel = new FileInputStream(file).getChannel();
+        this.fileSize = context.fileSize();
+
+        Footer footer = readFooter();
+        this.indexBlockIterator = 
readBlock(footer.getIndexBlockHandle()).iterator();
+        // TODO read bloom filter block
+    }
+
+    private Footer readFooter() throws IOException {
+        MemorySegment footerData = read(fileSize - Footer.ENCODED_LENGTH, 
Footer.ENCODED_LENGTH);
+        return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
+    }
+
+    @Nullable
+    @Override
+    public byte[] lookup(byte[] key) throws IOException {
+        MemorySlice keySlice = MemorySlice.wrap(key);
+        // seek the index to the block containing the key
+        indexBlockIterator.seekTo(keySlice);
+
+        // if indexIterator does not have a next, it means the key does not 
exist in this iterator
+        if (indexBlockIterator.hasNext()) {
+            // seek the current iterator to the key
+            BlockIterator current = getNextBlock();
+            if (current.seekTo(keySlice)) {
+                return current.next().getValue().copyBytes();
+            }
+        }
+        return null;
+    }
+
+    private BlockIterator getNextBlock() throws IOException {
+        MemorySlice blockHandle = indexBlockIterator.next().getValue();
+        BlockReader dataBlock = openBlock(blockHandle);
+        return dataBlock.iterator();
+    }
+
+    private BlockReader openBlock(MemorySlice blockEntry) throws IOException {
+        BlockHandle blockHandle = 
BlockHandle.readBlockHandle(blockEntry.toInput());
+        return readBlock(blockHandle);
+    }
+
+    private MemorySegment read(long offset, int length) throws IOException {
+        // TODO use cache
+        // TODO cache uncompressed block
+        // TODO separate index and data cache
+        byte[] buffer = new byte[length];
+        int read = fileChannel.read(ByteBuffer.wrap(buffer), offset);
+        if (read != length) {
+            throw new IOException("Could not read all the data");
+        }
+        return MemorySegment.wrap(buffer);
+    }
+
+    private BlockReader readBlock(BlockHandle blockHandle) throws IOException {
+        // read block trailer
+        MemorySegment trailerData =
+                read(blockHandle.offset() + blockHandle.size(), 
BlockTrailer.ENCODED_LENGTH);
+        BlockTrailer blockTrailer =
+                
BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput());
+
+        // TODO validate checksum
+
+        // decompress data
+
+        MemorySegment block = read(blockHandle.offset(), blockHandle.size());
+        MemorySlice uncompressedData;
+        BlockCompressionFactory compressionFactory =
+                
BlockCompressionFactory.create(blockTrailer.getCompressionType());
+        if (compressionFactory == null) {
+            uncompressedData = MemorySlice.wrap(block);
+        } else {
+            MemorySliceInput compressedInput = 
MemorySlice.wrap(block).toInput();
+            byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
+            BlockDecompressor decompressor = 
compressionFactory.getDecompressor();
+            int uncompressedLength =
+                    decompressor.decompress(
+                            block.getHeapMemory(),
+                            compressedInput.position(),
+                            compressedInput.available(),
+                            uncompressed,
+                            0);
+            checkArgument(uncompressedLength == uncompressed.length);
+            uncompressedData = MemorySlice.wrap(uncompressed);
+        }
+
+        return new BlockReader(uncompressedData, comparator);
+    }
+
+    @Override
+    public void close() throws IOException {
+        this.fileChannel.close();
+        // TODO clear cache too
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
new file mode 100644
index 000000000..1d2436e94
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.BlockCompressionType;
+import org.apache.paimon.compression.BlockCompressor;
+import org.apache.paimon.lookup.LookupStoreFactory;
+import org.apache.paimon.lookup.LookupStoreWriter;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.BloomFilter;
+import org.apache.paimon.utils.MurmurHashUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.zip.CRC32;
+
+import static org.apache.paimon.lookup.sort.BlockHandle.writeBlockHandle;
+import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes;
+import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
+
+/** A {@link LookupStoreWriter} for sorting. */
+public class SortLookupStoreWriter implements LookupStoreWriter {
+
+    private static final Logger LOG =
+            LoggerFactory.getLogger(SortLookupStoreWriter.class.getName());
+
+    public static final int MAGIC_NUMBER = 1481571681;
+
+    private final BufferedOutputStream fileOutputStream;
+    private final int blockSize;
+    private final BlockWriter dataBlockWriter;
+    private final BlockWriter indexBlockWriter;
+    @Nullable private final BloomFilter.Builder bloomFilter;
+    private final BlockCompressionType compressionType;
+    @Nullable private final BlockCompressor blockCompressor;
+
+    private byte[] lastKey;
+    private long position;
+
+    private long recordCount;
+    private long totalUncompressedSize;
+    private long totalCompressedSize;
+
+    SortLookupStoreWriter(
+            File file,
+            int blockSize,
+            @Nullable BloomFilter.Builder bloomFilter,
+            @Nullable BlockCompressionFactory compressionFactory)
+            throws IOException {
+        this.fileOutputStream = new 
BufferedOutputStream(Files.newOutputStream(file.toPath()));
+        this.blockSize = blockSize;
+        this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1));
+        int expectedNumberOfBlocks = 1024;
+        this.indexBlockWriter =
+                new BlockWriter(BlockHandle.MAX_ENCODED_LENGTH * 
expectedNumberOfBlocks);
+        this.bloomFilter = bloomFilter;
+        if (compressionFactory == null) {
+            this.compressionType = BlockCompressionType.NONE;
+            this.blockCompressor = null;
+        } else {
+            this.compressionType = compressionFactory.getCompressionType();
+            this.blockCompressor = compressionFactory.getCompressor();
+        }
+    }
+
+    @Override
+    public void put(byte[] key, byte[] value) throws IOException {
+        dataBlockWriter.add(key, value);
+        if (bloomFilter != null) {
+            bloomFilter.addHash(MurmurHashUtils.hashBytes(key));
+        }
+
+        lastKey = key;
+
+        if (dataBlockWriter.memory() > blockSize) {
+            flush();
+        }
+
+        recordCount++;
+    }
+
+    private void flush() throws IOException {
+        if (dataBlockWriter.size() == 0) {
+            return;
+        }
+
+        BlockHandle blockHandle = writeBlock(dataBlockWriter);
+        MemorySlice handleEncoding = writeBlockHandle(blockHandle);
+        indexBlockWriter.add(lastKey, handleEncoding.copyBytes());
+    }
+
+    private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException 
{
+        // close the block
+        MemorySlice block = blockWriter.finish();
+
+        totalUncompressedSize += block.length();
+
+        // attempt to compress the block
+        BlockCompressionType blockCompressionType = BlockCompressionType.NONE;
+        if (blockCompressor != null) {
+            int maxCompressedSize = 
blockCompressor.getMaxCompressedSize(block.length());
+            byte[] compressed = allocateReuseBytes(maxCompressedSize + 5);
+            int offset = encodeInt(compressed, 0, block.length());
+            int compressedSize =
+                    offset
+                            + blockCompressor.compress(
+                                    block.getHeapMemory(),
+                                    block.offset(),
+                                    block.length(),
+                                    compressed,
+                                    offset);
+
+            // Don't use the compressed data if compressed less than 12.5%,
+            if (compressedSize < block.length() - (block.length() / 8)) {
+                block = new MemorySlice(MemorySegment.wrap(compressed), 0, 
compressedSize);
+                blockCompressionType = this.compressionType;
+            }
+        }
+
+        totalCompressedSize += block.length();
+
+        // create block trailer
+        BlockTrailer blockTrailer =
+                new BlockTrailer(blockCompressionType, crc32c(block, 
blockCompressionType));
+        MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer);
+
+        // create a handle to this block
+        BlockHandle blockHandle = new BlockHandle(position, block.length());
+
+        // write data
+        writeSlice(block);
+
+        // write trailer: 5 bytes
+        writeSlice(trailer);
+
+        // clean up state
+        blockWriter.reset();
+
+        return blockHandle;
+    }
+
+    private static int crc32c(MemorySlice data, BlockCompressionType type) {
+        CRC32 crc = new CRC32();
+        crc.update(data.getHeapMemory(), data.offset(), data.length());
+        crc.update(type.persistentId() & 0xFF);
+        return (int) crc.getValue();
+    }
+
+    @Override
+    public LookupStoreFactory.Context close() throws IOException {
+        // flush current data block
+        flush();
+
+        LOG.info("Number of record: {}", recordCount);
+
+        // write bloom filter
+        @Nullable BloomFilterHandle bloomFilterHandle = null;
+        if (bloomFilter != null) {
+            MemorySegment buffer = bloomFilter.getBuffer();
+            bloomFilterHandle =
+                    new BloomFilterHandle(position, buffer.size(), 
bloomFilter.expectedEntries());
+            writeSlice(MemorySlice.wrap(buffer));
+            LOG.info("Bloom filter size: {} bytes", 
bloomFilter.getBuffer().size());
+        }
+
+        // write index block
+        BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);
+
+        // write footer
+        Footer footer = new Footer(bloomFilterHandle, indexBlockHandle);
+        MemorySlice footerEncoding = Footer.writeFooter(footer);
+        writeSlice(footerEncoding);
+
+        // close file
+        fileOutputStream.close();
+
+        LOG.info("totalUncompressedSize: {}", 
MemorySize.ofBytes(totalUncompressedSize));
+        LOG.info("totalCompressedSize: {}", 
MemorySize.ofBytes(totalCompressedSize));
+        return new SortContext(position);
+    }
+
+    private void writeSlice(MemorySlice slice) throws IOException {
+        fileOutputStream.write(slice.getHeapMemory(), slice.offset(), 
slice.length());
+        position += slice.length();
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
new file mode 100644
index 000000000..589e967d4
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+import org.apache.paimon.utils.MurmurHashUtils;
+
+/** Slice of a {@link MemorySegment}. */
+public final class MemorySlice implements Comparable<MemorySlice> {
+
+    private final MemorySegment segment;
+    private final int offset;
+    private final int length;
+
+    public MemorySlice(MemorySegment segment, int offset, int length) {
+        this.segment = segment;
+        this.offset = offset;
+        this.length = length;
+    }
+
+    public MemorySegment segment() {
+        return segment;
+    }
+
+    public int offset() {
+        return offset;
+    }
+
+    public int length() {
+        return length;
+    }
+
+    public MemorySlice slice(int index, int length) {
+        if (index == 0 && length == this.length) {
+            return this;
+        }
+
+        return new MemorySlice(segment, offset + index, length);
+    }
+
+    public byte readByte(int position) {
+        return segment.get(offset + position);
+    }
+
+    public int readInt(int position) {
+        return segment.getInt(offset + position);
+    }
+
+    public long readLong(int position) {
+        return segment.getLong(offset + position);
+    }
+
+    public byte[] getHeapMemory() {
+        return segment.getHeapMemory();
+    }
+
+    public byte[] copyBytes() {
+        byte[] bytes = new byte[length];
+        segment.get(offset, bytes, 0, length);
+        return bytes;
+    }
+
+    public MemorySliceInput toInput() {
+        return new MemorySliceInput(this);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (this == o) {
+            return true;
+        }
+        if (o == null || getClass() != o.getClass()) {
+            return false;
+        }
+
+        MemorySlice slice = (MemorySlice) o;
+
+        // do lengths match
+        if (length != slice.length) {
+            return false;
+        }
+
+        // if arrays have same base offset, some optimizations can be taken...
+        if (offset == slice.offset && segment == slice.segment) {
+            return true;
+        }
+        return segment.equalTo(slice.segment, offset, slice.offset, length);
+    }
+
+    @Override
+    public int hashCode() {
+        return MurmurHashUtils.hashBytes(segment, offset, length);
+    }
+
+    public static MemorySlice wrap(byte[] bytes) {
+        return new MemorySlice(MemorySegment.wrap(bytes), 0, bytes.length);
+    }
+
+    public static MemorySlice wrap(MemorySegment segment) {
+        return new MemorySlice(segment, 0, segment.size());
+    }
+
+    @Override
+    public int compareTo(MemorySlice other) {
+        int len = Math.min(length, other.length);
+        for (int i = 0; i < len; i++) {
+            int res =
+                    (segment.get(offset + i) & 0xFF) - 
(other.segment.get(other.offset + i) & 0xFF);
+            if (res != 0) {
+                return res;
+            }
+        }
+        return length - other.length;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java
new file mode 100644
index 000000000..4b9cd8c04
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+/** Input for {@link MemorySlice}. */
+public class MemorySliceInput {
+
+    private final MemorySlice slice;
+
+    private int position;
+
+    public MemorySliceInput(MemorySlice slice) {
+        this.slice = slice;
+    }
+
+    public int position() {
+        return position;
+    }
+
+    public void setPosition(int position) {
+        if (position < 0 || position > slice.length()) {
+            throw new IndexOutOfBoundsException();
+        }
+        this.position = position;
+    }
+
+    public boolean isReadable() {
+        return available() > 0;
+    }
+
+    public int available() {
+        return slice.length() - position;
+    }
+
+    public byte readByte() {
+        if (position == slice.length()) {
+            throw new IndexOutOfBoundsException();
+        }
+        return slice.readByte(position++);
+    }
+
+    public int readUnsignedByte() {
+        return (short) (readByte() & 0xFF);
+    }
+
+    public int readInt() {
+        int v = slice.readInt(position);
+        position += 4;
+        return v;
+    }
+
+    public int readVarLenInt() {
+        for (int offset = 0, result = 0; offset < 32; offset += 7) {
+            int b = readUnsignedByte();
+            result |= (b & 0x7F) << offset;
+            if ((b & 0x80) == 0) {
+                return result;
+            }
+        }
+        throw new Error("Malformed integer.");
+    }
+
+    public long readLong() {
+        long v = slice.readLong(position);
+        position += 8;
+        return v;
+    }
+
+    public long readVarLenLong() {
+        long result = 0;
+        for (int offset = 0; offset < 64; offset += 7) {
+            long b = readUnsignedByte();
+            result |= (b & 0x7F) << offset;
+            if ((b & 0x80) == 0) {
+                return result;
+            }
+        }
+        throw new Error("Malformed long.");
+    }
+
+    public MemorySlice readSlice(int length) {
+        MemorySlice newSlice = slice.slice(position, length);
+        position += length;
+        return newSlice;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
new file mode 100644
index 000000000..90e8e5490
--- /dev/null
+++ 
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+/** Output for {@link MemorySegment}. */
+public class MemorySliceOutput {
+
+    private MemorySegment segment;
+    private int size;
+
+    public MemorySliceOutput(int estimatedSize) {
+        this.segment = MemorySegment.wrap(new byte[estimatedSize]);
+    }
+
+    public int size() {
+        return size;
+    }
+
+    public MemorySlice toSlice() {
+        return new MemorySlice(segment, 0, size);
+    }
+
+    public void reset() {
+        size = 0;
+    }
+
+    public void writeByte(int value) {
+        ensureSize(size + 1);
+        segment.put(size++, (byte) value);
+    }
+
+    public void writeInt(int value) {
+        ensureSize(size + 4);
+        segment.putInt(size, value);
+        size += 4;
+    }
+
+    public void writeVarLenInt(int value) {
+        if (value < 0) {
+            throw new IllegalArgumentException("negative value: v=" + value);
+        }
+
+        while ((value & ~0x7F) != 0) {
+            writeByte(((value & 0x7F) | 0x80));
+            value >>>= 7;
+        }
+
+        writeByte((byte) value);
+    }
+
+    public void writeLong(long value) {
+        ensureSize(size + 8);
+        segment.putLong(size, value);
+        size += 8;
+    }
+
+    public void writeVarLenLong(long value) {
+        if (value < 0) {
+            throw new IllegalArgumentException("negative value: v=" + value);
+        }
+
+        while ((value & ~0x7FL) != 0) {
+            writeByte((((int) value & 0x7F) | 0x80));
+            value >>>= 7;
+        }
+        writeByte((byte) value);
+    }
+
+    public void writeBytes(byte[] source) {
+        writeBytes(source, 0, source.length);
+    }
+
+    public void writeBytes(byte[] source, int sourceIndex, int length) {
+        ensureSize(size + length);
+        segment.put(size, source, sourceIndex, length);
+        size += length;
+    }
+
+    private void ensureSize(int minWritableBytes) {
+        if (minWritableBytes <= segment.size()) {
+            return;
+        }
+
+        int newCapacity = segment.size();
+        int minNewCapacity = segment.size() + minWritableBytes;
+        while (newCapacity < minNewCapacity) {
+            newCapacity <<= 1;
+        }
+
+        MemorySegment newSegment = MemorySegment.wrap(new byte[newCapacity]);
+        segment.copyTo(0, newSegment, 0, segment.size());
+        segment = newSegment;
+    }
+}
diff --git 
a/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java 
b/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
index 575fae02a..ec6e1a362 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
@@ -73,6 +73,10 @@ public class IntArrayList {
         return Arrays.copyOf(array, size);
     }
 
+    public int get(int i) {
+        return array[i];
+    }
+
     public static final IntArrayList EMPTY =
             new IntArrayList(0) {
 
diff --git 
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
new file mode 100644
index 000000000..804b8ed09
--- /dev/null
+++ 
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.LookupStoreFactory.Context;
+import org.apache.paimon.options.MemorySize;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SortLookupStoreFactory}. */
+public class SortLookupStoreFactoryTest {
+
+    @TempDir Path tempDir;
+
+    private File file;
+
+    @BeforeEach
+    public void before() throws Exception {
+        file = new File(tempDir.toFile(), UUID.randomUUID().toString());
+        if (!file.createNewFile()) {
+            throw new IOException("Can not create file: " + file);
+        }
+    }
+
+    @Test
+    public void testNormal() throws IOException {
+        SortLookupStoreFactory factory =
+                new SortLookupStoreFactory(
+                        Comparator.naturalOrder(),
+                        new CacheManager(MemorySize.ofMebiBytes(1)),
+                        1024,
+                        "zstd");
+
+        SortLookupStoreWriter writer = factory.createWriter(file, null);
+        int valueCount = 10_000;
+        for (int i = 0; i < valueCount; i++) {
+            byte[] bytes = toBytes(i);
+            writer.put(bytes, bytes);
+        }
+        Context context = writer.close();
+
+        SortLookupStoreReader reader = factory.createReader(file, context);
+        for (int i = 0; i < valueCount; i++) {
+            byte[] bytes = toBytes(i);
+            assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(i);
+        }
+        reader.close();
+    }
+
+    private byte[] toBytes(int i) {
+        return String.valueOf(10_000 + i).getBytes(StandardCharsets.UTF_8);
+    }
+
+    private int fromBytes(byte[] bytes) {
+        return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8)) - 
10_000;
+    }
+}
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 133cfe540..df3242141 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -29,6 +29,7 @@ import org.apache.paimon.compact.CompactManager;
 import org.apache.paimon.compact.NoopCompactManager;
 import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
 import org.apache.paimon.format.FileFormatDiscover;
@@ -39,8 +40,8 @@ import org.apache.paimon.io.FileReaderFactory;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.KeyValueFileWriterFactory;
 import org.apache.paimon.io.RecordLevelExpire;
+import org.apache.paimon.lookup.LookupStoreFactory;
 import org.apache.paimon.lookup.LookupStrategy;
-import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
 import org.apache.paimon.mergetree.Levels;
 import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor;
@@ -349,6 +350,11 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
             throw new RuntimeException(
                     "Can not use lookup, there is no temp disk directory to 
use.");
         }
+        LookupStoreFactory lookupStoreFactory =
+                LookupStoreFactory.create(
+                        options,
+                        cacheManager,
+                        new 
RowCompactedSerializer(keyType).createSliceComparator());
         Options options = this.options.toConfiguration();
         return new LookupLevels<>(
                 levels,
@@ -357,11 +363,7 @@ public class KeyValueFileStoreWrite extends 
MemoryFileStoreWrite<KeyValue> {
                 valueProcessor,
                 readerFactory::createRecordReader,
                 () -> ioManager.createChannel().getPathFile(),
-                new HashLookupStoreFactory(
-                        cacheManager,
-                        this.options.cachePageSize(),
-                        options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
-                        
options.get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)),
+                lookupStoreFactory,
                 options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
                 options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
                 bfGenerator(options));
diff --git 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 8489ac509..9181fdb16 100644
--- 
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++ 
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -26,16 +26,18 @@ import org.apache.paimon.data.BinaryRow;
 import org.apache.paimon.data.InternalRow;
 import org.apache.paimon.data.serializer.InternalRowSerializer;
 import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
 import org.apache.paimon.deletionvectors.DeletionVector;
 import org.apache.paimon.disk.IOManager;
 import org.apache.paimon.io.DataFileMeta;
 import org.apache.paimon.io.KeyValueFileReaderFactory;
 import org.apache.paimon.io.cache.CacheManager;
-import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.lookup.LookupStoreFactory;
 import org.apache.paimon.mergetree.Levels;
 import org.apache.paimon.mergetree.LookupLevels;
 import org.apache.paimon.options.Options;
 import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
 import org.apache.paimon.utils.KeyComparatorSupplier;
 import org.apache.paimon.utils.Preconditions;
 
@@ -62,7 +64,7 @@ public class LocalTableQuery implements TableQuery {
 
     private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
 
-    private final HashLookupStoreFactory hashLookupStoreFactory;
+    private final LookupStoreFactory lookupStoreFactory;
 
     private final int startLevel;
 
@@ -79,13 +81,13 @@ public class LocalTableQuery implements TableQuery {
         KeyValueFileStore store = (KeyValueFileStore) tableStore;
 
         this.readerFactoryBuilder = store.newReaderFactoryBuilder();
+        RowType keyType = readerFactoryBuilder.keyType();
         this.keyComparatorSupplier = new 
KeyComparatorSupplier(readerFactoryBuilder.keyType());
-        this.hashLookupStoreFactory =
-                new HashLookupStoreFactory(
+        this.lookupStoreFactory =
+                LookupStoreFactory.create(
+                        options,
                         new CacheManager(options.lookupCacheMaxMemory()),
-                        options.cachePageSize(),
-                        
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
-                        
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION));
+                        new 
RowCompactedSerializer(keyType).createSliceComparator());
 
         if (options.needLookup()) {
             startLevel = 1;
@@ -145,7 +147,7 @@ public class LocalTableQuery implements TableQuery {
                                 Preconditions.checkNotNull(ioManager, 
"IOManager is required.")
                                         .createChannel()
                                         .getPathFile(),
-                        hashLookupStoreFactory,
+                        lookupStoreFactory,
                         options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
                         options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
                         bfGenerator(options));
diff --git 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index b74f242e1..09bd6eea1 100644
--- 
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++ 
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
 
 import org.apache.paimon.CoreOptions;
 import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.LookupLocalFileType;
 import org.apache.paimon.Snapshot;
 import org.apache.paimon.data.BinaryString;
 import org.apache.paimon.data.GenericRow;
@@ -92,6 +93,7 @@ import static 
org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
 import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
 import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
 import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
 import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
 import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
 import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
@@ -1565,6 +1567,17 @@ public class PrimaryKeyFileStoreTableTest extends 
FileStoreTableTestBase {
         innerTestTableQuery(table);
     }
 
+    @Test
+    public void testTableQueryForLookupLocalSortFile() throws Exception {
+        FileStoreTable table =
+                createFileStoreTable(
+                        options -> {
+                            options.set(CHANGELOG_PRODUCER, LOOKUP);
+                            options.set(LOOKUP_LOCAL_FILE_TYPE, 
LookupLocalFileType.SORT);
+                        });
+        innerTestTableQuery(table);
+    }
+
     @Test
     public void testTableQueryForNormal() throws Exception {
         FileStoreTable table = createFileStoreTable();


Reply via email to