This is an automated email from the ASF dual-hosted git repository.
lzljs3620320 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/paimon.git
The following commit(s) were added to refs/heads/master by this push:
new 982e7b1db [core] Introduce a basic SortLookupStoreFactory (#3770)
982e7b1db is described below
commit 982e7b1dbf5c23999105e603b5748b8e586c2202
Author: Jingsong Lee <[email protected]>
AuthorDate: Thu Jul 18 11:56:33 2024 +0800
[core] Introduce a basic SortLookupStoreFactory (#3770)
---
.../shortcodes/generated/core_configuration.html | 6 +
.../main/java/org/apache/paimon/CoreOptions.java | 36 ++++
.../paimon/compression/AirCompressorFactory.java | 12 +-
.../compression/BlockCompressionFactory.java | 25 ++-
...ssionFactory.java => BlockCompressionType.java} | 31 ++-
.../compression/Lz4BlockCompressionFactory.java | 6 +
.../compression/ZstdBlockCompressionFactory.java | 5 +
.../data/serializer/RowCompactedSerializer.java | 63 +++++-
.../apache/paimon/lookup/LookupStoreFactory.java | 25 +++
.../apache/paimon/lookup/LookupStoreReader.java | 3 +
.../sort/BlockAlignedType.java} | 29 ++-
.../org/apache/paimon/lookup/sort/BlockEntry.java | 78 ++++++++
.../org/apache/paimon/lookup/sort/BlockHandle.java | 98 ++++++++++
.../apache/paimon/lookup/sort/BlockIterator.java | 112 +++++++++++
.../org/apache/paimon/lookup/sort/BlockReader.java | 87 +++++++++
.../apache/paimon/lookup/sort/BlockTrailer.java | 100 ++++++++++
.../org/apache/paimon/lookup/sort/BlockWriter.java | 99 ++++++++++
.../paimon/lookup/sort/BloomFilterHandle.java | 68 +++++++
.../java/org/apache/paimon/lookup/sort/Footer.java | 101 ++++++++++
.../SortContext.java} | 20 +-
.../paimon/lookup/sort/SortLookupStoreFactory.java | 62 ++++++
.../paimon/lookup/sort/SortLookupStoreReader.java | 157 +++++++++++++++
.../paimon/lookup/sort/SortLookupStoreWriter.java | 212 +++++++++++++++++++++
.../java/org/apache/paimon/memory/MemorySlice.java | 130 +++++++++++++
.../org/apache/paimon/memory/MemorySliceInput.java | 102 ++++++++++
.../apache/paimon/memory/MemorySliceOutput.java | 110 +++++++++++
.../java/org/apache/paimon/utils/IntArrayList.java | 4 +
.../lookup/sort/SortLookupStoreFactoryTest.java | 85 +++++++++
.../paimon/operation/KeyValueFileStoreWrite.java | 14 +-
.../apache/paimon/table/query/LocalTableQuery.java | 18 +-
.../paimon/table/PrimaryKeyFileStoreTableTest.java | 13 ++
31 files changed, 1866 insertions(+), 45 deletions(-)
diff --git a/docs/layouts/shortcodes/generated/core_configuration.html
b/docs/layouts/shortcodes/generated/core_configuration.html
index 19a73cc5d..93bff9692 100644
--- a/docs/layouts/shortcodes/generated/core_configuration.html
+++ b/docs/layouts/shortcodes/generated/core_configuration.html
@@ -387,6 +387,12 @@ Mainly to resolve data skew on primary keys. We recommend
starting with 64 mb wh
<td>Float</td>
<td>The index load factor for lookup.</td>
</tr>
+ <tr>
+ <td><h5>lookup.local-file-type</h5></td>
+ <td style="word-wrap: break-word;">hash</td>
+ <td><p>Enum</p></td>
+ <td>The local file type for lookup.<br /><br />Possible
values:<ul><li>"sort": Construct a sorted file for lookup.</li><li>"hash":
Construct a hash file for lookup.</li></ul></td>
+ </tr>
<tr>
<td><h5>manifest.compression</h5></td>
<td style="word-wrap: break-word;">"zstd"</td>
diff --git a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
index 9bb6ab91f..5b2f8aa77 100644
--- a/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
+++ b/paimon-common/src/main/java/org/apache/paimon/CoreOptions.java
@@ -809,6 +809,12 @@ public class CoreOptions implements Serializable {
.withDescription(
"Define partition by table options, cannot define
partition on DDL and table options at the same time.");
+ public static final ConfigOption<LookupLocalFileType>
LOOKUP_LOCAL_FILE_TYPE =
+ key("lookup.local-file-type")
+ .enumType(LookupLocalFileType.class)
+ .defaultValue(LookupLocalFileType.HASH)
+ .withDescription("The local file type for lookup.");
+
public static final ConfigOption<Float> LOOKUP_HASH_LOAD_FACTOR =
key("lookup.hash-load-factor")
.floatType()
@@ -1624,6 +1630,10 @@ public class CoreOptions implements Serializable {
return (int) options.get(CACHE_PAGE_SIZE).getBytes();
}
+ public LookupLocalFileType lookupLocalFileType() {
+ return options.get(LOOKUP_LOCAL_FILE_TYPE);
+ }
+
public MemorySize lookupCacheMaxMemory() {
return options.get(LOOKUP_CACHE_MAX_MEMORY_SIZE);
}
@@ -2616,4 +2626,30 @@ public class CoreOptions implements Serializable {
return text(description);
}
}
+
+ /** Specifies the local file type for lookup. */
+ public enum LookupLocalFileType implements DescribedEnum {
+ SORT("sort", "Construct a sorted file for lookup."),
+
+ HASH("hash", "Construct a hash file for lookup.");
+
+ private final String value;
+
+ private final String description;
+
+ LookupLocalFileType(String value, String description) {
+ this.value = value;
+ this.description = description;
+ }
+
+ @Override
+ public String toString() {
+ return value;
+ }
+
+ @Override
+ public InlineElement getDescription() {
+ return text(description);
+ }
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/AirCompressorFactory.java
b/paimon-common/src/main/java/org/apache/paimon/compression/AirCompressorFactory.java
index f22c9aad8..5caa78a12 100644
---
a/paimon-common/src/main/java/org/apache/paimon/compression/AirCompressorFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/compression/AirCompressorFactory.java
@@ -24,14 +24,24 @@ import io.airlift.compress.Decompressor;
/** Implementation of {@link BlockCompressionFactory} for airlift compressors.
*/
public class AirCompressorFactory implements BlockCompressionFactory {
+ private final BlockCompressionType type;
private final Compressor internalCompressor;
private final Decompressor internalDecompressor;
- public AirCompressorFactory(Compressor internalCompressor, Decompressor
internalDecompressor) {
+ public AirCompressorFactory(
+ BlockCompressionType type,
+ Compressor internalCompressor,
+ Decompressor internalDecompressor) {
+ this.type = type;
this.internalCompressor = internalCompressor;
this.internalDecompressor = internalDecompressor;
}
+ @Override
+ public BlockCompressionType getCompressionType() {
+ return type;
+ }
+
@Override
public BlockCompressor getCompressor() {
return new AirBlockCompressor(internalCompressor);
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
index 90ef2002d..3bf81a6c9 100644
---
a/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionFactory.java
@@ -29,6 +29,8 @@ import javax.annotation.Nullable;
*/
public interface BlockCompressionFactory {
+ BlockCompressionType getCompressionType();
+
BlockCompressor getCompressor();
BlockDecompressor getDecompressor();
@@ -39,12 +41,31 @@ public interface BlockCompressionFactory {
switch (compression.toUpperCase()) {
case "NONE":
return null;
+ case "ZSTD":
+ return new ZstdBlockCompressionFactory();
case "LZ4":
return new Lz4BlockCompressionFactory();
case "LZO":
- return new AirCompressorFactory(new LzoCompressor(), new
LzoDecompressor());
- case "ZSTD":
+ return new AirCompressorFactory(
+ BlockCompressionType.LZO, new LzoCompressor(), new
LzoDecompressor());
+ default:
+ throw new IllegalStateException("Unknown CompressionMethod " +
compression);
+ }
+ }
+
+ /** Creates {@link BlockCompressionFactory} according to the {@link
BlockCompressionType}. */
+ @Nullable
+ static BlockCompressionFactory create(BlockCompressionType compression) {
+ switch (compression) {
+ case NONE:
+ return null;
+ case ZSTD:
return new ZstdBlockCompressionFactory();
+ case LZ4:
+ return new Lz4BlockCompressionFactory();
+ case LZO:
+ return new AirCompressorFactory(
+ BlockCompressionType.LZO, new LzoCompressor(), new
LzoDecompressor());
default:
throw new IllegalStateException("Unknown CompressionMethod " +
compression);
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java
similarity index 55%
copy from
paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
copy to
paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java
index 5e97cb3e5..51e9f8340 100644
---
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/compression/BlockCompressionType.java
@@ -18,16 +18,31 @@
package org.apache.paimon.compression;
-/** Implementation of {@link BlockCompressionFactory} for zstd codec. */
-public class ZstdBlockCompressionFactory implements BlockCompressionFactory {
+/** Block Compression type. */
+public enum BlockCompressionType {
+ NONE(0),
+ ZSTD(1),
+ LZ4(2),
+ LZO(3);
- @Override
- public BlockCompressor getCompressor() {
- return new ZstdBlockCompressor();
+ private final int persistentId;
+
+ BlockCompressionType(int persistentId) {
+ this.persistentId = persistentId;
+ }
+
+ public int persistentId() {
+ return this.persistentId;
}
- @Override
- public BlockDecompressor getDecompressor() {
- return new ZstdBlockDecompressor();
+ public static BlockCompressionType getCompressionTypeByPersistentId(int
persistentId) {
+ BlockCompressionType[] types = values();
+ for (BlockCompressionType type : types) {
+ if (type.persistentId == persistentId) {
+ return type;
+ }
+ }
+
+ throw new IllegalArgumentException("Unknown persistentId " +
persistentId);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/Lz4BlockCompressionFactory.java
b/paimon-common/src/main/java/org/apache/paimon/compression/Lz4BlockCompressionFactory.java
index 91be9e916..1e2c42d66 100644
---
a/paimon-common/src/main/java/org/apache/paimon/compression/Lz4BlockCompressionFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/compression/Lz4BlockCompressionFactory.java
@@ -20,6 +20,12 @@ package org.apache.paimon.compression;
/** Implementation of {@link BlockCompressionFactory} for Lz4 codec. */
public class Lz4BlockCompressionFactory implements BlockCompressionFactory {
+
+ @Override
+ public BlockCompressionType getCompressionType() {
+ return BlockCompressionType.LZ4;
+ }
+
@Override
public BlockCompressor getCompressor() {
return new Lz4BlockCompressor();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
index 5e97cb3e5..13745e6ee 100644
---
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
@@ -21,6 +21,11 @@ package org.apache.paimon.compression;
/** Implementation of {@link BlockCompressionFactory} for zstd codec. */
public class ZstdBlockCompressionFactory implements BlockCompressionFactory {
+ @Override
+ public BlockCompressionType getCompressionType() {
+ return BlockCompressionType.ZSTD;
+ }
+
@Override
public BlockCompressor getCompressor() {
return new ZstdBlockCompressor();
diff --git
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
index d39dc8c95..91c1c1c7c 100644
---
a/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
+++
b/paimon-common/src/main/java/org/apache/paimon/data/serializer/RowCompactedSerializer.java
@@ -32,6 +32,7 @@ import org.apache.paimon.data.Timestamp;
import org.apache.paimon.io.DataInputView;
import org.apache.paimon.io.DataOutputView;
import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.types.DataType;
import org.apache.paimon.types.RowKind;
import org.apache.paimon.types.RowType;
@@ -42,6 +43,7 @@ import javax.annotation.Nullable;
import java.io.IOException;
import java.io.Serializable;
import java.util.Arrays;
+import java.util.Comparator;
import java.util.Objects;
import static org.apache.paimon.data.BinaryRow.HEADER_SIZE_IN_BITS;
@@ -160,6 +162,10 @@ public class RowCompactedSerializer implements
Serializer<InternalRow> {
return row;
}
+ public Comparator<MemorySlice> createSliceComparator() {
+ return new SliceComparator(rowType);
+ }
+
private static FieldWriter createFieldWriter(DataType fieldType) {
final FieldWriter fieldWriter;
switch (fieldType.getTypeRoot()) {
@@ -515,6 +521,7 @@ public class RowCompactedSerializer implements
Serializer<InternalRow> {
private MemorySegment segment;
private MemorySegment[] segments;
+ private int offset;
private int position;
private RowReader(int headerSizeInBytes) {
@@ -522,17 +529,22 @@ public class RowCompactedSerializer implements
Serializer<InternalRow> {
}
private void pointTo(byte[] bytes) {
- this.segment = MemorySegment.wrap(bytes);
+ pointTo(MemorySegment.wrap(bytes), 0);
+ }
+
+ private void pointTo(MemorySegment segment, int offset) {
+ this.segment = segment;
this.segments = new MemorySegment[] {segment};
- this.position = headerSizeInBytes;
+ this.offset = offset;
+ this.position = offset + headerSizeInBytes;
}
private RowKind readRowKind() {
- return RowKind.fromByteValue(segment.get(0));
+ return RowKind.fromByteValue(segment.get(offset));
}
private boolean isNullAt(int pos) {
- return bitGet(segment, 0, pos + HEADER_SIZE_IN_BITS);
+ return bitGet(segment, offset, pos + HEADER_SIZE_IN_BITS);
}
private boolean readBoolean() {
@@ -635,4 +647,47 @@ public class RowCompactedSerializer implements
Serializer<InternalRow> {
return serializer.deserialize(bytes);
}
}
+
+ private static class SliceComparator implements Comparator<MemorySlice> {
+
+ private final RowReader reader1;
+ private final RowReader reader2;
+ private final FieldReader[] fieldReaders;
+
+ public SliceComparator(RowType rowType) {
+ this.reader1 = new RowReader(rowType.getFieldCount());
+ this.reader2 = new RowReader(rowType.getFieldCount());
+ this.fieldReaders = new FieldReader[rowType.getFieldCount()];
+ for (int i = 0; i < rowType.getFieldCount(); i++) {
+ fieldReaders[i] = createFieldReader(rowType.getTypeAt(i));
+ }
+ }
+
+ @Override
+ public int compare(MemorySlice slice1, MemorySlice slice2) {
+ reader1.pointTo(slice1.segment(), slice1.offset());
+ reader2.pointTo(slice2.segment(), slice2.offset());
+ for (int i = 0; i < fieldReaders.length; i++) {
+ boolean isNull1 = reader1.isNullAt(i);
+ boolean isNull2 = reader2.isNullAt(i);
+ if (!isNull1 || !isNull2) {
+ if (isNull1) {
+ return -1;
+ } else if (isNull2) {
+ return 1;
+ } else {
+ FieldReader fieldReader = fieldReaders[i];
+ Object o1 = fieldReader.readField(reader1, i);
+ Object o2 = fieldReader.readField(reader2, i);
+ @SuppressWarnings({"unchecked", "rawtypes"})
+ int comp = ((Comparable) o1).compareTo(o2);
+ if (comp != 0) {
+ return comp;
+ }
+ }
+ }
+ }
+ return 0;
+ }
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
index c22b8605a..2d57cf192 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreFactory.java
@@ -19,6 +19,10 @@
package org.apache.paimon.lookup;
import org.apache.paimon.CoreOptions;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.lookup.sort.SortLookupStoreFactory;
+import org.apache.paimon.memory.MemorySlice;
import org.apache.paimon.options.Options;
import org.apache.paimon.utils.BloomFilter;
@@ -26,6 +30,7 @@ import javax.annotation.Nullable;
import java.io.File;
import java.io.IOException;
+import java.util.Comparator;
import java.util.function.Function;
/**
@@ -59,6 +64,26 @@ public interface LookupStoreFactory {
return bfGenerator;
}
+ static LookupStoreFactory create(
+ CoreOptions options, CacheManager cacheManager,
Comparator<MemorySlice> keyComparator) {
+ String compression =
+
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION);
+ switch (options.lookupLocalFileType()) {
+ case SORT:
+ return new SortLookupStoreFactory(
+ keyComparator, cacheManager, options.cachePageSize(),
compression);
+ case HASH:
+ return new HashLookupStoreFactory(
+ cacheManager,
+ options.cachePageSize(),
+
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
+ compression);
+ default:
+ throw new IllegalArgumentException(
+ "Unsupported lookup local file type: " +
options.lookupLocalFileType());
+ }
+ }
+
/** Context between writer and reader. */
interface Context {}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
index c867e2053..6b4ca0159 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
@@ -18,6 +18,8 @@
package org.apache.paimon.lookup;
+import javax.annotation.Nullable;
+
import java.io.Closeable;
import java.io.IOException;
@@ -25,5 +27,6 @@ import java.io.IOException;
public interface LookupStoreReader extends Closeable {
/** Lookup value by key. */
+ @Nullable
byte[] lookup(byte[] key) throws IOException;
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
similarity index 59%
copy from
paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
copy to
paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
index 5e97cb3e5..e5849d9f7 100644
---
a/paimon-common/src/main/java/org/apache/paimon/compression/ZstdBlockCompressionFactory.java
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockAlignedType.java
@@ -16,18 +16,29 @@
* limitations under the License.
*/
-package org.apache.paimon.compression;
+package org.apache.paimon.lookup.sort;
-/** Implementation of {@link BlockCompressionFactory} for zstd codec. */
-public class ZstdBlockCompressionFactory implements BlockCompressionFactory {
+/** Aligned type for block. */
+public enum BlockAlignedType {
+ ALIGNED((byte) 0),
+ UNALIGNED((byte) 1);
- @Override
- public BlockCompressor getCompressor() {
- return new ZstdBlockCompressor();
+ private final byte b;
+
+ BlockAlignedType(byte b) {
+ this.b = b;
+ }
+
+ public byte toByte() {
+ return b;
}
- @Override
- public BlockDecompressor getDecompressor() {
- return new ZstdBlockDecompressor();
+ public static BlockAlignedType fromByte(byte b) {
+ for (BlockAlignedType type : BlockAlignedType.values()) {
+ if (type.toByte() == b) {
+ return type;
+ }
+ }
+ throw new IllegalStateException("Illegal block aligned type: " + b);
}
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
new file mode 100644
index 000000000..4473886e3
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockEntry.java
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+
+import java.util.Map.Entry;
+
+import static java.util.Objects.requireNonNull;
+
+/** Entry represents a key value. */
+public class BlockEntry implements Entry<MemorySlice, MemorySlice> {
+
+ private final MemorySlice key;
+ private final MemorySlice value;
+
+ public BlockEntry(MemorySlice key, MemorySlice value) {
+ requireNonNull(key, "key is null");
+ requireNonNull(value, "value is null");
+ this.key = key;
+ this.value = value;
+ }
+
+ @Override
+ public MemorySlice getKey() {
+ return key;
+ }
+
+ @Override
+ public MemorySlice getValue() {
+ return value;
+ }
+
+ @Override
+ public final MemorySlice setValue(MemorySlice value) {
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BlockEntry entry = (BlockEntry) o;
+
+ if (!key.equals(entry.key)) {
+ return false;
+ }
+ return value.equals(entry.value);
+ }
+
+ @Override
+ public int hashCode() {
+ int result = key.hashCode();
+ result = 31 * result + value.hashCode();
+ return result;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
new file mode 100644
index 000000000..737f57a8b
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockHandle.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+
+/** Handle for a block. */
+public class BlockHandle {
+ public static final int MAX_ENCODED_LENGTH = 9 + 5;
+
+ private final long offset;
+ private final int size;
+
+ BlockHandle(long offset, int size) {
+ this.offset = offset;
+ this.size = size;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public int getFullBlockSize() {
+ return size + BlockTrailer.ENCODED_LENGTH;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BlockHandle that = (BlockHandle) o;
+
+ if (size != that.size) {
+ return false;
+ }
+ if (offset != that.offset) {
+ return false;
+ }
+
+ return true;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = Long.hashCode(offset);
+ result = 31 * result + size;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "BlockHandle" + "{offset=" + offset + ", size=" + size + '}';
+ }
+
+ public static BlockHandle readBlockHandle(MemorySliceInput sliceInput) {
+ long offset = sliceInput.readVarLenLong();
+ int size = sliceInput.readVarLenInt();
+ return new BlockHandle(offset, size);
+ }
+
+ public static MemorySlice writeBlockHandle(BlockHandle blockHandle) {
+ MemorySliceOutput sliceOutput = new
MemorySliceOutput(MAX_ENCODED_LENGTH);
+ writeBlockHandleTo(blockHandle, sliceOutput);
+ return sliceOutput.toSlice();
+ }
+
+ public static void writeBlockHandleTo(BlockHandle blockHandle,
MemorySliceOutput sliceOutput) {
+ sliceOutput.writeVarLenLong(blockHandle.offset);
+ sliceOutput.writeVarLenInt(blockHandle.size);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
new file mode 100644
index 000000000..c5647dc50
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockIterator.java
@@ -0,0 +1,112 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+
+import java.util.Comparator;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.NoSuchElementException;
+
+import static java.util.Objects.requireNonNull;
+
+/** An {@link Iterator} for a block. */
+public abstract class BlockIterator implements Iterator<Map.Entry<MemorySlice,
MemorySlice>> {
+
+ protected final MemorySliceInput data;
+
+ private final int recordCount;
+ private final Comparator<MemorySlice> comparator;
+
+ private BlockEntry polled;
+
+ public BlockIterator(
+ MemorySliceInput data, int recordCount, Comparator<MemorySlice>
comparator) {
+ this.data = data;
+ this.recordCount = recordCount;
+ this.comparator = comparator;
+ }
+
+ @Override
+ public boolean hasNext() {
+ return polled != null || data.isReadable();
+ }
+
+ @Override
+ public BlockEntry next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ }
+
+ if (polled != null) {
+ BlockEntry result = polled;
+ polled = null;
+ return result;
+ }
+
+ return readEntry();
+ }
+
+ @Override
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+
+ public boolean seekTo(MemorySlice targetKey) {
+ int left = 0;
+ int right = recordCount - 1;
+
+ while (left <= right) {
+ int mid = left + (right - left) / 2;
+
+ seekTo(mid);
+ BlockEntry midEntry = readEntry();
+ int compare = comparator.compare(midEntry.getKey(), targetKey);
+
+ if (compare == 0) {
+ polled = midEntry;
+ return true;
+ } else if (compare > 0) {
+ polled = midEntry;
+ right = mid - 1;
+ } else {
+ left = mid + 1;
+ }
+ }
+
+ return false;
+ }
+
+ public abstract void seekTo(int record);
+
+ private BlockEntry readEntry() {
+ requireNonNull(data, "data is null");
+
+ int keyLength;
+ keyLength = data.readVarLenInt();
+ MemorySlice key = data.readSlice(keyLength);
+
+ int valueLength = data.readVarLenInt();
+ MemorySlice value = data.readSlice(valueLength);
+
+ return new BlockEntry(key, value);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
new file mode 100644
index 000000000..544a3d42b
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockReader.java
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+
+import java.util.Comparator;
+
+import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED;
+
+/** Reader for a block. */
+public class BlockReader {
+ private final MemorySlice block;
+ private final Comparator<MemorySlice> comparator;
+
+ public BlockReader(MemorySlice block, Comparator<MemorySlice> comparator) {
+ this.block = block;
+ this.comparator = comparator;
+ }
+
+ public long size() {
+ return block.length();
+ }
+
+ public BlockIterator iterator() {
+ BlockAlignedType alignedType =
+ BlockAlignedType.fromByte(block.readByte(block.length() - 1));
+ int intValue = block.readInt(block.length() - 5);
+ if (alignedType == ALIGNED) {
+ return new AlignedIterator(block.slice(0, block.length() - 5),
intValue, comparator);
+ } else {
+ int indexLength = intValue * 4;
+ int indexOffset = block.length() - 5 - indexLength;
+ MemorySlice data = block.slice(0, indexOffset);
+ MemorySlice index = block.slice(indexOffset, indexLength);
+ return new UnalignedIterator(data, index, comparator);
+ }
+ }
+
+ private static class AlignedIterator extends BlockIterator {
+
+ private final int recordSize;
+
+ public AlignedIterator(
+ MemorySlice data, int recordSize, Comparator<MemorySlice>
comparator) {
+ super(data.toInput(), data.length() / recordSize, comparator);
+ this.recordSize = recordSize;
+ }
+
+ @Override
+ public void seekTo(int record) {
+ data.setPosition(record * recordSize);
+ }
+ }
+
+ private static class UnalignedIterator extends BlockIterator {
+
+ private final MemorySlice index;
+
+ public UnalignedIterator(
+ MemorySlice data, MemorySlice index, Comparator<MemorySlice>
comparator) {
+ super(data.toInput(), index.length() / 4, comparator);
+ this.index = index;
+ }
+
+ @Override
+ public void seekTo(int record) {
+ data.setPosition(index.readInt(record * 4));
+ }
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
new file mode 100644
index 000000000..6d49bd9cc
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockTrailer.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.compression.BlockCompressionType;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+
+import static java.util.Objects.requireNonNull;
+
+/** Trailer of a block. */
+public class BlockTrailer {
+ public static final int ENCODED_LENGTH = 5;
+
+ private final BlockCompressionType compressionType;
+ private final int crc32c;
+
+ public BlockTrailer(BlockCompressionType compressionType, int crc32c) {
+ requireNonNull(compressionType, "compressionType is null");
+
+ this.compressionType = compressionType;
+ this.crc32c = crc32c;
+ }
+
+ public BlockCompressionType getCompressionType() {
+ return compressionType;
+ }
+
+ public int getCrc32c() {
+ return crc32c;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BlockTrailer that = (BlockTrailer) o;
+ if (crc32c != that.crc32c) {
+ return false;
+ }
+ return compressionType == that.compressionType;
+ }
+
+ @Override
+ public int hashCode() {
+ int result = compressionType.hashCode();
+ result = 31 * result + crc32c;
+ return result;
+ }
+
+ @Override
+ public String toString() {
+ return "BlockTrailer"
+ + "{compressionType="
+ + compressionType
+ + ", crc32c=0x"
+ + Integer.toHexString(crc32c)
+ + '}';
+ }
+
+ public static BlockTrailer readBlockTrailer(MemorySliceInput input) {
+ BlockCompressionType compressionType =
+
BlockCompressionType.getCompressionTypeByPersistentId(input.readUnsignedByte());
+ int crc32c = input.readInt();
+ return new BlockTrailer(compressionType, crc32c);
+ }
+
+ public static MemorySlice writeBlockTrailer(BlockTrailer blockTrailer) {
+ MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH);
+ writeBlockTrailer(blockTrailer, output);
+ return output.toSlice();
+ }
+
+ public static void writeBlockTrailer(BlockTrailer blockTrailer,
MemorySliceOutput sliceOutput) {
+
sliceOutput.writeByte(blockTrailer.getCompressionType().persistentId());
+ sliceOutput.writeInt(blockTrailer.getCrc32c());
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
new file mode 100644
index 000000000..d4f48e9c7
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BlockWriter.java
@@ -0,0 +1,99 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceOutput;
+import org.apache.paimon.utils.IntArrayList;
+
+import java.io.IOException;
+
+import static org.apache.paimon.lookup.sort.BlockAlignedType.ALIGNED;
+import static org.apache.paimon.lookup.sort.BlockAlignedType.UNALIGNED;
+
+/** Writer to build a Block. */
+public class BlockWriter {
+
+ private final IntArrayList positions;
+ private final MemorySliceOutput block;
+
+ private int alignedSize;
+ private boolean aligned;
+
+ public BlockWriter(int blockSize) {
+ this.positions = new IntArrayList(32);
+ this.block = new MemorySliceOutput(blockSize + 128);
+ this.alignedSize = 0;
+ this.aligned = true;
+ }
+
+ public void reset() {
+ this.positions.clear();
+ this.block.reset();
+ this.alignedSize = 0;
+ this.aligned = true;
+ }
+
+ public void add(byte[] key, byte[] value) {
+ int startPosition = block.size();
+ block.writeVarLenInt(key.length);
+ block.writeBytes(key);
+ block.writeVarLenInt(value.length);
+ block.writeBytes(value);
+ int endPosition = block.size();
+
+ positions.add(startPosition);
+ if (aligned) {
+ int currentSize = endPosition - startPosition;
+ if (alignedSize == 0) {
+ alignedSize = currentSize;
+ } else {
+ aligned = alignedSize == currentSize;
+ }
+ }
+ }
+
+ public int size() {
+ return positions.size();
+ }
+
+ public int memory() {
+ int memory = block.size() + 5;
+ if (!aligned) {
+ memory += positions.size() * 4;
+ }
+ return memory;
+ }
+
+ public MemorySlice finish() throws IOException {
+ if (positions.isEmpty()) {
+ throw new IllegalStateException();
+ }
+ if (aligned) {
+ block.writeInt(alignedSize);
+ } else {
+ for (int i = 0; i < positions.size(); i++) {
+ block.writeInt(positions.get(i));
+ }
+ block.writeInt(positions.size());
+ }
+ block.writeByte(aligned ? ALIGNED.toByte() : UNALIGNED.toByte());
+ return block.toSlice();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
new file mode 100644
index 000000000..7ec6a845c
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/BloomFilterHandle.java
@@ -0,0 +1,68 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import java.util.Objects;
+
+/** Handle for bloom filter. */
+public class BloomFilterHandle {
+
+ public static final int MAX_ENCODED_LENGTH = 9 + 5 + 9;
+
+ private final long offset;
+ private final int size;
+ private final long expectedEntries;
+
+ BloomFilterHandle(long offset, int size, long expectedEntries) {
+ this.offset = offset;
+ this.size = size;
+ this.expectedEntries = expectedEntries;
+ }
+
+ public long offset() {
+ return offset;
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public long expectedEntries() {
+ return expectedEntries;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ BloomFilterHandle that = (BloomFilterHandle) o;
+ return offset == that.offset
+ && size == that.size
+ && expectedEntries == that.expectedEntries;
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(offset, size, expectedEntries);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
new file mode 100644
index 000000000..b8ae51789
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/Footer.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+import org.apache.paimon.memory.MemorySliceOutput;
+
+import javax.annotation.Nullable;
+
+import java.io.IOException;
+
+import static org.apache.paimon.lookup.sort.SortLookupStoreWriter.MAGIC_NUMBER;
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/** Footer for a sorted file. */
+public class Footer {
+
+ public static final int ENCODED_LENGTH = 36;
+
+ @Nullable private final BloomFilterHandle bloomFilterHandle;
+ private final BlockHandle indexBlockHandle;
+
+ Footer(@Nullable BloomFilterHandle bloomFilterHandle, BlockHandle
indexBlockHandle) {
+ this.bloomFilterHandle = bloomFilterHandle;
+ this.indexBlockHandle = indexBlockHandle;
+ }
+
+ @Nullable
+ public BloomFilterHandle getBloomFilterHandle() {
+ return bloomFilterHandle;
+ }
+
+ public BlockHandle getIndexBlockHandle() {
+ return indexBlockHandle;
+ }
+
+ public static Footer readFooter(MemorySliceInput sliceInput) throws
IOException {
+ // read bloom filter and index handles
+ @Nullable
+ BloomFilterHandle bloomFilterHandle =
+ new BloomFilterHandle(
+ sliceInput.readLong(), sliceInput.readInt(),
sliceInput.readLong());
+ if (bloomFilterHandle.offset() == 0
+ && bloomFilterHandle.size() == 0
+ && bloomFilterHandle.expectedEntries() == 0) {
+ bloomFilterHandle = null;
+ }
+ BlockHandle indexBlockHandle = new BlockHandle(sliceInput.readLong(),
sliceInput.readInt());
+
+ // skip padding
+ sliceInput.setPosition(ENCODED_LENGTH - 4);
+
+ // verify magic number
+ int magicNumber = sliceInput.readInt();
+ checkArgument(magicNumber == MAGIC_NUMBER, "File is not a table (bad
magic number)");
+
+ return new Footer(bloomFilterHandle, indexBlockHandle);
+ }
+
+ public static MemorySlice writeFooter(Footer footer) {
+ MemorySliceOutput output = new MemorySliceOutput(ENCODED_LENGTH);
+ writeFooter(footer, output);
+ return output.toSlice();
+ }
+
+ public static void writeFooter(Footer footer, MemorySliceOutput
sliceOutput) {
+ // write bloom filter and index handles
+ if (footer.bloomFilterHandle == null) {
+ sliceOutput.writeLong(0);
+ sliceOutput.writeInt(0);
+ sliceOutput.writeLong(0);
+ } else {
+ sliceOutput.writeLong(footer.bloomFilterHandle.offset());
+ sliceOutput.writeInt(footer.bloomFilterHandle.size());
+ sliceOutput.writeLong(footer.bloomFilterHandle.expectedEntries());
+ }
+
+ sliceOutput.writeLong(footer.indexBlockHandle.offset());
+ sliceOutput.writeInt(footer.indexBlockHandle.size());
+
+ // write magic number
+ sliceOutput.writeInt(MAGIC_NUMBER);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
similarity index 69%
copy from
paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
copy to
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
index c867e2053..5aacb56bb 100644
---
a/paimon-common/src/main/java/org/apache/paimon/lookup/LookupStoreReader.java
+++ b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortContext.java
@@ -16,14 +16,20 @@
* limitations under the License.
*/
-package org.apache.paimon.lookup;
+package org.apache.paimon.lookup.sort;
-import java.io.Closeable;
-import java.io.IOException;
+import org.apache.paimon.lookup.LookupStoreFactory.Context;
-/** Reader, lookup value by key bytes. */
-public interface LookupStoreReader extends Closeable {
+/** A {@link Context} for sort store. */
+public class SortContext implements Context {
- /** Lookup value by key. */
- byte[] lookup(byte[] key) throws IOException;
+ private final long fileSize;
+
+ public SortContext(long fileSize) {
+ this.fileSize = fileSize;
+ }
+
+ public long fileSize() {
+ return fileSize;
+ }
}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
new file mode 100644
index 000000000..50b784688
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreFactory.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.LookupStoreFactory;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.utils.BloomFilter;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.Comparator;
+
+/** A {@link LookupStoreFactory} which uses hash to lookup records on disk. */
+public class SortLookupStoreFactory implements LookupStoreFactory {
+
+ private final Comparator<MemorySlice> comparator;
+ private final CacheManager cacheManager;
+ private final int blockSize;
+ @Nullable private final BlockCompressionFactory compressionFactory;
+
+ public SortLookupStoreFactory(
+ Comparator<MemorySlice> comparator,
+ CacheManager cacheManager,
+ int blockSize,
+ String compression) {
+ this.comparator = comparator;
+ this.cacheManager = cacheManager;
+ this.blockSize = blockSize;
+ this.compressionFactory = BlockCompressionFactory.create(compression);
+ }
+
+ @Override
+ public SortLookupStoreReader createReader(File file, Context context)
throws IOException {
+ return new SortLookupStoreReader(comparator, file, (SortContext)
context, cacheManager);
+ }
+
+ @Override
+ public SortLookupStoreWriter createWriter(File file, @Nullable
BloomFilter.Builder bloomFilter)
+ throws IOException {
+ return new SortLookupStoreWriter(file, blockSize, bloomFilter,
compressionFactory);
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
new file mode 100644
index 000000000..9b8c0f79a
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java
@@ -0,0 +1,157 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.BlockDecompressor;
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.LookupStoreReader;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.memory.MemorySliceInput;
+
+import javax.annotation.Nullable;
+
+import java.io.File;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.FileChannel;
+import java.util.Comparator;
+
+import static org.apache.paimon.utils.Preconditions.checkArgument;
+
+/**
+ * A {@link LookupStoreReader} for sort store.
+ *
+ * <p>TODO add block cache support.
+ *
+ * <p>TODO separate index cache and block cache.
+ */
+public class SortLookupStoreReader implements LookupStoreReader {
+
+ private final Comparator<MemorySlice> comparator;
+ private final FileChannel fileChannel;
+ private final long fileSize;
+
+ private final BlockIterator indexBlockIterator;
+
+ public SortLookupStoreReader(
+ Comparator<MemorySlice> comparator,
+ File file,
+ SortContext context,
+ CacheManager cacheManager)
+ throws IOException {
+ this.comparator = comparator;
+ //noinspection resource
+ this.fileChannel = new FileInputStream(file).getChannel();
+ this.fileSize = context.fileSize();
+
+ Footer footer = readFooter();
+ this.indexBlockIterator =
readBlock(footer.getIndexBlockHandle()).iterator();
+ // TODO read bloom filter block
+ }
+
+ private Footer readFooter() throws IOException {
+ MemorySegment footerData = read(fileSize - Footer.ENCODED_LENGTH,
Footer.ENCODED_LENGTH);
+ return Footer.readFooter(MemorySlice.wrap(footerData).toInput());
+ }
+
+ @Nullable
+ @Override
+ public byte[] lookup(byte[] key) throws IOException {
+ MemorySlice keySlice = MemorySlice.wrap(key);
+ // seek the index to the block containing the key
+ indexBlockIterator.seekTo(keySlice);
+
+ // if indexIterator does not have a next, it means the key does not
exist in this iterator
+ if (indexBlockIterator.hasNext()) {
+ // seek the current iterator to the key
+ BlockIterator current = getNextBlock();
+ if (current.seekTo(keySlice)) {
+ return current.next().getValue().copyBytes();
+ }
+ }
+ return null;
+ }
+
+ private BlockIterator getNextBlock() throws IOException {
+ MemorySlice blockHandle = indexBlockIterator.next().getValue();
+ BlockReader dataBlock = openBlock(blockHandle);
+ return dataBlock.iterator();
+ }
+
+ private BlockReader openBlock(MemorySlice blockEntry) throws IOException {
+ BlockHandle blockHandle =
BlockHandle.readBlockHandle(blockEntry.toInput());
+ return readBlock(blockHandle);
+ }
+
+ private MemorySegment read(long offset, int length) throws IOException {
+ // TODO use cache
+ // TODO cache uncompressed block
+ // TODO separate index and data cache
+ byte[] buffer = new byte[length];
+ int read = fileChannel.read(ByteBuffer.wrap(buffer), offset);
+ if (read != length) {
+ throw new IOException("Could not read all the data");
+ }
+ return MemorySegment.wrap(buffer);
+ }
+
+ private BlockReader readBlock(BlockHandle blockHandle) throws IOException {
+ // read block trailer
+ MemorySegment trailerData =
+ read(blockHandle.offset() + blockHandle.size(),
BlockTrailer.ENCODED_LENGTH);
+ BlockTrailer blockTrailer =
+
BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput());
+
+ // TODO validate checksum
+
+ // decompress data
+
+ MemorySegment block = read(blockHandle.offset(), blockHandle.size());
+ MemorySlice uncompressedData;
+ BlockCompressionFactory compressionFactory =
+
BlockCompressionFactory.create(blockTrailer.getCompressionType());
+ if (compressionFactory == null) {
+ uncompressedData = MemorySlice.wrap(block);
+ } else {
+ MemorySliceInput compressedInput =
MemorySlice.wrap(block).toInput();
+ byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
+ BlockDecompressor decompressor =
compressionFactory.getDecompressor();
+ int uncompressedLength =
+ decompressor.decompress(
+ block.getHeapMemory(),
+ compressedInput.position(),
+ compressedInput.available(),
+ uncompressed,
+ 0);
+ checkArgument(uncompressedLength == uncompressed.length);
+ uncompressedData = MemorySlice.wrap(uncompressed);
+ }
+
+ return new BlockReader(uncompressedData, comparator);
+ }
+
+ @Override
+ public void close() throws IOException {
+ this.fileChannel.close();
+ // TODO clear cache too
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
new file mode 100644
index 000000000..1d2436e94
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreWriter.java
@@ -0,0 +1,212 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.compression.BlockCompressionFactory;
+import org.apache.paimon.compression.BlockCompressionType;
+import org.apache.paimon.compression.BlockCompressor;
+import org.apache.paimon.lookup.LookupStoreFactory;
+import org.apache.paimon.lookup.LookupStoreWriter;
+import org.apache.paimon.memory.MemorySegment;
+import org.apache.paimon.memory.MemorySlice;
+import org.apache.paimon.options.MemorySize;
+import org.apache.paimon.utils.BloomFilter;
+import org.apache.paimon.utils.MurmurHashUtils;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.annotation.Nullable;
+
+import java.io.BufferedOutputStream;
+import java.io.File;
+import java.io.IOException;
+import java.nio.file.Files;
+import java.util.zip.CRC32;
+
+import static org.apache.paimon.lookup.sort.BlockHandle.writeBlockHandle;
+import static org.apache.paimon.memory.MemorySegmentUtils.allocateReuseBytes;
+import static org.apache.paimon.utils.VarLengthIntUtils.encodeInt;
+
+/** A {@link LookupStoreWriter} for sorting. */
+public class SortLookupStoreWriter implements LookupStoreWriter {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SortLookupStoreWriter.class.getName());
+
+ public static final int MAGIC_NUMBER = 1481571681;
+
+ private final BufferedOutputStream fileOutputStream;
+ private final int blockSize;
+ private final BlockWriter dataBlockWriter;
+ private final BlockWriter indexBlockWriter;
+ @Nullable private final BloomFilter.Builder bloomFilter;
+ private final BlockCompressionType compressionType;
+ @Nullable private final BlockCompressor blockCompressor;
+
+ private byte[] lastKey;
+ private long position;
+
+ private long recordCount;
+ private long totalUncompressedSize;
+ private long totalCompressedSize;
+
+ SortLookupStoreWriter(
+ File file,
+ int blockSize,
+ @Nullable BloomFilter.Builder bloomFilter,
+ @Nullable BlockCompressionFactory compressionFactory)
+ throws IOException {
+ this.fileOutputStream = new
BufferedOutputStream(Files.newOutputStream(file.toPath()));
+ this.blockSize = blockSize;
+ this.dataBlockWriter = new BlockWriter((int) (blockSize * 1.1));
+ int expectedNumberOfBlocks = 1024;
+ this.indexBlockWriter =
+ new BlockWriter(BlockHandle.MAX_ENCODED_LENGTH *
expectedNumberOfBlocks);
+ this.bloomFilter = bloomFilter;
+ if (compressionFactory == null) {
+ this.compressionType = BlockCompressionType.NONE;
+ this.blockCompressor = null;
+ } else {
+ this.compressionType = compressionFactory.getCompressionType();
+ this.blockCompressor = compressionFactory.getCompressor();
+ }
+ }
+
+ @Override
+ public void put(byte[] key, byte[] value) throws IOException {
+ dataBlockWriter.add(key, value);
+ if (bloomFilter != null) {
+ bloomFilter.addHash(MurmurHashUtils.hashBytes(key));
+ }
+
+ lastKey = key;
+
+ if (dataBlockWriter.memory() > blockSize) {
+ flush();
+ }
+
+ recordCount++;
+ }
+
+ private void flush() throws IOException {
+ if (dataBlockWriter.size() == 0) {
+ return;
+ }
+
+ BlockHandle blockHandle = writeBlock(dataBlockWriter);
+ MemorySlice handleEncoding = writeBlockHandle(blockHandle);
+ indexBlockWriter.add(lastKey, handleEncoding.copyBytes());
+ }
+
+ private BlockHandle writeBlock(BlockWriter blockWriter) throws IOException
{
+ // close the block
+ MemorySlice block = blockWriter.finish();
+
+ totalUncompressedSize += block.length();
+
+ // attempt to compress the block
+ BlockCompressionType blockCompressionType = BlockCompressionType.NONE;
+ if (blockCompressor != null) {
+ int maxCompressedSize =
blockCompressor.getMaxCompressedSize(block.length());
+ byte[] compressed = allocateReuseBytes(maxCompressedSize + 5);
+ int offset = encodeInt(compressed, 0, block.length());
+ int compressedSize =
+ offset
+ + blockCompressor.compress(
+ block.getHeapMemory(),
+ block.offset(),
+ block.length(),
+ compressed,
+ offset);
+
+ // Don't use the compressed data if compressed less than 12.5%,
+ if (compressedSize < block.length() - (block.length() / 8)) {
+ block = new MemorySlice(MemorySegment.wrap(compressed), 0,
compressedSize);
+ blockCompressionType = this.compressionType;
+ }
+ }
+
+ totalCompressedSize += block.length();
+
+ // create block trailer
+ BlockTrailer blockTrailer =
+ new BlockTrailer(blockCompressionType, crc32c(block,
blockCompressionType));
+ MemorySlice trailer = BlockTrailer.writeBlockTrailer(blockTrailer);
+
+ // create a handle to this block
+ BlockHandle blockHandle = new BlockHandle(position, block.length());
+
+ // write data
+ writeSlice(block);
+
+ // write trailer: 5 bytes
+ writeSlice(trailer);
+
+ // clean up state
+ blockWriter.reset();
+
+ return blockHandle;
+ }
+
+ private static int crc32c(MemorySlice data, BlockCompressionType type) {
+ CRC32 crc = new CRC32();
+ crc.update(data.getHeapMemory(), data.offset(), data.length());
+ crc.update(type.persistentId() & 0xFF);
+ return (int) crc.getValue();
+ }
+
+ @Override
+ public LookupStoreFactory.Context close() throws IOException {
+ // flush current data block
+ flush();
+
+ LOG.info("Number of record: {}", recordCount);
+
+ // write bloom filter
+ @Nullable BloomFilterHandle bloomFilterHandle = null;
+ if (bloomFilter != null) {
+ MemorySegment buffer = bloomFilter.getBuffer();
+ bloomFilterHandle =
+ new BloomFilterHandle(position, buffer.size(),
bloomFilter.expectedEntries());
+ writeSlice(MemorySlice.wrap(buffer));
+ LOG.info("Bloom filter size: {} bytes",
bloomFilter.getBuffer().size());
+ }
+
+ // write index block
+ BlockHandle indexBlockHandle = writeBlock(indexBlockWriter);
+
+ // write footer
+ Footer footer = new Footer(bloomFilterHandle, indexBlockHandle);
+ MemorySlice footerEncoding = Footer.writeFooter(footer);
+ writeSlice(footerEncoding);
+
+ // close file
+ fileOutputStream.close();
+
+ LOG.info("totalUncompressedSize: {}",
MemorySize.ofBytes(totalUncompressedSize));
+ LOG.info("totalCompressedSize: {}",
MemorySize.ofBytes(totalCompressedSize));
+ return new SortContext(position);
+ }
+
+ private void writeSlice(MemorySlice slice) throws IOException {
+ fileOutputStream.write(slice.getHeapMemory(), slice.offset(),
slice.length());
+ position += slice.length();
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
new file mode 100644
index 000000000..589e967d4
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySlice.java
@@ -0,0 +1,130 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+import org.apache.paimon.utils.MurmurHashUtils;
+
+/** Slice of a {@link MemorySegment}. */
+public final class MemorySlice implements Comparable<MemorySlice> {
+
+ private final MemorySegment segment;
+ private final int offset;
+ private final int length;
+
+ public MemorySlice(MemorySegment segment, int offset, int length) {
+ this.segment = segment;
+ this.offset = offset;
+ this.length = length;
+ }
+
+ public MemorySegment segment() {
+ return segment;
+ }
+
+ public int offset() {
+ return offset;
+ }
+
+ public int length() {
+ return length;
+ }
+
+ public MemorySlice slice(int index, int length) {
+ if (index == 0 && length == this.length) {
+ return this;
+ }
+
+ return new MemorySlice(segment, offset + index, length);
+ }
+
+ public byte readByte(int position) {
+ return segment.get(offset + position);
+ }
+
+ public int readInt(int position) {
+ return segment.getInt(offset + position);
+ }
+
+ public long readLong(int position) {
+ return segment.getLong(offset + position);
+ }
+
+ public byte[] getHeapMemory() {
+ return segment.getHeapMemory();
+ }
+
+ public byte[] copyBytes() {
+ byte[] bytes = new byte[length];
+ segment.get(offset, bytes, 0, length);
+ return bytes;
+ }
+
+ public MemorySliceInput toInput() {
+ return new MemorySliceInput(this);
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ MemorySlice slice = (MemorySlice) o;
+
+ // do lengths match
+ if (length != slice.length) {
+ return false;
+ }
+
+ // if arrays have same base offset, some optimizations can be taken...
+ if (offset == slice.offset && segment == slice.segment) {
+ return true;
+ }
+ return segment.equalTo(slice.segment, offset, slice.offset, length);
+ }
+
+ @Override
+ public int hashCode() {
+ return MurmurHashUtils.hashBytes(segment, offset, length);
+ }
+
+ public static MemorySlice wrap(byte[] bytes) {
+ return new MemorySlice(MemorySegment.wrap(bytes), 0, bytes.length);
+ }
+
+ public static MemorySlice wrap(MemorySegment segment) {
+ return new MemorySlice(segment, 0, segment.size());
+ }
+
+ @Override
+ public int compareTo(MemorySlice other) {
+ int len = Math.min(length, other.length);
+ for (int i = 0; i < len; i++) {
+ int res =
+ (segment.get(offset + i) & 0xFF) -
(other.segment.get(other.offset + i) & 0xFF);
+ if (res != 0) {
+ return res;
+ }
+ }
+ return length - other.length;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java
new file mode 100644
index 000000000..4b9cd8c04
--- /dev/null
+++ b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceInput.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+/** Input for {@link MemorySlice}. */
+public class MemorySliceInput {
+
+ private final MemorySlice slice;
+
+ private int position;
+
+ public MemorySliceInput(MemorySlice slice) {
+ this.slice = slice;
+ }
+
+ public int position() {
+ return position;
+ }
+
+ public void setPosition(int position) {
+ if (position < 0 || position > slice.length()) {
+ throw new IndexOutOfBoundsException();
+ }
+ this.position = position;
+ }
+
+ public boolean isReadable() {
+ return available() > 0;
+ }
+
+ public int available() {
+ return slice.length() - position;
+ }
+
+ public byte readByte() {
+ if (position == slice.length()) {
+ throw new IndexOutOfBoundsException();
+ }
+ return slice.readByte(position++);
+ }
+
+ public int readUnsignedByte() {
+ return (short) (readByte() & 0xFF);
+ }
+
+ public int readInt() {
+ int v = slice.readInt(position);
+ position += 4;
+ return v;
+ }
+
+ public int readVarLenInt() {
+ for (int offset = 0, result = 0; offset < 32; offset += 7) {
+ int b = readUnsignedByte();
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed integer.");
+ }
+
+ public long readLong() {
+ long v = slice.readLong(position);
+ position += 8;
+ return v;
+ }
+
+ public long readVarLenLong() {
+ long result = 0;
+ for (int offset = 0; offset < 64; offset += 7) {
+ long b = readUnsignedByte();
+ result |= (b & 0x7F) << offset;
+ if ((b & 0x80) == 0) {
+ return result;
+ }
+ }
+ throw new Error("Malformed long.");
+ }
+
+ public MemorySlice readSlice(int length) {
+ MemorySlice newSlice = slice.slice(position, length);
+ position += length;
+ return newSlice;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
new file mode 100644
index 000000000..90e8e5490
--- /dev/null
+++
b/paimon-common/src/main/java/org/apache/paimon/memory/MemorySliceOutput.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.memory;
+
+/** Output for {@link MemorySegment}. */
+public class MemorySliceOutput {
+
+ private MemorySegment segment;
+ private int size;
+
+ public MemorySliceOutput(int estimatedSize) {
+ this.segment = MemorySegment.wrap(new byte[estimatedSize]);
+ }
+
+ public int size() {
+ return size;
+ }
+
+ public MemorySlice toSlice() {
+ return new MemorySlice(segment, 0, size);
+ }
+
+ public void reset() {
+ size = 0;
+ }
+
+ public void writeByte(int value) {
+ ensureSize(size + 1);
+ segment.put(size++, (byte) value);
+ }
+
+ public void writeInt(int value) {
+ ensureSize(size + 4);
+ segment.putInt(size, value);
+ size += 4;
+ }
+
+ public void writeVarLenInt(int value) {
+ if (value < 0) {
+ throw new IllegalArgumentException("negative value: v=" + value);
+ }
+
+ while ((value & ~0x7F) != 0) {
+ writeByte(((value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+
+ writeByte((byte) value);
+ }
+
+ public void writeLong(long value) {
+ ensureSize(size + 8);
+ segment.putLong(size, value);
+ size += 8;
+ }
+
+ public void writeVarLenLong(long value) {
+ if (value < 0) {
+ throw new IllegalArgumentException("negative value: v=" + value);
+ }
+
+ while ((value & ~0x7FL) != 0) {
+ writeByte((((int) value & 0x7F) | 0x80));
+ value >>>= 7;
+ }
+ writeByte((byte) value);
+ }
+
+ public void writeBytes(byte[] source) {
+ writeBytes(source, 0, source.length);
+ }
+
+ public void writeBytes(byte[] source, int sourceIndex, int length) {
+ ensureSize(size + length);
+ segment.put(size, source, sourceIndex, length);
+ size += length;
+ }
+
+ private void ensureSize(int minWritableBytes) {
+ if (minWritableBytes <= segment.size()) {
+ return;
+ }
+
+ int newCapacity = segment.size();
+ int minNewCapacity = segment.size() + minWritableBytes;
+ while (newCapacity < minNewCapacity) {
+ newCapacity <<= 1;
+ }
+
+ MemorySegment newSegment = MemorySegment.wrap(new byte[newCapacity]);
+ segment.copyTo(0, newSegment, 0, segment.size());
+ segment = newSegment;
+ }
+}
diff --git
a/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
b/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
index 575fae02a..ec6e1a362 100644
--- a/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
+++ b/paimon-common/src/main/java/org/apache/paimon/utils/IntArrayList.java
@@ -73,6 +73,10 @@ public class IntArrayList {
return Arrays.copyOf(array, size);
}
+ public int get(int i) {
+ return array[i];
+ }
+
public static final IntArrayList EMPTY =
new IntArrayList(0) {
diff --git
a/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
new file mode 100644
index 000000000..804b8ed09
--- /dev/null
+++
b/paimon-common/src/test/java/org/apache/paimon/lookup/sort/SortLookupStoreFactoryTest.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.paimon.lookup.sort;
+
+import org.apache.paimon.io.cache.CacheManager;
+import org.apache.paimon.lookup.LookupStoreFactory.Context;
+import org.apache.paimon.options.MemorySize;
+
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+
+import java.io.File;
+import java.io.IOException;
+import java.nio.charset.StandardCharsets;
+import java.nio.file.Path;
+import java.util.Comparator;
+import java.util.UUID;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/** Test for {@link SortLookupStoreFactory}. */
+public class SortLookupStoreFactoryTest {
+
+ @TempDir Path tempDir;
+
+ private File file;
+
+ @BeforeEach
+ public void before() throws Exception {
+ file = new File(tempDir.toFile(), UUID.randomUUID().toString());
+ if (!file.createNewFile()) {
+ throw new IOException("Can not create file: " + file);
+ }
+ }
+
+ @Test
+ public void testNormal() throws IOException {
+ SortLookupStoreFactory factory =
+ new SortLookupStoreFactory(
+ Comparator.naturalOrder(),
+ new CacheManager(MemorySize.ofMebiBytes(1)),
+ 1024,
+ "zstd");
+
+ SortLookupStoreWriter writer = factory.createWriter(file, null);
+ int valueCount = 10_000;
+ for (int i = 0; i < valueCount; i++) {
+ byte[] bytes = toBytes(i);
+ writer.put(bytes, bytes);
+ }
+ Context context = writer.close();
+
+ SortLookupStoreReader reader = factory.createReader(file, context);
+ for (int i = 0; i < valueCount; i++) {
+ byte[] bytes = toBytes(i);
+ assertThat(fromBytes(reader.lookup(bytes))).isEqualTo(i);
+ }
+ reader.close();
+ }
+
+ private byte[] toBytes(int i) {
+ return String.valueOf(10_000 + i).getBytes(StandardCharsets.UTF_8);
+ }
+
+ private int fromBytes(byte[] bytes) {
+ return Integer.parseInt(new String(bytes, StandardCharsets.UTF_8)) -
10_000;
+ }
+}
diff --git
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
index 133cfe540..df3242141 100644
---
a/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
+++
b/paimon-core/src/main/java/org/apache/paimon/operation/KeyValueFileStoreWrite.java
@@ -29,6 +29,7 @@ import org.apache.paimon.compact.CompactManager;
import org.apache.paimon.compact.NoopCompactManager;
import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.deletionvectors.DeletionVectorsMaintainer;
import org.apache.paimon.format.FileFormatDiscover;
@@ -39,8 +40,8 @@ import org.apache.paimon.io.FileReaderFactory;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.KeyValueFileWriterFactory;
import org.apache.paimon.io.RecordLevelExpire;
+import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.lookup.LookupStrategy;
-import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.mergetree.LookupLevels.ContainsValueProcessor;
@@ -349,6 +350,11 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
throw new RuntimeException(
"Can not use lookup, there is no temp disk directory to
use.");
}
+ LookupStoreFactory lookupStoreFactory =
+ LookupStoreFactory.create(
+ options,
+ cacheManager,
+ new
RowCompactedSerializer(keyType).createSliceComparator());
Options options = this.options.toConfiguration();
return new LookupLevels<>(
levels,
@@ -357,11 +363,7 @@ public class KeyValueFileStoreWrite extends
MemoryFileStoreWrite<KeyValue> {
valueProcessor,
readerFactory::createRecordReader,
() -> ioManager.createChannel().getPathFile(),
- new HashLookupStoreFactory(
- cacheManager,
- this.options.cachePageSize(),
- options.get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
-
options.get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION)),
+ lookupStoreFactory,
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
bfGenerator(options));
diff --git
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
index 8489ac509..9181fdb16 100644
---
a/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
+++
b/paimon-core/src/main/java/org/apache/paimon/table/query/LocalTableQuery.java
@@ -26,16 +26,18 @@ import org.apache.paimon.data.BinaryRow;
import org.apache.paimon.data.InternalRow;
import org.apache.paimon.data.serializer.InternalRowSerializer;
import org.apache.paimon.data.serializer.InternalSerializers;
+import org.apache.paimon.data.serializer.RowCompactedSerializer;
import org.apache.paimon.deletionvectors.DeletionVector;
import org.apache.paimon.disk.IOManager;
import org.apache.paimon.io.DataFileMeta;
import org.apache.paimon.io.KeyValueFileReaderFactory;
import org.apache.paimon.io.cache.CacheManager;
-import org.apache.paimon.lookup.hash.HashLookupStoreFactory;
+import org.apache.paimon.lookup.LookupStoreFactory;
import org.apache.paimon.mergetree.Levels;
import org.apache.paimon.mergetree.LookupLevels;
import org.apache.paimon.options.Options;
import org.apache.paimon.table.FileStoreTable;
+import org.apache.paimon.types.RowType;
import org.apache.paimon.utils.KeyComparatorSupplier;
import org.apache.paimon.utils.Preconditions;
@@ -62,7 +64,7 @@ public class LocalTableQuery implements TableQuery {
private final KeyValueFileReaderFactory.Builder readerFactoryBuilder;
- private final HashLookupStoreFactory hashLookupStoreFactory;
+ private final LookupStoreFactory lookupStoreFactory;
private final int startLevel;
@@ -79,13 +81,13 @@ public class LocalTableQuery implements TableQuery {
KeyValueFileStore store = (KeyValueFileStore) tableStore;
this.readerFactoryBuilder = store.newReaderFactoryBuilder();
+ RowType keyType = readerFactoryBuilder.keyType();
this.keyComparatorSupplier = new
KeyComparatorSupplier(readerFactoryBuilder.keyType());
- this.hashLookupStoreFactory =
- new HashLookupStoreFactory(
+ this.lookupStoreFactory =
+ LookupStoreFactory.create(
+ options,
new CacheManager(options.lookupCacheMaxMemory()),
- options.cachePageSize(),
-
options.toConfiguration().get(CoreOptions.LOOKUP_HASH_LOAD_FACTOR),
-
options.toConfiguration().get(CoreOptions.LOOKUP_CACHE_SPILL_COMPRESSION));
+ new
RowCompactedSerializer(keyType).createSliceComparator());
if (options.needLookup()) {
startLevel = 1;
@@ -145,7 +147,7 @@ public class LocalTableQuery implements TableQuery {
Preconditions.checkNotNull(ioManager,
"IOManager is required.")
.createChannel()
.getPathFile(),
- hashLookupStoreFactory,
+ lookupStoreFactory,
options.get(CoreOptions.LOOKUP_CACHE_FILE_RETENTION),
options.get(CoreOptions.LOOKUP_CACHE_MAX_DISK_SIZE),
bfGenerator(options));
diff --git
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
index b74f242e1..09bd6eea1 100644
---
a/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
+++
b/paimon-core/src/test/java/org/apache/paimon/table/PrimaryKeyFileStoreTableTest.java
@@ -20,6 +20,7 @@ package org.apache.paimon.table;
import org.apache.paimon.CoreOptions;
import org.apache.paimon.CoreOptions.ChangelogProducer;
+import org.apache.paimon.CoreOptions.LookupLocalFileType;
import org.apache.paimon.Snapshot;
import org.apache.paimon.data.BinaryString;
import org.apache.paimon.data.GenericRow;
@@ -92,6 +93,7 @@ import static
org.apache.paimon.CoreOptions.CHANGELOG_PRODUCER;
import static org.apache.paimon.CoreOptions.ChangelogProducer.LOOKUP;
import static org.apache.paimon.CoreOptions.DELETION_VECTORS_ENABLED;
import static org.apache.paimon.CoreOptions.FILE_FORMAT;
+import static org.apache.paimon.CoreOptions.LOOKUP_LOCAL_FILE_TYPE;
import static org.apache.paimon.CoreOptions.MERGE_ENGINE;
import static org.apache.paimon.CoreOptions.MergeEngine.FIRST_ROW;
import static org.apache.paimon.CoreOptions.SNAPSHOT_EXPIRE_LIMIT;
@@ -1565,6 +1567,17 @@ public class PrimaryKeyFileStoreTableTest extends
FileStoreTableTestBase {
innerTestTableQuery(table);
}
+ @Test
+ public void testTableQueryForLookupLocalSortFile() throws Exception {
+ FileStoreTable table =
+ createFileStoreTable(
+ options -> {
+ options.set(CHANGELOG_PRODUCER, LOOKUP);
+ options.set(LOOKUP_LOCAL_FILE_TYPE,
LookupLocalFileType.SORT);
+ });
+ innerTestTableQuery(table);
+ }
+
@Test
public void testTableQueryForNormal() throws Exception {
FileStoreTable table = createFileStoreTable();