vinothchandar commented on a change in pull request #4333:
URL: https://github.com/apache/hudi/pull/4333#discussion_r774314975



##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -88,76 +92,24 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSc
   }
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
-                             boolean readBlockLazily, boolean reverseReader, 
boolean enableInlineReading,
+                             boolean readBlockLazily, boolean reverseReader, 
boolean enableRecordLookups,
                              String keyField) throws IOException {
-    FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
-    this.logFile = logFile;
-    this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
+    // NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
+    //       is prefixed with an appropriate scheme given that we're not 
propagating the FS
+    //       further
+    this.logFile = new HoodieLogFile(fs.getFileStatus(logFile.getPath()));

Review comment:
       let'sgetrid of the extra fs. getFileStatus call. Its an extra Rpc to 
S3/HDFS 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -481,4 +442,59 @@ public long moveToPrev() throws IOException {
   public void remove() {
     throw new UnsupportedOperationException("Remove not supported for 
HoodieLogFileReader");
   }
+
+  /**
+   * Fetch the right {@link FSDataInputStream} to be used by wrapping with 
required input streams.
+   * @param fs instance of {@link FileSystem} in use.
+   * @param bufferSize buffer size to be used.
+   * @return the right {@link FSDataInputStream} as required.
+   */
+  private static FSDataInputStream getFSDataInputStream(FileSystem fs,

Review comment:
       has any code Changed here?
   
    Please refrain from relocating code in the same PR that also 
changes/rewrites actual logic. If you must, then annotate the PR with comments 
letting the reviewer know what's going on .

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -288,28 +288,14 @@ public synchronized void close() {
     }
   }
 
-  static class SeekableByteArrayInputStream extends ByteArrayInputStream 
implements Seekable, PositionedReadable {
+  static class SeekableByteArrayInputStream extends 
ByteBufferBackedInputStream implements Seekable, PositionedReadable {

Review comment:
       something that changes the reading for HFile in general, should go on 
its own PR?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -72,7 +76,7 @@
   private long reverseLogFilePosition;
   private long lastReverseLogFilePosition;
   private boolean reverseReader;
-  private boolean enableInlineReading;
+  private boolean enableRecordLookups;

Review comment:
       Wondering if there are actually two separate concepts here.
   
   inline reading - Where are use a parquet reader or hfile reader to read 
block content, iinstead of reading it out as a byte[] 
   
   record lookups - Whether we scan and filter the keys of interest or push the 
lookups to the reader/fileformat. 
   
   I feel the first one should be what the log reader should care about. We 
should push the record lookups into the file reader?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -88,76 +92,24 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSc
   }
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
-                             boolean readBlockLazily, boolean reverseReader, 
boolean enableInlineReading,
+                             boolean readBlockLazily, boolean reverseReader, 
boolean enableRecordLookups,
                              String keyField) throws IOException {
-    FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
-    this.logFile = logFile;
-    this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
+    // NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
+    //       is prefixed with an appropriate scheme given that we're not 
propagating the FS
+    //       further
+    this.logFile = new HoodieLogFile(fs.getFileStatus(logFile.getPath()));

Review comment:
       In general,  there should be no direct list or status fetches here. We 
removed all of them with O.9.0

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -208,71 +155,85 @@ private HoodieLogBlock readBlock() throws IOException {
     HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
 
     // 3. Read the block type for a log block
-    if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-      type = inputStream.readInt();
-
-      ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length, 
"Invalid block byte type found " + type);
-      blockType = HoodieLogBlockType.values()[type];
-    }
+    HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
 
     // 4. Read the header for a log block, if present
-    if (nextBlockVersion.hasHeader()) {
-      header = HoodieLogBlock.getLogMetadata(inputStream);
-    }
 
-    int contentLength = blocksize;
+    Map<HeaderMetadataType, String> header =
+        nextBlockVersion.hasHeader() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
+
     // 5. Read the content length for the content
-    if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-      contentLength = (int) inputStream.readLong();
-    }
+    // Fallback to full-block size if no content-length
+    // TODO replace w/ hasContentLength
+    int contentLength =
+        nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : 
blockSize;
 
     // 6. Read the content or skip content based on IO vs Memory trade-off by 
client
-    // TODO - have a max block size and reuse this buffer in the ByteBuffer
-    // (hard to guess max block size for now)
     long contentPosition = inputStream.getPos();
-    byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, 
contentLength, readBlockLazily);
+    boolean shouldReadLazily = readBlockLazily && 
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
+    Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, 
contentLength, shouldReadLazily);
 
     // 7. Read footer if any
-    Map<HeaderMetadataType, String> footer = null;
-    if (nextBlockVersion.hasFooter()) {
-      footer = HoodieLogBlock.getLogMetadata(inputStream);
-    }
+    Map<HeaderMetadataType, String> footer =
+        nextBlockVersion.hasFooter() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
 
     // 8. Read log block length, if present. This acts as a reverse pointer 
when traversing a
     // log file in reverse
-    @SuppressWarnings("unused")
-    long logBlockLength = 0;
     if (nextBlockVersion.hasLogBlockLength()) {
-      logBlockLength = inputStream.readLong();
+      inputStream.readLong();
     }
 
     // 9. Read the log block end position in the log file
     long blockEndPos = inputStream.getPos();
 
     switch (Objects.requireNonNull(blockType)) {
-      // based on type read the block
       case AVRO_DATA_BLOCK:
         if (nextBlockVersion.getVersion() == 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-          return HoodieAvroDataBlock.getBlock(content, readerSchema);
+          return HoodieAvroDataBlock.getBlock(content.get(), readerSchema);
         } else {
-          return new HoodieAvroDataBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
-              contentPosition, contentLength, blockEndPos, readerSchema, 
header, footer, keyField);
+          return new HoodieAvroDataBlock(logFile, inputStream, content, 
readBlockLazily,
+              contentPosition, contentLength, blockEndPos, 
Option.ofNullable(readerSchema), header, footer, keyField);
         }
+
       case HFILE_DATA_BLOCK:
-        return new HoodieHFileDataBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
-            contentPosition, contentLength, blockEndPos, readerSchema,
-            header, footer, enableInlineReading, keyField);
+        checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
+            String.format("HFile block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
+
+        return new HoodieHFileDataBlock(logFile, inputStream, content, 
readBlockLazily,
+            contentPosition, contentLength, blockEndPos, 
Option.ofNullable(readerSchema),
+            header, footer, keyField, enableRecordLookups);
+
+      case PARQUET_DATA_BLOCK:
+        checkState(nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION,
+            String.format("Parquet block could not be of version (%d)", 
HoodieLogFormatVersion.DEFAULT_VERSION));
+
+        return new HoodieParquetDataBlock(logFile, inputStream, content, 
readBlockLazily,
+            contentPosition, contentLength, blockEndPos, 
Option.ofNullable(readerSchema), header, footer, keyField);
+
       case DELETE_BLOCK:
-        return HoodieDeleteBlock.getBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
+        return HoodieDeleteBlock.getBlock(logFile, inputStream, content, 
readBlockLazily,
             contentPosition, contentLength, blockEndPos, header, footer);
+
       case COMMAND_BLOCK:
-        return HoodieCommandBlock.getBlock(logFile, inputStream, 
Option.ofNullable(content), readBlockLazily,
+        return HoodieCommandBlock.getBlock(logFile, inputStream, content, 
readBlockLazily,
             contentPosition, contentLength, blockEndPos, header, footer);
+
       default:
         throw new HoodieNotSupportedException("Unsupported Block " + 
blockType);
     }
   }
 
+  @Nullable
+  private HoodieLogBlockType tryReadBlockType(HoodieLogFormat.LogFormatVersion 
blockVersion) throws IOException {
+    if (blockVersion.getVersion() == HoodieLogFormatVersion.DEFAULT_VERSION) {
+      return null;

Review comment:
       The nulls as sentinel here makes one nervous. Can we use Option?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -208,71 +155,85 @@ private HoodieLogBlock readBlock() throws IOException {
     HoodieLogFormat.LogFormatVersion nextBlockVersion = readVersion();
 
     // 3. Read the block type for a log block
-    if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-      type = inputStream.readInt();
-
-      ValidationUtils.checkArgument(type < HoodieLogBlockType.values().length, 
"Invalid block byte type found " + type);
-      blockType = HoodieLogBlockType.values()[type];
-    }
+    HoodieLogBlockType blockType = tryReadBlockType(nextBlockVersion);
 
     // 4. Read the header for a log block, if present
-    if (nextBlockVersion.hasHeader()) {
-      header = HoodieLogBlock.getLogMetadata(inputStream);
-    }
 
-    int contentLength = blocksize;
+    Map<HeaderMetadataType, String> header =
+        nextBlockVersion.hasHeader() ? 
HoodieLogBlock.getLogMetadata(inputStream) : null;
+
     // 5. Read the content length for the content
-    if (nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION) {
-      contentLength = (int) inputStream.readLong();
-    }
+    // Fallback to full-block size if no content-length
+    // TODO replace w/ hasContentLength
+    int contentLength =
+        nextBlockVersion.getVersion() != 
HoodieLogFormatVersion.DEFAULT_VERSION ? (int) inputStream.readLong() : 
blockSize;
 
     // 6. Read the content or skip content based on IO vs Memory trade-off by 
client
-    // TODO - have a max block size and reuse this buffer in the ByteBuffer
-    // (hard to guess max block size for now)
     long contentPosition = inputStream.getPos();
-    byte[] content = HoodieLogBlock.readOrSkipContent(inputStream, 
contentLength, readBlockLazily);
+    boolean shouldReadLazily = readBlockLazily && 
nextBlockVersion.getVersion() != HoodieLogFormatVersion.DEFAULT_VERSION;
+    Option<byte[]> content = HoodieLogBlock.tryReadContent(inputStream, 
contentLength, shouldReadLazily);
 
     // 7. Read footer if any
-    Map<HeaderMetadataType, String> footer = null;
-    if (nextBlockVersion.hasFooter()) {
-      footer = HoodieLogBlock.getLogMetadata(inputStream);
-    }
+    Map<HeaderMetadataType, String> footer =

Review comment:
       Can we work with empty map instead of null? 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
    * Type of the log block WARNING: This enum is serialized as the ordinal. 
Only add new enums at the end.
    */
   public enum HoodieLogBlockType {
-    COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, 
HFILE_DATA_BLOCK
+    COMMAND_BLOCK(":command"),
+    DELETE_BLOCK(":delete"),
+    CORRUPT_BLOCK(":corrupted"),

Review comment:
       rename: "corrupt" its  a valid adjective. Match the enum name ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -58,24 +57,37 @@
 import java.util.TreeMap;
 import java.util.stream.Collectors;
 
+import static org.apache.hudi.common.util.ValidationUtils.checkState;
+
 /**
  * HoodieHFileDataBlock contains a list of records stored inside an HFile 
format. It is used with the HFile
  * base file format.
  */
 public class HoodieHFileDataBlock extends HoodieDataBlock {
   private static final Logger LOG = 
LogManager.getLogger(HoodieHFileDataBlock.class);
-  private static Compression.Algorithm compressionAlgorithm = 
Compression.Algorithm.GZ;
-  private static int blockSize = 1 * 1024 * 1024;
-  private boolean enableInlineReading = false;
-
-  public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream 
inputStream, Option<byte[]> content,
-                              boolean readBlockLazily, long position, long 
blockSize, long blockEndpos,
-                              Schema readerSchema, Map<HeaderMetadataType, 
String> header,
-                              Map<HeaderMetadataType, String> footer, boolean 
enableInlineReading, String keyField) {
-    super(content, inputStream, readBlockLazily,
-        Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndpos)),
-        readerSchema, header, footer, keyField);
-    this.enableInlineReading = enableInlineReading;
+
+  private static final int DEFAULT_BLOCK_SIZE = 1024 * 1024;
+  private static final Compression.Algorithm DEFAULT_COMPRESSION_ALGO = 
Compression.Algorithm.GZ;
+
+  public HoodieHFileDataBlock(HoodieLogFile logFile,

Review comment:
       let's revert back to the wrapped style. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java
##########
@@ -88,76 +92,24 @@ public HoodieLogFileReader(FileSystem fs, HoodieLogFile 
logFile, Schema readerSc
   }
 
   public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema, int bufferSize,
-                             boolean readBlockLazily, boolean reverseReader, 
boolean enableInlineReading,
+                             boolean readBlockLazily, boolean reverseReader, 
boolean enableRecordLookups,
                              String keyField) throws IOException {
-    FSDataInputStream fsDataInputStream = fs.open(logFile.getPath(), 
bufferSize);
-    this.logFile = logFile;
-    this.inputStream = getFSDataInputStream(fsDataInputStream, fs, bufferSize);
+    // NOTE: We repackage {@code HoodieLogFile} here to make sure that the 
provided path
+    //       is prefixed with an appropriate scheme given that we're not 
propagating the FS
+    //       further
+    this.logFile = new HoodieLogFile(fs.getFileStatus(logFile.getPath()));
+    this.inputStream = getFSDataInputStream(fs, this.logFile, bufferSize);
     this.readerSchema = readerSchema;
     this.readBlockLazily = readBlockLazily;
     this.reverseReader = reverseReader;
-    this.enableInlineReading = enableInlineReading;
+    this.enableRecordLookups = enableRecordLookups;
     this.keyField = keyField;
     if (this.reverseReader) {
-      this.reverseLogFilePosition = this.lastReverseLogFilePosition = 
logFile.getFileSize();
+      this.reverseLogFilePosition = this.lastReverseLogFilePosition = 
this.logFile.getFileSize();
     }
     addShutDownHook();
   }
 
-  public HoodieLogFileReader(FileSystem fs, HoodieLogFile logFile, Schema 
readerSchema) throws IOException {
-    this(fs, logFile, readerSchema, DEFAULT_BUFFER_SIZE, false, false);
-  }
-
-  /**
-   * Fetch the right {@link FSDataInputStream} to be used by wrapping with 
required input streams.
-   * @param fsDataInputStream original instance of {@link FSDataInputStream}.
-   * @param fs instance of {@link FileSystem} in use.
-   * @param bufferSize buffer size to be used.
-   * @return the right {@link FSDataInputStream} as required.
-   */
-  private FSDataInputStream getFSDataInputStream(FSDataInputStream 
fsDataInputStream, FileSystem fs, int bufferSize) {

Review comment:
       Where is all this moved to? (note to self) 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -148,65 +165,40 @@ public HoodieLogBlockType getBlockType() {
   }
 
   @Override
-  protected void createRecordsFromContentBytes() throws IOException {
-    if (enableInlineReading) {
-      getRecords(Collections.emptyList());
-    } else {
-      super.createRecordsFromContentBytes();
-    }
-  }
-
-  @Override
-  public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
-    readWithInlineFS(keys);
-    return records;
-  }
-
-  private void readWithInlineFS(List<String> keys) throws IOException {
-    boolean enableFullScan = keys.isEmpty();
-    // Get schema from the header
-    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    // If readerSchema was not present, use writerSchema
-    if (schema == null) {
-      schema = writerSchema;
-    }
-    Configuration conf = new Configuration();
-    CacheConfig cacheConf = new CacheConfig(conf);
+  protected List<IndexedRecord> lookupRecords(List<String> keys) throws 
IOException {
     Configuration inlineConf = new Configuration();
     inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", 
InLineFileSystem.class.getName());
 
+    HoodieLogBlockContentLocation blockContentLoc = 
getBlockContentLocation().get();
+
     Path inlinePath = InLineFSUtils.getInlineFilePath(
-        getBlockContentLocation().get().getLogFile().getPath(),
-        
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
-        getBlockContentLocation().get().getContentPositionInLogFile(),
-        getBlockContentLocation().get().getBlockSize());
-    if (!enableFullScan) {
-      // HFile read will be efficient if keys are sorted, since on storage, 
records are sorted by key. This will avoid unnecessary seeks.
-      Collections.sort(keys);
+        blockContentLoc.getLogFile().getPath(),
+        
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+        blockContentLoc.getContentPositionInLogFile(),
+        blockContentLoc.getBlockSize());
+
+    // HFile read will be efficient if keys are sorted, since on storage, 
records are sorted by key. This will avoid unnecessary seeks.
+    Collections.sort(keys);
+
+    try (HoodieHFileReader<IndexedRecord> reader =
+             new HoodieHFileReader<>(inlineConf, inlinePath, new 
CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) {
+      // Get writer's schema from the header
+      List<Pair<String, IndexedRecord>> logRecords = reader.readRecords(keys, 
readerSchema);
+      return 
logRecords.stream().map(Pair::getSecond).collect(Collectors.toList());
     }
-    HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, 
cacheConf, inlinePath.getFileSystem(inlineConf));
-    List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords 
= enableFullScan ? reader.readAllRecords(writerSchema, schema) :

Review comment:
       We should just be using our own Pair.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
    * Type of the log block WARNING: This enum is serialized as the ordinal. 
Only add new enums at the end.
    */
   public enum HoodieLogBlockType {
-    COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, 
HFILE_DATA_BLOCK
+    COMMAND_BLOCK(":command"),
+    DELETE_BLOCK(":delete"),
+    CORRUPT_BLOCK(":corrupted"),

Review comment:
       rename: "corrupt" its  a valid adjective. Match the enum name ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -148,65 +165,40 @@ public HoodieLogBlockType getBlockType() {
   }
 
   @Override
-  protected void createRecordsFromContentBytes() throws IOException {
-    if (enableInlineReading) {
-      getRecords(Collections.emptyList());
-    } else {
-      super.createRecordsFromContentBytes();
-    }
-  }
-
-  @Override
-  public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
-    readWithInlineFS(keys);
-    return records;
-  }
-
-  private void readWithInlineFS(List<String> keys) throws IOException {
-    boolean enableFullScan = keys.isEmpty();
-    // Get schema from the header
-    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    // If readerSchema was not present, use writerSchema
-    if (schema == null) {
-      schema = writerSchema;
-    }
-    Configuration conf = new Configuration();
-    CacheConfig cacheConf = new CacheConfig(conf);
+  protected List<IndexedRecord> lookupRecords(List<String> keys) throws 
IOException {
     Configuration inlineConf = new Configuration();
     inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", 
InLineFileSystem.class.getName());
 
+    HoodieLogBlockContentLocation blockContentLoc = 
getBlockContentLocation().get();
+
     Path inlinePath = InLineFSUtils.getInlineFilePath(
-        getBlockContentLocation().get().getLogFile().getPath(),
-        
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
-        getBlockContentLocation().get().getContentPositionInLogFile(),
-        getBlockContentLocation().get().getBlockSize());
-    if (!enableFullScan) {
-      // HFile read will be efficient if keys are sorted, since on storage, 
records are sorted by key. This will avoid unnecessary seeks.
-      Collections.sort(keys);
+        blockContentLoc.getLogFile().getPath(),
+        
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+        blockContentLoc.getContentPositionInLogFile(),
+        blockContentLoc.getBlockSize());
+
+    // HFile read will be efficient if keys are sorted, since on storage, 
records are sorted by key. This will avoid unnecessary seeks.
+    Collections.sort(keys);
+
+    try (HoodieHFileReader<IndexedRecord> reader =
+             new HoodieHFileReader<>(inlineConf, inlinePath, new 
CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) {
+      // Get writer's schema from the header
+      List<Pair<String, IndexedRecord>> logRecords = reader.readRecords(keys, 
readerSchema);
+      return 
logRecords.stream().map(Pair::getSecond).collect(Collectors.toList());
     }
-    HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, 
cacheConf, inlinePath.getFileSystem(inlineConf));
-    List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords 
= enableFullScan ? reader.readAllRecords(writerSchema, schema) :
-        reader.readRecords(keys, schema);
-    reader.close();
-    this.records = logRecords.stream().map(t -> 
t.getSecond()).collect(Collectors.toList());
   }
 
   @Override
-  protected void deserializeRecords() throws IOException {
+  protected List<IndexedRecord> deserializeRecords(byte[] content) throws 
IOException {
+    checkState(readerSchema != null, "Reader's schema has to be non-null");

Review comment:
       Is this because of change in the PR? the original code seems to expect 
null reader schema at times?

##########
File path: 
hudi-common/src/test/java/org/apache/hudi/common/testutils/HadoopMapRedUtils.java
##########
@@ -0,0 +1,87 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.testutils;
+
+import org.apache.hadoop.mapred.Counters;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.common.util.Option;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+public class HadoopMapRedUtils {

Review comment:
       Could we rewrite the tests in a different way, than relying on the 
mapred classes. generally, they are a world of pain as we test across hadoop 
versions 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+  public HoodieParquetDataBlock(HoodieLogFile logFile,
+                                FSDataInputStream inputStream,
+                                Option<byte[]> content,
+                                boolean readBlockLazily,
+                                long position, long blockSize, long 
blockEndPos,
+                                Option<Schema> readerSchema,
+                                Map<HeaderMetadataType, String> header,
+                                Map<HeaderMetadataType, String> footer,
+                                String keyField) {
+    super(content,
+        inputStream,
+        readBlockLazily,
+        Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndPos)),
+        readerSchema,
+        header,
+        footer,
+        keyField,
+        false);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header,
+      @Nonnull String keyField
+  ) {
+    super(records, header, new HashMap<>(), keyField);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header
+  ) {
+    super(records, header, new HashMap<>(), 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords(List<IndexedRecord> records) throws 
IOException {
+    if (records.size() == 0) {
+      return new byte[0];
+    }
+
+    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+        new AvroSchemaConverter().convert(writerSchema), writerSchema, 
Option.empty());
+
+    HoodieAvroParquetConfig avroParquetConfig =
+        new HoodieAvroParquetConfig(
+            writeSupport,
+            // TODO fetch compression codec from the config
+            CompressionCodecName.GZIP,
+            ParquetWriter.DEFAULT_BLOCK_SIZE,
+            ParquetWriter.DEFAULT_PAGE_SIZE,
+            1024 * 1024 * 1024,
+            new Configuration(),
+            
Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+    try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
+      try (HoodieParquetStreamWriter<IndexedRecord> parquetWriter = new 
HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {
+        for (IndexedRecord record : records) {
+          String recordKey = getRecordKey(record);
+          parquetWriter.writeAvro(recordKey, record);
+        }
+        outputStream.flush();
+      }
+    }
+
+    return baos.toByteArray();
+  }
+
+  public static Iterator<IndexedRecord> 
getProjectedParquetRecordsIterator(Configuration conf,
+                                                                           
Schema readerSchema,
+                                                                           
InputFile inputFile) throws IOException {
+    AvroReadSupport.setAvroReadSchema(conf, readerSchema);
+    AvroReadSupport.setRequestedProjection(conf, readerSchema);
+    ParquetReader<IndexedRecord> reader =
+        
AvroParquetReader.<IndexedRecord>builder(inputFile).withConf(conf).build();
+    return new ParquetReaderIterator<>(reader);
+  }
+
+  /**
+   * NOTE: We're overriding the whole reading sequence to make sure we 
properly respect
+   *       the requested Reader's schema and only fetch the columns that have 
been explicitly
+   *       requested by the caller (providing projected Reader's schema)
+   */
+  @Override
+  protected List<IndexedRecord> readRecordsFromBlockPayload() throws 
IOException {
+    Configuration inlineConf = new Configuration();
+    inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", 
InLineFileSystem.class.getName());
+
+    HoodieLogBlockContentLocation blockContentLoc = 
getBlockContentLocation().get();
+
+    Path inlineLogFilePath = InLineFSUtils.getInlineFilePath(
+        blockContentLoc.getLogFile().getPath(),
+        
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+        blockContentLoc.getContentPositionInLogFile(),
+        blockContentLoc.getBlockSize());
+
+    ArrayList<IndexedRecord> records = new ArrayList<>();
+
+    getProjectedParquetRecordsIterator(
+        inlineConf,

Review comment:
       If someone manually sets `spark.sparkContext.hadoopConfiguration.set(x, 
y)` say configuring cloud access creds, this is going to be an issue, since 
`new Configuration()` won't pick that up

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/util/io/ByteBufferBackedInputStream.java
##########
@@ -0,0 +1,138 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.util.io;
+
+import javax.annotation.Nonnull;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Instance of {@link InputStream} backed by {@link ByteBuffer}, implementing 
following
+ * functionality (on top of what's required by {@link InputStream})
+ *
+ * <ol>
+ *   <li>Seeking: enables random access by allowing to seek to an arbitrary 
position w/in the stream</li>
+ *   <li>(Thread-safe) Copying: enables to copy from the underlying buffer not 
modifying the state of the stream</li>
+ * </ol>
+ *
+ * NOTE: Generally methods of this class are NOT thread-safe, unless specified 
otherwise
+ */
+public class ByteBufferBackedInputStream extends InputStream {

Review comment:
       can we UT this class?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -109,7 +116,25 @@ public long getLogBlockLength() {
    * Type of the log block WARNING: This enum is serialized as the ordinal. 
Only add new enums at the end.
    */
   public enum HoodieLogBlockType {
-    COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, 
HFILE_DATA_BLOCK
+    COMMAND_BLOCK(":command"),

Review comment:
       why the :, Why do we need the id?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,180 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.fs.inline.InLineFSUtils;
+import org.apache.hudi.common.fs.inline.InLineFileSystem;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.hadoop.util.HadoopInputFile;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+  public HoodieParquetDataBlock(HoodieLogFile logFile,
+                                FSDataInputStream inputStream,
+                                Option<byte[]> content,
+                                boolean readBlockLazily,
+                                long position, long blockSize, long 
blockEndPos,
+                                Option<Schema> readerSchema,
+                                Map<HeaderMetadataType, String> header,
+                                Map<HeaderMetadataType, String> footer,
+                                String keyField) {
+    super(content,
+        inputStream,
+        readBlockLazily,
+        Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndPos)),
+        readerSchema,
+        header,
+        footer,
+        keyField,
+        false);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header,
+      @Nonnull String keyField
+  ) {
+    super(records, header, new HashMap<>(), keyField);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header
+  ) {
+    super(records, header, new HashMap<>(), 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords(List<IndexedRecord> records) throws 
IOException {
+    if (records.size() == 0) {
+      return new byte[0];
+    }
+
+    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+        new AvroSchemaConverter().convert(writerSchema), writerSchema, 
Option.empty());
+
+    HoodieAvroParquetConfig avroParquetConfig =
+        new HoodieAvroParquetConfig(
+            writeSupport,
+            // TODO fetch compression codec from the config
+            CompressionCodecName.GZIP,
+            ParquetWriter.DEFAULT_BLOCK_SIZE,
+            ParquetWriter.DEFAULT_PAGE_SIZE,
+            1024 * 1024 * 1024,
+            new Configuration(),
+            
Double.parseDouble(String.valueOf(0.1)));//HoodieStorageConfig.PARQUET_COMPRESSION_RATIO.defaultValue()));
+
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+
+    try (FSDataOutputStream outputStream = new FSDataOutputStream(baos)) {
+      try (HoodieParquetStreamWriter<IndexedRecord> parquetWriter = new 
HoodieParquetStreamWriter<>(outputStream, avroParquetConfig)) {
+        for (IndexedRecord record : records) {
+          String recordKey = getRecordKey(record);
+          parquetWriter.writeAvro(recordKey, record);
+        }
+        outputStream.flush();
+      }
+    }
+
+    return baos.toByteArray();
+  }
+
+  public static Iterator<IndexedRecord> 
getProjectedParquetRecordsIterator(Configuration conf,
+                                                                           
Schema readerSchema,
+                                                                           
InputFile inputFile) throws IOException {
+    AvroReadSupport.setAvroReadSchema(conf, readerSchema);
+    AvroReadSupport.setRequestedProjection(conf, readerSchema);
+    ParquetReader<IndexedRecord> reader =
+        
AvroParquetReader.<IndexedRecord>builder(inputFile).withConf(conf).build();
+    return new ParquetReaderIterator<>(reader);
+  }
+
+  /**
+   * NOTE: We're overriding the whole reading sequence to make sure we 
properly respect
+   *       the requested Reader's schema and only fetch the columns that have 
been explicitly
+   *       requested by the caller (providing projected Reader's schema)
+   */
+  @Override
+  protected List<IndexedRecord> readRecordsFromBlockPayload() throws 
IOException {
+    Configuration inlineConf = new Configuration();
+    inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", 
InLineFileSystem.class.getName());
+
+    HoodieLogBlockContentLocation blockContentLoc = 
getBlockContentLocation().get();
+
+    Path inlineLogFilePath = InLineFSUtils.getInlineFilePath(
+        blockContentLoc.getLogFile().getPath(),
+        
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+        blockContentLoc.getContentPositionInLogFile(),
+        blockContentLoc.getBlockSize());
+
+    ArrayList<IndexedRecord> records = new ArrayList<>();
+
+    getProjectedParquetRecordsIterator(
+        inlineConf,

Review comment:
       +1 , given we are cleaning up a bunch here, lets also fix this. We need 
to wire in the config from the TableMetaClient into this layer.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieParquetStreamWriter.java
##########
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.parquet.io.OutputStreamBackedOutputFile;
+import org.apache.parquet.hadoop.ParquetFileWriter;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.api.WriteSupport;
+import org.apache.parquet.io.OutputFile;
+
+import java.io.IOException;
+
+// TODO unify w/ HoodieParquetWriter
+public class HoodieParquetStreamWriter<R extends IndexedRecord> implements 
AutoCloseable {

Review comment:
       why do we need this separate class? Why not enhance `HoodieParquetWriter`

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -148,65 +165,40 @@ public HoodieLogBlockType getBlockType() {
   }
 
   @Override
-  protected void createRecordsFromContentBytes() throws IOException {
-    if (enableInlineReading) {
-      getRecords(Collections.emptyList());
-    } else {
-      super.createRecordsFromContentBytes();
-    }
-  }
-
-  @Override
-  public List<IndexedRecord> getRecords(List<String> keys) throws IOException {
-    readWithInlineFS(keys);
-    return records;
-  }
-
-  private void readWithInlineFS(List<String> keys) throws IOException {
-    boolean enableFullScan = keys.isEmpty();
-    // Get schema from the header
-    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
-    // If readerSchema was not present, use writerSchema
-    if (schema == null) {
-      schema = writerSchema;
-    }
-    Configuration conf = new Configuration();
-    CacheConfig cacheConf = new CacheConfig(conf);
+  protected List<IndexedRecord> lookupRecords(List<String> keys) throws 
IOException {
     Configuration inlineConf = new Configuration();
     inlineConf.set("fs." + InLineFileSystem.SCHEME + ".impl", 
InLineFileSystem.class.getName());
 
+    HoodieLogBlockContentLocation blockContentLoc = 
getBlockContentLocation().get();
+
     Path inlinePath = InLineFSUtils.getInlineFilePath(
-        getBlockContentLocation().get().getLogFile().getPath(),
-        
getBlockContentLocation().get().getLogFile().getPath().getFileSystem(conf).getScheme(),
-        getBlockContentLocation().get().getContentPositionInLogFile(),
-        getBlockContentLocation().get().getBlockSize());
-    if (!enableFullScan) {
-      // HFile read will be efficient if keys are sorted, since on storage, 
records are sorted by key. This will avoid unnecessary seeks.
-      Collections.sort(keys);
+        blockContentLoc.getLogFile().getPath(),
+        
blockContentLoc.getLogFile().getPath().getFileSystem(inlineConf).getScheme(),
+        blockContentLoc.getContentPositionInLogFile(),
+        blockContentLoc.getBlockSize());
+
+    // HFile read will be efficient if keys are sorted, since on storage, 
records are sorted by key. This will avoid unnecessary seeks.
+    Collections.sort(keys);
+
+    try (HoodieHFileReader<IndexedRecord> reader =
+             new HoodieHFileReader<>(inlineConf, inlinePath, new 
CacheConfig(inlineConf), inlinePath.getFileSystem(inlineConf))) {
+      // Get writer's schema from the header
+      List<Pair<String, IndexedRecord>> logRecords = reader.readRecords(keys, 
readerSchema);
+      return 
logRecords.stream().map(Pair::getSecond).collect(Collectors.toList());
     }
-    HoodieHFileReader reader = new HoodieHFileReader(inlineConf, inlinePath, 
cacheConf, inlinePath.getFileSystem(inlineConf));
-    List<org.apache.hadoop.hbase.util.Pair<String, IndexedRecord>> logRecords 
= enableFullScan ? reader.readAllRecords(writerSchema, schema) :
-        reader.readRecords(keys, schema);
-    reader.close();
-    this.records = logRecords.stream().map(t -> 
t.getSecond()).collect(Collectors.toList());
   }
 
   @Override
-  protected void deserializeRecords() throws IOException {
+  protected List<IndexedRecord> deserializeRecords(byte[] content) throws 
IOException {
+    checkState(readerSchema != null, "Reader's schema has to be non-null");

Review comment:
       Is this because of change in the PR? the original code seems to expect 
null reader schema at times?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/parquet/io/ByteBufferBackedInputFile.java
##########
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hudi.common.util.io.ByteBufferBackedInputStream;
+import org.apache.parquet.io.DelegatingSeekableInputStream;
+import org.apache.parquet.io.InputFile;
+import org.apache.parquet.io.SeekableInputStream;
+
+/**
+ * Implementation of {@link InputFile} backed by {@code byte[]} buffer
+ */
+public class ByteBufferBackedInputFile implements InputFile {
+  private final byte[] buffer;
+  private final int offset;
+  private final int length;
+
+  public ByteBufferBackedInputFile(byte[] buffer, int offset, int length) {
+    this.buffer = buffer;
+    this.offset = offset;
+    this.length = length;
+  }
+
+  public ByteBufferBackedInputFile(byte[] buffer) {
+    this(buffer, 0, buffer.length);
+  }
+
+  @Override
+  public long getLength() {
+    return length;
+  }
+
+  @Override
+  public SeekableInputStream newStream() {

Review comment:
       codecov was misleading. So we removed it.  lets add unit tests for the 
new files.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieParquetDataBlock.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hudi.avro.HoodieAvroWriteSupport;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ParquetReaderIterator;
+import org.apache.hudi.io.storage.HoodieAvroParquetConfig;
+import org.apache.hudi.io.storage.HoodieParquetStreamWriter;
+import org.apache.hudi.parquet.io.ByteBufferBackedInputFile;
+import org.apache.parquet.avro.AvroParquetReader;
+import org.apache.parquet.avro.AvroReadSupport;
+import org.apache.parquet.avro.AvroSchemaConverter;
+import org.apache.parquet.hadoop.ParquetReader;
+import org.apache.parquet.hadoop.ParquetWriter;
+import org.apache.parquet.hadoop.metadata.CompressionCodecName;
+import org.apache.parquet.io.InputFile;
+
+import javax.annotation.Nonnull;
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieParquetDataBlock contains a list of records serialized using Parquet.
+ */
+public class HoodieParquetDataBlock extends HoodieDataBlock {
+
+  public HoodieParquetDataBlock(
+      HoodieLogFile logFile,
+      FSDataInputStream inputStream,
+      Option<byte[]> content,
+      boolean readBlockLazily, long position, long blockSize, long blockEndpos,
+      Option<Schema> readerSchema,
+      Map<HeaderMetadataType, String> header,
+      Map<HeaderMetadataType, String> footer,
+      String keyField
+  ) {
+    super(
+        content,
+        inputStream,
+        readBlockLazily,
+        Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndpos)),
+        readerSchema,
+        header,
+        footer,
+        keyField,
+        false);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header,
+      @Nonnull String keyField
+  ) {
+    super(records, header, new HashMap<>(), keyField);
+  }
+
+  public HoodieParquetDataBlock(
+      @Nonnull List<IndexedRecord> records,
+      @Nonnull Map<HeaderMetadataType, String> header
+  ) {
+    super(records, header, new HashMap<>(), 
HoodieRecord.RECORD_KEY_METADATA_FIELD);
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.PARQUET_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords(List<IndexedRecord> records) throws 
IOException {
+    if (records.size() == 0) {
+      return new byte[0];
+    }
+
+    Schema writerSchema = new 
Schema.Parser().parse(super.getLogBlockHeader().get(HeaderMetadataType.SCHEMA));
+
+    HoodieAvroWriteSupport writeSupport = new HoodieAvroWriteSupport(
+        new AvroSchemaConverter().convert(writerSchema), writerSchema, 
Option.empty());
+
+    HoodieAvroParquetConfig avroParquetConfig =
+        new HoodieAvroParquetConfig(
+            writeSupport,
+            // TODO fetch compression codec from the config
+            CompressionCodecName.GZIP,

Review comment:
       But let's fix this occurrence in this PR. Don't want it to be buried in 
a broader effort. Or all references you mention are around log reader/writer 
code - that can be prioritized right away?  

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/parquet/io/OutputStreamBackedOutputFile.java
##########
@@ -0,0 +1,94 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.parquet.io;
+
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.parquet.io.OutputFile;
+import org.apache.parquet.io.PositionOutputStream;
+
+import javax.annotation.Nonnull;
+import java.io.IOException;
+
+/**
+ * Implementation of the {@link OutputFile} backed by {@link 
java.io.OutputStream}
+ */
+public class OutputStreamBackedOutputFile implements OutputFile {

Review comment:
       Do we need these to move away from older parquet APis? can you share 
more on why we are introducing the input/output file classes?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to