This is an automated email from the ASF dual-hosted git repository.

sivabalan pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 69ee790  [HUDI-1294] Adding inline read and seek based read(batch get) 
for hfile log blocks in metadata table (#3762)
69ee790 is described below

commit 69ee790a47a5fa90a6acd954a9330cce3ae31c3b
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Oct 29 12:12:44 2021 -0400

    [HUDI-1294] Adding inline read and seek based read(batch get) for hfile log 
blocks in metadata table (#3762)
---
 .../io/storage/TestHoodieHFileReaderWriter.java    | 134 +++++++++++++++++++++
 .../functional/TestHoodieBackedMetadata.java       |  38 +++---
 .../client/functional/TestHoodieMetadataBase.java  |  16 ++-
 .../hudi/common/config/HoodieMetadataConfig.java   |  23 ++++
 ...ner.java => AbstractHoodieLogRecordReader.java} |  99 +++++++++------
 .../hudi/common/table/log/HoodieLogFileReader.java |   9 +-
 .../common/table/log/HoodieLogFormatReader.java    |   9 +-
 .../table/log/HoodieMergedLogRecordScanner.java    |  11 +-
 .../table/log/HoodieUnMergedLogRecordScanner.java  |   4 +-
 .../common/table/log/block/HoodieDataBlock.java    |  13 +-
 .../table/log/block/HoodieHFileDataBlock.java      |  53 +++++++-
 .../apache/hudi/io/storage/HoodieHFileReader.java  |  34 +++++-
 .../apache/hudi/metadata/BaseTableMetadata.java    |  62 ++++++++--
 .../hudi/metadata/HoodieBackedTableMetadata.java   | 122 +++++++++++++------
 ...va => HoodieMetadataMergedLogRecordReader.java} |  55 +++++++--
 .../realtime/TestHoodieRealtimeRecordReader.java   |  21 +++-
 .../hudi/hadoop/testutils/InputFormatTestUtil.java |  14 ++-
 17 files changed, 583 insertions(+), 134 deletions(-)

diff --git 
a/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
new file mode 100644
index 0000000..0492063
--- /dev/null
+++ 
b/hudi-client/hudi-client-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java
@@ -0,0 +1,134 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.bloom.BloomFilterTypeCode;
+import org.apache.hudi.common.engine.TaskContextSupplier;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.util.Pair;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
+import org.mockito.Mockito;
+
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import static org.apache.hudi.common.testutils.FileSystemTestUtils.RANDOM;
+import static 
org.apache.hudi.common.testutils.SchemaTestUtil.getSchemaFromResource;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+
+public class TestHoodieHFileReaderWriter {
+  @TempDir File tempDir;
+  private Path filePath;
+
+  @BeforeEach
+  public void setup() throws IOException {
+    filePath = new Path(tempDir.toString() + "tempFile.txt");
+  }
+
+  @AfterEach
+  public void clearTempFile() {
+    File file = new File(filePath.toString());
+    if (file.exists()) {
+      file.delete();
+    }
+  }
+
+  private HoodieHFileWriter createHFileWriter(Schema avroSchema) throws 
Exception {
+    BloomFilter filter = BloomFilterFactory.createBloomFilter(1000, 0.00001, 
-1, BloomFilterTypeCode.SIMPLE.name());
+    Configuration conf = new Configuration();
+    TaskContextSupplier mockTaskContextSupplier = 
Mockito.mock(TaskContextSupplier.class);
+    String instantTime = "000";
+
+    HoodieHFileConfig hoodieHFileConfig = new HoodieHFileConfig(conf, 
Compression.Algorithm.GZ, 1024 * 1024, 120 * 1024 * 1024,
+        filter);
+    return new HoodieHFileWriter(instantTime, filePath, hoodieHFileConfig, 
avroSchema, mockTaskContextSupplier);
+  }
+
+  @Test
+  public void testWriteReadHFile() throws Exception {
+    Schema avroSchema = getSchemaFromResource(TestHoodieOrcReaderWriter.class, 
"/exampleSchema.avsc");
+    HoodieHFileWriter writer = createHFileWriter(avroSchema);
+    List<String> keys = new ArrayList<>();
+    Map<String, GenericRecord> recordMap = new HashMap<>();
+    for (int i = 0; i < 100; i++) {
+      GenericRecord record = new GenericData.Record(avroSchema);
+      String key = String.format("%s%04d", "key", i);
+      record.put("_row_key", key);
+      keys.add(key);
+      record.put("time", Integer.toString(RANDOM.nextInt()));
+      record.put("number", i);
+      writer.writeAvro(key, record);
+      recordMap.put(key, record);
+    }
+    writer.close();
+
+    Configuration conf = new Configuration();
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    HoodieHFileReader hoodieHFileReader = new HoodieHFileReader(conf, 
filePath, cacheConfig, filePath.getFileSystem(conf));
+    List<Pair<String, IndexedRecord>> records = 
hoodieHFileReader.readAllRecords();
+    records.forEach(entry -> assertEquals(entry.getSecond(), 
recordMap.get(entry.getFirst())));
+    hoodieHFileReader.close();
+
+    for (int i = 0; i < 20; i++) {
+      int randomRowstoFetch = 5 + RANDOM.nextInt(50);
+      Set<String> rowsToFetch = getRandomKeys(randomRowstoFetch, keys);
+      List<String> rowsList = new ArrayList<>(rowsToFetch);
+      Collections.sort(rowsList);
+      hoodieHFileReader = new HoodieHFileReader(conf, filePath, cacheConfig, 
filePath.getFileSystem(conf));
+      List<Pair<String, GenericRecord>> result = 
hoodieHFileReader.readRecords(rowsList);
+      assertEquals(result.size(), randomRowstoFetch);
+      result.forEach(entry -> {
+        assertEquals(entry.getSecond(), recordMap.get(entry.getFirst()));
+      });
+      hoodieHFileReader.close();
+    }
+  }
+
+  private Set<String> getRandomKeys(int count, List<String> keys) {
+    Set<String> rowKeys = new HashSet<>();
+    int totalKeys = keys.size();
+    while (rowKeys.size() < count) {
+      int index = RANDOM.nextInt(totalKeys);
+      if (!rowKeys.contains(index)) {
+        rowKeys.add(keys.get(index));
+      }
+    }
+    return rowKeys;
+  }
+}
\ No newline at end of file
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
index 7ea9766..e0c61e1 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieBackedMetadata.java
@@ -160,9 +160,8 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       doRollbackAndValidate(testTable, "0000003", "0000004");
     }
 
-    doWriteOperationAndValidate(testTable, "0000005");
-
-    // trigger an upsert and validate
+    // trigger couple of upserts
+    doWriteOperation(testTable, "0000005");
     doWriteOperation(testTable, "0000006");
     validateMetadata(testTable, true);
   }
@@ -222,9 +221,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
    * Test various table operations sync to Metadata Table correctly.
    */
   @ParameterizedTest
-  @EnumSource(HoodieTableType.class)
-  public void testTableOperations(HoodieTableType tableType) throws Exception {
-    init(tableType);
+  @MethodSource("bootstrapAndTableOperationTestArgs")
+  public void testTableOperations(HoodieTableType tableType, boolean 
enableFullScan) throws Exception {
+    init(tableType, true, enableFullScan);
     doWriteInsertAndUpsert(testTable);
 
     // trigger an upsert
@@ -236,7 +235,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
 
     // trigger an upsert
-    doWriteOperationAndValidate(testTable, "0000005");
+    doWriteOperation(testTable, "0000005");
 
     // trigger clean
     doCleanAndValidate(testTable, "0000006", singletonList("0000001"));
@@ -255,7 +254,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     doWriteOperation(testTable, "0000002");
     doCleanAndValidate(testTable, "0000003", Arrays.asList("0000001"));
     if (tableType == MERGE_ON_READ) {
-      doCompactionAndValidate(testTable, "0000004");
+      doCompaction(testTable, "0000004");
     }
     doWriteOperation(testTable, "0000005");
     validateMetadata(testTable, emptyList(), true);
@@ -288,7 +287,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     doWriteOperationAndValidate(testTable, "0000003");
 
     // trigger a commit and rollback
-    doWriteOperationAndValidate(testTable, "0000004");
+    doWriteOperation(testTable, "0000004");
     doRollbackAndValidate(testTable, "0000004", "0000005");
 
     // trigger few upserts and validate
@@ -297,7 +296,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
     validateMetadata(testTable);
 
-    doWriteOperationAndValidate(testTable, "0000010");
+    doWriteOperation(testTable, "0000010");
 
     // rollback last commit. and validate.
     doRollbackAndValidate(testTable, "0000010", "0000011");
@@ -309,7 +308,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
 
     // roll back of delete
-    doWriteOperationAndValidate(testTable, "0000014", DELETE);
+    doWriteOperation(testTable, "0000014", DELETE);
     doRollbackAndValidate(testTable, "0000014", "0000015");
 
     // rollback partial commit
@@ -394,9 +393,9 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     syncTableMetadata(writeConfig);
     validateMetadata(testTable);
 
-    doWriteOperationAndValidate(testTable, "00000003", INSERT);
-    doWriteOperationAndValidate(testTable, "00000004", UPSERT);
-    doWriteOperationAndValidate(testTable, "00000005", UPSERT);
+    doWriteOperation(testTable, "00000003", INSERT);
+    doWriteOperation(testTable, "00000004", UPSERT);
+    doWriteOperation(testTable, "00000005", UPSERT);
 
     // trigger compaction
     if (MERGE_ON_READ.equals(tableType)) {
@@ -404,13 +403,13 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
     }
 
     // trigger an upsert
-    doWriteOperationAndValidate(testTable, "00000008");
+    doWriteOperation(testTable, "00000008");
     // trigger delete
-    doWriteOperationAndValidate(testTable, "00000009", DELETE);
+    doWriteOperation(testTable, "00000009", DELETE);
     // trigger clean
     doCleanAndValidate(testTable, "00000010", asList("00000003", "00000004"));
     // trigger another upsert
-    doWriteOperationAndValidate(testTable, "00000011");
+    doWriteOperation(testTable, "00000011");
     // trigger clustering
     doClusterAndValidate(testTable, "00000012");
 
@@ -528,7 +527,6 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       records = dataGen.generateUniqueUpdates(newCommitTime, 10);
       writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
 
       // Write 4 (updates and inserts)
       newCommitTime = "0000004";
@@ -552,7 +550,6 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       records = dataGen.generateUpdates(newCommitTime, 5);
       writeStatuses = client.upsert(jsc.parallelize(records, 1), 
newCommitTime).collect();
       assertNoWriteErrors(writeStatuses);
-      validateMetadata(client);
 
       // Compaction
       if (metaClient.getTableType() == HoodieTableType.MERGE_ON_READ) {
@@ -568,7 +565,6 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
       JavaRDD<HoodieKey> deleteKeys = jsc.parallelize(records, 1).map(r -> 
r.getKey());
       client.startCommitWithTime(newCommitTime);
       client.delete(deleteKeys, newCommitTime);
-      validateMetadata(client);
 
       // Clean
       newCommitTime = "0000009";
@@ -1128,7 +1124,7 @@ public class TestHoodieBackedMetadata extends 
TestHoodieMetadataBase {
         Collections.sort(fsFileNames);
         Collections.sort(metadataFilenames);
 
-        assertEquals(fsStatuses.length, partitionToFilesMap.get(basePath + "/" 
+ partition).length);
+        assertEquals(fsStatuses.length, 
partitionToFilesMap.get(partitionPath.toString()).length);
 
         // File sizes should be valid
         Arrays.stream(metaStatuses).forEach(s -> assertTrue(s.getLen() > 0));
diff --git 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
index 85f869f..7a49daf 100644
--- 
a/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
+++ 
b/hudi-client/hudi-spark-client/src/test/java/org/apache/hudi/client/functional/TestHoodieMetadataBase.java
@@ -72,6 +72,10 @@ public class TestHoodieMetadataBase extends 
HoodieClientTestHarness {
   }
 
   public void init(HoodieTableType tableType, boolean enableMetadataTable) 
throws IOException {
+    init(tableType, enableMetadataTable, true);
+  }
+
+  public void init(HoodieTableType tableType, boolean enableMetadataTable, 
boolean enableFullScan) throws IOException {
     this.tableType = tableType;
     initPath();
     initSparkContexts("TestHoodieMetadata");
@@ -80,7 +84,8 @@ public class TestHoodieMetadataBase extends 
HoodieClientTestHarness {
     initMetaClient(tableType);
     initTestDataGenerator();
     metadataTableBasePath = 
HoodieTableMetadata.getMetadataTableBasePath(basePath);
-    writeConfig = getWriteConfig(true, enableMetadataTable);
+    writeConfig = 
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, true, 
enableMetadataTable, false,
+        enableFullScan).build();
     initWriteConfigAndMetatableWriter(writeConfig, enableMetadataTable);
   }
 
@@ -256,7 +261,13 @@ public class TestHoodieMetadataBase extends 
HoodieClientTestHarness {
     return getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy.EAGER, 
autoCommit, useFileListingMetadata, enableMetrics);
   }
 
-  protected HoodieWriteConfig.Builder 
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean 
autoCommit, boolean useFileListingMetadata, boolean enableMetrics) {
+  protected HoodieWriteConfig.Builder 
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean 
autoCommit, boolean useFileListingMetadata,
+                                                            boolean 
enableMetrics) {
+    return getWriteConfigBuilder(policy, autoCommit, useFileListingMetadata, 
enableMetrics, true);
+  }
+
+  protected HoodieWriteConfig.Builder 
getWriteConfigBuilder(HoodieFailedWritesCleaningPolicy policy, boolean 
autoCommit, boolean useFileListingMetadata,
+                                                            boolean 
enableMetrics, boolean enableFullScan) {
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 
2).withDeleteParallelism(2).withRollbackParallelism(2).withFinalizeWriteParallelism(2)
         .withAutoCommit(autoCommit)
@@ -271,6 +282,7 @@ public class TestHoodieMetadataBase extends 
HoodieClientTestHarness {
         
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
         .withMetadataConfig(HoodieMetadataConfig.newBuilder()
             .enable(useFileListingMetadata)
+            .enableFullScan(enableFullScan)
             .enableMetrics(enableMetrics).build())
         .withMetricsConfig(HoodieMetricsConfig.newBuilder().on(enableMetrics)
             .withExecutorMetrics(true).build())
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
index b74a17c..d526294 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieMetadataConfig.java
@@ -115,6 +115,20 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       .sinceVersion("0.7.0")
       .withDocumentation("Parallelism to use, when listing the table on lake 
storage.");
 
+  public static final ConfigProperty<Boolean> ENABLE_INLINE_READING = 
ConfigProperty
+      .key(METADATA_PREFIX + ".enable.inline.reading")
+      .defaultValue(true)
+      .sinceVersion("0.10.0")
+      .withDocumentation("Enable inline reading of Log files. By default log 
block contents are read as byte[] using regular input stream and records "
+          + "are deserialized from it. Enabling this will read each log block 
as an inline file and read records from the same. For instance, "
+          + "for HFileDataBlock, a inline file will be read using 
HFileReader.");
+
+  public static final ConfigProperty<Boolean> ENABLE_FULL_SCAN_LOG_FILES = 
ConfigProperty
+      .key(METADATA_PREFIX + ".enable.full.scan.log.files")
+      .defaultValue(true)
+      .sinceVersion("0.10.0")
+      .withDocumentation("Enable full scanning of log files while reading log 
records. If disabled, hudi does look up of only interested entries.");
+
   private HoodieMetadataConfig() {
     super();
   }
@@ -143,6 +157,10 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
     return getString(DIR_FILTER_REGEX);
   }
 
+  public boolean enableFullScan() {
+    return getBoolean(ENABLE_FULL_SCAN_LOG_FILES);
+  }
+
   public static class Builder {
 
     private final HoodieMetadataConfig metadataConfig = new 
HoodieMetadataConfig();
@@ -210,6 +228,11 @@ public final class HoodieMetadataConfig extends 
HoodieConfig {
       return this;
     }
 
+    public Builder enableFullScan(boolean enableFullScan) {
+      metadataConfig.setValue(ENABLE_FULL_SCAN_LOG_FILES, 
String.valueOf(enableFullScan));
+      return this;
+    }
+
     public HoodieMetadataConfig build() {
       metadataConfig.setDefaults(HoodieMetadataConfig.class.getName());
       return metadataConfig;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
similarity index 83%
rename from 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
rename to 
hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
index 868c7cb..e2e76ad 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/AbstractHoodieLogRecordReader.java
@@ -47,6 +47,7 @@ import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.util.ArrayDeque;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Deque;
 import java.util.HashSet;
@@ -71,9 +72,9 @@ import static 
org.apache.hudi.common.table.log.block.HoodieLogBlock.HoodieLogBlo
  * <p>
  * This results in two I/O passes over the log file.
  */
-public abstract class AbstractHoodieLogRecordScanner {
+public abstract class AbstractHoodieLogRecordReader {
 
-  private static final Logger LOG = 
LogManager.getLogger(AbstractHoodieLogRecordScanner.class);
+  private static final Logger LOG = 
LogManager.getLogger(AbstractHoodieLogRecordReader.class);
 
   // Reader schema for the records
   protected final Schema readerSchema;
@@ -114,12 +115,23 @@ public abstract class AbstractHoodieLogRecordScanner {
   private AtomicLong totalCorruptBlocks = new AtomicLong(0);
   // Store the last instant log blocks (needed to implement rollback)
   private Deque<HoodieLogBlock> currentInstantLogBlocks = new ArrayDeque<>();
+  // Enables full scan of log records
+  protected final boolean enableFullScan;
+  private int totalScannedLogFiles;
   // Progress
   private float progress = 0.0f;
 
-  protected AbstractHoodieLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
-                                           String latestInstantTime, boolean 
readBlocksLazily, boolean reverseReader,
-                                           int bufferSize, 
Option<InstantRange> instantRange, boolean withOperationField) {
+  protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
+                                          String latestInstantTime, boolean 
readBlocksLazily, boolean reverseReader,
+                                          int bufferSize, Option<InstantRange> 
instantRange, boolean withOperationField) {
+    this(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField,
+        true);
+  }
+
+  protected AbstractHoodieLogRecordReader(FileSystem fs, String basePath, 
List<String> logFilePaths, Schema readerSchema,
+                                          String latestInstantTime, boolean 
readBlocksLazily, boolean reverseReader,
+                                          int bufferSize, Option<InstantRange> 
instantRange, boolean withOperationField,
+                                          boolean enableFullScan) {
     this.readerSchema = readerSchema;
     this.latestInstantTime = latestInstantTime;
     this.hoodieTableMetaClient = 
HoodieTableMetaClient.builder().setConf(fs.getConf()).setBasePath(basePath).build();
@@ -132,18 +144,27 @@ public abstract class AbstractHoodieLogRecordScanner {
     }
     this.totalLogFiles.addAndGet(logFilePaths.size());
     this.logFilePaths = logFilePaths;
-    this.readBlocksLazily = readBlocksLazily;
     this.reverseReader = reverseReader;
+    this.readBlocksLazily = readBlocksLazily;
     this.fs = fs;
     this.bufferSize = bufferSize;
     this.instantRange = instantRange;
     this.withOperationField = withOperationField;
+    this.enableFullScan = enableFullScan;
   }
 
-  /**
-   * Scan Log files.
-   */
   public void scan() {
+    scan(Option.empty());
+  }
+
+  public void scan(Option<List<String>> keys) {
+    currentInstantLogBlocks = new ArrayDeque<>();
+    progress = 0.0f;
+    totalLogFiles = new AtomicLong(0);
+    totalRollbacks = new AtomicLong(0);
+    totalCorruptBlocks = new AtomicLong(0);
+    totalLogBlocks = new AtomicLong(0);
+    totalLogRecords = new AtomicLong(0);
     HoodieLogFormatReader logFormatReaderWrapper = null;
     HoodieTimeline commitsTimeline = 
this.hoodieTableMetaClient.getCommitsTimeline();
     HoodieTimeline completedInstantsTimeline = 
commitsTimeline.filterCompletedInstants();
@@ -152,7 +173,7 @@ public abstract class AbstractHoodieLogRecordScanner {
       // iterate over the paths
       logFormatReaderWrapper = new HoodieLogFormatReader(fs,
           logFilePaths.stream().map(logFile -> new HoodieLogFile(new 
Path(logFile))).collect(Collectors.toList()),
-          readerSchema, readBlocksLazily, reverseReader, bufferSize);
+          readerSchema, readBlocksLazily, reverseReader, bufferSize, 
!enableFullScan);
       Set<HoodieLogFile> scannedLogFiles = new HashSet<>();
       while (logFormatReaderWrapper.hasNext()) {
         HoodieLogFile logFile = logFormatReaderWrapper.getLogFile();
@@ -160,16 +181,16 @@ public abstract class AbstractHoodieLogRecordScanner {
         scannedLogFiles.add(logFile);
         totalLogFiles.set(scannedLogFiles.size());
         // Use the HoodieLogFileReader to iterate through the blocks in the 
log file
-        HoodieLogBlock r = logFormatReaderWrapper.next();
-        final String instantTime = r.getLogBlockHeader().get(INSTANT_TIME);
+        HoodieLogBlock logBlock = logFormatReaderWrapper.next();
+        final String instantTime = 
logBlock.getLogBlockHeader().get(INSTANT_TIME);
         totalLogBlocks.incrementAndGet();
-        if (r.getBlockType() != CORRUPT_BLOCK
-            && 
!HoodieTimeline.compareTimestamps(r.getLogBlockHeader().get(INSTANT_TIME), 
HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
+        if (logBlock.getBlockType() != CORRUPT_BLOCK
+            && 
!HoodieTimeline.compareTimestamps(logBlock.getLogBlockHeader().get(INSTANT_TIME),
 HoodieTimeline.LESSER_THAN_OR_EQUALS, this.latestInstantTime
         )) {
           // hit a block with instant time greater than should be processed, 
stop processing further
           break;
         }
-        if (r.getBlockType() != CORRUPT_BLOCK && r.getBlockType() != 
COMMAND_BLOCK) {
+        if (logBlock.getBlockType() != CORRUPT_BLOCK && 
logBlock.getBlockType() != COMMAND_BLOCK) {
           if 
(!completedInstantsTimeline.containsOrBeforeTimelineStarts(instantTime)
               || inflightInstantsTimeline.containsInstant(instantTime)) {
             // hit an uncommitted block possibly from a failed write, move to 
the next one and skip processing this one
@@ -180,28 +201,28 @@ public abstract class AbstractHoodieLogRecordScanner {
             continue;
           }
         }
-        switch (r.getBlockType()) {
+        switch (logBlock.getBlockType()) {
           case HFILE_DATA_BLOCK:
           case AVRO_DATA_BLOCK:
             LOG.info("Reading a data block from file " + logFile.getPath() + " 
at instant "
-                + r.getLogBlockHeader().get(INSTANT_TIME));
-            if (isNewInstantBlock(r) && !readBlocksLazily) {
+                + logBlock.getLogBlockHeader().get(INSTANT_TIME));
+            if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
               // If this is an avro data block belonging to a different 
commit/instant,
               // then merge the last blocks and records into the main result
-              processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size());
+              processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keys);
             }
             // store the current block
-            currentInstantLogBlocks.push(r);
+            currentInstantLogBlocks.push(logBlock);
             break;
           case DELETE_BLOCK:
             LOG.info("Reading a delete block from file " + logFile.getPath());
-            if (isNewInstantBlock(r) && !readBlocksLazily) {
+            if (isNewInstantBlock(logBlock) && !readBlocksLazily) {
               // If this is a delete data block belonging to a different 
commit/instant,
               // then merge the last blocks and records into the main result
-              processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size());
+              processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keys);
             }
             // store deletes so can be rolled back
-            currentInstantLogBlocks.push(r);
+            currentInstantLogBlocks.push(logBlock);
             break;
           case COMMAND_BLOCK:
             // Consider the following scenario
@@ -218,9 +239,9 @@ public abstract class AbstractHoodieLogRecordScanner {
             // both B1 & B2
             LOG.info("Reading a command block from file " + logFile.getPath());
             // This is a command block - take appropriate action based on the 
command
-            HoodieCommandBlock commandBlock = (HoodieCommandBlock) r;
+            HoodieCommandBlock commandBlock = (HoodieCommandBlock) logBlock;
             String targetInstantForCommandBlock =
-                
r.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
+                
logBlock.getLogBlockHeader().get(HoodieLogBlock.HeaderMetadataType.TARGET_INSTANT_TIME);
             switch (commandBlock.getType()) { // there can be different types 
of command blocks
               case ROLLBACK_PREVIOUS_BLOCK:
                 // Rollback the last read log block
@@ -264,7 +285,7 @@ public abstract class AbstractHoodieLogRecordScanner {
             LOG.info("Found a corrupt block in " + logFile.getPath());
             totalCorruptBlocks.incrementAndGet();
             // If there is a corrupt block - we will assume that this was the 
next data block
-            currentInstantLogBlocks.push(r);
+            currentInstantLogBlocks.push(logBlock);
             break;
           default:
             throw new UnsupportedOperationException("Block type not supported 
yet");
@@ -273,7 +294,7 @@ public abstract class AbstractHoodieLogRecordScanner {
       // merge the last read block when all the blocks are done reading
       if (!currentInstantLogBlocks.isEmpty()) {
         LOG.info("Merging the final data blocks");
-        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size());
+        processQueuedBlocksForInstant(currentInstantLogBlocks, 
scannedLogFiles.size(), keys);
       }
       // Done
       progress = 1.0f;
@@ -308,9 +329,14 @@ public abstract class AbstractHoodieLogRecordScanner {
    * Iterate over the GenericRecord in the block, read the hoodie key and 
partition path and call subclass processors to
    * handle it.
    */
-  private void processDataBlock(HoodieDataBlock dataBlock) throws Exception {
+  private void processDataBlock(HoodieDataBlock dataBlock, 
Option<List<String>> keys) throws Exception {
     // TODO (NA) - Implement getRecordItr() in HoodieAvroDataBlock and use 
that here
-    List<IndexedRecord> recs = dataBlock.getRecords();
+    List<IndexedRecord> recs = new ArrayList<>();
+    if (!keys.isPresent()) {
+      recs = dataBlock.getRecords();
+    } else {
+      recs = dataBlock.getRecords(keys.get());
+    }
     totalLogRecords.addAndGet(recs.size());
     for (IndexedRecord rec : recs) {
       processNextRecord(createHoodieRecord(rec));
@@ -342,17 +368,18 @@ public abstract class AbstractHoodieLogRecordScanner {
   /**
    * Process the set of log blocks belonging to the last instant which is read 
fully.
    */
-  private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> lastBlocks, 
int numLogFilesSeen) throws Exception {
-    while (!lastBlocks.isEmpty()) {
-      LOG.info("Number of remaining logblocks to merge " + lastBlocks.size());
+  private void processQueuedBlocksForInstant(Deque<HoodieLogBlock> logBlocks, 
int numLogFilesSeen,
+                                             Option<List<String>> keys) throws 
Exception {
+    while (!logBlocks.isEmpty()) {
+      LOG.info("Number of remaining logblocks to merge " + logBlocks.size());
       // poll the element at the bottom of the stack since that's the order it 
was inserted
-      HoodieLogBlock lastBlock = lastBlocks.pollLast();
+      HoodieLogBlock lastBlock = logBlocks.pollLast();
       switch (lastBlock.getBlockType()) {
         case AVRO_DATA_BLOCK:
-          processDataBlock((HoodieAvroDataBlock) lastBlock);
+          processDataBlock((HoodieAvroDataBlock) lastBlock, keys);
           break;
         case HFILE_DATA_BLOCK:
-          processDataBlock((HoodieHFileDataBlock) lastBlock);
+          processDataBlock((HoodieHFileDataBlock) lastBlock, keys);
           break;
         case DELETE_BLOCK:
           Arrays.stream(((HoodieDeleteBlock) 
lastBlock).getKeysToDelete()).forEach(this::processNextDeletedKey);
@@ -432,6 +459,6 @@ public abstract class AbstractHoodieLogRecordScanner {
       throw new UnsupportedOperationException();
     }
 
-    public abstract AbstractHoodieLogRecordScanner build();
+    public abstract AbstractHoodieLogRecordReader build();
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
index f0f3842..cdf3065 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
@@ -70,17 +70,24 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
   private long reverseLogFilePosition;
   private long lastReverseLogFilePosition;
   private boolean reverseReader;
+  private boolean enableInlineReading;
   private boolean closed = false;
   private transient Thread shutdownThread = null;
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
                              boolean readBlockLazily, boolean reverseReader) 
throws IOException {
+    this(fs, logFile, readerSchema, bufferSize, readBlockLazily, 
reverseReader, false);
+  }
+
+  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
+                             boolean readBlockLazily, boolean reverseReader, 
boolean enableInlineReading) throws IOException {
     FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
     this.logFile = logFile;
     this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
     this.readerSchema = readerSchema;
     this.readBlockLazily = readBlockLazily;
     this.reverseReader = reverseReader;
+    this.enableInlineReading = enableInlineReading;
     if (this.reverseReader) {
       this.reverseLogFilePosition = this.lastReverseLogFilePosition = 
fs.getFileStatus(logFile.getPath()).getLen();
     }
@@ -248,7 +255,7 @@ public class HoodieLogFileReader implements 
HoodieLogFormat.Reader {
         }
       case HFILE_DATA_BLOCK:
         return new HoodieHFileDataBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
-              contentPosition, contentLength, blockEndPos, readerSchema, 
header, footer);
+              contentPosition, contentLength, blockEndPos, readerSchema, 
header, footer, enableInlineReading);
       case DELETE_BLOCK:
         return HoodieDeleteBlock.getBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
             contentPosition, contentLength, blockEndPos, header, footer);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
index 7267227..36fa187 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFormatReader.java
@@ -49,7 +49,12 @@ public class HoodieLogFormatReader implements 
HoodieLogFormat.Reader {
   private static final Logger LOG = 
LogManager.getLogger(HoodieLogFormatReader.class);
 
   HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema 
readerSchema, boolean readBlocksLazily,
-      boolean reverseLogReader, int bufferSize) throws IOException {
+                        boolean reverseLogReader, int bufferSize) throws 
IOException {
+    this(fs, logFiles, readerSchema, readBlocksLazily, reverseLogReader, 
bufferSize, false);
+  }
+
+  HoodieLogFormatReader(FileSystem fs, List<HoodieLogFile> logFiles, Schema 
readerSchema, boolean readBlocksLazily,
+      boolean reverseLogReader, int bufferSize, boolean enableInlineReading) 
throws IOException {
     this.logFiles = logFiles;
     this.fs = fs;
     this.readerSchema = readerSchema;
@@ -59,7 +64,7 @@ public class HoodieLogFormatReader implements 
HoodieLogFormat.Reader {
     this.prevReadersInOpenState = new ArrayList<>();
     if (logFiles.size() > 0) {
       HoodieLogFile nextLogFile = logFiles.remove(0);
-      this.currentReader = new HoodieLogFileReader(fs, nextLogFile, 
readerSchema, bufferSize, readBlocksLazily, false);
+      this.currentReader = new HoodieLogFileReader(fs, nextLogFile, 
readerSchema, bufferSize, readBlocksLazily, false, enableInlineReading);
     }
   }
 
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
index 18b2672..a8d97ac 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieMergedLogRecordScanner.java
@@ -54,7 +54,7 @@ import java.util.Map;
  * This results in two I/O passes over the log file.
  */
 
-public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
+public class HoodieMergedLogRecordScanner extends AbstractHoodieLogRecordReader
     implements Iterable<HoodieRecord<? extends HoodieRecordPayload>> {
 
   private static final Logger LOG = 
LogManager.getLogger(HoodieMergedLogRecordScanner.class);
@@ -77,8 +77,9 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
                                          boolean reverseReader, int 
bufferSize, String spillableMapBasePath,
                                          Option<InstantRange> instantRange, 
boolean autoScan,
                                          ExternalSpillableMap.DiskMapType 
diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
-                                         boolean withOperationField) {
-    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField);
+                                         boolean withOperationField, boolean 
enableFullScan) {
+    super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
readBlocksLazily, reverseReader, bufferSize, instantRange, withOperationField,
+        enableFullScan);
     try {
       // Store merged records for all versions for this log file, set the 
in-memory footprint to maxInMemoryMapSize
       this.records = new ExternalSpillableMap<>(maxMemorySizeInBytes, 
spillableMapBasePath, new DefaultSizeEstimator(),
@@ -166,7 +167,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
   /**
    * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
    */
-  public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
+  public static class Builder extends AbstractHoodieLogRecordReader.Builder {
     protected FileSystem fs;
     protected String basePath;
     protected List<String> logFilePaths;
@@ -276,7 +277,7 @@ public class HoodieMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner
       return new HoodieMergedLogRecordScanner(fs, basePath, logFilePaths, 
readerSchema,
           latestInstantTime, maxMemorySizeInBytes, readBlocksLazily, 
reverseReader,
           bufferSize, spillableMapBasePath, instantRange, autoScan,
-          diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField);
+          diskMapType, isBitCaskDiskMapCompressionEnabled, withOperationField, 
true);
     }
   }
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
index 8b26f72..f781a14 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieUnMergedLogRecordScanner.java
@@ -31,7 +31,7 @@ import java.util.List;
 /**
  * A scanner used to scan hoodie unmerged log records.
  */
-public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScanner {
+public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordReader {
 
   private final LogRecordScannerCallback callback;
 
@@ -72,7 +72,7 @@ public class HoodieUnMergedLogRecordScanner extends 
AbstractHoodieLogRecordScann
   /**
    * Builder used to build {@code HoodieUnMergedLogRecordScanner}.
    */
-  public static class Builder extends AbstractHoodieLogRecordScanner.Builder {
+  public static class Builder extends AbstractHoodieLogRecordReader.Builder {
     private FileSystem fs;
     private String basePath;
     private List<String> logFilePaths;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
index 8f5b741..2e4338e 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieDataBlock.java
@@ -111,6 +111,17 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     return records;
   }
 
+  /**
+   * Batch get of keys of interest. Implementation can choose to either do 
full scan and return matched entries or
+   * do a seek based parsing and return matched entries.
+   * @param keys keys of interest.
+   * @return List of IndexedRecords for the keys of interest.
+   * @throws IOException
+   */
+  public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
+    throw new UnsupportedOperationException("On demand batch get based on 
interested keys not supported");
+  }
+
   public Schema getSchema() {
     // if getSchema was invoked before converting byte [] to records
     if (records == null) {
@@ -119,7 +130,7 @@ public abstract class HoodieDataBlock extends 
HoodieLogBlock {
     return schema;
   }
 
-  private void createRecordsFromContentBytes() throws IOException {
+  protected void createRecordsFromContentBytes() throws IOException {
     if (readBlockLazily && !getContent().isPresent()) {
       // read log block contents from disk
       inflate();
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
index 6d2682a..a1e0c12 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
@@ -19,12 +19,16 @@
 package org.apache.hudi.common.table.log.block;
 
 import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
 import org.apache.hudi.common.model.HoodieLogFile;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.ValidationUtils;
 import org.apache.hudi.exception.HoodieIOException;
 import org.apache.hudi.io.storage.HoodieHFileReader;
+
+import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
@@ -44,6 +48,7 @@ import org.apache.hadoop.hbase.util.Pair;
 
 import java.io.ByteArrayOutputStream;
 import java.io.IOException;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.Iterator;
 import java.util.List;
@@ -61,6 +66,7 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
   private static final Logger LOG = 
LogManager.getLogger(HoodieHFileDataBlock.class);
   private static Compression.Algorithm compressionAlgorithm = 
Compression.Algorithm.GZ;
   private static int blockSize = 1 * 1024 * 1024;
+  private boolean enableInlineReading = false;
 
   public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> 
logBlockHeader,
        @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
@@ -71,10 +77,11 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
 
   public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream 
inputStream, Option<byte[]> content,
        boolean readBlockLazily, long position, long blockSize, long 
blockEndpos, Schema readerSchema,
-       Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> 
footer) {
+       Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> 
footer, boolean enableInlineReading) {
     super(content, inputStream, readBlockLazily,
           Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndpos)), readerSchema, header,
           footer);
+    this.enableInlineReading = enableInlineReading;
   }
 
   public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull 
Map<HeaderMetadataType, String> header) {
@@ -142,6 +149,50 @@ public class HoodieHFileDataBlock extends HoodieDataBlock {
   }
 
   @Override
+  protected void createRecordsFromContentBytes() throws IOException {
+    if (enableInlineReading) {
+      getRecords(Collections.emptyList());
+    } else {
+      super.createRecordsFromContentBytes();
+    }
+  }
+
+  @Override
+  public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
+    readWithInlineFS(keys);
+    return records;
+  }
+
+  private void readWithInlineFS(List<String> keys) throws IOException {
+    boolean enableFullScan = keys.isEmpty();
+    // Get schema from the header
+    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+    // If readerSchema was not present, use writerSchema
+    if (schema == null) {
+      schema = writerSchema;
+    }
+    Configuration conf = new Configuration();
+    CacheConfig cacheConf = new CacheConfig(conf);
+    Configuration inlineConf = new Configuration();
+    inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", 
InLineFileSystem.class.getName());
+
+    Path inlinePath = InLineFSUtils.getInlineFilePath(
+        getBlockContentLocation().get().getLogFile().getPath(),
+        
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
+        getBlockContentLocation().get().getContentPositionInLogFile(),
+        getBlockContentLocation().get().getBlockSize());
+    if (!enableFullScan) {
+      // HFile read will be efficient if keys are sorted, since on storage, 
records are sorted by key. This will avoid unnecessary seeks.
+      Collections.sort(keys);
+    }
+    HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, 
cacheConf, inlinePath.getFileSystem(inlineConf));
+    List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords 
= enableFullScan ? reader.readAllRecords(writerSchema, schema) :
+        reader.readRecords(keys, schema);
+    reader.close();
+    this.records = logRecords.stream().map(t -> 
t.getSecond()).collect(Collectors.toList());
+  }
+
+  @Override
   protected void deserializeRecords() throws IOException {
     // Get schema from the header
     Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
index b954e57..7b80d1a 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
@@ -21,6 +21,7 @@ package org.apache.hudi.io.storage;
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.ByteBuffer;
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashSet;
 import java.util.Iterator;
@@ -33,6 +34,7 @@ import org.apache.avro.Schema;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.fs.PositionedReadable;
 import org.apache.hadoop.fs.Seekable;
@@ -55,6 +57,7 @@ public class HoodieHFileReader<R extends IndexedRecord> 
implements HoodieFileRea
   private Path path;
   private Configuration conf;
   private HFile.Reader reader;
+  private FSDataInputStream fsDataInputStream;
   private Schema schema;
   // Scanner used to read individual keys. This is cached to prevent the 
overhead of opening the scanner for each
   // key retrieval.
@@ -72,6 +75,13 @@ public class HoodieHFileReader<R extends IndexedRecord> 
implements HoodieFileRea
     this.reader = HFile.createReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
   }
 
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig, FileSystem inlineFs) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.fsDataInputStream = inlineFs.open(path);
+    this.reader = HFile.createReader(inlineFs, path, cacheConfig, 
configuration);
+  }
+
   public HoodieHFileReader(byte[] content) throws IOException {
     Configuration conf = new Configuration();
     Path path = new Path("hoodie");
@@ -164,6 +174,25 @@ public class HoodieHFileReader<R extends IndexedRecord> 
implements HoodieFileRea
     return readAllRecords(schema, schema);
   }
 
+  public List<Pair<String, R>> readRecords(List<String> keys) throws 
IOException {
+    reader.loadFileInfo();
+    Schema schema = new Schema.Parser().parse(new 
String(reader.loadFileInfo().get(KEY_SCHEMA.getBytes())));
+    return readRecords(keys, schema);
+  }
+
+  public List<Pair<String, R>> readRecords(List<String> keys, Schema schema) 
throws IOException {
+    this.schema = schema;
+    reader.loadFileInfo();
+    List<Pair<String, R>> records = new ArrayList<>();
+    for (String key: keys) {
+      Option<R> value = getRecordByKey(key, schema);
+      if (value.isPresent()) {
+        records.add(new Pair(key, value.get()));
+      }
+    }
+    return records;
+  }
+
   @Override
   public Iterator getRecordIterator(Schema readerSchema) throws IOException {
     final HFileScanner scanner = reader.getScanner(false, false);
@@ -217,7 +246,7 @@ public class HoodieHFileReader<R extends IndexedRecord> 
implements HoodieFileRea
 
     synchronized (this) {
       if (keyScanner == null) {
-        keyScanner = reader.getScanner(true, true);
+        keyScanner = reader.getScanner(false, true);
       }
 
       if (keyScanner.seekTo(kv) == 0) {
@@ -250,6 +279,9 @@ public class HoodieHFileReader<R extends IndexedRecord> 
implements HoodieFileRea
     try {
       reader.close();
       reader = null;
+      if (fsDataInputStream != null) {
+        fsDataInputStream.close();
+      }
       keyScanner = null;
     } catch (IOException e) {
       throw new HoodieIOException("Error closing the hfile reader", e);
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java 
b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
index 1690c9a..b560b76 100644
--- a/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
+++ b/hudi-common/src/main/java/org/apache/hudi/metadata/BaseTableMetadata.java
@@ -30,6 +30,7 @@ import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.util.HoodieTimer;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.collection.Pair;
 import org.apache.hudi.exception.HoodieMetadataException;
 
 import org.apache.hadoop.fs.FileStatus;
@@ -38,10 +39,13 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 
 import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
 public abstract class BaseTableMetadata implements HoodieTableMetadata {
 
@@ -126,15 +130,12 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
   }
 
   @Override
-  public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> 
partitionPaths)
+  public Map<String, FileStatus[]> getAllFilesInPartitions(List<String> 
partitions)
       throws IOException {
     if (enabled) {
-      Map<String, FileStatus[]> partitionsFilesMap = new HashMap<>();
-
       try {
-        for (String partitionPath : partitionPaths) {
-          partitionsFilesMap.put(partitionPath, fetchAllFilesInPartition(new 
Path(partitionPath)));
-        }
+        List<Path> partitionPaths = partitions.stream().map(entry -> new 
Path(entry)).collect(Collectors.toList());
+        Map<String, FileStatus[]> partitionsFilesMap = 
fetchAllFilesInPartitionPaths(partitionPaths);
         return partitionsFilesMap;
       } catch (Exception e) {
         throw new HoodieMetadataException("Failed to retrieve files in 
partition from metadata", e);
@@ -142,7 +143,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
     }
 
     return new FileSystemBackedTableMetadata(getEngineContext(), hadoopConf, 
dataBasePath, metadataConfig.shouldAssumeDatePartitioning())
-        .getAllFilesInPartitions(partitionPaths);
+        .getAllFilesInPartitions(partitions);
   }
 
   /**
@@ -150,7 +151,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
    */
   protected List<String> fetchAllPartitionPaths() throws IOException {
     HoodieTimer timer = new HoodieTimer().startTimer();
-    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getRecordByKeyFromMetadata(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getRecordByKey(RECORDKEY_PARTITION_LIST, 
MetadataPartitionType.FILES.partitionPath());
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_PARTITIONS_STR, timer.endTimer()));
 
     List<String> partitions = Collections.emptyList();
@@ -184,7 +185,7 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
     }
 
     HoodieTimer timer = new HoodieTimer().startTimer();
-    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getRecordByKeyFromMetadata(partitionName, 
MetadataPartitionType.FILES.partitionPath());
+    Option<HoodieRecord<HoodieMetadataPayload>> hoodieRecord = 
getRecordByKey(partitionName, MetadataPartitionType.FILES.partitionPath());
     metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
 
     FileStatus[] statuses = {};
@@ -200,7 +201,48 @@ public abstract class BaseTableMetadata implements 
HoodieTableMetadata {
     return statuses;
   }
 
-  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKeyFromMetadata(String key, String partitionName);
+  Map<String, FileStatus[]> fetchAllFilesInPartitionPaths(List<Path> 
partitionPaths) throws IOException {
+    Map<String, Path> partitionInfo = new HashMap<>();
+    boolean foundNonPartitionedPath = false;
+    for (Path partitionPath: partitionPaths) {
+      String partitionName = FSUtils.getRelativePartitionPath(new 
Path(dataBasePath), partitionPath);
+      if (partitionName.isEmpty()) {
+        if (partitionInfo.size() > 1) {
+          throw new HoodieMetadataException("Found mix of partitioned and non 
partitioned paths while fetching data from metadata table");
+        }
+        partitionInfo.put(NON_PARTITIONED_NAME, partitionPath);
+        foundNonPartitionedPath = true;
+      } else {
+        if (foundNonPartitionedPath) {
+          throw new HoodieMetadataException("Found mix of partitioned and non 
partitioned paths while fetching data from metadata table");
+        }
+        partitionInfo.put(partitionName, partitionPath);
+      }
+    }
+
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
partitionsFileStatus =
+        getRecordsByKeys(new ArrayList<>(partitionInfo.keySet()), 
MetadataPartitionType.FILES.partitionPath());
+    metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.LOOKUP_FILES_STR, timer.endTimer()));
+    Map<String, FileStatus[]> result = new HashMap<>();
+
+    for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry: 
partitionsFileStatus) {
+      if (entry.getValue().isPresent()) {
+        if (!entry.getValue().get().getData().getDeletions().isEmpty()) {
+          throw new HoodieMetadataException("Metadata record for partition " + 
entry.getKey() + " is inconsistent: "
+              + entry.getValue().get().getData());
+        }
+        result.put(partitionInfo.get(entry.getKey()).toString(), 
entry.getValue().get().getData().getFileStatuses(hadoopConf.get(), 
partitionInfo.get(entry.getKey())));
+      }
+    }
+
+    LOG.info("Listed files in partitions from metadata: partition list =" + 
Arrays.toString(partitionPaths.toArray()));
+    return result;
+  }
+
+  protected abstract Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKey(String key, String partitionName);
+
+  protected abstract List<Pair<String, 
Option<HoodieRecord<HoodieMetadataPayload>>>> getRecordsByKeys(List<String> 
key, String partitionName);
 
   protected HoodieEngineContext getEngineContext() {
     return engineContext != null ? engineContext : new 
HoodieLocalEngineContext(hadoopConf.get());
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
index b0940a7..bf0cf92 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieBackedTableMetadata.java
@@ -81,7 +81,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   private final boolean reuse;
 
   // Readers for latest file slice corresponding to file groups in the 
metadata partition of interest
-  private Map<String, Pair<HoodieFileReader, 
HoodieMetadataMergedLogRecordScanner>> partitionReaders = new 
ConcurrentHashMap<>();
+  private Map<String, Pair<HoodieFileReader, 
HoodieMetadataMergedLogRecordReader>> partitionReaders = new 
ConcurrentHashMap<>();
 
   public HoodieBackedTableMetadata(HoodieEngineContext engineContext, 
HoodieMetadataConfig metadataConfig,
                                    String datasetBasePath, String 
spillableMapDirectory) {
@@ -120,18 +120,68 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   @Override
-  protected Option<HoodieRecord<HoodieMetadataPayload>> 
getRecordByKeyFromMetadata(String key, String partitionName) {
-    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers = 
openReadersIfNeeded(key, partitionName);
+  protected Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String 
key, String partitionName) {
+    return getRecordsByKeys(Collections.singletonList(key), 
partitionName).get(0).getValue();
+  }
+
+  protected List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys, String partitionName) {
+    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = 
openReadersIfNeeded(keys.get(0), partitionName);
     try {
       List<Long> timings = new ArrayList<>();
-      HoodieTimer timer = new HoodieTimer().startTimer();
       HoodieFileReader baseFileReader = readers.getKey();
-      HoodieMetadataMergedLogRecordScanner logRecordScanner = 
readers.getRight();
+      HoodieMetadataMergedLogRecordReader logRecordScanner = 
readers.getRight();
+
+      // local map to assist in merging with base file records
+      Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = 
readLogRecords(logRecordScanner, keys, timings);
+      List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = 
readFromBaseAndMergeWithLogRecords(baseFileReader,
+          keys, logRecords, timings);
+      LOG.info(String.format("Metadata read for %s keys took [baseFileRead, 
logMerge] %s ms", keys.size(), timings));
+      return result;
+    } catch (IOException ioe) {
+      throw new HoodieIOException("Error merging records from metadata table 
for  " + keys.size() + " key : ", ioe);
+    } finally {
+      if (!reuse) {
+        close(partitionName);
+      }
+    }
+  }
+
+  private Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> 
readLogRecords(HoodieMetadataMergedLogRecordReader logRecordScanner,
+                                                                               
   List<String> keys, List<Long> timings) {
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    Map<String, Option<HoodieRecord<HoodieMetadataPayload>>> logRecords = new 
HashMap<>();
+    // Retrieve records from log file
+    timer.startTimer();
+    if (logRecordScanner != null) {
+      if (metadataConfig.enableFullScan()) {
+        // path which does full scan of log files
+        for (String key : keys) {
+          logRecords.put(key, 
logRecordScanner.getRecordByKey(key).get(0).getValue());
+        }
+      } else {
+        // this path will do seeks pertaining to the keys passed in
+        List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
logRecordsList = logRecordScanner.getRecordsByKeys(keys);
+        for (Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>> entry : 
logRecordsList) {
+          logRecords.put(entry.getKey(), entry.getValue());
+        }
+      }
+    }
+    timings.add(timer.endTimer());
+    return logRecords;
+  }
 
-      // Retrieve record from base file
-      HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
-      if (baseFileReader != null) {
-        HoodieTimer readTimer = new HoodieTimer().startTimer();
+  private List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
readFromBaseAndMergeWithLogRecords(HoodieFileReader baseFileReader,
+                                                                               
                              List<String> keys, Map<String, 
Option<HoodieRecord<HoodieMetadataPayload>>> logRecords,
+                                                                               
                              List<Long> timings) throws IOException {
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> result = 
new ArrayList<>();
+    // merge with base records
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    timer.startTimer();
+    HoodieRecord<HoodieMetadataPayload> hoodieRecord = null;
+    // Retrieve record from base file
+    if (baseFileReader != null) {
+      HoodieTimer readTimer = new HoodieTimer().startTimer();
+      for (String key : keys) {
         Option<GenericRecord> baseRecord = baseFileReader.getRecordByKey(key);
         if (baseRecord.isPresent()) {
           hoodieRecord = metadataTableConfig.populateMetaFields()
@@ -139,46 +189,45 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
               : 
SpillableMapUtils.convertToHoodieRecordPayload(baseRecord.get(), 
metadataTableConfig.getPayloadClass(), metadataTableConfig.getPreCombineField(),
               Pair.of(metadataTableConfig.getRecordKeyFieldProp(), 
metadataTableConfig.getPartitionFieldProp()), false);
           metrics.ifPresent(m -> 
m.updateMetrics(HoodieMetadataMetrics.BASEFILE_READ_STR, readTimer.endTimer()));
-        }
-      }
-      timings.add(timer.endTimer());
-
-      // Retrieve record from log file
-      timer.startTimer();
-      if (logRecordScanner != null) {
-        Option<HoodieRecord<HoodieMetadataPayload>> logHoodieRecord = 
logRecordScanner.getRecordByKey(key);
-        if (logHoodieRecord.isPresent()) {
-          if (hoodieRecord != null) {
-            // Merge the payloads
-            HoodieRecordPayload mergedPayload = 
logHoodieRecord.get().getData().preCombine(hoodieRecord.getData());
-            hoodieRecord = new HoodieRecord(hoodieRecord.getKey(), 
mergedPayload);
+          // merge base file record w/ log record if present
+          if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
+            HoodieRecordPayload mergedPayload = 
logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
+            result.add(Pair.of(key, Option.of(new 
HoodieRecord(hoodieRecord.getKey(), mergedPayload))));
           } else {
-            hoodieRecord = logHoodieRecord.get();
+            // only base record
+            result.add(Pair.of(key, Option.of(hoodieRecord)));
+          }
+        } else {
+          // only log record
+          if (logRecords.containsKey(key) && logRecords.get(key).isPresent()) {
+            HoodieRecordPayload mergedPayload = 
logRecords.get(key).get().getData().preCombine(hoodieRecord.getData());
+            result.add(Pair.of(key, Option.of(new 
HoodieRecord(hoodieRecord.getKey(), mergedPayload))));
+          } else { // not found in both base file and log files
+            result.add(Pair.of(key, Option.empty()));
           }
         }
       }
       timings.add(timer.endTimer());
-      LOG.info(String.format("Metadata read for key %s took [baseFileRead, 
logMerge] %s ms", key, timings));
-      return Option.ofNullable(hoodieRecord);
-    } catch (IOException ioe) {
-      throw new HoodieIOException("Error merging records from metadata table 
for key :" + key, ioe);
-    } finally {
-      if (!reuse) {
-        close(partitionName);
+    } else {
+      // no base file at all
+      timings.add(timer.endTimer());
+      for (Map.Entry<String, Option<HoodieRecord<HoodieMetadataPayload>>> 
entry : logRecords.entrySet()) {
+        result.add(Pair.of(entry.getKey(), entry.getValue()));
       }
     }
+    return result;
   }
 
   /**
    * Returns a new pair of readers to the base and log files.
    */
-  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> 
openReadersIfNeeded(String key, String partitionName) {
+  private Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> 
openReadersIfNeeded(String key, String partitionName) {
     return partitionReaders.computeIfAbsent(partitionName, k -> {
       try {
         final long baseFileOpenMs;
         final long logScannerOpenMs;
         HoodieFileReader baseFileReader = null;
-        HoodieMetadataMergedLogRecordScanner logRecordScanner = null;
+        HoodieMetadataMergedLogRecordReader logRecordScanner = null;
 
         // Metadata is in sync till the latest completed instant on the dataset
         HoodieTimer timer = new HoodieTimer().startTimer();
@@ -192,7 +241,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         baseFileOpenMs = baseFileReaderOpenTimePair.getValue();
 
         // Open the log record scanner using the log files from the latest 
file slice
-        Pair<HoodieMetadataMergedLogRecordScanner, Long> 
logRecordScannerOpenTimePair = getLogRecordScanner(slice);
+        Pair<HoodieMetadataMergedLogRecordReader, Long> 
logRecordScannerOpenTimePair = getLogRecordScanner(slice);
         logRecordScanner = logRecordScannerOpenTimePair.getKey();
         logScannerOpenMs = logRecordScannerOpenTimePair.getValue();
 
@@ -244,7 +293,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     return validInstantTimestamps;
   }
 
-  private Pair<HoodieMetadataMergedLogRecordScanner, Long> 
getLogRecordScanner(FileSlice slice) {
+  private Pair<HoodieMetadataMergedLogRecordReader, Long> 
getLogRecordScanner(FileSlice slice) {
     HoodieTimer timer = new HoodieTimer().startTimer();
     List<String> logFilePaths = slice.getLogFiles()
         .sorted(HoodieLogFile.getLogFileComparator())
@@ -261,7 +310,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
     // Load the schema
     Schema schema = 
HoodieAvroUtils.addMetadataFields(HoodieMetadataRecord.getClassSchema());
     HoodieCommonConfig commonConfig = 
HoodieCommonConfig.newBuilder().fromProperties(metadataConfig.getProps()).build();
-    HoodieMetadataMergedLogRecordScanner logRecordScanner = 
HoodieMetadataMergedLogRecordScanner.newBuilder()
+    HoodieMetadataMergedLogRecordReader logRecordScanner = 
HoodieMetadataMergedLogRecordReader.newBuilder()
         .withFileSystem(metadataMetaClient.getFs())
         .withBasePath(metadataBasePath)
         .withLogFilePaths(logFilePaths)
@@ -273,6 +322,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
         .withDiskMapType(commonConfig.getSpillableDiskMapType())
         
.withBitCaskDiskMapCompressionEnabled(commonConfig.isBitCaskDiskMapCompressionEnabled())
         .withLogBlockTimestamps(validInstantTimestamps)
+        .enableFullScan(metadataConfig.enableFullScan())
         .build();
 
     Long logScannerOpenMs = timer.endTimer();
@@ -319,7 +369,7 @@ public class HoodieBackedTableMetadata extends 
BaseTableMetadata {
   }
 
   private synchronized void close(String partitionName) {
-    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordScanner> readers = 
partitionReaders.remove(partitionName);
+    Pair<HoodieFileReader, HoodieMetadataMergedLogRecordReader> readers = 
partitionReaders.remove(partitionName);
     if (readers != null) {
       try {
         if (readers.getKey() != null) {
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
similarity index 76%
rename from 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
rename to 
hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
index 3132ea6..131ca3b 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordScanner.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/metadata/HoodieMetadataMergedLogRecordReader.java
@@ -19,12 +19,16 @@
 package org.apache.hudi.metadata;
 
 import java.io.IOException;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Set;
 
 import org.apache.avro.Schema;
 import org.apache.hadoop.fs.FileSystem;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
 import org.apache.hudi.common.model.HoodieKey;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
@@ -32,26 +36,30 @@ import 
org.apache.hudi.common.table.log.HoodieMergedLogRecordScanner;
 import org.apache.hudi.common.table.log.InstantRange;
 import org.apache.hudi.common.util.Option;
 import org.apache.hudi.common.util.collection.ExternalSpillableMap;
+import org.apache.hudi.common.util.collection.Pair;
 
 /**
  * A {@code HoodieMergedLogRecordScanner} implementation which only merged 
records matching providing keys. This is
  * useful in limiting memory usage when only a small subset of updates records 
are to be read.
  */
-public class HoodieMetadataMergedLogRecordScanner extends 
HoodieMergedLogRecordScanner {
+public class HoodieMetadataMergedLogRecordReader extends 
HoodieMergedLogRecordScanner {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieMetadataMergedLogRecordReader.class);
 
   // Set of all record keys that are to be read in memory
   private Set<String> mergeKeyFilter;
 
-  private HoodieMetadataMergedLogRecordScanner(FileSystem fs, String basePath, 
List<String> logFilePaths,
+  private HoodieMetadataMergedLogRecordReader(FileSystem fs, String basePath, 
List<String> logFilePaths,
                                               Schema readerSchema, String 
latestInstantTime, Long maxMemorySizeInBytes, int bufferSize,
                                               String spillableMapBasePath, 
Set<String> mergeKeyFilter,
                                               ExternalSpillableMap.DiskMapType 
diskMapType, boolean isBitCaskDiskMapCompressionEnabled,
-                                              Option<InstantRange> 
instantRange) {
+                                              Option<InstantRange> 
instantRange, boolean enableFullScan) {
     super(fs, basePath, logFilePaths, readerSchema, latestInstantTime, 
maxMemorySizeInBytes, false, false, bufferSize,
-        spillableMapBasePath, instantRange, false, diskMapType, 
isBitCaskDiskMapCompressionEnabled, false);
+        spillableMapBasePath, instantRange, false, diskMapType, 
isBitCaskDiskMapCompressionEnabled, false, enableFullScan);
     this.mergeKeyFilter = mergeKeyFilter;
-
-    performScan();
+    if (enableFullScan) {
+      performScan();
+    }
   }
 
   @Override
@@ -71,8 +79,8 @@ public class HoodieMetadataMergedLogRecordScanner extends 
HoodieMergedLogRecordS
   /**
    * Returns the builder for {@code HoodieMetadataMergedLogRecordScanner}.
    */
-  public static HoodieMetadataMergedLogRecordScanner.Builder newBuilder() {
-    return new HoodieMetadataMergedLogRecordScanner.Builder();
+  public static HoodieMetadataMergedLogRecordReader.Builder newBuilder() {
+    return new HoodieMetadataMergedLogRecordReader.Builder();
   }
 
   /**
@@ -81,8 +89,22 @@ public class HoodieMetadataMergedLogRecordScanner extends 
HoodieMergedLogRecordS
    * @param key Key of the record to retrieve
    * @return {@code HoodieRecord} if key was found else {@code Option.empty()}
    */
-  public Option<HoodieRecord<HoodieMetadataPayload>> getRecordByKey(String 
key) {
-    return Option.ofNullable((HoodieRecord) records.get(key));
+  public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordByKey(String key) {
+    return Collections.singletonList(Pair.of(key, 
Option.ofNullable((HoodieRecord) records.get(key))));
+  }
+
+  public List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
getRecordsByKeys(List<String> keys) {
+    records.clear();
+    scan(Option.of(keys));
+    List<Pair<String, Option<HoodieRecord<HoodieMetadataPayload>>>> 
metadataRecords = new ArrayList<>();
+    keys.forEach(entry -> {
+      if (records.containsKey(entry)) {
+        metadataRecords.add(Pair.of(entry, Option.ofNullable((HoodieRecord) 
records.get(entry))));
+      } else {
+        metadataRecords.add(Pair.of(entry, Option.empty()));
+      }
+    });
+    return metadataRecords;
   }
 
   /**
@@ -90,6 +112,8 @@ public class HoodieMetadataMergedLogRecordScanner extends 
HoodieMergedLogRecordS
    */
   public static class Builder extends HoodieMergedLogRecordScanner.Builder {
     private Set<String> mergeKeyFilter = Collections.emptySet();
+    private boolean enableFullScan;
+    private boolean enableInlineReading;
 
     @Override
     public Builder withFileSystem(FileSystem fs) {
@@ -171,11 +195,16 @@ public class HoodieMetadataMergedLogRecordScanner extends 
HoodieMergedLogRecordS
       return this;
     }
 
+    public Builder enableFullScan(boolean enableFullScan) {
+      this.enableFullScan = enableFullScan;
+      return this;
+    }
+
     @Override
-    public HoodieMetadataMergedLogRecordScanner build() {
-      return new HoodieMetadataMergedLogRecordScanner(fs, basePath, 
logFilePaths, readerSchema,
+    public HoodieMetadataMergedLogRecordReader build() {
+      return new HoodieMetadataMergedLogRecordReader(fs, basePath, 
logFilePaths, readerSchema,
           latestInstantTime, maxMemorySizeInBytes, bufferSize, 
spillableMapBasePath, mergeKeyFilter,
-          diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange);
+          diskMapType, isBitCaskDiskMapCompressionEnabled, instantRange, 
enableFullScan);
     }
   }
 
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
index a647da9..1771db0 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/realtime/TestHoodieRealtimeRecordReader.java
@@ -27,6 +27,7 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableConfig;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.HoodieLogFormat.Writer;
+import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
 import org.apache.hudi.common.testutils.FileCreateUtils;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
@@ -137,6 +138,24 @@ public class TestHoodieRealtimeRecordReader {
   public void testReader(ExternalSpillableMap.DiskMapType diskMapType,
                          boolean isCompressionEnabled,
                          boolean partitioned) throws Exception {
+    testReaderInternal(diskMapType, isCompressionEnabled, partitioned);
+  }
+
+  @Test
+  public void testHFileInlineReader() throws Exception {
+    testReaderInternal(ExternalSpillableMap.DiskMapType.BITCASK, false, false,
+        HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK);
+  }
+
+  private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
+                                  boolean isCompressionEnabled,
+                                  boolean partitioned) throws Exception {
+    testReaderInternal(diskMapType, isCompressionEnabled, partitioned, 
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
+  }
+
+  private void testReaderInternal(ExternalSpillableMap.DiskMapType diskMapType,
+                         boolean isCompressionEnabled,
+                         boolean partitioned, 
HoodieLogBlock.HoodieLogBlockType logBlockType) throws Exception {
     // initial commit
     Schema schema = 
HoodieAvroUtils.addMetadataFields(SchemaTestUtil.getEvolvedSchema());
     HoodieTestUtils.init(hadoopConf, basePath.toString(), 
HoodieTableType.MERGE_ON_READ);
@@ -175,7 +194,7 @@ public class TestHoodieRealtimeRecordReader {
         } else {
           writer =
               InputFormatTestUtil.writeDataBlockToLogFile(partitionDir, fs, 
schema, "fileid0", baseInstant,
-                  instantTime, 120, 0, logVersion);
+                  instantTime, 120, 0, logVersion, logBlockType);
         }
         long size = writer.getCurrentSize();
         writer.close();
diff --git 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
index d10ccfc..13d9219 100644
--- 
a/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
+++ 
b/hudi-hadoop-mr/src/test/java/org/apache/hudi/hadoop/testutils/InputFormatTestUtil.java
@@ -27,6 +27,8 @@ import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.log.HoodieLogFormat;
 import org.apache.hudi.common.table.log.block.HoodieAvroDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieCommandBlock;
+import org.apache.hudi.common.table.log.block.HoodieDataBlock;
+import org.apache.hudi.common.table.log.block.HoodieHFileDataBlock;
 import org.apache.hudi.common.table.log.block.HoodieLogBlock;
 import org.apache.hudi.common.testutils.HoodieTestUtils;
 import org.apache.hudi.common.testutils.SchemaTestUtil;
@@ -301,7 +303,14 @@ public class InputFormatTestUtil {
 
   public static HoodieLogFormat.Writer writeDataBlockToLogFile(File 
partitionDir, FileSystem fs, Schema schema, String
       fileId,
-                                                               String 
baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion)
+                                                               String 
baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion) 
throws IOException, InterruptedException {
+    return writeDataBlockToLogFile(partitionDir, fs, schema, fileId, 
baseCommit, newCommit, numberOfRecords, offset, logVersion, 
HoodieLogBlock.HoodieLogBlockType.AVRO_DATA_BLOCK);
+  }
+
+  public static HoodieLogFormat.Writer writeDataBlockToLogFile(File 
partitionDir, FileSystem fs, Schema schema, String
+      fileId,
+                                                               String 
baseCommit, String newCommit, int numberOfRecords, int offset, int logVersion,
+                                                               
HoodieLogBlock.HoodieLogBlockType logBlockType)
       throws InterruptedException, IOException {
     HoodieLogFormat.Writer writer = 
HoodieLogFormat.newWriterBuilder().onParentPath(new 
Path(partitionDir.getPath()))
         
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(fileId).withLogVersion(logVersion)
@@ -314,7 +323,8 @@ public class InputFormatTestUtil {
     Map<HoodieLogBlock.HeaderMetadataType, String> header = new HashMap<>();
     header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, newCommit);
     header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
writeSchema.toString());
-    HoodieAvroDataBlock dataBlock = new HoodieAvroDataBlock(records, header);
+    HoodieDataBlock dataBlock = (logBlockType == 
HoodieLogBlock.HoodieLogBlockType.HFILE_DATA_BLOCK) ? new 
HoodieHFileDataBlock(records, header) :
+        new HoodieAvroDataBlock(records, header);
     writer.appendBlock(dataBlock);
     return writer;
   }

Reply via email to