vinothchandar commented on code in PR #10241:
URL: https://github.com/apache/hudi/pull/10241#discussion_r1454105252


##########
hudi-common/src/test/java/org/apache/hudi/io/storage/TestHoodieHFileReaderWriter.java:
##########
@@ -474,4 +505,47 @@ private void verifyHFileReader(
     }
     assertEquals(count, reader.getEntries());
   }
+
+  private void writeHFileForTesting(String fileLocation,

Review Comment:
   should this live in a test utils class?



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/BlockIndexEntry.java:
##########
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+/**
+ * Represents the index entry of a data block in the Data Index stored in the
+ * {@link HFileBlockType#ROOT_INDEX} block in the "Load-on-open" section.
+ * <p>
+ * This is completely in-memory representation and does not involve byte 
parsing.
+ * <p>
+ * When comparing two {@link BlockIndexEntry} instances, the underlying bytes 
of the keys
+ * are compared in lexicographical order.
+ */
+public class BlockIndexEntry implements Comparable<BlockIndexEntry> {
+  private final Key key;

Review Comment:
   ```suggestion
     private final Key firstKey;
   ```



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import static org.apache.hudi.io.hfile.HFileReader.SEEK_TO_FOUND;
+import static org.apache.hudi.io.hfile.HFileReader.SEEK_TO_IN_RANGE;
+import static org.apache.hudi.io.hfile.KeyValue.KEY_OFFSET;
+
+/**
+ * Represents a {@link HFileBlockType#DATA} block.
+ */
+public class HFileDataBlock extends HFileBlock {
+  // Hudi does not use HFile MVCC timestamp version so the version
+  // is always 0, thus the byte length of the version is always 1.
+  // This assumption is also validated when parsing {@link HFileInfo},
+  // i.e., the maximum MVCC timestamp in a HFile must be 0.
+  private static final long ZERO_TS_VERSION_BYTE_LENGTH = 1;
+
+  // End offset of content in the block, relative to the start of the byte 
array {@link byteBuff}
+  protected final int uncompressedContentEndOffset;
+
+  protected HFileDataBlock(HFileContext context,
+                           byte[] byteBuff,
+                           int startOffsetInBuff) {
+    super(context, HFileBlockType.DATA, byteBuff, startOffsetInBuff);
+
+    this.uncompressedContentEndOffset = this.uncompressedEndOffset - 
this.sizeCheckSum;
+  }
+
+  /**
+   * Seeks to the key to look up. The key may not have an exact match.
+   *
+   * @param position               {@link HFilePosition} containing the 
current position relative
+   *                               to the beginning of the HFile (not the 
block start offset).
+   * @param key                    key to look up.
+   * @param blockStartOffsetInFile the start offset of the block relative to 
the beginning of the
+   *                               HFile.
+   * @return 0 if the block contains the exact same key as the lookup key, and 
the position points
+   * to the key; or 1 if the lookup key does not exist, and the position 
points to the
+   * lexicographically largest key that is smaller than the lookup key.
+   */
+  public int seekTo(HFilePosition position, Key key, int 
blockStartOffsetInFile) {
+    int offset = position.getOffset() - blockStartOffsetInFile;

Review Comment:
   is this a relative offset now?



##########
hudi-io/README.md:
##########
@@ -0,0 +1,40 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+# `hudi-io` Module
+
+This module contains classes that are I/O related, including common 
abstraction and APIs, readers and writers, etc.
+
+## HFile Reader
+
+We implement our own HFile reader (`org.apache.hudi.io.hfile.HFileReaderImpl`) 
that functionally works on reading HFiles
+in the Hudi metadata tables, based on the format described below.
+
+### HFile Format
+
+[HFile format](https://hbase.apache.org/book.html#_hfile_format_2) is 
originally designed and implemented
+by [HBase](https://hbase.apache.org/). We use HFile as the base file format of 
the internal metadata table (MDT). Here
+we describe the HFile format that are relevant to Hudi, as not all features of 
HFile are used.

Review Comment:
   Can you please add a separate .md file that sketches the format. 



##########
hudi-io/README.md:
##########
@@ -0,0 +1,40 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+# `hudi-io` Module
+
+This module contains classes that are I/O related, including common 
abstraction and APIs, readers and writers, etc.
+
+## HFile Reader
+
+We implement our own HFile reader (`org.apache.hudi.io.hfile.HFileReaderImpl`) 
that functionally works on reading HFiles
+in the Hudi metadata tables, based on the format described below.
+
+### HFile Format
+
+[HFile format](https://hbase.apache.org/book.html#_hfile_format_2) is 
originally designed and implemented

Review Comment:
   ```suggestion
   [HFile format](https://hbase.apache.org/book.html#_hfile_format_2) is based 
on SSTable file format optimized for range scans/point lookups, originally 
designed and implemented
   ```



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileInfo.java:
##########
@@ -0,0 +1,90 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.io.util.IOUtils;
+
+import java.util.Map;
+
+/**
+ * Represents the HFile info read from {@link HFileBlockType#FILE_INFO} block.
+ */
+public class HFileInfo {
+  private static final String RESERVED_PREFIX = "hfile.";
+  private static final UTF8StringKey LAST_KEY =
+      new UTF8StringKey(RESERVED_PREFIX + "LASTKEY");
+  private static final UTF8StringKey FILE_CREATION_TIME_TS =
+      new UTF8StringKey(RESERVED_PREFIX + "CREATE_TIME_TS");
+  private static final UTF8StringKey KEY_VALUE_VERSION =
+      new UTF8StringKey("KEY_VALUE_VERSION");
+  private static final UTF8StringKey MAX_MVCC_TS_KEY =
+      new UTF8StringKey("MAX_MEMSTORE_TS_KEY");
+
+  private static final int KEY_VALUE_VERSION_WITH_MVCC_TS = 1;
+
+  private final Map<UTF8StringKey, byte[]> infoMap;
+  private final long fileCreationTime;
+  private final Option<Key> lastKey;
+  private final long maxMvccTs;
+  private final boolean containsMvccTs;
+
+  public HFileInfo(Map<UTF8StringKey, byte[]> infoMap) {
+    this.infoMap = infoMap;
+    this.fileCreationTime = parseFileCreationTime();
+    this.lastKey = parseLastKey();
+    this.maxMvccTs = parseMaxMvccTs();
+    this.containsMvccTs = maxMvccTs > 0;
+    if (containsMvccTs) {
+      // The HFile written by Hudi does not contain MVCC timestamps.
+      // Parsing MVCC timestamps is not supported.
+      throw new UnsupportedOperationException("Parsing MVCC timestamp in HFile 
is not supported");

Review Comment:
   ```suggestion
         throw new UnsupportedOperationException("HFiles with MVCC timestamps 
not supported");
   ```



##########
hudi-io/src/main/java/org/apache/hudi/io/compress/airlift/HoodieAirliftGzipDecompressor.java:
##########
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.compress.airlift;
+
+import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.io.compress.HoodieDecompressor;
+
+import io.airlift.compress.gzip.JdkGzipHadoopStreams;
+import io.airlift.compress.hadoop.HadoopInputStream;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import static org.apache.hudi.io.util.IOUtils.readFully;
+
+/**
+ * Implementation of {@link HoodieDecompressor} for {@link 
CompressionCodec#GZIP} compression
+ * codec using airlift aircompressor's GZIP decompressor.
+ */
+public class HoodieAirliftGzipDecompressor implements HoodieDecompressor {
+  private final JdkGzipHadoopStreams gzipStreams;

Review Comment:
   do we need a `.close()` in `HoodieDecompressor` . How does the resources 
used get released? just gc is enough?



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hudi.io.hfile.HFileBlock.HFILEBLOCK_HEADER_SIZE;
+import static org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
+
+/**
+ * An implementation a {@link HFileReader}.
+ */
+public class HFileReaderImpl implements HFileReader {
+  private final FSDataInputStream stream;
+  private final long fileSize;
+
+  private final HFilePosition currentPos;
+  private boolean isMetadataInitialized = false;
+  private HFileTrailer trailer;
+  private HFileContext context;
+  private TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
+  private TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
+  private HFileInfo fileInfo;
+  private Option<BlockIndexEntry> currentDataBlockEntry;
+  private Option<HFileDataBlock> currentDataBlock;
+
+  public HFileReaderImpl(FSDataInputStream stream, long fileSize) {
+    this.stream = stream;
+    this.fileSize = fileSize;
+    this.currentPos = new HFilePosition();
+    this.currentDataBlockEntry = Option.empty();
+    this.currentDataBlock = Option.empty();
+  }
+
+  @Override
+  public synchronized void initializeMetadata() throws IOException {
+    if (this.isMetadataInitialized) {
+      return;
+    }
+
+    // Read Trailer (serialized in Proto)
+    this.trailer = readTrailer(stream, fileSize);
+    this.context = HFileContext.builder()
+        .compressionCodec(trailer.getCompressionCodec())
+        .build();
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, trailer.getLoadOnOpenDataOffset(),
+        fileSize - HFileTrailer.getTrailerSize());
+    HFileRootIndexBlock dataIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.dataBlockIndexEntryMap = 
dataIndexBlock.readBlockIndex(trailer.getDataIndexCount(), false);
+    HFileRootIndexBlock metaIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.metaBlockIndexEntryMap = 
metaIndexBlock.readBlockIndex(trailer.getMetaIndexCount(), true);
+    HFileFileInfoBlock fileInfoBlock =
+        (HFileFileInfoBlock) blockReader.nextBlock(HFileBlockType.FILE_INFO);
+    this.fileInfo = fileInfoBlock.readFileInfo();
+    this.isMetadataInitialized = true;
+  }
+
+  @Override
+  public Option<byte[]> getMetaInfo(UTF8StringKey key) throws IOException {
+    initializeMetadata();
+    byte[] bytes = fileInfo.get(key);
+    return bytes != null ? Option.of(bytes) : Option.empty();

Review Comment:
   Option.ofNullable?



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileUtils.java:
##########
@@ -0,0 +1,94 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.io.compress.CompressionCodec;
+import org.apache.hudi.io.util.IOUtils;
+
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Util methods for reading and writing HFile.
+ */
+public class HFileUtils {
+  private static final Map<Integer, CompressionCodec> 
HFILE_COMPRESSION_CODEC_MAP = createCompressionCodecMap();
+
+  /**
+   * Gets the compression codec based on the ID.  This ID is written to the 
HFile on storage.
+   *
+   * @param id ID indicating the compression codec.
+   * @return compression codec based on the ID.
+   */
+  public static CompressionCodec decodeCompressionCodec(int id) {
+    CompressionCodec codec = HFILE_COMPRESSION_CODEC_MAP.get(id);
+    if (codec == null) {
+      throw new IllegalArgumentException("Compression code not found for ID: " 
+ id);
+    }
+    return codec;
+  }
+
+  /**
+   * Reads the HFile major version from the input.
+   *
+   * @param bytes  input data.
+   * @param offset offset to start reading.
+   * @return major version of the file.
+   */
+  public static int readMajorVersion(byte[] bytes, int offset) {
+    int ch1 = bytes[offset] & 0xFF;
+    int ch2 = bytes[offset + 1] & 0xFF;
+    int ch3 = bytes[offset + 2] & 0xFF;
+    return ((ch1 << 16) + (ch2 << 8) + ch3);
+  }
+
+  /**
+   * Compares two HFile {@link Key}.
+   *
+   * @param key1 left operand key.
+   * @param key2 right operand key.
+   * @return 0 if equal, < 0 if left is less than right, > 0 otherwise.
+   */
+  public static int compareKeys(Key key1, Key key2) {
+    return IOUtils.compareTo(
+        key1.getBytes(), key1.getContentOffset(), key1.getContentLength(),
+        key2.getBytes(), key2.getContentOffset(), key2.getContentLength());
+  }
+
+  /**
+   * The ID mapping cannot change or else that breaks all existing HFiles out 
there,
+   * even the ones that are not compressed! (They use the NONE algorithm)
+   * This is because HFile stores the ID to indicate which compression codec 
is used.
+   *
+   * @return the mapping of ID to compression codec.
+   */
+  private static Map<Integer, CompressionCodec> createCompressionCodecMap() {

Review Comment:
   cut down the number of codecs supported to what we need?



##########
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressionFactory.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.compress;
+
+import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipDecompressor;
+import org.apache.hudi.io.compress.builtin.HoodieNoneDecompressor;
+
+/**
+ * Factory for {@link HoodieDecompressor}.
+ */
+public class HoodieCompressionFactory {

Review Comment:
   ```suggestion
   public class HoodieDecompressorFactory {
   ```



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hudi.io.hfile.HFileBlock.HFILEBLOCK_HEADER_SIZE;
+import static org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
+
+/**
+ * An implementation a {@link HFileReader}.
+ */
+public class HFileReaderImpl implements HFileReader {
+  private final FSDataInputStream stream;
+  private final long fileSize;
+
+  private final HFilePosition currentPos;
+  private boolean isMetadataInitialized = false;
+  private HFileTrailer trailer;
+  private HFileContext context;
+  private TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
+  private TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
+  private HFileInfo fileInfo;
+  private Option<BlockIndexEntry> currentDataBlockEntry;
+  private Option<HFileDataBlock> currentDataBlock;
+
+  public HFileReaderImpl(FSDataInputStream stream, long fileSize) {
+    this.stream = stream;
+    this.fileSize = fileSize;
+    this.currentPos = new HFilePosition();
+    this.currentDataBlockEntry = Option.empty();
+    this.currentDataBlock = Option.empty();
+  }
+
+  @Override
+  public synchronized void initializeMetadata() throws IOException {
+    if (this.isMetadataInitialized) {
+      return;
+    }
+
+    // Read Trailer (serialized in Proto)
+    this.trailer = readTrailer(stream, fileSize);
+    this.context = HFileContext.builder()
+        .compressionCodec(trailer.getCompressionCodec())
+        .build();
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, trailer.getLoadOnOpenDataOffset(),
+        fileSize - HFileTrailer.getTrailerSize());
+    HFileRootIndexBlock dataIndexBlock =

Review Comment:
   could we read them all at once somehow - i.e one RPC call and then parse 
three blocks?



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFilePosition.java:
##########
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+/**
+ * Stores the current position and {@link KeyValue} at the position in the 
HFile.
+ * The same instance is used as a position pointer during HFile reading.
+ * The {@link KeyValue} can be lazily read and cached.
+ */
+public class HFilePosition {

Review Comment:
   is HFileCursor a better name?



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hudi.io.hfile.HFileBlock.HFILEBLOCK_HEADER_SIZE;
+import static org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
+
+/**
+ * An implementation a {@link HFileReader}.
+ */
+public class HFileReaderImpl implements HFileReader {
+  private final FSDataInputStream stream;
+  private final long fileSize;
+
+  private final HFilePosition currentPos;
+  private boolean isMetadataInitialized = false;
+  private HFileTrailer trailer;
+  private HFileContext context;
+  private TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
+  private TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
+  private HFileInfo fileInfo;
+  private Option<BlockIndexEntry> currentDataBlockEntry;
+  private Option<HFileDataBlock> currentDataBlock;
+
+  public HFileReaderImpl(FSDataInputStream stream, long fileSize) {
+    this.stream = stream;
+    this.fileSize = fileSize;
+    this.currentPos = new HFilePosition();
+    this.currentDataBlockEntry = Option.empty();
+    this.currentDataBlock = Option.empty();
+  }
+
+  @Override
+  public synchronized void initializeMetadata() throws IOException {
+    if (this.isMetadataInitialized) {
+      return;
+    }
+
+    // Read Trailer (serialized in Proto)
+    this.trailer = readTrailer(stream, fileSize);
+    this.context = HFileContext.builder()
+        .compressionCodec(trailer.getCompressionCodec())
+        .build();
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, trailer.getLoadOnOpenDataOffset(),
+        fileSize - HFileTrailer.getTrailerSize());
+    HFileRootIndexBlock dataIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.dataBlockIndexEntryMap = 
dataIndexBlock.readBlockIndex(trailer.getDataIndexCount(), false);
+    HFileRootIndexBlock metaIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.metaBlockIndexEntryMap = 
metaIndexBlock.readBlockIndex(trailer.getMetaIndexCount(), true);
+    HFileFileInfoBlock fileInfoBlock =
+        (HFileFileInfoBlock) blockReader.nextBlock(HFileBlockType.FILE_INFO);
+    this.fileInfo = fileInfoBlock.readFileInfo();
+    this.isMetadataInitialized = true;
+  }
+
+  @Override
+  public Option<byte[]> getMetaInfo(UTF8StringKey key) throws IOException {
+    initializeMetadata();
+    byte[] bytes = fileInfo.get(key);
+    return bytes != null ? Option.of(bytes) : Option.empty();
+  }
+
+  @Override
+  public Option<ByteBuffer> getMetaBlock(String metaBlockName) throws 
IOException {
+    initializeMetadata();
+    BlockIndexEntry blockIndexEntry = metaBlockIndexEntryMap.get(new 
UTF8StringKey(metaBlockName));
+    if (blockIndexEntry == null) {
+      return Option.empty();
+    }
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, blockIndexEntry.getOffset(),
+        blockIndexEntry.getOffset() + blockIndexEntry.getSize());
+    HFileMetaBlock block = (HFileMetaBlock) 
blockReader.nextBlock(HFileBlockType.META);
+    return Option.of(block.readContent());
+  }
+
+  @Override
+  public long getNumKeyValueEntries() {
+    try {
+      initializeMetadata();
+      return trailer.getNumKeyValueEntries();
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot read HFile", e);
+    }
+  }
+
+  @Override
+  public int seekTo(Key key) throws IOException {
+    Option<KeyValue> currentKeyValue = getKeyValue();
+    if (!currentKeyValue.isPresent()) {
+      return SEEK_TO_EOF;
+    }
+    int compareCurrent = key.compareTo(currentKeyValue.get().getKey());
+    if (compareCurrent > 0) {
+      if (currentDataBlockEntry.get().getNextBlockKey().isPresent()) {
+        int comparedNextBlockFirstKey =
+            key.compareTo(currentDataBlockEntry.get().getNextBlockKey().get());
+        if (comparedNextBlockFirstKey >= 0) {
+          // Searches the block that may contain the lookup key based the 
starting keys of
+          // all blocks (sorted in the TreeMap of block index entries), using 
binary search.
+          // The result contains the greatest key less than or equal to the 
given key,
+          // or null if there is no such key.
+
+          Map.Entry<Key, BlockIndexEntry> floorEntry = 
dataBlockIndexEntryMap.floorEntry(key);
+          if (floorEntry == null) {
+            // Key smaller than the start key of the first block
+            return SEEK_TO_BACKWARDS;
+          }
+          currentDataBlockEntry = Option.of(floorEntry.getValue());
+          currentDataBlock = Option.empty();
+          currentPos.setOffset(
+              (int) currentDataBlockEntry.get().getOffset() + 
HFILEBLOCK_HEADER_SIZE);
+        }
+      }
+      if (!currentDataBlockEntry.get().getNextBlockKey().isPresent()) {
+        // This is the last data block.  Check against the last key.
+        if (fileInfo.getLastKey().isPresent()) {
+          int comparedLastKey = key.compareTo(fileInfo.getLastKey().get());
+          if (comparedLastKey > 0) {
+            currentDataBlockEntry = Option.empty();
+            currentDataBlock = Option.empty();
+            currentPos.setEof();
+            return SEEK_TO_EOF;
+          }
+        }
+      }
+
+      if (!currentDataBlock.isPresent()) {
+        currentDataBlock = 
Option.of(instantiateHFileDataBlock(currentDataBlockEntry.get()));
+      }
+
+      return currentDataBlock.get()
+          .seekTo(currentPos, key, (int) 
currentDataBlockEntry.get().getOffset());
+    }
+    if (compareCurrent == 0) {
+      return SEEK_TO_FOUND;
+    }
+    // Backward seek not supported
+    return SEEK_TO_BACKWARDS;
+  }
+
+  @Override
+  public boolean seekTo() throws IOException {
+    initializeMetadata();
+    if (trailer.getNumKeyValueEntries() == 0) {
+      currentPos.setEof();
+      return false;
+    }
+    // Move the current position to the beginning of the first data block
+    currentPos.setOffset(dataBlockIndexEntryMap.firstKey().getOffset() + 
HFILEBLOCK_HEADER_SIZE);
+    currentPos.unsetEof();
+    currentDataBlockEntry = 
Option.of(dataBlockIndexEntryMap.firstEntry().getValue());
+    // The data block will be read when {@link #getKeyValue} is called
+    currentDataBlock = Option.empty();
+    return true;
+  }
+
+  @Override
+  public boolean next() throws IOException {
+    if (currentPos.isValid()) {
+      if (!currentDataBlock.isPresent()) {
+        currentDataBlock = 
Option.of(instantiateHFileDataBlock(currentDataBlockEntry.get()));
+      }
+      if (currentDataBlock.get().next(currentPos, (int) 
currentDataBlockEntry.get().getOffset())) {
+        // The position is advanced by the data block instance
+        return true;
+      }
+      currentDataBlockEntry = 
getNextBlockIndexEntry(currentDataBlockEntry.get());
+      currentDataBlock = Option.empty();

Review Comment:
   should `currentDataBlock` be updated here when skipping tot he next block 
index entry? or L216-218 below handles it? 



##########
hudi-io/README.md:
##########
@@ -0,0 +1,40 @@
+<!--
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+-->
+
+# `hudi-io` Module
+
+This module contains classes that are I/O related, including common 
abstraction and APIs, readers and writers, etc.
+
+## HFile Reader
+
+We implement our own HFile reader (`org.apache.hudi.io.hfile.HFileReaderImpl`) 
that functionally works on reading HFiles

Review Comment:
   ```suggestion
   We implement our own HFile reader 
(`org.apache.hudi.io.hfile.HFileReaderImpl`) that functionally works on reading 
HBase HFiles
   ```



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hudi.io.hfile.HFileBlock.HFILEBLOCK_HEADER_SIZE;
+import static org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
+
+/**
+ * An implementation a {@link HFileReader}.
+ */
+public class HFileReaderImpl implements HFileReader {
+  private final FSDataInputStream stream;
+  private final long fileSize;
+
+  private final HFilePosition currentPos;
+  private boolean isMetadataInitialized = false;
+  private HFileTrailer trailer;
+  private HFileContext context;
+  private TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
+  private TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
+  private HFileInfo fileInfo;
+  private Option<BlockIndexEntry> currentDataBlockEntry;
+  private Option<HFileDataBlock> currentDataBlock;
+
+  public HFileReaderImpl(FSDataInputStream stream, long fileSize) {
+    this.stream = stream;
+    this.fileSize = fileSize;
+    this.currentPos = new HFilePosition();
+    this.currentDataBlockEntry = Option.empty();
+    this.currentDataBlock = Option.empty();
+  }
+
+  @Override
+  public synchronized void initializeMetadata() throws IOException {
+    if (this.isMetadataInitialized) {
+      return;
+    }
+
+    // Read Trailer (serialized in Proto)
+    this.trailer = readTrailer(stream, fileSize);
+    this.context = HFileContext.builder()
+        .compressionCodec(trailer.getCompressionCodec())
+        .build();
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, trailer.getLoadOnOpenDataOffset(),
+        fileSize - HFileTrailer.getTrailerSize());
+    HFileRootIndexBlock dataIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.dataBlockIndexEntryMap = 
dataIndexBlock.readBlockIndex(trailer.getDataIndexCount(), false);
+    HFileRootIndexBlock metaIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.metaBlockIndexEntryMap = 
metaIndexBlock.readBlockIndex(trailer.getMetaIndexCount(), true);
+    HFileFileInfoBlock fileInfoBlock =
+        (HFileFileInfoBlock) blockReader.nextBlock(HFileBlockType.FILE_INFO);
+    this.fileInfo = fileInfoBlock.readFileInfo();
+    this.isMetadataInitialized = true;
+  }
+
+  @Override
+  public Option<byte[]> getMetaInfo(UTF8StringKey key) throws IOException {
+    initializeMetadata();
+    byte[] bytes = fileInfo.get(key);
+    return bytes != null ? Option.of(bytes) : Option.empty();
+  }
+
+  @Override
+  public Option<ByteBuffer> getMetaBlock(String metaBlockName) throws 
IOException {
+    initializeMetadata();
+    BlockIndexEntry blockIndexEntry = metaBlockIndexEntryMap.get(new 
UTF8StringKey(metaBlockName));
+    if (blockIndexEntry == null) {
+      return Option.empty();
+    }
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, blockIndexEntry.getOffset(),
+        blockIndexEntry.getOffset() + blockIndexEntry.getSize());
+    HFileMetaBlock block = (HFileMetaBlock) 
blockReader.nextBlock(HFileBlockType.META);
+    return Option.of(block.readContent());
+  }
+
+  @Override
+  public long getNumKeyValueEntries() {
+    try {
+      initializeMetadata();
+      return trailer.getNumKeyValueEntries();
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot read HFile", e);
+    }
+  }
+
+  @Override
+  public int seekTo(Key key) throws IOException {
+    Option<KeyValue> currentKeyValue = getKeyValue();
+    if (!currentKeyValue.isPresent()) {
+      return SEEK_TO_EOF;
+    }
+    int compareCurrent = key.compareTo(currentKeyValue.get().getKey());
+    if (compareCurrent > 0) {
+      if (currentDataBlockEntry.get().getNextBlockKey().isPresent()) {
+        int comparedNextBlockFirstKey =
+            key.compareTo(currentDataBlockEntry.get().getNextBlockKey().get());
+        if (comparedNextBlockFirstKey >= 0) {
+          // Searches the block that may contain the lookup key based the 
starting keys of
+          // all blocks (sorted in the TreeMap of block index entries), using 
binary search.
+          // The result contains the greatest key less than or equal to the 
given key,
+          // or null if there is no such key.
+
+          Map.Entry<Key, BlockIndexEntry> floorEntry = 
dataBlockIndexEntryMap.floorEntry(key);
+          if (floorEntry == null) {
+            // Key smaller than the start key of the first block

Review Comment:
   should the currentPos etc be reset to the first block in this case? 



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hudi.io.hfile.HFileBlock.HFILEBLOCK_HEADER_SIZE;
+import static org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
+
+/**
+ * An implementation a {@link HFileReader}.
+ */
+public class HFileReaderImpl implements HFileReader {
+  private final FSDataInputStream stream;
+  private final long fileSize;
+
+  private final HFilePosition currentPos;
+  private boolean isMetadataInitialized = false;
+  private HFileTrailer trailer;
+  private HFileContext context;
+  private TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
+  private TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
+  private HFileInfo fileInfo;
+  private Option<BlockIndexEntry> currentDataBlockEntry;
+  private Option<HFileDataBlock> currentDataBlock;
+
+  public HFileReaderImpl(FSDataInputStream stream, long fileSize) {
+    this.stream = stream;
+    this.fileSize = fileSize;
+    this.currentPos = new HFilePosition();
+    this.currentDataBlockEntry = Option.empty();
+    this.currentDataBlock = Option.empty();
+  }
+
+  @Override
+  public synchronized void initializeMetadata() throws IOException {
+    if (this.isMetadataInitialized) {
+      return;
+    }
+
+    // Read Trailer (serialized in Proto)
+    this.trailer = readTrailer(stream, fileSize);
+    this.context = HFileContext.builder()
+        .compressionCodec(trailer.getCompressionCodec())
+        .build();
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, trailer.getLoadOnOpenDataOffset(),
+        fileSize - HFileTrailer.getTrailerSize());
+    HFileRootIndexBlock dataIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.dataBlockIndexEntryMap = 
dataIndexBlock.readBlockIndex(trailer.getDataIndexCount(), false);
+    HFileRootIndexBlock metaIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.metaBlockIndexEntryMap = 
metaIndexBlock.readBlockIndex(trailer.getMetaIndexCount(), true);
+    HFileFileInfoBlock fileInfoBlock =
+        (HFileFileInfoBlock) blockReader.nextBlock(HFileBlockType.FILE_INFO);
+    this.fileInfo = fileInfoBlock.readFileInfo();
+    this.isMetadataInitialized = true;
+  }
+
+  @Override
+  public Option<byte[]> getMetaInfo(UTF8StringKey key) throws IOException {
+    initializeMetadata();
+    byte[] bytes = fileInfo.get(key);
+    return bytes != null ? Option.of(bytes) : Option.empty();
+  }
+
+  @Override
+  public Option<ByteBuffer> getMetaBlock(String metaBlockName) throws 
IOException {
+    initializeMetadata();
+    BlockIndexEntry blockIndexEntry = metaBlockIndexEntryMap.get(new 
UTF8StringKey(metaBlockName));
+    if (blockIndexEntry == null) {
+      return Option.empty();
+    }
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, blockIndexEntry.getOffset(),
+        blockIndexEntry.getOffset() + blockIndexEntry.getSize());
+    HFileMetaBlock block = (HFileMetaBlock) 
blockReader.nextBlock(HFileBlockType.META);
+    return Option.of(block.readContent());
+  }
+
+  @Override
+  public long getNumKeyValueEntries() {
+    try {
+      initializeMetadata();
+      return trailer.getNumKeyValueEntries();
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot read HFile", e);
+    }
+  }
+
+  @Override
+  public int seekTo(Key key) throws IOException {
+    Option<KeyValue> currentKeyValue = getKeyValue();
+    if (!currentKeyValue.isPresent()) {
+      return SEEK_TO_EOF;
+    }
+    int compareCurrent = key.compareTo(currentKeyValue.get().getKey());
+    if (compareCurrent > 0) {
+      if (currentDataBlockEntry.get().getNextBlockKey().isPresent()) {
+        int comparedNextBlockFirstKey =
+            key.compareTo(currentDataBlockEntry.get().getNextBlockKey().get());
+        if (comparedNextBlockFirstKey >= 0) {
+          // Searches the block that may contain the lookup key based the 
starting keys of
+          // all blocks (sorted in the TreeMap of block index entries), using 
binary search.
+          // The result contains the greatest key less than or equal to the 
given key,
+          // or null if there is no such key.
+
+          Map.Entry<Key, BlockIndexEntry> floorEntry = 
dataBlockIndexEntryMap.floorEntry(key);
+          if (floorEntry == null) {
+            // Key smaller than the start key of the first block
+            return SEEK_TO_BACKWARDS;
+          }
+          currentDataBlockEntry = Option.of(floorEntry.getValue());
+          currentDataBlock = Option.empty();
+          currentPos.setOffset(
+              (int) currentDataBlockEntry.get().getOffset() + 
HFILEBLOCK_HEADER_SIZE);
+        }
+      }
+      if (!currentDataBlockEntry.get().getNextBlockKey().isPresent()) {
+        // This is the last data block.  Check against the last key.
+        if (fileInfo.getLastKey().isPresent()) {
+          int comparedLastKey = key.compareTo(fileInfo.getLastKey().get());
+          if (comparedLastKey > 0) {
+            currentDataBlockEntry = Option.empty();
+            currentDataBlock = Option.empty();
+            currentPos.setEof();
+            return SEEK_TO_EOF;
+          }
+        }
+      }
+
+      if (!currentDataBlock.isPresent()) {

Review Comment:
   why not do this inline at L141 instead. would tht be easier to follow?



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileDataBlock.java:
##########
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import static org.apache.hudi.io.hfile.HFileReader.SEEK_TO_FOUND;
+import static org.apache.hudi.io.hfile.HFileReader.SEEK_TO_IN_RANGE;
+import static org.apache.hudi.io.hfile.KeyValue.KEY_OFFSET;
+
+/**
+ * Represents a {@link HFileBlockType#DATA} block.
+ */
+public class HFileDataBlock extends HFileBlock {
+  // Hudi does not use HFile MVCC timestamp version so the version
+  // is always 0, thus the byte length of the version is always 1.
+  // This assumption is also validated when parsing {@link HFileInfo},
+  // i.e., the maximum MVCC timestamp in a HFile must be 0.
+  private static final long ZERO_TS_VERSION_BYTE_LENGTH = 1;
+
+  // End offset of content in the block, relative to the start of the byte 
array {@link byteBuff}
+  protected final int uncompressedContentEndOffset;
+
+  protected HFileDataBlock(HFileContext context,
+                           byte[] byteBuff,
+                           int startOffsetInBuff) {
+    super(context, HFileBlockType.DATA, byteBuff, startOffsetInBuff);
+
+    this.uncompressedContentEndOffset = this.uncompressedEndOffset - 
this.sizeCheckSum;
+  }
+
+  /**
+   * Seeks to the key to look up. The key may not have an exact match.
+   *
+   * @param position               {@link HFilePosition} containing the 
current position relative
+   *                               to the beginning of the HFile (not the 
block start offset).
+   * @param key                    key to look up.
+   * @param blockStartOffsetInFile the start offset of the block relative to 
the beginning of the
+   *                               HFile.
+   * @return 0 if the block contains the exact same key as the lookup key, and 
the position points
+   * to the key; or 1 if the lookup key does not exist, and the position 
points to the
+   * lexicographically largest key that is smaller than the lookup key.
+   */
+  public int seekTo(HFilePosition position, Key key, int 
blockStartOffsetInFile) {
+    int offset = position.getOffset() - blockStartOffsetInFile;
+    int endOffset = uncompressedContentEndOffset;
+    int lastOffset = offset;
+    Option<KeyValue> lastKeyValue = position.getKeyValue();
+    while (offset < endOffset) {
+      // Full length is not known yet until parsing
+      KeyValue kv = readKeyValue(offset);
+      int comp = kv.getKey().compareTo(key);
+      if (comp == 0) {
+        position.set(offset + blockStartOffsetInFile, kv);
+        return SEEK_TO_FOUND;
+      } else if (comp > 0) {

Review Comment:
   add a line of comment on each of the cases?



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReaderImpl.java:
##########
@@ -0,0 +1,284 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import org.apache.hadoop.fs.FSDataInputStream;
+
+import java.io.ByteArrayInputStream;
+import java.io.DataInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+import java.util.TreeMap;
+
+import static org.apache.hudi.io.hfile.HFileBlock.HFILEBLOCK_HEADER_SIZE;
+import static org.apache.hudi.io.hfile.HFileUtils.readMajorVersion;
+
+/**
+ * An implementation a {@link HFileReader}.
+ */
+public class HFileReaderImpl implements HFileReader {
+  private final FSDataInputStream stream;
+  private final long fileSize;
+
+  private final HFilePosition currentPos;
+  private boolean isMetadataInitialized = false;
+  private HFileTrailer trailer;
+  private HFileContext context;
+  private TreeMap<Key, BlockIndexEntry> dataBlockIndexEntryMap;
+  private TreeMap<Key, BlockIndexEntry> metaBlockIndexEntryMap;
+  private HFileInfo fileInfo;
+  private Option<BlockIndexEntry> currentDataBlockEntry;
+  private Option<HFileDataBlock> currentDataBlock;
+
+  public HFileReaderImpl(FSDataInputStream stream, long fileSize) {
+    this.stream = stream;
+    this.fileSize = fileSize;
+    this.currentPos = new HFilePosition();
+    this.currentDataBlockEntry = Option.empty();
+    this.currentDataBlock = Option.empty();
+  }
+
+  @Override
+  public synchronized void initializeMetadata() throws IOException {
+    if (this.isMetadataInitialized) {
+      return;
+    }
+
+    // Read Trailer (serialized in Proto)
+    this.trailer = readTrailer(stream, fileSize);
+    this.context = HFileContext.builder()
+        .compressionCodec(trailer.getCompressionCodec())
+        .build();
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, trailer.getLoadOnOpenDataOffset(),
+        fileSize - HFileTrailer.getTrailerSize());
+    HFileRootIndexBlock dataIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.dataBlockIndexEntryMap = 
dataIndexBlock.readBlockIndex(trailer.getDataIndexCount(), false);
+    HFileRootIndexBlock metaIndexBlock =
+        (HFileRootIndexBlock) blockReader.nextBlock(HFileBlockType.ROOT_INDEX);
+    this.metaBlockIndexEntryMap = 
metaIndexBlock.readBlockIndex(trailer.getMetaIndexCount(), true);
+    HFileFileInfoBlock fileInfoBlock =
+        (HFileFileInfoBlock) blockReader.nextBlock(HFileBlockType.FILE_INFO);
+    this.fileInfo = fileInfoBlock.readFileInfo();
+    this.isMetadataInitialized = true;
+  }
+
+  @Override
+  public Option<byte[]> getMetaInfo(UTF8StringKey key) throws IOException {
+    initializeMetadata();
+    byte[] bytes = fileInfo.get(key);
+    return bytes != null ? Option.of(bytes) : Option.empty();
+  }
+
+  @Override
+  public Option<ByteBuffer> getMetaBlock(String metaBlockName) throws 
IOException {
+    initializeMetadata();
+    BlockIndexEntry blockIndexEntry = metaBlockIndexEntryMap.get(new 
UTF8StringKey(metaBlockName));
+    if (blockIndexEntry == null) {
+      return Option.empty();
+    }
+    HFileBlockReader blockReader = new HFileBlockReader(
+        context, stream, blockIndexEntry.getOffset(),
+        blockIndexEntry.getOffset() + blockIndexEntry.getSize());
+    HFileMetaBlock block = (HFileMetaBlock) 
blockReader.nextBlock(HFileBlockType.META);
+    return Option.of(block.readContent());
+  }
+
+  @Override
+  public long getNumKeyValueEntries() {
+    try {
+      initializeMetadata();
+      return trailer.getNumKeyValueEntries();
+    } catch (IOException e) {
+      throw new RuntimeException("Cannot read HFile", e);
+    }
+  }
+
+  @Override
+  public int seekTo(Key key) throws IOException {
+    Option<KeyValue> currentKeyValue = getKeyValue();
+    if (!currentKeyValue.isPresent()) {
+      return SEEK_TO_EOF;
+    }
+    int compareCurrent = key.compareTo(currentKeyValue.get().getKey());
+    if (compareCurrent > 0) {
+      if (currentDataBlockEntry.get().getNextBlockKey().isPresent()) {
+        int comparedNextBlockFirstKey =
+            key.compareTo(currentDataBlockEntry.get().getNextBlockKey().get());
+        if (comparedNextBlockFirstKey >= 0) {
+          // Searches the block that may contain the lookup key based the 
starting keys of
+          // all blocks (sorted in the TreeMap of block index entries), using 
binary search.
+          // The result contains the greatest key less than or equal to the 
given key,
+          // or null if there is no such key.
+
+          Map.Entry<Key, BlockIndexEntry> floorEntry = 
dataBlockIndexEntryMap.floorEntry(key);
+          if (floorEntry == null) {
+            // Key smaller than the start key of the first block
+            return SEEK_TO_BACKWARDS;
+          }
+          currentDataBlockEntry = Option.of(floorEntry.getValue());
+          currentDataBlock = Option.empty();
+          currentPos.setOffset(
+              (int) currentDataBlockEntry.get().getOffset() + 
HFILEBLOCK_HEADER_SIZE);
+        }
+      }
+      if (!currentDataBlockEntry.get().getNextBlockKey().isPresent()) {
+        // This is the last data block.  Check against the last key.
+        if (fileInfo.getLastKey().isPresent()) {
+          int comparedLastKey = key.compareTo(fileInfo.getLastKey().get());
+          if (comparedLastKey > 0) {
+            currentDataBlockEntry = Option.empty();
+            currentDataBlock = Option.empty();
+            currentPos.setEof();
+            return SEEK_TO_EOF;
+          }
+        }
+      }
+
+      if (!currentDataBlock.isPresent()) {
+        currentDataBlock = 
Option.of(instantiateHFileDataBlock(currentDataBlockEntry.get()));
+      }
+
+      return currentDataBlock.get()
+          .seekTo(currentPos, key, (int) 
currentDataBlockEntry.get().getOffset());
+    }
+    if (compareCurrent == 0) {
+      return SEEK_TO_FOUND;
+    }
+    // Backward seek not supported

Review Comment:
   not supported? is that just a comment or should we throw an exception if 
unsupported



##########
hudi-io/src/main/java/org/apache/hudi/io/compress/HoodieCompressionFactory.java:
##########
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.compress;
+
+import org.apache.hudi.io.compress.airlift.HoodieAirliftGzipDecompressor;
+import org.apache.hudi.io.compress.builtin.HoodieNoneDecompressor;
+
+/**
+ * Factory for {@link HoodieDecompressor}.
+ */
+public class HoodieCompressionFactory {

Review Comment:
   lets rename later if we add compression.



##########
hudi-io/src/main/java/org/apache/hudi/io/hfile/HFileReader.java:
##########
@@ -0,0 +1,119 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.hudi.io.hfile;
+
+import org.apache.hudi.common.util.Option;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * HFile reader that supports seeks.
+ */
+public interface HFileReader extends Closeable {
+  // Return code of seekTo(Key)
+  int SEEK_TO_BACKWARDS = -1;

Review Comment:
   javadocs on what these constants mean.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to