nsivabalan commented on a change in pull request #1804:
URL: https://github.com/apache/hudi/pull/1804#discussion_r456376140



##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileConfig.java
##########
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hudi.common.bloom.BloomFilter;
+
+public class HoodieHFileConfig {
+
+  private Compression.Algorithm compressionAlgorithm;
+  private int blockSize;
+  private long maxFileSize;
+  private Configuration hadoopConf;
+  private BloomFilter bloomFilter;
+
+  public HoodieHFileConfig(Compression.Algorithm compressionAlgorithm, int 
blockSize, long maxFileSize,

Review comment:
       when I was benchmarking Hfile, found few other cache config params to be 
useful. 
   prefetch on open, cache L1 and drop behind compaction. More details can be 
found [here](https://issues.apache.org/jira/browse/HUDI-432). Can we add these 
3 configs to this class as well

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieLogBlock.java
##########
@@ -110,7 +110,7 @@ public long getLogBlockLength() {
    * Type of the log block WARNING: This enum is serialized as the ordinal. 
Only add new enums at the end.
    */
   public enum HoodieLogBlockType {
-    COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK
+    COMMAND_BLOCK, DELETE_BLOCK, CORRUPT_BLOCK, AVRO_DATA_BLOCK, 
HFILE_DATA_BLOCK

Review comment:
       correct me if I am wrong. we may not have a separate delete block for 
hfile, since everything is a key, value in bytes. So, we might have to fetch 
all values and resolve to the latest one to find if the value represents delete 
or active. 

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+@SuppressWarnings("Duplicates")

Review comment:
       java docs

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/storage/HoodieHFileWriter.java
##########
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.fs.HoodieWrapperFileSystem;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.io.Writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.concurrent.atomic.AtomicLong;
+
+/**
+ * HoodieHFileWriter writes IndexedRecords into an HFile. The record's key is 
used as the key and the
+ * AVRO encoded record bytes are saved as the value.
+ *
+ * Limitations (compared to columnar formats like Parquet or ORC):
+ *  1. Records should be added in order of keys
+ *  2. There are no column stats
+ */
+public class HoodieHFileWriter<T extends HoodieRecordPayload, R extends 
IndexedRecord>
+    implements HoodieFileWriter<R> {
+  private static AtomicLong recordIndex = new AtomicLong(1);
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileWriter.class);
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  private final Path file;
+  private HoodieHFileConfig hfileConfig;
+  private final HoodieWrapperFileSystem fs;
+  private final long maxFileSize;
+  private final String instantTime;
+  private final SparkTaskContextSupplier sparkTaskContextSupplier;
+  private HFile.Writer writer;
+  private String minRecordKey;
+  private String maxRecordKey;
+
+  public HoodieHFileWriter(String instantTime, Path file, HoodieHFileConfig 
hfileConfig, Schema schema,
+      SparkTaskContextSupplier sparkTaskContextSupplier) throws IOException {
+
+    Configuration conf = FSUtils.registerFileSystem(file, 
hfileConfig.getHadoopConf());
+    this.file = HoodieWrapperFileSystem.convertToHoodiePath(file, conf);
+    this.fs = (HoodieWrapperFileSystem) this.file.getFileSystem(conf);
+    this.hfileConfig = hfileConfig;
+
+    // We cannot accurately measure the snappy compressed output file size. We 
are choosing a
+    // conservative 10%
+    // TODO - compute this compression ratio dynamically by looking at the 
bytes written to the
+    // stream and the actual file size reported by HDFS
+    // this.maxFileSize = hfileConfig.getMaxFileSize()
+    //    + Math.round(hfileConfig.getMaxFileSize() * 
hfileConfig.getCompressionRatio());
+    this.maxFileSize = hfileConfig.getMaxFileSize();
+    this.instantTime = instantTime;
+    this.sparkTaskContextSupplier = sparkTaskContextSupplier;
+
+    HFileContext context = new 
HFileContextBuilder().withBlockSize(hfileConfig.getBlockSize())
+          .withCompression(hfileConfig.getCompressionAlgorithm())
+          .build();
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    this.writer = HFile.getWriterFactory(conf, cacheConfig).withPath(this.fs, 
this.file).withFileContext(context).create();
+
+    writer.appendFileInfo(KEY_SCHEMA.getBytes(), schema.toString().getBytes());
+  }
+
+  @Override
+  public void writeAvroWithMetadata(R avroRecord, HoodieRecord record) throws 
IOException {
+    String seqId =
+        HoodieRecord.generateSequenceId(instantTime, 
sparkTaskContextSupplier.getPartitionIdSupplier().get(), 
recordIndex.getAndIncrement());
+    HoodieAvroUtils.addHoodieKeyToRecord((GenericRecord) avroRecord, 
record.getRecordKey(), record.getPartitionPath(),
+        file.getName());
+    HoodieAvroUtils.addCommitMetadataToRecord((GenericRecord) avroRecord, 
instantTime, seqId);
+
+    writeAvro(record.getRecordKey(), (IndexedRecord)avroRecord);
+  }
+
+  @Override
+  public boolean canWrite() {
+    return fs.getBytesWritten(file) < maxFileSize;
+  }
+
+  @Override
+  public void writeAvro(String recordKey, IndexedRecord object) throws 
IOException {
+    byte[] value = HoodieAvroUtils.avroToBytes((GenericRecord)object);
+    KeyValue kv = new KeyValue(recordKey.getBytes(), null, null, value);
+    writer.append(kv);
+
+    if (hfileConfig.useBloomFilter()) {

Review comment:
       can we move all bloom filter related code to a separate class. So that 
this class is in line w/ ParquetWriter as well. I am not suggesting to 
introduce an interface, just another class. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieHFileReader;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nonnull;
+
+/**
+ * HoodieHFileDataBlock contains a list of records stored inside an HFile 
format. It is used with the HFile
+ * base file format.
+ */
+public class HoodieHFileDataBlock extends HoodieDataBlock {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileDataBlock.class);
+  private static Compression.Algorithm compressionAlgorithm = 
Compression.Algorithm.GZ;
+  private static int blockSize = 1 * 1024 * 1024;
+
+  public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> 
logBlockHeader,
+       @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
+       @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, 
@Nonnull Option<byte[]> content,
+       FSDataInputStream inputStream, boolean readBlockLazily) {
+    super(logBlockHeader, logBlockFooter, blockContentLocation, content, 
inputStream, readBlockLazily);
+  }
+
+  public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream 
inputStream, Option<byte[]> content,
+       boolean readBlockLazily, long position, long blockSize, long 
blockEndpos, Schema readerSchema,
+       Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> 
footer) {
+    super(content, inputStream, readBlockLazily,
+          Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndpos)), readerSchema, header,
+          footer);
+  }
+
+  public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull 
Map<HeaderMetadataType, String> header) {
+    super(records, header, new HashMap<>());
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.HFILE_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords() throws IOException {
+    HFileContext context = new 
HFileContextBuilder().withBlockSize(blockSize).withCompression(compressionAlgorithm)
+        .build();
+    Configuration conf = new Configuration();
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+    HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+        .withOutputStream(ostream).withFileContext(context).create();
+
+    // Serialize records into bytes
+    Map<String, byte[]> recordMap = new TreeMap<>();
+    Iterator<IndexedRecord> itr = records.iterator();
+    boolean useIntegerKey = false;
+    int key = 0;
+    int keySize = 0;
+    Field keyField = 
records.get(0).getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    if (keyField == null) {

Review comment:
       can you help me understand whats the usecase for this ?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/common/table/log/block/HoodieHFileDataBlock.java
##########
@@ -0,0 +1,159 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.common.table.log.block;
+
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.model.HoodieLogFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.hudi.io.storage.HoodieHFileReader;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import org.apache.avro.Schema;
+import org.apache.avro.Schema.Field;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.FSDataOutputStream;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.compress.Compression;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileContext;
+import org.apache.hadoop.hbase.io.hfile.HFileContextBuilder;
+import org.apache.hadoop.hbase.util.Pair;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.TreeMap;
+import java.util.stream.Collectors;
+
+import javax.annotation.Nonnull;
+
+/**
+ * HoodieHFileDataBlock contains a list of records stored inside an HFile 
format. It is used with the HFile
+ * base file format.
+ */
+public class HoodieHFileDataBlock extends HoodieDataBlock {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileDataBlock.class);
+  private static Compression.Algorithm compressionAlgorithm = 
Compression.Algorithm.GZ;
+  private static int blockSize = 1 * 1024 * 1024;
+
+  public HoodieHFileDataBlock(@Nonnull Map<HeaderMetadataType, String> 
logBlockHeader,
+       @Nonnull Map<HeaderMetadataType, String> logBlockFooter,
+       @Nonnull Option<HoodieLogBlockContentLocation> blockContentLocation, 
@Nonnull Option<byte[]> content,
+       FSDataInputStream inputStream, boolean readBlockLazily) {
+    super(logBlockHeader, logBlockFooter, blockContentLocation, content, 
inputStream, readBlockLazily);
+  }
+
+  public HoodieHFileDataBlock(HoodieLogFile logFile, FSDataInputStream 
inputStream, Option<byte[]> content,
+       boolean readBlockLazily, long position, long blockSize, long 
blockEndpos, Schema readerSchema,
+       Map<HeaderMetadataType, String> header, Map<HeaderMetadataType, String> 
footer) {
+    super(content, inputStream, readBlockLazily,
+          Option.of(new HoodieLogBlockContentLocation(logFile, position, 
blockSize, blockEndpos)), readerSchema, header,
+          footer);
+  }
+
+  public HoodieHFileDataBlock(@Nonnull List<IndexedRecord> records, @Nonnull 
Map<HeaderMetadataType, String> header) {
+    super(records, header, new HashMap<>());
+  }
+
+  @Override
+  public HoodieLogBlockType getBlockType() {
+    return HoodieLogBlockType.HFILE_DATA_BLOCK;
+  }
+
+  @Override
+  protected byte[] serializeRecords() throws IOException {
+    HFileContext context = new 
HFileContextBuilder().withBlockSize(blockSize).withCompression(compressionAlgorithm)
+        .build();
+    Configuration conf = new Configuration();
+    CacheConfig cacheConfig = new CacheConfig(conf);
+    ByteArrayOutputStream baos = new ByteArrayOutputStream();
+    FSDataOutputStream ostream = new FSDataOutputStream(baos, null);
+
+    HFile.Writer writer = HFile.getWriterFactory(conf, cacheConfig)
+        .withOutputStream(ostream).withFileContext(context).create();
+
+    // Serialize records into bytes
+    Map<String, byte[]> recordMap = new TreeMap<>();
+    Iterator<IndexedRecord> itr = records.iterator();
+    boolean useIntegerKey = false;
+    int key = 0;
+    int keySize = 0;
+    Field keyField = 
records.get(0).getSchema().getField(HoodieRecord.RECORD_KEY_METADATA_FIELD);
+    if (keyField == null) {
+      // Missing key metadata field so we should use an integer sequence key
+      useIntegerKey = true;
+      keySize = (int) Math.ceil(Math.log(records.size())) + 1;
+    }
+    while (itr.hasNext()) {
+      IndexedRecord record = itr.next();
+      String recordKey;
+      if (useIntegerKey) {
+        recordKey = String.format("%" + keySize + "s", key++);
+      } else {
+        recordKey = record.get(keyField.pos()).toString();
+      }
+      byte[] recordBytes = HoodieAvroUtils.avroToBytes(record);
+      recordMap.put(recordKey, recordBytes);
+    }
+
+    // Write the records
+    recordMap.forEach((recordKey, recordBytes) -> {

Review comment:
       why another loop rather than writing it in line 121?

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileReader.java
##########
@@ -34,7 +35,17 @@
 
   public Set<String> filterRowKeys(Set<String> candidateRowKeys);
 
-  public Iterator<R> getRecordIterator(Schema schema) throws IOException;
+  public Iterator<R> getRecordIterator(Schema readerSchema) throws IOException;
+
+  default Iterator<R> getRecordIterator() throws IOException {
+    return getRecordIterator(getSchema());
+  }
+
+  public Option<R> getRecordByKey(String key, Schema readerSchema) throws 
IOException;

Review comment:
       do you think we introduce a config to say whether a particular file 
format supports single key look up. Not all file formats might support look up 
by Key. So, for those, we can keep this as unsupported and for Hfile kind of 
formats, we can have impl. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
+  private Path path;
+  private Configuration conf;
+  private HFile.Reader reader;
+  private Schema schema;
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.reader = HFile.createReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  }
+
+  public HoodieHFileReader(byte[] content) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path("hoodie");
+    SeekableByteArrayInputStream bis = new 
SeekableByteArrayInputStream(content);
+    FSDataInputStream fsdis = new FSDataInputStream(bis);
+    this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new 
FSDataInputStreamWrapper(fsdis),
+        content.length, new CacheConfig(conf), conf);
+  }
+
+  @Override
+  public String[] readMinMaxRecordKeys() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      return new String[] { new 
String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
+          new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
+    } catch (IOException e) {
+      throw new HoodieException("Could not read min/max record key out of file 
information block correctly from path", e);
+    }
+  }
+
+  @Override
+  public Schema getSchema() {
+    if (schema == null) {
+      try {
+        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+        schema = new Schema.Parser().parse(new 
String(fileInfo.get(KEY_SCHEMA.getBytes())));
+      } catch (IOException e) {
+        throw new HoodieException("Could not read schema of file from path", 
e);
+      }
+    }
+
+    return schema;
+  }
+
+  @Override
+  public BloomFilter readBloomFilter() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      ByteBuffer serializedFilter = 
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false);
+      byte[] filterBytes = new byte[serializedFilter.remaining()];
+      serializedFilter.get(filterBytes); // read the bytes that were written
+      return BloomFilterFactory.fromString(new String(filterBytes),
+          new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
+    } catch (IOException e) {
+      throw new HoodieException("Could not read bloom filter from " + path, e);
+    }
+  }
+
+  @Override
+  public Set<String> filterRowKeys(Set candidateRowKeys) {

Review comment:
       not required in this patch. but we should think if we can decide whether 
to read entire set and filter or just do random seeks based on candidate set 
size to be filtered for.

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
+  private Path path;
+  private Configuration conf;
+  private HFile.Reader reader;
+  private Schema schema;
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.reader = HFile.createReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  }
+
+  public HoodieHFileReader(byte[] content) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path("hoodie");
+    SeekableByteArrayInputStream bis = new 
SeekableByteArrayInputStream(content);
+    FSDataInputStream fsdis = new FSDataInputStream(bis);
+    this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new 
FSDataInputStreamWrapper(fsdis),
+        content.length, new CacheConfig(conf), conf);
+  }
+
+  @Override
+  public String[] readMinMaxRecordKeys() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      return new String[] { new 
String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
+          new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
+    } catch (IOException e) {
+      throw new HoodieException("Could not read min/max record key out of file 
information block correctly from path", e);
+    }
+  }
+
+  @Override
+  public Schema getSchema() {
+    if (schema == null) {
+      try {
+        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+        schema = new Schema.Parser().parse(new 
String(fileInfo.get(KEY_SCHEMA.getBytes())));
+      } catch (IOException e) {
+        throw new HoodieException("Could not read schema of file from path", 
e);
+      }
+    }
+
+    return schema;
+  }
+
+  @Override
+  public BloomFilter readBloomFilter() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      ByteBuffer serializedFilter = 
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false);
+      byte[] filterBytes = new byte[serializedFilter.remaining()];
+      serializedFilter.get(filterBytes); // read the bytes that were written
+      return BloomFilterFactory.fromString(new String(filterBytes),
+          new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
+    } catch (IOException e) {
+      throw new HoodieException("Could not read bloom filter from " + path, e);
+    }
+  }
+
+  @Override
+  public Set<String> filterRowKeys(Set candidateRowKeys) {
+    try {
+      List<Pair<String, R>> allRecords = readAllRecords();
+      Set<String> rowKeys = new HashSet<>();
+      allRecords.forEach(t -> {
+        if (candidateRowKeys.contains(t.getFirst())) {
+          rowKeys.add(t.getFirst());
+        }
+      });
+      return rowKeys;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read row keys from " + path, e);
+    }
+  }
+
+  public List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema 
readerSchema) throws IOException {
+    List<Pair<String, R>> recordList = new LinkedList<>();
+    try {
+      HFileScanner scanner = reader.getScanner(false, false);

Review comment:
       why cacheBlocks and pread args are hard coded? shouldn't we configurize 
it. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
+  private Path path;
+  private Configuration conf;
+  private HFile.Reader reader;
+  private Schema schema;
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.reader = HFile.createReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  }
+
+  public HoodieHFileReader(byte[] content) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path("hoodie");
+    SeekableByteArrayInputStream bis = new 
SeekableByteArrayInputStream(content);

Review comment:
       why do we need this seekable. HFile reader by default support look up by 
key right. Can you help me understand 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
+  private Path path;
+  private Configuration conf;
+  private HFile.Reader reader;
+  private Schema schema;
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.reader = HFile.createReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  }
+
+  public HoodieHFileReader(byte[] content) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path("hoodie");
+    SeekableByteArrayInputStream bis = new 
SeekableByteArrayInputStream(content);
+    FSDataInputStream fsdis = new FSDataInputStream(bis);
+    this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new 
FSDataInputStreamWrapper(fsdis),
+        content.length, new CacheConfig(conf), conf);
+  }
+
+  @Override
+  public String[] readMinMaxRecordKeys() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      return new String[] { new 
String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
+          new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
+    } catch (IOException e) {
+      throw new HoodieException("Could not read min/max record key out of file 
information block correctly from path", e);
+    }
+  }
+
+  @Override
+  public Schema getSchema() {
+    if (schema == null) {
+      try {
+        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+        schema = new Schema.Parser().parse(new 
String(fileInfo.get(KEY_SCHEMA.getBytes())));
+      } catch (IOException e) {
+        throw new HoodieException("Could not read schema of file from path", 
e);
+      }
+    }
+
+    return schema;
+  }
+
+  @Override
+  public BloomFilter readBloomFilter() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      ByteBuffer serializedFilter = 
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false);
+      byte[] filterBytes = new byte[serializedFilter.remaining()];
+      serializedFilter.get(filterBytes); // read the bytes that were written
+      return BloomFilterFactory.fromString(new String(filterBytes),
+          new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
+    } catch (IOException e) {
+      throw new HoodieException("Could not read bloom filter from " + path, e);
+    }
+  }
+
+  @Override
+  public Set<String> filterRowKeys(Set candidateRowKeys) {
+    try {
+      List<Pair<String, R>> allRecords = readAllRecords();
+      Set<String> rowKeys = new HashSet<>();
+      allRecords.forEach(t -> {
+        if (candidateRowKeys.contains(t.getFirst())) {
+          rowKeys.add(t.getFirst());
+        }
+      });
+      return rowKeys;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read row keys from " + path, e);
+    }
+  }
+
+  public List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema 
readerSchema) throws IOException {
+    List<Pair<String, R>> recordList = new LinkedList<>();
+    try {
+      HFileScanner scanner = reader.getScanner(false, false);
+      if (scanner.seekTo()) {
+        do {
+          Cell c = scanner.getKeyValue();
+          byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), 
c.getRowOffset(), c.getRowOffset() + c.getRowLength());
+          R record = readNextRecord(c, writerSchema, readerSchema);
+          recordList.add(new Pair<>(new String(keyBytes), record));
+        } while (scanner.next());
+      }
+
+      return recordList;
+    } catch (IOException e) {
+      throw new HoodieException("Error reading hfile " + path + " as a 
dataframe", e);
+    }
+  }
+
+  public List<Pair<String, R>> readAllRecords() throws IOException {
+    Schema schema = new Schema.Parser().parse(new 
String(reader.loadFileInfo().get("schema".getBytes())));

Review comment:
       minor. why using hard coded string. we should use constant for "schema".

##########
File path: 
hudi-client/src/main/java/org/apache/hudi/io/HoodieSortedMergeHandle.java
##########
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io;
+
+import org.apache.hudi.client.SparkTaskContextSupplier;
+import org.apache.hudi.client.WriteStatus;
+import org.apache.hudi.common.model.HoodieBaseFile;
+import org.apache.hudi.common.model.HoodieRecord;
+import org.apache.hudi.common.model.HoodieRecordPayload;
+import org.apache.hudi.config.HoodieWriteConfig;
+import org.apache.hudi.exception.HoodieUpsertException;
+import org.apache.hudi.table.HoodieTable;
+
+import org.apache.avro.generic.GenericRecord;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.PriorityQueue;
+import java.util.Queue;
+
+@SuppressWarnings("Duplicates")
+public class HoodieSortedMergeHandle<T extends HoodieRecordPayload> extends 
HoodieMergeHandle<T> {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieSortedMergeHandle.class);
+
+  private Queue<String> newRecordKeysSorted = new PriorityQueue<>();
+
+  public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
+       Iterator<HoodieRecord<T>> recordItr, String partitionPath, String 
fileId, SparkTaskContextSupplier sparkTaskContextSupplier) {
+    super(config, instantTime, hoodieTable, recordItr, partitionPath, fileId, 
sparkTaskContextSupplier);
+    newRecordKeysSorted.addAll(keyToNewRecords.keySet());
+  }
+
+  /**
+   * Called by compactor code path.
+   */
+  public HoodieSortedMergeHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
+      Map<String, HoodieRecord<T>> keyToNewRecordsOrig, String partitionPath, 
String fileId,
+      HoodieBaseFile dataFileToBeMerged, SparkTaskContextSupplier 
sparkTaskContextSupplier) {
+    super(config, instantTime, hoodieTable, keyToNewRecordsOrig, 
partitionPath, fileId, dataFileToBeMerged,
+        sparkTaskContextSupplier);
+
+    newRecordKeysSorted.addAll(keyToNewRecords.keySet());
+  }
+
+  /**
+   * Go through an old record. Here if we detect a newer version shows up, we 
write the new one to the file.
+   */
+  @Override
+  public void write(GenericRecord oldRecord) {
+    String key = 
oldRecord.get(HoodieRecord.RECORD_KEY_METADATA_FIELD).toString();
+
+    // To maintain overall sorted order across updates and inserts, write any 
new inserts whose keys are less than
+    // the oldRecord's key.
+    while (!newRecordKeysSorted.isEmpty() && 
newRecordKeysSorted.peek().compareTo(key) <= 0) {
+      String keyToPreWrite = newRecordKeysSorted.remove();
+      if (keyToPreWrite.equals(key)) {
+        // will be handled as an update later
+        break;
+      }
+
+      // This is a new insert
+      HoodieRecord<T> hoodieRecord = new 
HoodieRecord<>(keyToNewRecords.get(keyToPreWrite));
+      if (writtenRecordKeys.contains(keyToPreWrite)) {
+        throw new HoodieUpsertException("Insert/Update not in sorted order");
+      }
+      try {
+        if (useWriterSchema) {
+          writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(writerSchema));
+        } else {
+          writeRecord(hoodieRecord, 
hoodieRecord.getData().getInsertValue(originalSchema));
+        }
+        insertRecordsWritten++;
+        writtenRecordKeys.add(keyToPreWrite);
+      } catch (IOException e) {
+        throw new HoodieUpsertException("Failed to write records", e);
+      }
+    }
+
+    super.write(oldRecord);
+  }
+
+  @Override
+  public WriteStatus close() {
+    // write out any pending records (this can happen when inserts are turned 
into updates)
+    newRecordKeysSorted.stream().sorted().forEach(key -> {

Review comment:
       can you remind me why sorting is required here? my understanding is that 
newRecordKeysSorted is already sorted. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
+  private Path path;
+  private Configuration conf;
+  private HFile.Reader reader;
+  private Schema schema;
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.reader = HFile.createReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  }
+
+  public HoodieHFileReader(byte[] content) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path("hoodie");
+    SeekableByteArrayInputStream bis = new 
SeekableByteArrayInputStream(content);
+    FSDataInputStream fsdis = new FSDataInputStream(bis);
+    this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new 
FSDataInputStreamWrapper(fsdis),
+        content.length, new CacheConfig(conf), conf);
+  }
+
+  @Override
+  public String[] readMinMaxRecordKeys() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      return new String[] { new 
String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
+          new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
+    } catch (IOException e) {
+      throw new HoodieException("Could not read min/max record key out of file 
information block correctly from path", e);
+    }
+  }
+
+  @Override
+  public Schema getSchema() {
+    if (schema == null) {
+      try {
+        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+        schema = new Schema.Parser().parse(new 
String(fileInfo.get(KEY_SCHEMA.getBytes())));
+      } catch (IOException e) {
+        throw new HoodieException("Could not read schema of file from path", 
e);
+      }
+    }
+
+    return schema;
+  }
+
+  @Override
+  public BloomFilter readBloomFilter() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      ByteBuffer serializedFilter = 
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false);
+      byte[] filterBytes = new byte[serializedFilter.remaining()];
+      serializedFilter.get(filterBytes); // read the bytes that were written
+      return BloomFilterFactory.fromString(new String(filterBytes),
+          new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
+    } catch (IOException e) {
+      throw new HoodieException("Could not read bloom filter from " + path, e);
+    }
+  }
+
+  @Override
+  public Set<String> filterRowKeys(Set candidateRowKeys) {
+    try {
+      List<Pair<String, R>> allRecords = readAllRecords();
+      Set<String> rowKeys = new HashSet<>();
+      allRecords.forEach(t -> {
+        if (candidateRowKeys.contains(t.getFirst())) {
+          rowKeys.add(t.getFirst());
+        }
+      });
+      return rowKeys;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read row keys from " + path, e);
+    }
+  }
+
+  public List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema 
readerSchema) throws IOException {
+    List<Pair<String, R>> recordList = new LinkedList<>();
+    try {
+      HFileScanner scanner = reader.getScanner(false, false);
+      if (scanner.seekTo()) {
+        do {
+          Cell c = scanner.getKeyValue();
+          byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), 
c.getRowOffset(), c.getRowOffset() + c.getRowLength());
+          R record = readNextRecord(c, writerSchema, readerSchema);
+          recordList.add(new Pair<>(new String(keyBytes), record));
+        } while (scanner.next());
+      }
+
+      return recordList;
+    } catch (IOException e) {
+      throw new HoodieException("Error reading hfile " + path + " as a 
dataframe", e);
+    }
+  }
+
+  public List<Pair<String, R>> readAllRecords() throws IOException {
+    Schema schema = new Schema.Parser().parse(new 
String(reader.loadFileInfo().get("schema".getBytes())));
+    return readAllRecords(schema, schema);
+  }
+
+  @Override
+  public Iterator getRecordIterator(Schema readerSchema) throws IOException {
+    final HFileScanner scanner = reader.getScanner(false, false);
+    return new Iterator<R>() {
+      private R next = null;
+      private boolean eof = false;
+
+      @Override
+      public boolean hasNext() {
+        try {
+          // To handle when hasNext() is called multiple times for idempotency 
and/or the first time
+          if (this.next == null && !this.eof) {
+            if (!scanner.isSeeked() && scanner.seekTo()) {
+                this.next = (R)readNextRecord(scanner.getKeyValue(), 
getSchema(), readerSchema);
+            }
+          }
+          return this.next != null;
+        } catch (IOException io) {
+          throw new HoodieIOException("unable to read next record from hfile 
", io);
+        }
+      }
+
+      @Override
+      public R next() {
+        try {
+          // To handle case when next() is called before hasNext()
+          if (this.next == null) {
+            if (!hasNext()) {
+              throw new HoodieIOException("No more records left to read from 
hfile");
+            }
+          }
+          R retVal = this.next;
+          if (scanner.next()) {
+            this.next = (R)readNextRecord(scanner.getKeyValue(), getSchema(), 
readerSchema);
+          } else {
+            this.next = null;
+            this.eof = true;
+          }
+          return retVal;
+        } catch (IOException io) {
+          throw new HoodieIOException("unable to read next record from parquet 
file ", io);
+        }
+      }
+    };
+  }
+
+  @Override
+  public Option getRecordByKey(String key, Schema readerSchema) throws 
IOException {
+    HFileScanner scanner = reader.getScanner(false, false);
+    KeyValue kv = new KeyValue(key.getBytes(), null, null, null);
+    if (scanner.seekTo(kv) == 0) {
+      Cell c = scanner.getKeyValue();
+      byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), c.getRowOffset(), 
c.getRowOffset() + c.getRowLength());
+      R record = readNextRecord(c, getSchema(), readerSchema);
+      return Option.of(record);
+    }
+
+    return Option.empty();
+  }
+
+  private R readNextRecord(Cell c, Schema writerSchema, Schema readerSchema) 
throws IOException {

Review comment:
       curious. why named it as readNextRecord. felt "getRecord" or 
"getRecordFromCell" would be more apt. 

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
+  private Path path;
+  private Configuration conf;
+  private HFile.Reader reader;
+  private Schema schema;
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.reader = HFile.createReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  }
+
+  public HoodieHFileReader(byte[] content) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path("hoodie");
+    SeekableByteArrayInputStream bis = new 
SeekableByteArrayInputStream(content);
+    FSDataInputStream fsdis = new FSDataInputStream(bis);
+    this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new 
FSDataInputStreamWrapper(fsdis),
+        content.length, new CacheConfig(conf), conf);
+  }
+
+  @Override
+  public String[] readMinMaxRecordKeys() {
+    Map<byte[], byte[]> fileInfo;

Review comment:
       minor. why define this outside the try block

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
+  private Path path;
+  private Configuration conf;
+  private HFile.Reader reader;
+  private Schema schema;
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.reader = HFile.createReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  }
+
+  public HoodieHFileReader(byte[] content) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path("hoodie");
+    SeekableByteArrayInputStream bis = new 
SeekableByteArrayInputStream(content);
+    FSDataInputStream fsdis = new FSDataInputStream(bis);
+    this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new 
FSDataInputStreamWrapper(fsdis),

Review comment:
       is this where exception will be thrown if file does not exists or do we 
check file exists check elsewhere prior to this step ?

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.HoodieHFileInputFormat;
+import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
+import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * HoodieRealtimeInputFormat for HUDI datasets which store data in HFile base 
file format.
+ */
+@UseRecordReaderFromInputFormat
+@UseFileSplitsFromInputFormat
+public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileRealtimeInputFormat.class);
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{

Review comment:
       similar comment as above. can we see if we can re-use code if possible. 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/HoodieHFileInputFormat.java
##########
@@ -0,0 +1,163 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop;
+
+import org.apache.hadoop.conf.Configurable;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileInputFormat;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapreduce.Job;
+import org.apache.hudi.common.model.HoodieFileFormat;
+import org.apache.hudi.common.table.HoodieTableMetaClient;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.table.timeline.HoodieInstant;
+import org.apache.hudi.common.table.timeline.HoodieTimeline;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.hadoop.utils.HoodieHiveUtils;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * HoodieInputFormat for HUDI datasets which store data in HFile base file 
format.
+ */
+@UseFileSplitsFromInputFormat
+public class HoodieHFileInputFormat extends FileInputFormat<NullWritable, 
ArrayWritable> implements Configurable {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileInputFormat.class);
+
+  protected Configuration conf;
+
+  protected HoodieDefaultTimeline filterInstantsTimeline(HoodieDefaultTimeline 
timeline) {
+    return HoodieInputFormatUtils.filterInstantsTimeline(timeline);
+  }
+
+  @Override
+  public FileStatus[] listStatus(JobConf job) throws IOException {

Review comment:
       I see lot of similarities between this and HoodieParquetInputFormat. Is 
there a way to re-use the code in anyway. 

##########
File path: 
hudi-hadoop-mr/src/main/java/org/apache/hudi/hadoop/realtime/HoodieHFileRealtimeInputFormat.java
##########
@@ -0,0 +1,111 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.hadoop.realtime;
+
+import java.io.IOException;
+import java.util.Arrays;
+import java.util.stream.Stream;
+
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.io.ArrayWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapred.FileSplit;
+import org.apache.hadoop.mapred.InputSplit;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.RecordReader;
+import org.apache.hadoop.mapred.Reporter;
+import org.apache.hudi.common.table.timeline.HoodieDefaultTimeline;
+import org.apache.hudi.common.util.ValidationUtils;
+import org.apache.hudi.hadoop.HoodieHFileInputFormat;
+import org.apache.hudi.hadoop.UseFileSplitsFromInputFormat;
+import org.apache.hudi.hadoop.UseRecordReaderFromInputFormat;
+import org.apache.hudi.hadoop.utils.HoodieInputFormatUtils;
+import org.apache.hudi.hadoop.utils.HoodieRealtimeInputFormatUtils;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * HoodieRealtimeInputFormat for HUDI datasets which store data in HFile base 
file format.
+ */
+@UseRecordReaderFromInputFormat
+@UseFileSplitsFromInputFormat
+public class HoodieHFileRealtimeInputFormat extends HoodieHFileInputFormat {
+
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileRealtimeInputFormat.class);
+
+  @Override
+  public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException 
{
+    Stream<FileSplit> fileSplits = Arrays.stream(super.getSplits(job, 
numSplits)).map(is -> (FileSplit) is);
+    return HoodieRealtimeInputFormatUtils.getRealtimeSplits(job, fileSplits);
+  }
+
+  @Override
+  public FileStatus[] listStatus(JobConf job) throws IOException {
+    // Call the HoodieInputFormat::listStatus to obtain all latest parquet 
files, based on commit

Review comment:
       minor: fix java docs. // parquet -> hfile

##########
File path: 
hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieHFileReader.java
##########
@@ -0,0 +1,301 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.io.storage;
+
+import java.io.ByteArrayInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FSDataInputStream;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.fs.PositionedReadable;
+import org.apache.hadoop.fs.Seekable;
+import org.apache.hadoop.hbase.Cell;
+import org.apache.hadoop.hbase.KeyValue;
+import org.apache.hadoop.hbase.io.FSDataInputStreamWrapper;
+import org.apache.hadoop.hbase.io.hfile.CacheConfig;
+import org.apache.hadoop.hbase.io.hfile.HFile;
+import org.apache.hadoop.hbase.io.hfile.HFileScanner;
+import org.apache.hadoop.hbase.util.Pair;
+import org.apache.hudi.avro.HoodieAvroUtils;
+import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.bloom.BloomFilterFactory;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.util.Option;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.exception.HoodieIOException;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+public class HoodieHFileReader<R extends IndexedRecord> implements 
HoodieFileReader {
+  private static final Logger LOG = 
LogManager.getLogger(HoodieHFileReader.class);
+  private Path path;
+  private Configuration conf;
+  private HFile.Reader reader;
+  private Schema schema;
+
+  public static final String KEY_SCHEMA = "schema";
+  public static final String KEY_BLOOM_FILTER_META_BLOCK = "bloomFilter";
+  public static final String KEY_BLOOM_FILTER_TYPE_CODE = 
"bloomFilterTypeCode";
+  public static final String KEY_MIN_RECORD = "minRecordKey";
+  public static final String KEY_MAX_RECORD = "maxRecordKey";
+
+  public HoodieHFileReader(Configuration configuration, Path path, CacheConfig 
cacheConfig) throws IOException {
+    this.conf = configuration;
+    this.path = path;
+    this.reader = HFile.createReader(FSUtils.getFs(path.toString(), 
configuration), path, cacheConfig, conf);
+  }
+
+  public HoodieHFileReader(byte[] content) throws IOException {
+    Configuration conf = new Configuration();
+    Path path = new Path("hoodie");
+    SeekableByteArrayInputStream bis = new 
SeekableByteArrayInputStream(content);
+    FSDataInputStream fsdis = new FSDataInputStream(bis);
+    this.reader = HFile.createReader(FSUtils.getFs("hoodie", conf), path, new 
FSDataInputStreamWrapper(fsdis),
+        content.length, new CacheConfig(conf), conf);
+  }
+
+  @Override
+  public String[] readMinMaxRecordKeys() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      return new String[] { new 
String(fileInfo.get(KEY_MIN_RECORD.getBytes())),
+          new String(fileInfo.get(KEY_MAX_RECORD.getBytes()))};
+    } catch (IOException e) {
+      throw new HoodieException("Could not read min/max record key out of file 
information block correctly from path", e);
+    }
+  }
+
+  @Override
+  public Schema getSchema() {
+    if (schema == null) {
+      try {
+        Map<byte[], byte[]> fileInfo = reader.loadFileInfo();
+        schema = new Schema.Parser().parse(new 
String(fileInfo.get(KEY_SCHEMA.getBytes())));
+      } catch (IOException e) {
+        throw new HoodieException("Could not read schema of file from path", 
e);
+      }
+    }
+
+    return schema;
+  }
+
+  @Override
+  public BloomFilter readBloomFilter() {
+    Map<byte[], byte[]> fileInfo;
+    try {
+      fileInfo = reader.loadFileInfo();
+      ByteBuffer serializedFilter = 
reader.getMetaBlock(KEY_BLOOM_FILTER_META_BLOCK, false);
+      byte[] filterBytes = new byte[serializedFilter.remaining()];
+      serializedFilter.get(filterBytes); // read the bytes that were written
+      return BloomFilterFactory.fromString(new String(filterBytes),
+          new String(fileInfo.get(KEY_BLOOM_FILTER_TYPE_CODE.getBytes())));
+    } catch (IOException e) {
+      throw new HoodieException("Could not read bloom filter from " + path, e);
+    }
+  }
+
+  @Override
+  public Set<String> filterRowKeys(Set candidateRowKeys) {
+    try {
+      List<Pair<String, R>> allRecords = readAllRecords();
+      Set<String> rowKeys = new HashSet<>();
+      allRecords.forEach(t -> {
+        if (candidateRowKeys.contains(t.getFirst())) {
+          rowKeys.add(t.getFirst());
+        }
+      });
+      return rowKeys;
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to read row keys from " + path, e);
+    }
+  }
+
+  public List<Pair<String, R>> readAllRecords(Schema writerSchema, Schema 
readerSchema) throws IOException {
+    List<Pair<String, R>> recordList = new LinkedList<>();
+    try {
+      HFileScanner scanner = reader.getScanner(false, false);
+      if (scanner.seekTo()) {
+        do {
+          Cell c = scanner.getKeyValue();
+          byte[] keyBytes = Arrays.copyOfRange(c.getRowArray(), 
c.getRowOffset(), c.getRowOffset() + c.getRowLength());
+          R record = readNextRecord(c, writerSchema, readerSchema);
+          recordList.add(new Pair<>(new String(keyBytes), record));
+        } while (scanner.next());
+      }
+
+      return recordList;
+    } catch (IOException e) {
+      throw new HoodieException("Error reading hfile " + path + " as a 
dataframe", e);
+    }
+  }
+
+  public List<Pair<String, R>> readAllRecords() throws IOException {
+    Schema schema = new Schema.Parser().parse(new 
String(reader.loadFileInfo().get("schema".getBytes())));
+    return readAllRecords(schema, schema);
+  }
+
+  @Override
+  public Iterator getRecordIterator(Schema readerSchema) throws IOException {
+    final HFileScanner scanner = reader.getScanner(false, false);
+    return new Iterator<R>() {
+      private R next = null;
+      private boolean eof = false;
+
+      @Override
+      public boolean hasNext() {
+        try {
+          // To handle when hasNext() is called multiple times for idempotency 
and/or the first time
+          if (this.next == null && !this.eof) {
+            if (!scanner.isSeeked() && scanner.seekTo()) {
+                this.next = (R)readNextRecord(scanner.getKeyValue(), 
getSchema(), readerSchema);
+            }
+          }
+          return this.next != null;
+        } catch (IOException io) {
+          throw new HoodieIOException("unable to read next record from hfile 
", io);
+        }
+      }
+
+      @Override
+      public R next() {
+        try {
+          // To handle case when next() is called before hasNext()
+          if (this.next == null) {
+            if (!hasNext()) {
+              throw new HoodieIOException("No more records left to read from 
hfile");
+            }
+          }
+          R retVal = this.next;
+          if (scanner.next()) {
+            this.next = (R)readNextRecord(scanner.getKeyValue(), getSchema(), 
readerSchema);

Review comment:
       why move to next element here itself. shouldn't we rely on hasNext() 
everytime to move to to next record as per the Iterator interface contract?




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to