wwj6591812 commented on code in PR #4419:
URL: https://github.com/apache/paimon/pull/4419#discussion_r1824382377
##########
paimon-common/src/main/java/org/apache/paimon/lookup/sort/SortLookupStoreReader.java:
##########
@@ -127,64 +121,61 @@ private BlockReader openBlock(MemorySlice blockEntry)
throws IOException {
return readBlock(blockHandle);
}
- private MemorySegment read(long offset, int length) throws IOException {
- // TODO use cache
- // TODO cache uncompressed block
- // TODO separate index and data cache
- byte[] buffer = new byte[length];
- int read = fileChannel.read(ByteBuffer.wrap(buffer), offset);
- if (read != length) {
- throw new IOException("Could not read all the data");
- }
- return MemorySegment.wrap(buffer);
- }
-
- private BlockReader readBlock(BlockHandle blockHandle) throws IOException {
+ private BlockReader readBlock(BlockHandle blockHandle) {
// read block trailer
MemorySegment trailerData =
- read(blockHandle.offset() + blockHandle.size(),
BlockTrailer.ENCODED_LENGTH);
+ blockCache.getBlock(
+ blockHandle.offset() + blockHandle.size(),
+ BlockTrailer.ENCODED_LENGTH,
+ b -> b);
BlockTrailer blockTrailer =
BlockTrailer.readBlockTrailer(MemorySlice.wrap(trailerData).toInput());
- MemorySegment block = read(blockHandle.offset(), blockHandle.size());
- int crc32cCode = crc32c(block, blockTrailer.getCompressionType());
- checkArgument(
- blockTrailer.getCrc32c() == crc32cCode,
- String.format(
- "Expected CRC32C(%d) but found CRC32C(%d) for
file(%s)",
- blockTrailer.getCrc32c(), crc32cCode, filePath));
-
- // decompress data
- MemorySlice uncompressedData;
- BlockCompressionFactory compressionFactory =
-
BlockCompressionFactory.create(blockTrailer.getCompressionType());
- if (compressionFactory == null) {
- uncompressedData = MemorySlice.wrap(block);
- } else {
- MemorySliceInput compressedInput =
MemorySlice.wrap(block).toInput();
- byte[] uncompressed = new byte[compressedInput.readVarLenInt()];
- BlockDecompressor decompressor =
compressionFactory.getDecompressor();
- int uncompressedLength =
- decompressor.decompress(
- block.getHeapMemory(),
- compressedInput.position(),
- compressedInput.available(),
- uncompressed,
- 0);
- checkArgument(uncompressedLength == uncompressed.length);
- uncompressedData = MemorySlice.wrap(uncompressed);
- }
-
- return new BlockReader(uncompressedData, comparator);
+ MemorySegment unCompressedBlock =
+ blockCache.getBlock(
+ blockHandle.offset(),
+ blockHandle.size(),
+ bytes -> {
+ MemorySegment block = MemorySegment.wrap(bytes);
+ int crc32cCode = crc32c(block,
blockTrailer.getCompressionType());
+ checkArgument(
+ blockTrailer.getCrc32c() == crc32cCode,
+ String.format(
+ "Expected CRC32C(%d) but found
CRC32C(%d) for file(%s)",
+ blockTrailer.getCrc32c(),
crc32cCode, filePath));
+
+ // decompress data
+ BlockCompressionFactory compressionFactory =
+ BlockCompressionFactory.create(
+ blockTrailer.getCompressionType());
+ if (compressionFactory == null) {
+ return bytes;
+ } else {
+ MemorySliceInput compressedInput =
+ MemorySlice.wrap(block).toInput();
+ byte[] uncompressed = new
byte[compressedInput.readVarLenInt()];
+ BlockDecompressor decompressor =
+ compressionFactory.getDecompressor();
+ int uncompressedLength =
+ decompressor.decompress(
+ block.getHeapMemory(),
+ compressedInput.position(),
+ compressedInput.available(),
+ uncompressed,
+ 0);
+ checkArgument(uncompressedLength ==
uncompressed.length);
Review Comment:
may checkState better?
##########
paimon-common/src/main/java/org/apache/paimon/io/CompressedPageFileInput.java:
##########
@@ -67,7 +67,7 @@ public RandomAccessFile file() {
@Override
public long uncompressBytes() {
Review Comment:
method name uncompressedBytes
##########
paimon-common/src/main/java/org/apache/paimon/io/cache/CacheManager.java:
##########
@@ -76,4 +76,26 @@ public void invalidPage(CacheKey key) {
public int fileReadCount() {
return fileReadCount;
}
+
+ /** The container for the segment. */
+ public static class SegmentContainer {
+
+ private final MemorySegment segment;
+
+ private int accessCount;
Review Comment:
need AtomicInteger?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]