nsivabalan commented on code in PR #5208: URL: https://github.com/apache/hudi/pull/5208#discussion_r841319780
########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java: ########## @@ -26,33 +26,41 @@ import org.apache.avro.Schema; import org.apache.avro.generic.IndexedRecord; +import org.apache.hadoop.hbase.io.hfile.HFileScanner; + import org.apache.hudi.common.bloom.BloomFilter; import org.apache.hudi.common.util.Option; public interface HoodieFileReader<R extends IndexedRecord> extends AutoCloseable { - public String[] readMinMaxRecordKeys(); + String[] readMinMaxRecordKeys(); - public BloomFilter readBloomFilter(); + BloomFilter readBloomFilter(); - public Set<String> filterRowKeys(Set<String> candidateRowKeys); + Set<String> filterRowKeys(Set<String> candidateRowKeys); default Map<String, R> getRecordsByKeys(List<String> rowKeys) throws IOException { throw new UnsupportedOperationException(); } - public Iterator<R> getRecordIterator(Schema readerSchema) throws IOException; + default Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes, HFileScanner hFileScanner, Schema readerSchema, Option<Schema.Field> keyFieldSchema) throws IOException { + throw new UnsupportedOperationException(); + } + + Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes) throws IOException; + + Iterator<R> getRecordIterator(Schema readerSchema) throws IOException; default Iterator<R> getRecordIterator() throws IOException { return getRecordIterator(getSchema()); } - default Option<R> getRecordByKey(String key, Schema readerSchema) throws IOException { + default Option<R> getRecordByKey(String key, Schema readerSchema, HFileScanner hFileScanner, Option<Schema.Field> keyFieldSchema) throws IOException { Review Comment: had to change these apis so that each caller uses its own HFileScanner. We have removed the class instance HFileScanner so that concurrent readers don't overstep each other and hence we could remove synchronized block within actual read methods. ########## hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java: ########## @@ -329,39 +410,85 @@ public R next() { throw new HoodieIOException("unable to read next record from parquet file ", io); } } + + @Override + public void close() { + hFileScanner.close(); + } }; } - private boolean isKeyAvailable(String key) throws IOException { + private boolean isKeyAvailable(String key, HFileScanner keyScanner) throws IOException { final KeyValue kv = new KeyValue(key.getBytes(), null, null, null); + return keyScanner.seekTo(kv) == 0; + } + + @Override + public Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes) throws IOException { + Schema readerSchema = getSchema(); + Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); synchronized (this) { - if (keyScanner == null) { - keyScanner = reader.getScanner(false, false); - } - if (keyScanner.seekTo(kv) == 0) { - return true; + HFileScanner hFileScanner = reader.getScanner(false, false); + hFileScanner.seekTo(); // seek to beginning of file. + return getRecordsByKeyPrefixes(keyPrefixes, hFileScanner, readerSchema, keyFieldSchema); + } + } + + @Override + public Map<String, R> getRecordsByKeyPrefixes(List<String> keyPrefixes, HFileScanner hFileScanner, Schema readerSchema, Option<Schema.Field> keyFieldSchema) throws IOException { + // NOTE: It's always beneficial to sort keys being sought to by HFile reader + // to avoid seeking back and forth + Collections.sort(keyPrefixes); + List<Pair<byte[], byte[]>> keyRecordsBytes = new ArrayList<>(keyPrefixes.size()); + for (String keyPrefix : keyPrefixes) { + KeyValue kv = new KeyValue(keyPrefix.getBytes(), null, null, null); + int val = hFileScanner.seekTo(kv); + // what does seekTo() does: + // eg entries in file. [key01, key02, key03, key04,..., key20] + // when keyPrefix is "key", seekTo will return -1 and place the cursor just before key01. getCel() will return key01 entry + // when keyPrefix is ""key03", seekTo will return 0 and place the cursor just before key01. getCell() will return key03 entry + // when keyPrefix is ""key1", seekTo will return 1 and place the cursor just before key10(i.e. key09). call next() and then call getCell() to see key10 entry + // when keyPrefix is "key99", seekTo will return 1 and place the cursor just before last entry, ie. key04. getCell() will return key04 entry. + + if (val == 1) { // move to next entry if return value is 1 + if (!hFileScanner.next()) { + // we have reached the end of file. we can skip proceeding further + break; + } } + do { + Cell c = hFileScanner.getCell(); + byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), c.getRowOffset() + c.getRowLength()); + String key = new String(keyBytes); + // Check whether we're still reading records corresponding to the key-prefix + if (!key.startsWith(keyPrefix)) { + break; + } + + // Extract the byte value before releasing the lock since we cannot hold on to the returned cell afterwards + byte[] valueBytes = Arrays.copyOfRange(c.getValueArray(), c.getValueOffset(), c.getValueOffset() + c.getValueLength()); + keyRecordsBytes.add(Pair.newPair(keyBytes, valueBytes)); + } while (hFileScanner.next()); } - return false; + + // Use tree map so that entries are in sorted in the map being returned. + Map<String, R> values = new TreeMap<String, R>(); + for (Pair<byte[], byte[]> kv : keyRecordsBytes) { + R record = deserialize(kv.getFirst(), kv.getSecond(), readerSchema, readerSchema, keyFieldSchema); + values.put(new String(kv.getFirst()), record); + } + return values; } @Override - public Option getRecordByKey(String key, Schema readerSchema) throws IOException { + public Option<R> getRecordByKey(String key, Schema readerSchema, HFileScanner hFileScanner, Option<Schema.Field> keyFieldSchema) throws IOException { byte[] value = null; - final Option<Schema.Field> keyFieldSchema = Option.ofNullable(readerSchema.getField(KEY_FIELD_NAME)); - ValidationUtils.checkState(keyFieldSchema != null); KeyValue kv = new KeyValue(key.getBytes(), null, null, null); - synchronized (this) { - if (keyScanner == null) { Review Comment: removed the class instance HFileScanner so that concurrent readers can use their own HfileScanner. and hence no synchronization required. ########## hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java: ########## @@ -239,6 +322,43 @@ private void initIfNeeded() { return result; } + private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> readFromBaseAndMergeWithLogRecordsForKeyPrefixes(HoodieFileReader baseFileReader, Review Comment: @alexeykudinkin : I tried to unify this and the other method. but there are quite a few places where I had to do if else branching. so, have left it as is. If you dont mind, can you take a stab at unifying. I have addressed every other feedback we discussed. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: dev-unsubscr...@hudi.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org