This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new d4784c42d8d [HUDI-7928] Remove shared HFile reader in
HoodieNativeAvroHFileReader (#11935)
d4784c42d8d is described below
commit d4784c42d8d3f80484bccad86b5aa4107a6d8136
Author: Y Ethan Guo <[email protected]>
AuthorDate: Thu Sep 12 20:54:32 2024 -0700
[HUDI-7928] Remove shared HFile reader in HoodieNativeAvroHFileReader
(#11935)
---
.../hudi/common/config/HoodieReaderConfig.java | 2 +-
.../io/storage/HoodieNativeAvroHFileReader.java | 84 ++++++++++++++--------
2 files changed, 56 insertions(+), 30 deletions(-)
diff --git
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
index a7e41098d66..bb29e090ec3 100644
---
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
+++
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieReaderConfig.java
@@ -31,7 +31,7 @@ import javax.annotation.concurrent.Immutable;
public class HoodieReaderConfig extends HoodieConfig {
public static final ConfigProperty<Boolean> USE_NATIVE_HFILE_READER =
ConfigProperty
.key("_hoodie.hfile.use.native.reader")
- .defaultValue(false)
+ .defaultValue(true)
.markAdvanced()
.sinceVersion("1.0.0")
.withDocumentation("When enabled, the native HFile reader is used to
read HFiles. This is an internal config.");
diff --git
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
index 23fc8dba327..37845c9e47e 100644
---
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
+++
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieNativeAvroHFileReader.java
@@ -25,6 +25,7 @@ import org.apache.hudi.common.model.HoodieAvroIndexedRecord;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordLocation;
import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ValidationUtils;
import org.apache.hudi.common.util.collection.ClosableIterator;
import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.common.util.collection.Pair;
@@ -49,9 +50,13 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.nio.ByteBuffer;
+import java.util.Arrays;
import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
@@ -66,29 +71,33 @@ import static
org.apache.hudi.io.hfile.HFileUtils.isPrefixOfKey;
*/
public class HoodieNativeAvroHFileReader extends HoodieAvroHFileReaderImplBase
{
private static final Logger LOG =
LoggerFactory.getLogger(HoodieNativeAvroHFileReader.class);
+ // Keys of the meta info that should be preloaded on demand from the HFile
+ private static final Set<String> PRELOADED_META_INFO_KEYS = new HashSet<>(
+ Arrays.asList(KEY_MIN_RECORD, KEY_MAX_RECORD, SCHEMA_KEY));
private final HoodieStorage storage;
private final Option<StoragePath> path;
private final Option<byte[]> bytesContent;
- private Option<HFileReader> sharedHFileReader;
+ // In-memory cache for meta info
+ private final Map<String, byte[]> metaInfoMap;
private final Lazy<Schema> schema;
+ private boolean isMetaInfoLoaded = false;
+ private long numKeyValueEntries = -1L;
public HoodieNativeAvroHFileReader(HoodieStorage storage, StoragePath path,
Option<Schema> schemaOption) {
this.storage = storage;
this.path = Option.of(path);
this.bytesContent = Option.empty();
- this.sharedHFileReader = Option.empty();
- this.schema = schemaOption.map(Lazy::eagerly)
- .orElseGet(() -> Lazy.lazily(() ->
fetchSchema(getSharedHFileReader())));
+ this.metaInfoMap = new HashMap<>();
+ this.schema = schemaOption.map(Lazy::eagerly).orElseGet(() ->
Lazy.lazily(this::fetchSchema));
}
public HoodieNativeAvroHFileReader(HoodieStorage storage, byte[] content,
Option<Schema> schemaOption) {
this.storage = storage;
this.path = Option.empty();
this.bytesContent = Option.of(content);
- this.sharedHFileReader = Option.empty();
- this.schema = schemaOption.map(Lazy::eagerly)
- .orElseGet(() -> Lazy.lazily(() ->
fetchSchema(getSharedHFileReader())));
+ this.metaInfoMap = new HashMap<>();
+ this.schema = schemaOption.map(Lazy::eagerly).orElseGet(() ->
Lazy.lazily(this::fetchSchema));
}
@Override
@@ -106,11 +115,10 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public String[] readMinMaxRecordKeys() {
- HFileReader reader = getSharedHFileReader();
try {
return new String[] {
- fromUTF8Bytes(reader.getMetaInfo(new
UTF8StringKey(KEY_MIN_RECORD)).get()),
- fromUTF8Bytes(reader.getMetaInfo(new
UTF8StringKey(KEY_MAX_RECORD)).get())};
+ fromUTF8Bytes(getHFileMetaInfoFromCache(KEY_MIN_RECORD)),
+ fromUTF8Bytes(getHFileMetaInfoFromCache(KEY_MAX_RECORD))};
} catch (IOException e) {
throw new HoodieIOException("Cannot read min and max record keys from
HFile.", e);
}
@@ -118,8 +126,7 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public BloomFilter readBloomFilter() {
- try {
- HFileReader reader = getSharedHFileReader();
+ try (HFileReader reader = newHFileReader()) {
ByteBuffer byteBuffer =
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK).get();
return BloomFilterFactory.fromByteBuffer(byteBuffer,
fromUTF8Bytes(reader.getMetaInfo(new
UTF8StringKey(KEY_BLOOM_FILTER_TYPE_CODE)).get()));
@@ -190,18 +197,20 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
@Override
public void close() {
- try {
- if (sharedHFileReader.isPresent()) {
- sharedHFileReader.get().close();
- }
- } catch (IOException e) {
- throw new HoodieIOException("Error closing the HFile reader", e);
- }
+ isMetaInfoLoaded = false;
+ metaInfoMap.clear();
}
@Override
public long getTotalRecords() {
- return getSharedHFileReader().getNumKeyValueEntries();
+ try {
+ loadAllMetaInfoIntoCacheIfNeeded();
+ } catch (IOException e) {
+ throw new HoodieIOException("Cannot get the number of entries from
HFile", e);
+ }
+ ValidationUtils.checkArgument(
+ numKeyValueEntries >= 0, "Number of entries in HFile must be >= 0");
+ return numKeyValueEntries;
}
@Override
@@ -224,10 +233,10 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
iterator, data -> unsafeCast(new HoodieAvroIndexedRecord(data)));
}
- private static Schema fetchSchema(HFileReader reader) {
+ private Schema fetchSchema() {
try {
return new Schema.Parser().parse(
- fromUTF8Bytes(reader.getMetaInfo(new
UTF8StringKey(SCHEMA_KEY)).get()));
+ fromUTF8Bytes(getHFileMetaInfoFromCache(SCHEMA_KEY)));
} catch (IOException e) {
throw new HoodieIOException("Unable to read schema from HFile", e);
}
@@ -244,14 +253,31 @@ public class HoodieNativeAvroHFileReader extends
HoodieAvroHFileReaderImplBase {
readerSchema);
}
- private synchronized HFileReader getSharedHFileReader() {
- try {
- if (!sharedHFileReader.isPresent()) {
- sharedHFileReader = Option.of(newHFileReader());
+ private byte[] getHFileMetaInfoFromCache(String key) throws IOException {
+ if (!PRELOADED_META_INFO_KEYS.contains(key)) {
+ throw new
IllegalStateException("HoodieNativeAvroHFileReader#getHFileMetaInfoFromCache"
+ + " should only be called on supported meta info keys; this key is
not supported: "
+ + key);
+ }
+ loadAllMetaInfoIntoCacheIfNeeded();
+ return metaInfoMap.get(key);
+ }
+
+ private synchronized void loadAllMetaInfoIntoCacheIfNeeded() throws
IOException {
+ if (!isMetaInfoLoaded) {
+ // Load all meta info that are small into cache
+ try (HFileReader reader = newHFileReader()) {
+ this.numKeyValueEntries = reader.getNumKeyValueEntries();
+ for (String metaInfoKey : PRELOADED_META_INFO_KEYS) {
+ Option<byte[]> metaInfo = reader.getMetaInfo(new
UTF8StringKey(metaInfoKey));
+ if (metaInfo.isPresent()) {
+ metaInfoMap.put(metaInfoKey, metaInfo.get());
+ }
+ }
+ isMetaInfoLoaded = true;
+ } catch (Exception e) {
+ throw new IOException("Unable to construct HFile reader", e);
}
- return sharedHFileReader.get();
- } catch (IOException e) {
- throw new HoodieIOException("Unable to construct HFile reader", e);
}
}