alexeykudinkin commented on code in PR #5208:
URL: https://github.com/apache/hudi/pull/5208#discussion_r844105557
##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java:
##########
@@ -44,97 +33,117 @@
import org.apache.hadoop.hbase.io.hfile.HFileInfo;
import org.apache.hadoop.hbase.io.hfile.HFileScanner;
import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.LazyRef;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import java.io.IOException;
-import java.util.ArrayList;
import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
import java.util.Iterator;
-import java.util.LinkedList;
import java.util.List;
-import java.util.Map;
+import java.util.Objects;
import java.util.Set;
import java.util.TreeSet;
import java.util.stream.Collectors;
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * NOTE: PLEASE READ DOCS & COMMENTS CAREFULLY BEFORE MAKING CHANGES
+ * <p>
+ * {@link HoodieFileReader} implementation allowing to read from {@link HFile}.
+ */
public class HoodieHFileReader<R extends IndexedRecord> implements
HoodieFileReader<R> {
- public static final String KEY_FIELD_NAME = "key";
- public static final String KEY_SCHEMA = "schema";
+
+ // TODO HoodieHFileReader right now tightly coupled to MT, we should break
that coupling
+ public static final String SCHEMA_KEY = "schema";
public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
public static final String KEY_BLOOM_FILTER_TYPE_CODE =
"bloomFilterTypeCode";
+
+ public static final String KEY_FIELD_NAME = "key";
public static final String KEY_MIN_RECORD = "minRecordKey";
public static final String KEY_MAX_RECORD = "maxRecordKey";
private static final Logger LOG =
LogManager.getLogger(HoodieHFileReader.class);
- private Path path;
- private Configuration conf;
- private HFile.Reader reader;
- private FSDataInputStream fsDataInputStream;
- private Schema schema;
- // Scanner used to read individual keys. This is cached to prevent the
overhead of opening the scanner for each
- // key retrieval.
- private HFileScanner keyScanner;
-
- public HoodieHFileReader(Configuration configuration, Path path, CacheConfig
cacheConfig) throws IOException {
- this.conf = configuration;
- this.path = path;
- this.reader =
HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(),
configuration), path, cacheConfig, conf);
+ private final Path path;
+
+ private final LazyRef<Schema> schema;
+
+ // NOTE: Reader is ONLY THREAD-SAFE for {@code Scanner} operating in
Positional Read ("pread")
+ // mode (ie created w/ "pread = true")
+ private final HFile.Reader reader;
+ // NOTE: Scanner caches read blocks, therefore it's important to re-use
scanner
+ // wherever possible
+ private final HFileScanner sharedScanner;
+
+ private final Object sharedScannerLock = new Object();
+
+ public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig
cacheConfig) throws IOException {
+ this(path,
+ HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(),
hadoopConf), path, cacheConfig, hadoopConf),
+ Option.empty());
}
- public HoodieHFileReader(Configuration configuration, Path path, CacheConfig
cacheConfig, FileSystem fs) throws IOException {
- this.conf = configuration;
- this.path = path;
- this.fsDataInputStream = fs.open(path);
- this.reader = HoodieHFileUtils.createHFileReader(fs, path, cacheConfig,
configuration);
+ public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig
cacheConfig, FileSystem fs) throws IOException {
+ this(path, HoodieHFileUtils.createHFileReader(fs, path, cacheConfig,
hadoopConf), Option.empty());
}
- public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content)
throws IOException {
- this.reader = HoodieHFileUtils.createHFileReader(fs, dummyPath, content);
+ public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content,
Option<Schema> schemaOpt) throws IOException {
+ this(null, HoodieHFileUtils.createHFileReader(fs, dummyPath, content),
schemaOpt);
+ }
+
+ public HoodieHFileReader(Path path, HFile.Reader reader, Option<Schema>
schemaOpt) throws IOException {
+ this.path = path;
+ this.reader = reader;
+ // For shared scanner, which is primarily used for point-lookups, we're
caching blocks
+ // by default, to minimize amount of traffic to the underlying storage
+ this.sharedScanner = getHFileScanner(reader, true);
Review Comment:
No point -- init is very lightweight
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]