nsivabalan commented on code in PR #5208:
URL: https://github.com/apache/hudi/pull/5208#discussion_r843462425


##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java:
##########
@@ -44,97 +33,117 @@
 import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.LazyRef;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * NOTE: PLEASE READ DOCS & COMMENTS CAREFULLY BEFORE MAKING CHANGES
+ * <p>
+ * {@link HoodieFileReader} implementation allowing to read from {@link HFile}.
+ */
 public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader<R> {
-  public static final String KEY_FIELD_NAME = "key";
-  public static final String KEY_SCHEMA = "schema";
+
+  // TODO HoodieHFileReader right now tightly coupled to MT, we should break 
that coupling
+  public static final String SCHEMA_KEY = "schema";
   public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
   public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+
+  public static final String KEY_FIELD_NAME = "key";
   public static final String KEY_MIN_RECORD = "minRecordKey";
   public static final String KEY_MAX_RECORD = "maxRecordKey";
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
 
-  private Path path;
-  private Configuration conf;
-  private HFile.Reader reader;
-  private FSDataInputStream fsDataInputStream;
-  private Schema schema;
-  // Scanner used to read individual keys. This is cached to prevent the 
overhead of opening the scanner for each
-  // key retrieval.
-  private HFileScanner keyScanner;
-
-  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
-    this.conf = configuration;
-    this.path = path;
-    this.reader = 
HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  private final Path path;
+
+  private final LazyRef<Schema> schema;
+
+  // NOTE: Reader is ONLY THREAD-SAFE for {@code Scanner} operating in 
Positional Read ("pread")
+  //       mode (ie created w/ "pread = true")
+  private final HFile.Reader reader;
+  // NOTE: Scanner caches read blocks, therefore it's important to re-use 
scanner

Review Comment:
   can we also add a line to call out what are the flows which uses cached 
scanner. and what are the flows that uses its own scanner. if I am not wrong, 
getAllRecords uses cached scanner. where as getRecordsByKeys(point look ups) up 
and prefixed based look ups uses its own scanner. 



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java:
##########
@@ -44,97 +33,117 @@
 import org.apache.hadoop.hbase.io.hfile.HFileInfo;
 import org.apache.hadoop.hbase.io.hfile.HFileScanner;
 import org.apache.hadoop.hbase.nio.ByteBuff;
-import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.ClosableIterator;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.util.LazyRef;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
-import java.util.ArrayList;
 import java.util.Arrays;
-import java.util.HashMap;
+import java.util.Collections;
 import java.util.Iterator;
-import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
+import java.util.Objects;
 import java.util.Set;
 import java.util.TreeSet;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.CollectionUtils.toStream;
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
+/**
+ * NOTE: PLEASE READ DOCS & COMMENTS CAREFULLY BEFORE MAKING CHANGES
+ * <p>
+ * {@link HoodieFileReader} implementation allowing to read from {@link HFile}.
+ */
 public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader<R> {
-  public static final String KEY_FIELD_NAME = "key";
-  public static final String KEY_SCHEMA = "schema";
+
+  // TODO HoodieHFileReader right now tightly coupled to MT, we should break 
that coupling
+  public static final String SCHEMA_KEY = "schema";
   public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
   public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+
+  public static final String KEY_FIELD_NAME = "key";
   public static final String KEY_MIN_RECORD = "minRecordKey";
   public static final String KEY_MAX_RECORD = "maxRecordKey";
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
 
-  private Path path;
-  private Configuration conf;
-  private HFile.Reader reader;
-  private FSDataInputStream fsDataInputStream;
-  private Schema schema;
-  // Scanner used to read individual keys. This is cached to prevent the 
overhead of opening the scanner for each
-  // key retrieval.
-  private HFileScanner keyScanner;
-
-  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
-    this.conf = configuration;
-    this.path = path;
-    this.reader = 
HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  private final Path path;
+
+  private final LazyRef<Schema> schema;
+
+  // NOTE: Reader is ONLY THREAD-SAFE for {@code Scanner} operating in 
Positional Read ("pread")
+  //       mode (ie created w/ "pread = true")
+  private final HFile.Reader reader;
+  // NOTE: Scanner caches read blocks, therefore it's important to re-use 
scanner
+  //       wherever possible
+  private final HFileScanner sharedScanner;
+
+  private final Object sharedScannerLock = new Object();
+
+  public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig 
cacheConfig) throws IOException {
+    this(path,
+        HoodieHFileUtils.createHFileReader(FSUtils.getFs(path.toString(), 
hadoopConf), path, cacheConfig, hadoopConf),
+        Option.empty());
   }
 
-  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig, FileSystem fs) throws IOException {
-    this.conf = configuration;
-    this.path = path;
-    this.fsDataInputStream = fs.open(path);
-    this.reader = HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, 
configuration);
+  public HoodieHFileReader(Configuration hadoopConf, Path path, CacheConfig 
cacheConfig, FileSystem fs) throws IOException {
+    this(path, HoodieHFileUtils.createHFileReader(fs, path, cacheConfig, 
hadoopConf), Option.empty());
   }
 
-  public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content) 
throws IOException {
-    this.reader = HoodieHFileUtils.createHFileReader(fs, dummyPath, content);
+  public HoodieHFileReader(FileSystem fs, Path dummyPath, byte[] content, 
Option<Schema> schemaOpt) throws IOException {
+    this(null, HoodieHFileUtils.createHFileReader(fs, dummyPath, content), 
schemaOpt);
+  }
+
+  public HoodieHFileReader(Path path, HFile.Reader reader, Option<Schema> 
schemaOpt) throws IOException {
+    this.path = path;
+    this.reader = reader;
+    // For shared scanner, which is primarily used for point-lookups, we're 
caching blocks
+    // by default, to minimize amount of traffic to the underlying storage
+    this.sharedScanner = getHFileScanner(reader, true);

Review Comment:
   Do you think we should also instantiate the scanner lazily ? similar to 
schema 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -133,28 +142,79 @@ private void initIfNeeded() {
   }
 
   @Override
-  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys,
-                                                                               
              String partitionName) {
+  public HoodieData<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(List<String> keyPrefixes,
+                                                                               
  String partitionName) {
+    // NOTE: Since we partition records to a particular file-group by full 
key, we will have
+    //       to scan all file-groups for all key-prefixes as each of these 
might contain some
+    //       records matching the key-prefix
+    List<FileSlice> partitionFileSlices =
+        
HoodieTableMetadataUtil.getPartitionLatestMergedFileSlices(metadataMetaClient, 
partitionName);
+
+    return engineContext.parallelize(partitionFileSlices)
+        .flatMap(
+            (SerializableFunction<FileSlice, Iterator<Pair<String, 
Option<HoodieRecord<HoodieMetadataPayload>>>>>) fileSlice -> {
+              // NOTE: Since this will be executed by executors, we can't 
access previously cached
+              //       readers, and therefore have to always open new ones
+              Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
readers =
+                  openReaders(partitionName, fileSlice);
+              try {
+                List<Long> timings = new ArrayList<>();
+
+                HoodieFileReader baseFileReader = readers.getKey();
+                HoodieMetadataMergedLogRecordReader logRecordScanner = 
readers.getRight();
+
+                if (baseFileReader == null && logRecordScanner == null) {
+                  // TODO: what do we do if both does not exist? should we 
throw an exception and let caller do the fallback ?
+                  return Collections.emptyIterator();
+                }
+
+                boolean fullKeys = false;
+
+                Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> 
logRecords =
+                    readLogRecords(logRecordScanner, keyPrefixes, fullKeys, 
timings);
+
+                List<Pair<String, 
Option<HoodieRecord<HoodieMetadataPayload>>>> mergedRecords =
+                    readFromBaseAndMergeWithLogRecords(baseFileReader, 
keyPrefixes, fullKeys, logRecords, timings, partitionName);
+
+                LOG.debug(String.format("Metadata read for %s keys took 
[baseFileRead, logMerge] %s ms",
+                    keyPrefixes.size(), timings));
+
+                return mergedRecords.iterator();
+              } catch (IOException ioe) {
+                throw new HoodieIOException("Error merging records from 
metadata table for  " + keyPrefixes.size() + " key : ", ioe);
+              } finally {
+                close(Pair.of(partitionName, fileSlice.getFileId()));

Review Comment:
   guess we are missing to close the readers here. readers obtained within this 
method is not added to the hashmap maintained at class level. its a local copy. 
so, we have to clean them up here. 



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataPayload.java:
##########
@@ -390,8 +390,12 @@ private HoodieMetadataColumnStats 
combineColumnStatsMetadata(HoodieMetadataPaylo
     return combineAndGetUpdateValue(oldRecord, schema, new Properties());
   }
 
+  public Option<IndexedRecord> getInsertValue() throws IOException {
+    return getInsertValue(null, null);

Review Comment:
   whats the purpose of this? payload impls are not supposed to have any 
additional public methods. so trying to understand the use-case for this ?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java:
##########
@@ -118,24 +95,37 @@ protected void processNextDeletedRecord(DeleteRecord 
deleteRecord) {
    * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
    */
   public synchronized List<Pair<String, 
Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordByKey(String key) {
+    checkState(forceFullScan, "Record reader has to be in full-scan mode to 
use this API");
     return Collections.singletonList(Pair.of(key, 
Option.ofNullable((HoodieRecord) records.get(key))));
   }
 
+  @SuppressWarnings("unchecked")
+  public List<HoodieRecord<HoodieMetadataPayload>> 
getRecordsByKeyPrefixes(List<String> keyPrefixes) {
+    // Following operations have to be atomic, otherwise concurrent

Review Comment:
   can we add a checkState here that forceFullScan has to false.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java:
##########
@@ -53,38 +52,16 @@
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class);
 
-  // Set of all record keys that are to be read in memory
-  private Set<String> mergeKeyFilter;

Review Comment:
   @prashantwason : we are removing mergeKeyFilter as we don't see any usage of 
it. wanted to confirm that its ok to remove it ?



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -388,6 +504,21 @@ private void initIfNeeded() {
     return Pair.of(logRecordScanner, logScannerOpenMs);
   }
 
+  // NOTE: We're allowing eager full-scan of the log-files only for "files" 
partition.
+  //       Other partitions (like "column_stats", "bloom_filters") will have 
to be fetched
+  //       t/h point-lookups
+  private boolean isFullScanAllowedForPartition(String partitionName) {

Review Comment:
   for FILES, can we honor scan log files config in HoodieMetadataConfig.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to