the-other-tim-brown commented on code in PR #13724:
URL: https://github.com/apache/hudi/pull/13724#discussion_r2283696411
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java:
##########
@@ -20,379 +20,67 @@
package org.apache.hudi.io.hfile;
import org.apache.hudi.common.util.Option;
-import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.io.SeekableDataInputStream;
-import org.apache.logging.log4j.util.Strings;
-
-import java.io.ByteArrayInputStream;
-import java.io.DataInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Queue;
-import java.util.TreeMap;
-
-import static org.apache.hudi.io.hfile.HFileBlock.HFILEBLOCK_HEADER_SIZE;
-import static org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
/**
* An implementation a {@link HFileReader}.
*/
-public class HFileReaderImpl implements HFileReader {
- private final SeekableDataInputStream stream;
- private final long fileSize;
-
- private final HFileCursor cursor;
- private boolean isMetadataInitialized = false;
- private HFileTrailer trailer;
- private HFileContext context;
- private TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
- private TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
- private HFileInfo fileInfo;
- private Option<BlockIndexEntry> currentDataBlockEntry;
- private Option<HFileDataBlock> currentDataBlock;
+public class HFileReaderImpl extends BaseHFileReaderImpl {
public HFileReaderImpl(SeekableDataInputStream stream, long fileSize) {
- this.stream = stream;
- this.fileSize = fileSize;
- this.cursor = new HFileCursor();
- this.currentDataBlockEntry = Option.empty();
- this.currentDataBlock = Option.empty();
+ super(stream, fileSize);
}
@Override
public synchronized void initializeMetadata() throws IOException {
- if (this.isMetadataInitialized) {
- return;
- }
-
- // Read Trailer (serialized in Proto)
- this.trailer = readTrailer(stream, fileSize);
- this.context = HFileContext.builder()
- .compressionCodec(trailer.getCompressionCodec())
- .build();
- HFileBlockReader blockReader = new HFileBlockReader(
- context, stream, trailer.getLoadOnOpenDataOffset(),
- fileSize - HFileTrailer.getTrailerSize());
- this.dataBlockIndexEntryMap = readDataBlockIndex(
- blockReader, trailer.getDataIndexCount(),
trailer.getNumDataIndexLevels());
- HFileRootIndexBlock metaIndexBlock =
- (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
- this.metaBlockIndexEntryMap =
metaIndexBlock.readBlockIndex(trailer.getMetaIndexCount(), true);
- HFileFileInfoBlock fileInfoBlock =
- (HFileFileInfoBlock) blockReader.nextBlock(HFileBlockType.FILE_INFO);
- this.fileInfo = fileInfoBlock.readFileInfo();
- this.isMetadataInitialized = true;
+ super.initializeMetadata();
Review Comment:
For these methods, do we need to override if we're just calling `super`?
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/CachingHFileReaderImpl.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.io.SeekableDataInputStream;
+
+import java.io.IOException;
+
+/**
+ * HFile reader implementation with integrated caching functionality. This
extends BaseHFileReaderImpl and overrides the block instantiation method to add
caching capabilities.
+ * <p>
+ * Uses a shared static cache across all instances to maximize cache hits when
multiple readers access the same file.
+ */
+public class CachingHFileReaderImpl extends BaseHFileReaderImpl {
+
+ private static volatile HFileBlockCache GLOBAL_BLOCK_CACHE;
+ private static final Object CACHE_LOCK = new Object();
+
+ private final String filePath;
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath) {
+ this(stream, fileSize, filePath,
HFileReaderConfig.DEFAULT_BLOCK_CACHE_SIZE);
+ }
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath, int cacheSize) {
+ this(stream, fileSize, filePath, new HFileReaderConfig(cacheSize));
+ }
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath, HFileReaderConfig config) {
+ super(stream, fileSize);
+ this.filePath = filePath;
+ // Initialize global cache with provided config (ignored if already
initialized)
+ getGlobalCache(config);
+ }
+
+ /**
+ * Gets or creates the global cache shared by all CachingHFileReaderImpl
instances.
+ * Thread-safe singleton pattern with double-checked locking.
+ */
+ private static HFileBlockCache getGlobalCache(HFileReaderConfig config) {
+ HFileBlockCache result = GLOBAL_BLOCK_CACHE;
+ if (result == null) {
+ synchronized (CACHE_LOCK) {
+ result = GLOBAL_BLOCK_CACHE;
+ if (result == null) {
+ GLOBAL_BLOCK_CACHE = result = new HFileBlockCache(
Review Comment:
Is it possible that a different config will be used across the lifecycle of
this cache? If so, would it be good to log some warning that the cache is
already initialized with different configs?
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/CachingHFileReaderImpl.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.io.SeekableDataInputStream;
+
+import java.io.IOException;
+
+/**
+ * HFile reader implementation with integrated caching functionality. This
extends BaseHFileReaderImpl and overrides the block instantiation method to add
caching capabilities.
+ * <p>
+ * Uses a shared static cache across all instances to maximize cache hits when
multiple readers access the same file.
+ */
+public class CachingHFileReaderImpl extends BaseHFileReaderImpl {
+
+ private static volatile HFileBlockCache GLOBAL_BLOCK_CACHE;
+ private static final Object CACHE_LOCK = new Object();
+
+ private final String filePath;
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath) {
+ this(stream, fileSize, filePath,
HFileReaderConfig.DEFAULT_BLOCK_CACHE_SIZE);
+ }
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath, int cacheSize) {
+ this(stream, fileSize, filePath, new HFileReaderConfig(cacheSize));
+ }
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath, HFileReaderConfig config) {
+ super(stream, fileSize);
+ this.filePath = filePath;
+ // Initialize global cache with provided config (ignored if already
initialized)
+ getGlobalCache(config);
+ }
+
+ /**
+ * Gets or creates the global cache shared by all CachingHFileReaderImpl
instances.
+ * Thread-safe singleton pattern with double-checked locking.
+ */
+ private static HFileBlockCache getGlobalCache(HFileReaderConfig config) {
+ HFileBlockCache result = GLOBAL_BLOCK_CACHE;
+ if (result == null) {
+ synchronized (CACHE_LOCK) {
+ result = GLOBAL_BLOCK_CACHE;
+ if (result == null) {
+ GLOBAL_BLOCK_CACHE = result = new HFileBlockCache(
+ config.getBlockCacheSize(),
+ config.getCacheTtlMinutes(),
+ java.util.concurrent.TimeUnit.MINUTES);
Review Comment:
Nitpick: can we import TimeUnit so we don't need the fully qualified name?
##########
hudi-io/src/test/java/org/apache/hudi/io/hfile/TestHFileBlockCache.java:
##########
@@ -0,0 +1,143 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.junit.jupiter.api.Test;
+
+import java.nio.ByteBuffer;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertNotEquals;
+import static org.junit.jupiter.api.Assertions.assertNull;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Tests for HFile block caching functionality.
+ */
+public class TestHFileBlockCache {
Review Comment:
Can you add a test case for `getOrCompute` and validate repeated calls
result in the same object returned?
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockCache.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Least Frequently Used (LFU) cache for HFile blocks to improve read
performance by avoiding repeated block reads.
+ * Uses Caffeine cache with configurable size and TTL. Thread-safe for
concurrent access.
+ */
+public class HFileBlockCache {
+
+ private final Cache<BlockCacheKey, HFileBlock> cache;
+
+ public HFileBlockCache(int maxCacheSize) {
+ this(maxCacheSize, 30, TimeUnit.MINUTES);
+ }
+
+ public HFileBlockCache(int maxCacheSize, long expireAfterWrite, TimeUnit
timeUnit) {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(maxCacheSize)
+
.expireAfterWrite(Duration.ofMillis(timeUnit.toMillis(expireAfterWrite)))
+ .build();
+ }
+
+ /**
+ * Gets a block from cache.
+ *
+ * @param key the cache key
+ * @return cached block or null if not found
+ */
+ public HFileBlock getBlock(BlockCacheKey key) {
+ return cache.getIfPresent(key);
+ }
+
+ /**
+ * Puts a block into cache.
+ *
+ * @param key the cache key
+ * @param block the block to cache
+ */
+ public void putBlock(BlockCacheKey key, HFileBlock block) {
+ cache.put(key, block);
+ }
+
+ /**
+ * Gets a block from cache, or computes and caches it if not present.
+ * This method is thread-safe and prevents the "cache stampede" problem
+ * where multiple threads try to load the same value simultaneously.
+ *
+ * @param key the cache key
+ * @param loader callable to load the block if not in cache
+ * @return cached or newly computed block
+ * @throws Exception if the loader throws an exception
+ */
+ public HFileBlock getOrCompute(BlockCacheKey key,
java.util.concurrent.Callable<HFileBlock> loader) throws Exception {
+ // Caffeine uses Function instead of Callable, so we need to wrap the
Callable
+ return cache.get(key, (k) -> {
+ try {
+ return loader.call();
+ } catch (Exception e) {
+ throw new RuntimeException(e);
+ }
+ });
+ }
+
+ /**
+ * Clears all cached blocks.
+ */
+ public void clear() {
+ cache.invalidateAll();
+ }
+
+ /**
+ * Gets current cache size.
+ *
+ * @return number of cached blocks
+ */
+ public long size() {
+ return cache.estimatedSize();
+ }
+
+ /**
+ * Forces cache maintenance operations like eviction.
+ * This is useful for testing to ensure consistent behavior.
+ */
+ public void cleanUp() {
+ cache.cleanUp();
+ }
+
+ /**
+ * Cache key for identifying blocks uniquely.
+ */
+ public static class BlockCacheKey {
+
+ private final String fileIdentity;
+ private final long offset;
+ private final int size;
+
+ public BlockCacheKey(long offset, int size) {
+ this(null, offset, size);
+ }
+
+ public BlockCacheKey(String fileIdentity, long offset, int size) {
+ this.fileIdentity = fileIdentity;
+ this.offset = offset;
+ this.size = size;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ BlockCacheKey that = (BlockCacheKey) o;
+ return offset == that.offset
+ && size == that.size
+ && java.util.Objects.equals(fileIdentity, that.fileIdentity);
Review Comment:
Similarly we can import `Objects`
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockCache.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Least Frequently Used (LFU) cache for HFile blocks to improve read
performance by avoiding repeated block reads.
+ * Uses Caffeine cache with configurable size and TTL. Thread-safe for
concurrent access.
+ */
+public class HFileBlockCache {
+
+ private final Cache<BlockCacheKey, HFileBlock> cache;
Review Comment:
Will we ever cache something other than an HFileDataBlock? I see in the user
of the cache we need to cast the returned value.
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileBlockCache.java:
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import com.github.benmanes.caffeine.cache.Cache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+
+import java.time.Duration;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * Least Frequently Used (LFU) cache for HFile blocks to improve read
performance by avoiding repeated block reads.
+ * Uses Caffeine cache with configurable size and TTL. Thread-safe for
concurrent access.
+ */
+public class HFileBlockCache {
+
+ private final Cache<BlockCacheKey, HFileBlock> cache;
+
+ public HFileBlockCache(int maxCacheSize) {
+ this(maxCacheSize, 30, TimeUnit.MINUTES);
+ }
+
+ public HFileBlockCache(int maxCacheSize, long expireAfterWrite, TimeUnit
timeUnit) {
+ this.cache = Caffeine.newBuilder()
+ .maximumSize(maxCacheSize)
+
.expireAfterWrite(Duration.ofMillis(timeUnit.toMillis(expireAfterWrite)))
+ .build();
+ }
+
+ /**
+ * Gets a block from cache.
+ *
+ * @param key the cache key
+ * @return cached block or null if not found
+ */
+ public HFileBlock getBlock(BlockCacheKey key) {
+ return cache.getIfPresent(key);
+ }
+
+ /**
+ * Puts a block into cache.
+ *
+ * @param key the cache key
+ * @param block the block to cache
+ */
+ public void putBlock(BlockCacheKey key, HFileBlock block) {
+ cache.put(key, block);
+ }
+
+ /**
+ * Gets a block from cache, or computes and caches it if not present.
+ * This method is thread-safe and prevents the "cache stampede" problem
+ * where multiple threads try to load the same value simultaneously.
+ *
+ * @param key the cache key
+ * @param loader callable to load the block if not in cache
+ * @return cached or newly computed block
+ * @throws Exception if the loader throws an exception
+ */
+ public HFileBlock getOrCompute(BlockCacheKey key,
java.util.concurrent.Callable<HFileBlock> loader) throws Exception {
Review Comment:
Nitpick: can we also import callable here?
##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/CachingHFileReaderImpl.java:
##########
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.io.SeekableDataInputStream;
+
+import java.io.IOException;
+
+/**
+ * HFile reader implementation with integrated caching functionality. This
extends BaseHFileReaderImpl and overrides the block instantiation method to add
caching capabilities.
+ * <p>
+ * Uses a shared static cache across all instances to maximize cache hits when
multiple readers access the same file.
+ */
+public class CachingHFileReaderImpl extends BaseHFileReaderImpl {
+
+ private static volatile HFileBlockCache GLOBAL_BLOCK_CACHE;
+ private static final Object CACHE_LOCK = new Object();
+
+ private final String filePath;
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath) {
+ this(stream, fileSize, filePath,
HFileReaderConfig.DEFAULT_BLOCK_CACHE_SIZE);
+ }
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath, int cacheSize) {
+ this(stream, fileSize, filePath, new HFileReaderConfig(cacheSize));
+ }
+
+ public CachingHFileReaderImpl(SeekableDataInputStream stream, long fileSize,
String filePath, HFileReaderConfig config) {
+ super(stream, fileSize);
+ this.filePath = filePath;
+ // Initialize global cache with provided config (ignored if already
initialized)
+ getGlobalCache(config);
+ }
+
+ /**
+ * Gets or creates the global cache shared by all CachingHFileReaderImpl
instances.
+ * Thread-safe singleton pattern with double-checked locking.
+ */
+ private static HFileBlockCache getGlobalCache(HFileReaderConfig config) {
+ HFileBlockCache result = GLOBAL_BLOCK_CACHE;
+ if (result == null) {
+ synchronized (CACHE_LOCK) {
+ result = GLOBAL_BLOCK_CACHE;
+ if (result == null) {
+ GLOBAL_BLOCK_CACHE = result = new HFileBlockCache(
+ config.getBlockCacheSize(),
+ config.getCacheTtlMinutes(),
+ java.util.concurrent.TimeUnit.MINUTES);
+ }
+ }
+ }
+ return result;
+ }
+
+ @Override
+ public HFileDataBlock instantiateHFileDataBlock(BlockIndexEntry blockToRead)
throws IOException {
+ HFileBlockCache.BlockCacheKey cacheKey = new HFileBlockCache.BlockCacheKey(
+ filePath, blockToRead.getOffset(), blockToRead.getSize());
+
+ try {
+ HFileBlock block = GLOBAL_BLOCK_CACHE.getOrCompute(cacheKey, () ->
super.instantiateHFileDataBlock(blockToRead));
+ return (HFileDataBlock) block;
+ } catch (Exception e) {
+ if (e instanceof IOException) {
+ throw (IOException) e;
+ } else if (e instanceof RuntimeException) {
+ throw (RuntimeException) e;
+ } else {
+ throw new IOException("Failed to load HFile block", e);
+ }
Review Comment:
```suggestion
} catch (IOException | RuntimeException e) {
throw e;
} catch (Exception e) {
throw new IOException("Failed to load HFile block", e);
}
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]