nsivabalan commented on code in PR #5208:
URL: https://github.com/apache/hudi/pull/5208#discussion_r841319780


##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java:
##########
@@ -26,33 +26,41 @@
 
 import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+
 import org.apache.hudi.common.bloom.BloomFilter;
 import org.apache.hudi.common.util.Option;
 
 public interface HoodieFileReader<R extends IndexedRecord> extends 
AutoCloseable {
 
-  public String[] readMinMaxRecordKeys();
+  String[] readMinMaxRecordKeys();
 
-  public BloomFilter readBloomFilter();
+  BloomFilter readBloomFilter();
 
-  public Set<String> filterRowKeys(Set<String> candidateRowKeys);
+  Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
   default Map<String, R> getRecordsByKeys(List<String> rowKeys) throws 
IOException {
     throw new UnsupportedOperationException();
   }
 
-  public Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
+  default Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes, 
HFileScanner hFileScanner, Schema readerSchema, Option<Schema.Field> 
keyFieldSchema) throws IOException {
+    throw new UnsupportedOperationException();
+  }
+
+  Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes) throws 
IOException;
+
+  Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
 
   default Iterator<R> getRecordIterator() throws IOException {
     return getRecordIterator(getSchema());
   }
 
-  default Option<R> getRecordByKey(String key, Schema readerSchema) throws 
IOException {
+  default Option<R> getRecordByKey(String key, Schema readerSchema, 
HFileScanner hFileScanner, Option<Schema.Field> keyFieldSchema) throws 
IOException {

Review Comment:
   had to change these apis so that each caller uses its own HFileScanner. We 
have removed the class instance HFileScanner so that concurrent readers don't 
overstep each other and hence we could remove synchronized block within actual 
read methods.



##########
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java:
##########
@@ -329,39 +410,85 @@ public R next() {
           throw new HoodieIOException("unable to read next record from parquet 
file ", io);
         }
       }
+
+      @Override
+      public void close() {
+        hFileScanner.close();
+      }
     };
   }
 
-  private boolean isKeyAvailable(String key) throws IOException {
+  private boolean isKeyAvailable(String key, HFileScanner keyScanner) throws 
IOException {
     final KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+    return keyScanner.seekTo(kv) == 0;
+  }
+
+  @Override
+  public Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes) 
throws IOException {
+    Schema readerSchema = getSchema();
+    Option<Schema.Field> keyFieldSchema = 
Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
     synchronized (this) {
-      if (keyScanner == null) {
-        keyScanner = reader.getScanner(false, false);
-      }
-      if (keyScanner.seekTo(kv) == 0) {
-        return true;
+      HFileScanner hFileScanner = reader.getScanner(false, false);
+      hFileScanner.seekTo(); // seek to beginning of file.
+      return getRecordsByKeyPrefixes(keyPrefixes, hFileScanner, readerSchema, 
keyFieldSchema);
+    }
+  }
+
+  @Override
+  public Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes, 
HFileScanner hFileScanner, Schema readerSchema, Option<Schema.Field> 
keyFieldSchema) throws IOException {
+    // NOTE: It's always beneficial to sort keys being sought to by HFile 
reader
+    //       to avoid seeking back and forth
+    Collections.sort(keyPrefixes);
+    List<Pair<byte[], byte[]>> keyRecordsBytes = new 
ArrayList<>(keyPrefixes.size());
+    for (String keyPrefix : keyPrefixes) {
+      KeyValue kv = new KeyValue(keyPrefix.getBytes(), null, null, null);
+      int val = hFileScanner.seekTo(kv);
+      // what does seekTo() does:
+      // eg entries in file. [key01, key02, key03, key04,..., key20]
+      // when keyPrefix is "key", seekTo will return -1 and place the cursor 
just before key01. getCel() will return key01 entry
+      // when keyPrefix is ""key03", seekTo will return 0 and place the cursor 
just before key01. getCell() will return key03 entry
+      // when keyPrefix is ""key1", seekTo will return 1 and place the cursor 
just before key10(i.e. key09). call next() and then call getCell() to see key10 
entry
+      // when keyPrefix is "key99", seekTo will return 1 and place the cursor 
just before last entry, ie. key04. getCell() will return key04 entry.
+
+      if (val == 1) { // move to next entry if return value is 1
+        if (!hFileScanner.next()) {
+          // we have reached the end of file. we can skip proceeding further
+          break;
+        }
       }
+      do {
+        Cell c = hFileScanner.getCell();
+        byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), 
c.getRowOffset(), c.getRowOffset() + c.getRowLength());
+        String key = new String(keyBytes);
+        // Check whether we're still reading records corresponding to the 
key-prefix
+        if (!key.startsWith(keyPrefix)) {
+          break;
+        }
+
+        // Extract the byte value before releasing the lock since we cannot 
hold on to the returned cell afterwards
+        byte[] valueBytes = Arrays.copyOfRange(c.getValueArray(), 
c.getValueOffset(), c.getValueOffset() + c.getValueLength());
+        keyRecordsBytes.add(Pair.newPair(keyBytes, valueBytes));
+      } while (hFileScanner.next());
     }
-    return false;
+
+    // Use  tree map so that entries are in sorted in the map being returned.
+    Map<String, R> values = new TreeMap<String, R>();
+    for (Pair<byte[], byte[]> kv : keyRecordsBytes) {
+      R record = deserialize(kv.getFirst(), kv.getSecond(), readerSchema, 
readerSchema, keyFieldSchema);
+      values.put(new String(kv.getFirst()), record);
+    }
+    return values;
   }
 
   @Override
-  public Option getRecordByKey(String key, Schema readerSchema) throws 
IOException {
+  public Option<R> getRecordByKey(String key, Schema readerSchema, 
HFileScanner hFileScanner, Option<Schema.Field> keyFieldSchema) throws 
IOException {
     byte[] value = null;
-    final Option<Schema.Field> keyFieldSchema = 
Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME));
-    ValidationUtils.checkState(keyFieldSchema != null);
     KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
 
-    synchronized (this) {
-      if (keyScanner == null) {

Review Comment:
   removed the class instance HFileScanner so that concurrent readers can use 
their own HfileScanner. and hence no synchronization required.



##########
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java:
##########
@@ -239,6 +322,43 @@ private void initIfNeeded() {
     return result;
   }
 
+  private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
readFromBaseAndMergeWithLogRecordsForKeyPrefixes(HoodieFileReader 
baseFileReader,

Review Comment:
   @alexeykudinkin : I tried to unify this and the other method. but there are 
quite a few places where I had to do if else branching. so, have left it as is. 
If you dont mind, can you take a stab at unifying. I have addressed every other 
feedback we discussed. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: dev-unsubscr...@hudi.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to