This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b791473  Introduce HoodieReadHandle abstraction into index
b791473 is described below

commit b791473a6d663cd4ac2ce535aaa62bcd89fea3af
Author: Vinoth Chandar <[email protected]>
AuthorDate: Tue May 21 15:37:38 2019 -0700

    Introduce HoodieReadHandle abstraction into index
    
     - Generalized BloomIndex to work with file ids instead of paths
     - Abstracted away Bloom filter checking into HoodieLookupHandle
     - Abstracted away range information retrieval into HoodieRangeInfoHandle
---
 .../java/com/uber/hoodie/HoodieReadClient.java     |  18 ++-
 .../hoodie/func/CopyOnWriteLazyInsertIterable.java |   5 +-
 .../java/com/uber/hoodie/index/HoodieIndex.java    |   9 +-
 .../com/uber/hoodie/index/InMemoryHashIndex.java   |   3 +-
 .../hoodie/index/bloom/BloomIndexFileInfo.java     |  22 +--
 .../uber/hoodie/index/bloom/HoodieBloomIndex.java  |  95 ++++++-------
 .../index/bloom/HoodieBloomIndexCheckFunction.java | 138 ++++--------------
 .../hoodie/index/bloom/HoodieGlobalBloomIndex.java |   5 +-
 .../IntervalTreeBasedGlobalIndexFileFilter.java    |   4 +-
 .../bloom/IntervalTreeBasedIndexFileFilter.java    |   4 +-
 .../bloom/ListBasedGlobalIndexFileFilter.java      |   2 +-
 .../index/bloom/ListBasedIndexFileFilter.java      |   2 +-
 .../com/uber/hoodie/index/hbase/HBaseIndex.java    |   7 +-
 .../com/uber/hoodie/io/HoodieAppendHandle.java     |  16 +--
 .../com/uber/hoodie/io/HoodieCreateHandle.java     |   4 +-
 .../java/com/uber/hoodie/io/HoodieIOHandle.java    | 152 +-------------------
 .../com/uber/hoodie/io/HoodieKeyLookupHandle.java  | 158 +++++++++++++++++++++
 .../java/com/uber/hoodie/io/HoodieMergeHandle.java |  76 ++++++++--
 .../com/uber/hoodie/io/HoodieRangeInfoHandle.java  |  43 ++++++
 .../java/com/uber/hoodie/io/HoodieReadHandle.java  |  59 ++++++++
 ...{HoodieIOHandle.java => HoodieWriteHandle.java} |  47 +++---
 .../uber/hoodie/table/HoodieCopyOnWriteTable.java  |   2 +-
 .../java/com/uber/hoodie/table/WorkloadStat.java   |   2 +-
 .../java/com/uber/hoodie/TestHoodieClientBase.java |   2 +-
 .../java/com/uber/hoodie/index/TestHbaseIndex.java |   2 +-
 .../hoodie/index/bloom/TestHoodieBloomIndex.java   |  27 ++--
 .../index/bloom/TestHoodieGlobalBloomIndex.java    |  41 +++---
 .../hoodie/common/model/HoodieRecordLocation.java  |  16 +--
 .../uber/hoodie/common/model/HoodieTestUtils.java  |   4 +-
 .../util/collection/TestExternalSpillableMap.java  |   2 +-
 30 files changed, 537 insertions(+), 430 deletions(-)

diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
index e5e7239..a1e7ab7 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
@@ -20,6 +20,7 @@ package com.uber.hoodie;
 
 import com.google.common.base.Optional;
 import com.uber.hoodie.avro.model.HoodieCompactionPlan;
+import com.uber.hoodie.common.model.HoodieDataFile;
 import com.uber.hoodie.common.model.HoodieKey;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
@@ -119,15 +120,27 @@ public class HoodieReadClient<T extends 
HoodieRecordPayload> extends AbstractHoo
     }
   }
 
+  private Optional<String> convertToDataFilePath(Optional<Pair<String, 
String>> partitionPathFileIDPair) {
+    if (partitionPathFileIDPair.isPresent()) {
+      HoodieDataFile dataFile = hoodieTable.getROFileSystemView()
+          .getLatestDataFile(partitionPathFileIDPair.get().getLeft(), 
partitionPathFileIDPair.get().getRight()).get();
+      return Optional.of(dataFile.getPath());
+    } else {
+      return Optional.absent();
+    }
+  }
+
   /**
    * Given a bunch of hoodie keys, fetches all the individual records out as a 
data frame
    *
    * @return a dataframe
    */
-  public Dataset<Row> read(JavaRDD<HoodieKey> hoodieKeys, int parallelism) 
throws Exception {
+  public Dataset<Row> readROView(JavaRDD<HoodieKey> hoodieKeys, int 
parallelism) {
     assertSqlContext();
-    JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = index
+    JavaPairRDD<HoodieKey, Optional<Pair<String, String>>> lookupResultRDD = 
index
         .fetchRecordLocation(hoodieKeys, jsc, hoodieTable);
+    JavaPairRDD<HoodieKey, Optional<String>> keyToFileRDD = lookupResultRDD
+        .mapToPair(r -> new Tuple2<>(r._1, convertToDataFilePath(r._2)));
     List<String> paths = keyToFileRDD.filter(keyFileTuple -> 
keyFileTuple._2().isPresent())
         .map(keyFileTuple -> keyFileTuple._2().get()).collect();
 
@@ -144,7 +157,6 @@ public class HoodieReadClient<T extends 
HoodieRecordPayload> extends AbstractHoo
 
     // Now, we need to further filter out, for only rows that match the 
supplied hoodie keys
     JavaRDD<Row> rowRDD = keyRowRDD.join(keyToFileRDD, parallelism).map(tuple 
-> tuple._2()._1());
-
     return sqlContextOpt.get().createDataFrame(rowRDD, schema);
   }
 
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
index f2e9834..f885ce2 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/func/CopyOnWriteLazyInsertIterable.java
@@ -26,7 +26,7 @@ import 
com.uber.hoodie.common.util.queue.BoundedInMemoryQueueConsumer;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.io.HoodieCreateHandle;
-import com.uber.hoodie.io.HoodieIOHandle;
+import com.uber.hoodie.io.HoodieWriteHandle;
 import com.uber.hoodie.table.HoodieTable;
 import java.util.ArrayList;
 import java.util.Iterator;
@@ -131,12 +131,11 @@ public class CopyOnWriteLazyInsertIterable<T extends 
HoodieRecordPayload> extend
       BoundedInMemoryQueueConsumer<HoodieInsertValueGenResult<HoodieRecord>, 
List<WriteStatus>> {
 
     protected final List<WriteStatus> statuses = new ArrayList<>();
-    protected HoodieIOHandle handle;
+    protected HoodieWriteHandle handle;
 
     @Override
     protected void consumeOneRecord(HoodieInsertValueGenResult<HoodieRecord> 
payload) {
       final HoodieRecord insertPayload = payload.record;
-
       // lazily initialize the handle, for the first time
       if (handle == null) {
         handle = new HoodieCreateHandle(hoodieConfig, commitTime, hoodieTable, 
insertPayload.getPartitionPath(),
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java 
b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
index e3f3578..7f4eb63 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/HoodieIndex.java
@@ -23,6 +23,7 @@ import com.uber.hoodie.WriteStatus;
 import com.uber.hoodie.common.model.HoodieKey;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.HoodieIndexException;
 import com.uber.hoodie.index.bloom.HoodieBloomIndex;
@@ -63,12 +64,10 @@ public abstract class HoodieIndex<T extends 
HoodieRecordPayload> implements Seri
   }
 
   /**
-   * Checks if the given [Keys] exists in the hoodie table and returns [Key, 
Optional[FullFilePath]]
-   * If the optional FullFilePath value is not present, then the key is not 
found. If the
-   * FullFilePath value is present, it is the path component (without scheme) 
of the URI underlying
-   * file
+   * Checks if the given [Keys] exists in the hoodie table and returns [Key, 
Optional[partitionPath, fileID]]
+   * If the optional is empty, then the key is not found.
    */
-  public abstract JavaPairRDD<HoodieKey, Optional<String>> fetchRecordLocation(
+  public abstract JavaPairRDD<HoodieKey, Optional<Pair<String, String>>> 
fetchRecordLocation(
       JavaRDD<HoodieKey> hoodieKeys, final JavaSparkContext jsc, 
HoodieTable<T> hoodieTable);
 
   /**
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java 
b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java
index 672fa84..61da5ab 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/InMemoryHashIndex.java
@@ -24,6 +24,7 @@ import com.uber.hoodie.common.model.HoodieKey;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordLocation;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.table.HoodieTable;
 import java.util.ArrayList;
@@ -55,7 +56,7 @@ public class InMemoryHashIndex<T extends HoodieRecordPayload> 
extends HoodieInde
   }
 
   @Override
-  public JavaPairRDD<HoodieKey, Optional<String>> 
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
+  public JavaPairRDD<HoodieKey, Optional<Pair<String, String>>> 
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
       JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
     throw new UnsupportedOperationException("InMemory index does not implement 
check exist yet");
   }
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java
index 1114262..2a56d8a 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/BloomIndexFileInfo.java
@@ -22,30 +22,30 @@ import com.google.common.base.Objects;
 import java.io.Serializable;
 
 /**
- * Metadata about a given file, useful for index lookup
+ * Metadata about a given file group, useful for index lookup
  */
 public class BloomIndexFileInfo implements Serializable {
 
-  private final String fileName;
+  private final String fileId;
 
   private final String minRecordKey;
 
   private final String maxRecordKey;
 
-  public BloomIndexFileInfo(String fileName, String minRecordKey, String 
maxRecordKey) {
-    this.fileName = fileName;
+  public BloomIndexFileInfo(String fileId, String minRecordKey, String 
maxRecordKey) {
+    this.fileId = fileId;
     this.minRecordKey = minRecordKey;
     this.maxRecordKey = maxRecordKey;
   }
 
-  public BloomIndexFileInfo(String fileName) {
-    this.fileName = fileName;
+  public BloomIndexFileInfo(String fileId) {
+    this.fileId = fileId;
     this.minRecordKey = null;
     this.maxRecordKey = null;
   }
 
-  public String getFileName() {
-    return fileName;
+  public String getFileId() {
+    return fileId;
   }
 
   public String getMinRecordKey() {
@@ -77,19 +77,19 @@ public class BloomIndexFileInfo implements Serializable {
     }
 
     BloomIndexFileInfo that = (BloomIndexFileInfo) o;
-    return Objects.equal(that.fileName, fileName) && 
Objects.equal(that.minRecordKey, minRecordKey)
+    return Objects.equal(that.fileId, fileId) && 
Objects.equal(that.minRecordKey, minRecordKey)
         && Objects.equal(that.maxRecordKey, maxRecordKey);
 
   }
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(fileName, minRecordKey, maxRecordKey);
+    return Objects.hashCode(fileId, minRecordKey, maxRecordKey);
   }
 
   public String toString() {
     final StringBuilder sb = new StringBuilder("BloomIndexFileInfo {");
-    sb.append(" fileName=").append(fileName);
+    sb.append(" fileId=").append(fileId);
     sb.append(" minRecordKey=").append(minRecordKey);
     sb.append(" maxRecordKey=").append(maxRecordKey);
     sb.append('}');
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
index 0b93b9c..195738a 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndex.java
@@ -25,26 +25,22 @@ import static java.util.stream.Collectors.toList;
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Optional;
 import com.uber.hoodie.WriteStatus;
-import com.uber.hoodie.common.model.HoodieDataFile;
 import com.uber.hoodie.common.model.HoodieKey;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordLocation;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
-import com.uber.hoodie.common.util.FSUtils;
-import com.uber.hoodie.common.util.ParquetUtils;
 import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.MetadataNotFoundException;
 import com.uber.hoodie.index.HoodieIndex;
+import com.uber.hoodie.io.HoodieRangeInfoHandle;
 import com.uber.hoodie.table.HoodieTable;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
-import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -85,7 +81,8 @@ public class HoodieBloomIndex<T extends HoodieRecordPayload> 
extends HoodieIndex
         .mapToPair(record -> new Tuple2<>(record.getPartitionPath(), 
record.getRecordKey()));
 
     // Lookup indexes for all the partition/recordkey pair
-    JavaPairRDD<HoodieKey, String> keyFilenamePairRDD = 
lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+    JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD = 
lookupIndex(partitionRecordKeyPairRDD, jsc,
+        hoodieTable);
 
     // Cache the result, for subsequent stages.
     if (config.getBloomIndexUseCaching()) {
@@ -109,27 +106,33 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
     return taggedRecordRDD;
   }
 
-  public JavaPairRDD<HoodieKey, Optional<String>> 
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
+  /**
+   * Returns an RDD mapping each HoodieKey with a partitionPath/fileID which 
contains it. Optional.Empty if the key is
+   * not found.
+   *
+   * @param hoodieKeys keys to lookup
+   * @param jsc spark context
+   * @param hoodieTable hoodie table object
+   */
+  @Override
+  public JavaPairRDD<HoodieKey, Optional<Pair<String, String>>> 
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
       JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
     JavaPairRDD<String, String> partitionRecordKeyPairRDD = hoodieKeys
         .mapToPair(key -> new Tuple2<>(key.getPartitionPath(), 
key.getRecordKey()));
 
     // Lookup indexes for all the partition/recordkey pair
-    JavaPairRDD<HoodieKey, String> keyFilenamePairRDD = 
lookupIndex(partitionRecordKeyPairRDD, jsc, hoodieTable);
+    JavaPairRDD<HoodieKey, HoodieRecordLocation> recordKeyLocationRDD = 
lookupIndex(partitionRecordKeyPairRDD, jsc,
+        hoodieTable);
     JavaPairRDD<HoodieKey, String> keyHoodieKeyPairRDD = 
hoodieKeys.mapToPair(key -> new Tuple2<>(key, null));
 
-    return 
keyHoodieKeyPairRDD.leftOuterJoin(keyFilenamePairRDD).mapToPair(keyPathTuple -> 
{
-      Optional<String> recordLocationPath;
-      if (keyPathTuple._2._2.isPresent()) {
-        String fileName = keyPathTuple._2._2.get();
-        String partitionPath = keyPathTuple._1.getPartitionPath();
-        recordLocationPath = Optional
-            .of(new Path(new Path(hoodieTable.getMetaClient().getBasePath(), 
partitionPath), fileName)
-                .toUri().getPath());
+    return 
keyHoodieKeyPairRDD.leftOuterJoin(recordKeyLocationRDD).mapToPair(keyLoc -> {
+      Optional<Pair<String, String>> partitionPathFileidPair;
+      if (keyLoc._2._2.isPresent()) {
+        partitionPathFileidPair = 
Optional.of(Pair.of(keyLoc._1().getPartitionPath(), 
keyLoc._2._2.get().getFileId()));
       } else {
-        recordLocationPath = Optional.absent();
+        partitionPathFileidPair = Optional.absent();
       }
-      return new Tuple2<>(keyPathTuple._1, recordLocationPath);
+      return new Tuple2<>(keyLoc._1, partitionPathFileidPair);
     });
   }
 
@@ -137,9 +140,9 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
    * Lookup the location for each record key and return the 
pair<record_key,location> for all record keys already
    * present and drop the record keys if not present
    */
-  private JavaPairRDD<HoodieKey, String> lookupIndex(
-      JavaPairRDD<String, String> partitionRecordKeyPairRDD, final 
JavaSparkContext
-      jsc, final HoodieTable hoodieTable) {
+  private JavaPairRDD<HoodieKey, HoodieRecordLocation> lookupIndex(
+      JavaPairRDD<String, String> partitionRecordKeyPairRDD, final 
JavaSparkContext jsc,
+      final HoodieTable hoodieTable) {
     // Obtain records per partition, in the incoming records
     Map<String, Long> recordsPerPartition = 
partitionRecordKeyPairRDD.countByKey();
     List<String> affectedPartitionPathList = new 
ArrayList<>(recordsPerPartition.keySet());
@@ -157,7 +160,7 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
     int safeParallelism = computeSafeParallelism(recordsPerPartition, 
comparisonsPerFileGroup);
     int joinParallelism = 
determineParallelism(partitionRecordKeyPairRDD.partitions().size(), 
safeParallelism);
     return findMatchingFilesForRecordKeys(partitionToFileInfo, 
partitionRecordKeyPairRDD, joinParallelism,
-        hoodieTable.getMetaClient(), comparisonsPerFileGroup);
+        hoodieTable, comparisonsPerFileGroup);
   }
 
   /**
@@ -178,7 +181,7 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
       partitionToFileInfo.entrySet().stream().forEach(e -> {
         for (BloomIndexFileInfo fileInfo : e.getValue()) {
           //each file needs to be compared against all the records coming into 
the partition
-          fileToComparisons.put(fileInfo.getFileName(), 
recordsPerPartition.get(e.getKey()));
+          fileToComparisons.put(fileInfo.getFileId(), 
recordsPerPartition.get(e.getKey()));
         }
       });
     }
@@ -227,35 +230,35 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
       final HoodieTable hoodieTable) {
 
     // Obtain the latest data files from all the partitions.
-    List<Tuple2<String, HoodieDataFile>> dataFilesList = jsc
-        .parallelize(partitions, Math.max(partitions.size(), 
1)).flatMapToPair(partitionPath -> {
+    List<Pair<String, String>> partitionPathFileIDList = jsc
+        .parallelize(partitions, Math.max(partitions.size(), 1))
+        .flatMap(partitionPath -> {
           java.util.Optional<HoodieInstant> latestCommitTime = 
hoodieTable.getMetaClient().getCommitsTimeline()
               .filterCompletedInstants().lastInstant();
-          List<Tuple2<String, HoodieDataFile>> filteredFiles = new 
ArrayList<>();
+          List<Pair<String, String>> filteredFiles = new ArrayList<>();
           if (latestCommitTime.isPresent()) {
             filteredFiles = hoodieTable.getROFileSystemView()
                 .getLatestDataFilesBeforeOrOn(partitionPath, 
latestCommitTime.get().getTimestamp())
-                .map(f -> new Tuple2<>(partitionPath, f)).collect(toList());
+                .map(f -> Pair.of(partitionPath, 
f.getFileId())).collect(toList());
           }
           return filteredFiles.iterator();
         }).collect();
 
     if (config.getBloomIndexPruneByRanges()) {
       // also obtain file ranges, if range pruning is enabled
-      return jsc.parallelize(dataFilesList, Math.max(dataFilesList.size(), 
1)).mapToPair(ft -> {
+      return jsc.parallelize(partitionPathFileIDList, 
Math.max(partitionPathFileIDList.size(), 1)).mapToPair(pf -> {
         try {
-          String[] minMaxKeys = ParquetUtils
-              .readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new 
Path(ft._2().getPath()));
-          return new Tuple2<>(ft._1(),
-              new BloomIndexFileInfo(ft._2().getFileName(), minMaxKeys[0], 
minMaxKeys[1]));
+          HoodieRangeInfoHandle<T> rangeInfoHandle = new 
HoodieRangeInfoHandle<T>(config, hoodieTable, pf);
+          String[] minMaxKeys = rangeInfoHandle.getMinMaxKeys();
+          return new Tuple2<>(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue(), minMaxKeys[0], minMaxKeys[1]));
         } catch (MetadataNotFoundException me) {
-          logger.warn("Unable to find range metadata in file :" + ft._2());
-          return new Tuple2<>(ft._1(), new 
BloomIndexFileInfo(ft._2().getFileName()));
+          logger.warn("Unable to find range metadata in file :" + pf);
+          return new Tuple2<>(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue()));
         }
       }).collect();
     } else {
-      return dataFilesList.stream()
-          .map(ft -> new Tuple2<>(ft._1(), new 
BloomIndexFileInfo(ft._2().getFileName())))
+      return partitionPathFileIDList.stream()
+          .map(pf -> new Tuple2<>(pf.getKey(), new 
BloomIndexFileInfo(pf.getValue())))
           .collect(toList());
     }
   }
@@ -324,9 +327,9 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
    * parallelism for tagging location
    */
   @VisibleForTesting
-  JavaPairRDD<HoodieKey, String> findMatchingFilesForRecordKeys(
+  JavaPairRDD<HoodieKey, HoodieRecordLocation> findMatchingFilesForRecordKeys(
       final Map<String, List<BloomIndexFileInfo>> partitionToFileIndexInfo,
-      JavaPairRDD<String, String> partitionRecordKeyPairRDD, int 
shuffleParallelism, HoodieTableMetaClient metaClient,
+      JavaPairRDD<String, String> partitionRecordKeyPairRDD, int 
shuffleParallelism, HoodieTable hoodieTable,
       Map<String, Long> fileGroupToComparisons) {
     JavaRDD<Tuple2<String, HoodieKey>> fileComparisonsRDD =
         explodeRecordRDDWithFileComparisons(partitionToFileIndexInfo, 
partitionRecordKeyPairRDD);
@@ -347,17 +350,18 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
     }
 
     return fileComparisonsRDD
-        .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(metaClient, 
config.getBasePath()), true)
+        .mapPartitionsWithIndex(new HoodieBloomIndexCheckFunction(hoodieTable, 
config), true)
         .flatMap(List::iterator)
         .filter(lr -> lr.getMatchingRecordKeys().size() > 0)
         .flatMapToPair(lookupResult -> 
lookupResult.getMatchingRecordKeys().stream()
             .map(recordKey -> new Tuple2<>(new HoodieKey(recordKey, 
lookupResult.getPartitionPath()),
-                lookupResult.getFileName()))
+                new HoodieRecordLocation(lookupResult.getBaseInstantTime(), 
lookupResult.getFileId())))
             .collect(Collectors.toList())
             .iterator());
   }
 
-  HoodieRecord<T> getTaggedRecord(HoodieRecord<T> inputRecord, 
org.apache.spark.api.java.Optional<String> location) {
+  HoodieRecord<T> getTaggedRecord(HoodieRecord<T> inputRecord,
+      org.apache.spark.api.java.Optional<HoodieRecordLocation> location) {
     HoodieRecord<T> record = inputRecord;
     if (location.isPresent()) {
       // When you have a record in multiple files in the same partition, then 
rowKeyRecordPairRDD
@@ -366,11 +370,7 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
       // currentLocation 2 times and it will fail the second time. So creating 
a new in memory
       // copy of the hoodie record.
       record = new HoodieRecord<>(inputRecord);
-      String filename = location.get();
-      if (filename != null && !filename.isEmpty()) {
-        record.setCurrentLocation(new 
HoodieRecordLocation(FSUtils.getCommitTime(filename),
-            FSUtils.getFileId(filename)));
-      }
+      record.setCurrentLocation(location.get());
     }
     return record;
   }
@@ -379,10 +379,9 @@ public class HoodieBloomIndex<T extends 
HoodieRecordPayload> extends HoodieIndex
    * Tag the <rowKey, filename> back to the original HoodieRecord RDD.
    */
   protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
-      JavaPairRDD<HoodieKey, String> keyFilenamePairRDD, 
JavaRDD<HoodieRecord<T>> recordRDD) {
+      JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, 
JavaRDD<HoodieRecord<T>> recordRDD) {
     JavaPairRDD<HoodieKey, HoodieRecord<T>> keyRecordPairRDD = recordRDD
         .mapToPair(record -> new Tuple2<>(record.getKey(), record));
-
     // Here as the recordRDD might have more data than rowKeyRDD (some 
rowKeys' fileId is null),
     // so we do left outer join.
     return keyRecordPairRDD.leftOuterJoin(keyFilenamePairRDD).values().map(v1 
-> getTaggedRecord(v1._1, v1._2));
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
index 381e745..3dd3df5 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieBloomIndexCheckFunction.java
@@ -18,23 +18,18 @@
 
 package com.uber.hoodie.index.bloom;
 
-import com.uber.hoodie.common.BloomFilter;
 import com.uber.hoodie.common.model.HoodieKey;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
-import com.uber.hoodie.common.util.HoodieTimer;
-import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.exception.HoodieIndexException;
 import com.uber.hoodie.func.LazyIterableIterator;
+import com.uber.hoodie.io.HoodieKeyLookupHandle;
+import com.uber.hoodie.io.HoodieKeyLookupHandle.KeyLookupResult;
+import com.uber.hoodie.table.HoodieTable;
 import java.util.ArrayList;
-import java.util.HashSet;
 import java.util.Iterator;
 import java.util.List;
-import java.util.Set;
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
 import org.apache.spark.api.java.function.Function2;
 import scala.Tuple2;
 
@@ -43,150 +38,69 @@ import scala.Tuple2;
  * actual files
  */
 public class HoodieBloomIndexCheckFunction implements
-    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>,
-        Iterator<List<KeyLookupResult>>> {
+    Function2<Integer, Iterator<Tuple2<String, HoodieKey>>, 
Iterator<List<KeyLookupResult>>> {
 
-  private static Logger logger = 
LogManager.getLogger(HoodieBloomIndexCheckFunction.class);
+  private final HoodieTable hoodieTable;
 
-  private final String basePath;
+  private final HoodieWriteConfig config;
 
-  private final HoodieTableMetaClient metaClient;
-
-  public HoodieBloomIndexCheckFunction(HoodieTableMetaClient metaClient, 
String basePath) {
-    this.metaClient = metaClient;
-    this.basePath = basePath;
-  }
-
-  /**
-   * Given a list of row keys and one file, return only row keys existing in 
that file.
-   */
-  public static List<String> checkCandidatesAgainstFile(Configuration 
configuration,
-      List<String> candidateRecordKeys, Path filePath) throws 
HoodieIndexException {
-    List<String> foundRecordKeys = new ArrayList<>();
-    try {
-      // Load all rowKeys from the file, to double-confirm
-      if (!candidateRecordKeys.isEmpty()) {
-        HoodieTimer timer = new HoodieTimer().startTimer();
-        Set<String> fileRowKeys = 
ParquetUtils.filterParquetRowKeys(configuration, filePath,
-            new HashSet<>(candidateRecordKeys));
-        foundRecordKeys.addAll(fileRowKeys);
-        logger.info(String.format("Checked keys against file %s, in %d ms. 
#candidates (%d) #found (%d)", filePath,
-            timer.endTimer(), candidateRecordKeys.size(), 
foundRecordKeys.size()));
-        if (logger.isDebugEnabled()) {
-          logger.debug("Keys matching for file " + filePath + " => " + 
foundRecordKeys);
-        }
-      }
-    } catch (Exception e) {
-      throw new HoodieIndexException("Error checking candidate keys against 
file.", e);
-    }
-    return foundRecordKeys;
+  public HoodieBloomIndexCheckFunction(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
+    this.hoodieTable = hoodieTable;
+    this.config = config;
   }
 
   @Override
   public Iterator<List<KeyLookupResult>> call(Integer partition,
-      Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr)
-      throws Exception {
+      Iterator<Tuple2<String, HoodieKey>> fileParitionRecordKeyTripletItr) {
     return new LazyKeyCheckIterator(fileParitionRecordKeyTripletItr);
   }
 
-  class LazyKeyCheckIterator extends
-      LazyIterableIterator<Tuple2<String, HoodieKey>, List<KeyLookupResult>> {
+  class LazyKeyCheckIterator extends LazyIterableIterator<Tuple2<String, 
HoodieKey>, List<KeyLookupResult>> {
 
-    private List<String> candidateRecordKeys;
-
-    private BloomFilter bloomFilter;
-
-    private String currentFile;
-
-    private String currentPartitionPath;
-
-    private long totalKeysChecked;
+    private HoodieKeyLookupHandle keyLookupHandle;
 
     LazyKeyCheckIterator(
         Iterator<Tuple2<String, HoodieKey>> filePartitionRecordKeyTripletItr) {
       super(filePartitionRecordKeyTripletItr);
-      currentFile = null;
-      candidateRecordKeys = new ArrayList<>();
-      bloomFilter = null;
-      currentPartitionPath = null;
-      totalKeysChecked = 0;
     }
 
     @Override
     protected void start() {
     }
 
-    private void initState(String fileName, String partitionPath) throws 
HoodieIndexException {
-      try {
-        Path filePath = new Path(basePath + "/" + partitionPath + "/" + 
fileName);
-        HoodieTimer timer = new HoodieTimer().startTimer();
-        bloomFilter = 
ParquetUtils.readBloomFilterFromParquetMetadata(metaClient.getHadoopConf(), 
filePath);
-        logger.info(String.format("Read bloom filter from %s/%s in %d ms", 
partitionPath, fileName, timer.endTimer()));
-        candidateRecordKeys = new ArrayList<>();
-        currentFile = fileName;
-        currentPartitionPath = partitionPath;
-        totalKeysChecked = 0;
-      } catch (Exception e) {
-        throw new HoodieIndexException("Error checking candidate keys against 
file.", e);
-      }
-    }
-
-    // check record key against bloom filter of current file & add to possible 
keys if needed
-    private void checkAndAddCandidates(String recordKey) {
-      if (bloomFilter.mightContain(recordKey)) {
-        if (logger.isDebugEnabled()) {
-          logger.debug("Record key " + recordKey + " matches bloom filter in 
file " + currentPartitionPath
-              + "/" + currentFile);
-        }
-        candidateRecordKeys.add(recordKey);
-      }
-      totalKeysChecked++;
-    }
-
-    private List<String> checkAgainstCurrentFile() {
-      Path filePath = new Path(basePath + "/" + currentPartitionPath + "/" + 
currentFile);
-      if (logger.isDebugEnabled()) {
-        logger.debug("#The candidate row keys for " + filePath + " => " + 
candidateRecordKeys);
-      }
-      List<String> matchingKeys = 
checkCandidatesAgainstFile(metaClient.getHadoopConf(), candidateRecordKeys, 
filePath);
-      logger.info(String.format("Total records (%d), bloom filter candidates 
(%d)/fp(%d), actual matches (%d)",
-          totalKeysChecked, candidateRecordKeys.size(), 
candidateRecordKeys.size() - matchingKeys.size(),
-          matchingKeys.size()));
-      return matchingKeys;
-    }
-
     @Override
-    protected List<KeyLookupResult> computeNext() {
+    protected List<HoodieKeyLookupHandle.KeyLookupResult> computeNext() {
 
-      List<KeyLookupResult> ret = new ArrayList<>();
+      List<HoodieKeyLookupHandle.KeyLookupResult> ret = new ArrayList<>();
       try {
         // process one file in each go.
         while (inputItr.hasNext()) {
           Tuple2<String, HoodieKey> currentTuple = inputItr.next();
-          String fileName = currentTuple._1;
+          String fileId = currentTuple._1;
           String partitionPath = currentTuple._2.getPartitionPath();
           String recordKey = currentTuple._2.getRecordKey();
+          Pair<String, String> partitionPathFilePair = Pair.of(partitionPath, 
fileId);
 
           // lazily init state
-          if (currentFile == null) {
-            initState(fileName, partitionPath);
+          if (keyLookupHandle == null) {
+            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, 
partitionPathFilePair);
           }
 
           // if continue on current file
-          if (fileName.equals(currentFile)) {
-            checkAndAddCandidates(recordKey);
+          if 
(keyLookupHandle.getPartitionPathFilePair().equals(partitionPathFilePair)) {
+            keyLookupHandle.addKey(recordKey);
           } else {
             // do the actual checking of file & break out
-            ret.add(new KeyLookupResult(currentFile, currentPartitionPath, 
checkAgainstCurrentFile()));
-            initState(fileName, partitionPath);
-            checkAndAddCandidates(recordKey);
+            ret.add(keyLookupHandle.getLookupResult());
+            keyLookupHandle = new HoodieKeyLookupHandle(config, hoodieTable, 
partitionPathFilePair);
+            keyLookupHandle.addKey(recordKey);
             break;
           }
         }
 
         // handle case, where we ran out of input, close pending work, update 
return val
         if (!inputItr.hasNext()) {
-          ret.add(new KeyLookupResult(currentFile, currentPartitionPath, 
checkAgainstCurrentFile()));
+          ret.add(keyLookupHandle.getLookupResult());
         }
       } catch (Throwable e) {
         if (e instanceof HoodieException) {
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
index 2331729..4ec51d9 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/HoodieGlobalBloomIndex.java
@@ -21,6 +21,7 @@ package com.uber.hoodie.index.bloom;
 import com.google.common.annotations.VisibleForTesting;
 import com.uber.hoodie.common.model.HoodieKey;
 import com.uber.hoodie.common.model.HoodieRecord;
+import com.uber.hoodie.common.model.HoodieRecordLocation;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.util.FSUtils;
@@ -83,7 +84,7 @@ public class HoodieGlobalBloomIndex<T extends 
HoodieRecordPayload> extends Hoodi
       JavaPairRDD<String, String> partitionRecordKeyPairRDD) {
     Map<String, String> indexToPartitionMap = new HashMap<>();
     for (Entry<String, List<BloomIndexFileInfo>> entry : 
partitionToFileIndexInfo.entrySet()) {
-      entry.getValue().forEach(indexFile -> 
indexToPartitionMap.put(indexFile.getFileName(), entry.getKey()));
+      entry.getValue().forEach(indexFile -> 
indexToPartitionMap.put(indexFile.getFileId(), entry.getKey()));
     }
 
     IndexFileFilter indexFileFilter = config.getBloomIndexPruneByRanges()
@@ -106,7 +107,7 @@ public class HoodieGlobalBloomIndex<T extends 
HoodieRecordPayload> extends Hoodi
    */
   @Override
   protected JavaRDD<HoodieRecord<T>> tagLocationBacktoRecords(
-      JavaPairRDD<HoodieKey, String> keyFilenamePairRDD, 
JavaRDD<HoodieRecord<T>> recordRDD) {
+      JavaPairRDD<HoodieKey, HoodieRecordLocation> keyFilenamePairRDD, 
JavaRDD<HoodieRecord<T>> recordRDD) {
     JavaPairRDD<String, HoodieRecord<T>> rowKeyRecordPairRDD = recordRDD
         .mapToPair(record -> new Tuple2<>(record.getRecordKey(), record));
 
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java
index c03de08..c675ffc 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedGlobalIndexFileFilter.java
@@ -50,9 +50,9 @@ class IntervalTreeBasedGlobalIndexFileFilter implements 
IndexFileFilter {
     allIndexFiles.forEach(indexFile -> {
       if (indexFile.hasKeyRanges()) {
         indexLookUpTree.insert(new KeyRangeNode(indexFile.getMinRecordKey(),
-            indexFile.getMaxRecordKey(), indexFile.getFileName()));
+            indexFile.getMaxRecordKey(), indexFile.getFileId()));
       } else {
-        filesWithNoRanges.add(indexFile.getFileName());
+        filesWithNoRanges.add(indexFile.getFileId());
       }
     });
   }
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java
index 5e34d54..412e778 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/IntervalTreeBasedIndexFileFilter.java
@@ -49,12 +49,12 @@ class IntervalTreeBasedIndexFileFilter implements 
IndexFileFilter {
       bloomIndexFiles.forEach(indexFileInfo -> {
         if (indexFileInfo.hasKeyRanges()) {
           lookUpTree.insert(new KeyRangeNode(indexFileInfo.getMinRecordKey(),
-              indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileName()));
+              indexFileInfo.getMaxRecordKey(), indexFileInfo.getFileId()));
         } else {
           if (!partitionToFilesWithNoRanges.containsKey(partition)) {
             partitionToFilesWithNoRanges.put(partition, new HashSet<>());
           }
-          
partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileName());
+          
partitionToFilesWithNoRanges.get(partition).add(indexFileInfo.getFileId());
         }
       });
       partitionToFileIndexLookUpTree.put(partition, lookUpTree);
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java
index c5d627d..f904126 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedGlobalIndexFileFilter.java
@@ -43,7 +43,7 @@ class ListBasedGlobalIndexFileFilter extends 
ListBasedIndexFileFilter {
         // for each candidate file in partition, that needs to be compared.
         for (BloomIndexFileInfo indexInfo : indexInfos) {
           if (shouldCompareWithFile(indexInfo, recordKey)) {
-            toReturn.add(indexInfo.getFileName());
+            toReturn.add(indexInfo.getFileId());
           }
         }
       }
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java
index 353f22d..b99280b 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/index/bloom/ListBasedIndexFileFilter.java
@@ -48,7 +48,7 @@ class ListBasedIndexFileFilter implements IndexFileFilter {
       // for each candidate file in partition, that needs to be compared.
       for (BloomIndexFileInfo indexInfo : indexInfos) {
         if (shouldCompareWithFile(indexInfo, recordKey)) {
-          toReturn.add(indexInfo.getFileName());
+          toReturn.add(indexInfo.getFileId());
         }
       }
     }
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java 
b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
index 87f3ba5..4eb9443 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/index/hbase/HBaseIndex.java
@@ -29,6 +29,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
 import com.uber.hoodie.common.util.ReflectionUtils;
+import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieIndexConfig;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.HoodieDependentSystemUnavailableException;
@@ -60,7 +61,6 @@ import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.api.java.function.Function2;
-
 import scala.Tuple2;
 
 /**
@@ -123,9 +123,8 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
   }
 
   @Override
-  public JavaPairRDD<HoodieKey, Optional<String>> 
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
+  public JavaPairRDD<HoodieKey, Optional<Pair<String, String>>> 
fetchRecordLocation(JavaRDD<HoodieKey> hoodieKeys,
       JavaSparkContext jsc, HoodieTable<T> hoodieTable) {
-    //TODO : Change/Remove filterExists in HoodieReadClient() and revisit
     throw new UnsupportedOperationException("HBase index does not implement 
check exist");
   }
 
@@ -297,7 +296,7 @@ public class HBaseIndex<T extends HoodieRecordPayload> 
extends HoodieIndex<T> {
                   }
                   Put put = new Put(Bytes.toBytes(rec.getRecordKey()));
                   put.addColumn(SYSTEM_COLUMN_FAMILY, COMMIT_TS_COLUMN,
-                      Bytes.toBytes(loc.get().getCommitTime()));
+                      Bytes.toBytes(loc.get().getInstantTime()));
                   put.addColumn(SYSTEM_COLUMN_FAMILY, FILE_NAME_COLUMN,
                       Bytes.toBytes(loc.get().getFileId()));
                   put.addColumn(SYSTEM_COLUMN_FAMILY, PARTITION_PATH_COLUMN,
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
index 1b4081c..cc98f56 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieAppendHandle.java
@@ -59,7 +59,7 @@ import org.apache.spark.util.SizeEstimator;
 /**
  * IO Operation to append data onto an existing file.
  */
-public class HoodieAppendHandle<T extends HoodieRecordPayload> extends 
HoodieIOHandle<T> {
+public class HoodieAppendHandle<T extends HoodieRecordPayload> extends 
HoodieWriteHandle<T> {
 
   private static Logger logger = 
LogManager.getLogger(HoodieAppendHandle.class);
   // This acts as the sequenceID for records written
@@ -114,7 +114,7 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieIOH
       RealtimeView rtView = hoodieTable.getRTFileSystemView();
       Option<FileSlice> fileSlice = rtView.getLatestFileSlice(partitionPath, 
fileId);
       // Set the base commit time as the current commitTime for new inserts 
into log files
-      String baseInstantTime = commitTime;
+      String baseInstantTime = instantTime;
       if (fileSlice.isPresent()) {
         baseInstantTime = fileSlice.get().getBaseInstantTime();
       } else {
@@ -134,11 +134,11 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieIOH
         ((HoodieDeltaWriteStat) 
writeStatus.getStat()).setLogVersion(currentLogFile.getLogVersion());
         ((HoodieDeltaWriteStat) 
writeStatus.getStat()).setLogOffset(writer.getCurrentSize());
       } catch (Exception e) {
-        logger.error("Error in update task at commit " + commitTime, e);
+        logger.error("Error in update task at commit " + instantTime, e);
         writeStatus.setGlobalError(e);
         throw new HoodieUpsertException(
             "Failed to initialize HoodieAppendHandle for FileId: " + fileId + 
" on commit "
-                + commitTime + " on HDFS path " + 
hoodieTable.getMetaClient().getBasePath()
+                + instantTime + " on HDFS path " + 
hoodieTable.getMetaClient().getBasePath()
                 + partitionPath, e);
       }
       Path path = new Path(partitionPath, writer.getLogFile().getFileName());
@@ -154,13 +154,13 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieIOH
       if (avroRecord.isPresent()) {
         // Convert GenericRecord to GenericRecord with hoodie commit metadata 
in schema
         avroRecord = Optional.of(rewriteRecord((GenericRecord) 
avroRecord.get()));
-        String seqId = HoodieRecord.generateSequenceId(commitTime, 
TaskContext.getPartitionId(),
+        String seqId = HoodieRecord.generateSequenceId(instantTime, 
TaskContext.getPartitionId(),
             recordIndex.getAndIncrement());
         HoodieAvroUtils
             .addHoodieKeyToRecord((GenericRecord) avroRecord.get(), 
hoodieRecord.getRecordKey(),
                 hoodieRecord.getPartitionPath(), fileId);
         HoodieAvroUtils
-            .addCommitMetadataToRecord((GenericRecord) avroRecord.get(), 
commitTime, seqId);
+            .addCommitMetadataToRecord((GenericRecord) avroRecord.get(), 
instantTime, seqId);
         // If currentLocation is present, then this is an update
         if (hoodieRecord.getCurrentLocation() != null) {
           updatedRecordsWritten++;
@@ -200,7 +200,7 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieIOH
 
   private void doAppend(Map<HoodieLogBlock.HeaderMetadataType, String> header) 
{
     try {
-      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, commitTime);
+      header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, instantTime);
       header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
writerSchema.toString());
       if (recordList.size() > 0) {
         writer = writer.appendBlock(new HoodieAvroDataBlock(recordList, 
header));
@@ -286,7 +286,7 @@ public class HoodieAppendHandle<T extends 
HoodieRecordPayload> extends HoodieIOH
 
   private void writeToBuffer(HoodieRecord<T> record) {
     // update the new location of the record, so we know where to find it next
-    record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
+    record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
     Optional<IndexedRecord> indexedRecord = getIndexedRecord(record);
     if (indexedRecord.isPresent()) {
       recordList.add(indexedRecord.get());
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
index 0b75427..cc4596e 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieCreateHandle.java
@@ -41,7 +41,7 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.TaskContext;
 
-public class HoodieCreateHandle<T extends HoodieRecordPayload> extends 
HoodieIOHandle<T> {
+public class HoodieCreateHandle<T extends HoodieRecordPayload> extends 
HoodieWriteHandle<T> {
 
   private static Logger logger = 
LogManager.getLogger(HoodieCreateHandle.class);
 
@@ -101,7 +101,7 @@ public class HoodieCreateHandle<T extends 
HoodieRecordPayload> extends HoodieIOH
         IndexedRecord recordWithMetadataInSchema = 
rewriteRecord((GenericRecord) avroRecord.get());
         storageWriter.writeAvroWithMetadata(recordWithMetadataInSchema, 
record);
         // update the new location of record, so we know where to find it next
-        record.setNewLocation(new HoodieRecordLocation(commitTime, 
writeStatus.getFileId()));
+        record.setNewLocation(new HoodieRecordLocation(instantTime, 
writeStatus.getFileId()));
         recordsWritten++;
         insertRecordsWritten++;
       } else {
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
index 8fd0391..89d0aa2 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
@@ -18,167 +18,25 @@
 
 package com.uber.hoodie.io;
 
-import com.uber.hoodie.WriteStatus;
-import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
-import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
-import com.uber.hoodie.common.util.FSUtils;
-import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
-import com.uber.hoodie.common.util.HoodieAvroUtils;
-import com.uber.hoodie.common.util.HoodieTimer;
-import com.uber.hoodie.common.util.NoOpConsistencyGuard;
-import com.uber.hoodie.common.util.ReflectionUtils;
 import com.uber.hoodie.config.HoodieWriteConfig;
-import com.uber.hoodie.exception.HoodieException;
-import com.uber.hoodie.exception.HoodieIOException;
 import com.uber.hoodie.table.HoodieTable;
-import java.io.IOException;
-import java.util.Optional;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.log4j.LogManager;
-import org.apache.log4j.Logger;
-import org.apache.spark.TaskContext;
 
 
 public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
 
-  private static Logger logger = LogManager.getLogger(HoodieIOHandle.class);
-  protected final String commitTime;
-  protected final String fileId;
-  protected final String writeToken;
+  protected final String instantTime;
   protected final HoodieWriteConfig config;
   protected final FileSystem fs;
   protected final HoodieTable<T> hoodieTable;
-  protected final Schema originalSchema;
-  protected final Schema writerSchema;
-  protected HoodieTimer timer;
-  protected final WriteStatus writeStatus;
 
-  public HoodieIOHandle(HoodieWriteConfig config, String commitTime, String 
fileId,
-      HoodieTable<T> hoodieTable) {
-    this.commitTime = commitTime;
-    this.fileId = fileId;
-    this.writeToken = makeSparkWriteToken();
+  HoodieIOHandle(HoodieWriteConfig config, String instantTime, HoodieTable<T> 
hoodieTable) {
+    this.instantTime = instantTime;
     this.config = config;
-    this.fs = getFileSystem(hoodieTable, config);
     this.hoodieTable = hoodieTable;
-    this.originalSchema = new Schema.Parser().parse(config.getSchema());
-    this.writerSchema = createHoodieWriteSchema(originalSchema);
-    this.timer = new HoodieTimer().startTimer();
-    this.writeStatus = (WriteStatus) 
ReflectionUtils.loadClass(config.getWriteStatusClassName(),
-        !hoodieTable.getIndex().isImplicitWithStorage(),
-        config.getWriteStatusFailureFraction());
+    this.fs = getFileSystem();
   }
 
-  private static FileSystem getFileSystem(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
-    return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), 
config.isConsistencyCheckEnabled()
-        ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
-        config.getMaxConsistencyChecks(), 
config.getInitialConsistencyCheckIntervalMs(),
-        config.getMaxConsistencyCheckIntervalMs()) : new 
NoOpConsistencyGuard());
-  }
-
-  /**
-   * Generate a write token based on the currently running spark task and its 
place in the spark dag.
-   */
-  private static String makeSparkWriteToken() {
-    return FSUtils.makeWriteToken(TaskContext.getPartitionId(), 
TaskContext.get().stageId(),
-        TaskContext.get().taskAttemptId());
-  }
-
-  public static Schema createHoodieWriteSchema(Schema originalSchema) {
-    return HoodieAvroUtils.addMetadataFields(originalSchema);
-  }
-
-  public Path makeNewPath(String partitionPath) {
-    Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
-    try {
-      fs.mkdirs(path); // create a new partition as needed.
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to make dir " + path, e);
-    }
-
-    return new Path(path.toString(), FSUtils.makeDataFileName(commitTime, 
writeToken, fileId));
-  }
-
-  /**
-   * Creates an empty marker file corresponding to storage writer path
-   * @param partitionPath Partition path
-   */
-  protected void createMarkerFile(String partitionPath) {
-    Path markerPath = makeNewMarkerPath(partitionPath);
-    try {
-      logger.info("Creating Marker Path=" + markerPath);
-      fs.create(markerPath, false).close();
-    } catch (IOException e) {
-      throw new HoodieException("Failed to create marker file " + markerPath, 
e);
-    }
-  }
-
-  /**
-   * THe marker path will be  
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename
-   * @param partitionPath
-   * @return
-   */
-  private Path makeNewMarkerPath(String partitionPath) {
-    Path markerRootPath = new 
Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime));
-    Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
-    try {
-      fs.mkdirs(path); // create a new partition as needed.
-    } catch (IOException e) {
-      throw new HoodieIOException("Failed to make dir " + path, e);
-    }
-    return new Path(path.toString(), FSUtils.makeMarkerFile(commitTime, 
writeToken, fileId));
-  }
-
-  public Schema getWriterSchema() {
-    return writerSchema;
-  }
-
-  /**
-   * Determines whether we can accept the incoming records, into the current 
file, depending on
-   * <p>
-   * - Whether it belongs to the same partitionPath as existing records - 
Whether the current file
-   * written bytes lt max file size
-   */
-  public boolean canWrite(HoodieRecord record) {
-    return false;
-  }
-
-  /**
-   * Perform the actual writing of the given record into the backing file.
-   */
-  public void write(HoodieRecord record, Optional<IndexedRecord> insertValue) {
-    // NO_OP
-  }
-
-  /**
-   * Perform the actual writing of the given record into the backing file.
-   */
-  public void write(HoodieRecord record, Optional<IndexedRecord> avroRecord, 
Optional<Exception> exception) {
-    Optional recordMetadata = record.getData().getMetadata();
-    if (exception.isPresent() && exception.get() instanceof Throwable) {
-      // Not throwing exception from here, since we don't want to fail the 
entire job for a single record
-      writeStatus.markFailure(record, exception.get(), recordMetadata);
-      logger.error("Error writing record " + record, exception.get());
-    } else {
-      write(record, avroRecord);
-    }
-  }
-
-  /**
-   * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata 
fields
-   * @param record
-   * @return
-   */
-  protected GenericRecord rewriteRecord(GenericRecord record) {
-    return HoodieAvroUtils.rewriteRecord(record, writerSchema);
-  }
-
-  public abstract WriteStatus close();
-
-  public abstract WriteStatus getWriteStatus();
+  protected abstract FileSystem getFileSystem();
 }
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java
new file mode 100644
index 0000000..c5bdbf6
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieKeyLookupHandle.java
@@ -0,0 +1,158 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io;
+
+import com.uber.hoodie.common.BloomFilter;
+import com.uber.hoodie.common.model.HoodieDataFile;
+import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.model.HoodieTableType;
+import com.uber.hoodie.common.util.HoodieTimer;
+import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.exception.HoodieIndexException;
+import com.uber.hoodie.table.HoodieTable;
+import java.util.ArrayList;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Takes a bunch of keys and returns ones that are present in the file group.
+ */
+public class HoodieKeyLookupHandle<T extends HoodieRecordPayload> extends 
HoodieReadHandle<T> {
+
+  private static Logger logger = 
LogManager.getLogger(HoodieKeyLookupHandle.class);
+
+  private final HoodieTableType tableType;
+
+  private final BloomFilter bloomFilter;
+
+  private final List<String> candidateRecordKeys;
+
+  private long totalKeysChecked;
+
+  public HoodieKeyLookupHandle(HoodieWriteConfig config, HoodieTable<T> 
hoodieTable,
+      Pair<String, String> partitionPathFilePair) {
+    super(config, null, hoodieTable, partitionPathFilePair);
+    this.tableType = hoodieTable.getMetaClient().getTableType();
+    this.candidateRecordKeys = new ArrayList<>();
+    this.totalKeysChecked = 0;
+    HoodieTimer timer = new HoodieTimer().startTimer();
+    this.bloomFilter = 
ParquetUtils.readBloomFilterFromParquetMetadata(hoodieTable.getHadoopConf(),
+        new Path(getLatestDataFile().getPath()));
+    logger.info(String.format("Read bloom filter from %s in %d ms", 
partitionPathFilePair, timer.endTimer()));
+  }
+
+  /**
+   * Given a list of row keys and one file, return only row keys existing in 
that file.
+   */
+  public static List<String> checkCandidatesAgainstFile(Configuration 
configuration,
+      List<String> candidateRecordKeys, Path filePath) throws 
HoodieIndexException {
+    List<String> foundRecordKeys = new ArrayList<>();
+    try {
+      // Load all rowKeys from the file, to double-confirm
+      if (!candidateRecordKeys.isEmpty()) {
+        HoodieTimer timer = new HoodieTimer().startTimer();
+        Set<String> fileRowKeys = 
ParquetUtils.filterParquetRowKeys(configuration, filePath,
+            new HashSet<>(candidateRecordKeys));
+        foundRecordKeys.addAll(fileRowKeys);
+        logger.info(String.format("Checked keys against file %s, in %d ms. 
#candidates (%d) #found (%d)", filePath,
+            timer.endTimer(), candidateRecordKeys.size(), 
foundRecordKeys.size()));
+        if (logger.isDebugEnabled()) {
+          logger.debug("Keys matching for file " + filePath + " => " + 
foundRecordKeys);
+        }
+      }
+    } catch (Exception e) {
+      throw new HoodieIndexException("Error checking candidate keys against 
file.", e);
+    }
+    return foundRecordKeys;
+  }
+
+  /**
+   * Adds the key for look up.
+   */
+  public void addKey(String recordKey) {
+    // check record key against bloom filter of current file & add to possible 
keys if needed
+    if (bloomFilter.mightContain(recordKey)) {
+      if (logger.isDebugEnabled()) {
+        logger.debug("Record key " + recordKey + " matches bloom filter in  " 
+ partitionPathFilePair);
+      }
+      candidateRecordKeys.add(recordKey);
+    }
+    totalKeysChecked++;
+  }
+
+  /**
+   * Of all the keys, that were added, return a list of keys that were 
actually found in the file group.
+   */
+  public KeyLookupResult getLookupResult() {
+    if (logger.isDebugEnabled()) {
+      logger.debug("#The candidate row keys for " + partitionPathFilePair + " 
=> " + candidateRecordKeys);
+    }
+
+    HoodieDataFile dataFile = getLatestDataFile();
+    List<String> matchingKeys = 
checkCandidatesAgainstFile(hoodieTable.getHadoopConf(), candidateRecordKeys,
+        new Path(dataFile.getPath()));
+    logger.info(String.format("Total records (%d), bloom filter candidates 
(%d)/fp(%d), actual matches (%d)",
+        totalKeysChecked, candidateRecordKeys.size(), 
candidateRecordKeys.size() - matchingKeys.size(),
+        matchingKeys.size()));
+    return new KeyLookupResult(partitionPathFilePair.getRight(), 
partitionPathFilePair.getLeft(),
+        dataFile.getCommitTime(), matchingKeys);
+  }
+
+  /**
+   * Encapsulates the result from a key lookup
+   */
+  public static class KeyLookupResult {
+
+    private final String fileId;
+    private final String baseInstantTime;
+    private final List<String> matchingRecordKeys;
+    private final String partitionPath;
+
+    public KeyLookupResult(String fileId, String partitionPath, String 
baseInstantTime,
+        List<String> matchingRecordKeys) {
+      this.fileId = fileId;
+      this.partitionPath = partitionPath;
+      this.baseInstantTime = baseInstantTime;
+      this.matchingRecordKeys = matchingRecordKeys;
+    }
+
+    public String getFileId() {
+      return fileId;
+    }
+
+    public String getBaseInstantTime() {
+      return baseInstantTime;
+    }
+
+    public String getPartitionPath() {
+      return partitionPath;
+    }
+
+    public List<String> getMatchingRecordKeys() {
+      return matchingRecordKeys;
+    }
+  }
+}
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
index f94e293..b22bcb3 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieMergeHandle.java
@@ -28,6 +28,7 @@ import com.uber.hoodie.common.model.HoodieWriteStat;
 import com.uber.hoodie.common.model.HoodieWriteStat.RuntimeStats;
 import com.uber.hoodie.common.util.DefaultSizeEstimator;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.HoodieAvroUtils;
 import com.uber.hoodie.common.util.HoodieRecordSizeEstimator;
 import com.uber.hoodie.common.util.collection.ExternalSpillableMap;
 import com.uber.hoodie.config.HoodieWriteConfig;
@@ -42,6 +43,7 @@ import java.util.Iterator;
 import java.util.Map;
 import java.util.Optional;
 import java.util.Set;
+import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.generic.IndexedRecord;
 import org.apache.hadoop.fs.Path;
@@ -50,7 +52,7 @@ import org.apache.log4j.Logger;
 import org.apache.spark.TaskContext;
 
 @SuppressWarnings("Duplicates")
-public class HoodieMergeHandle<T extends HoodieRecordPayload> extends 
HoodieIOHandle<T> {
+public class HoodieMergeHandle<T extends HoodieRecordPayload> extends 
HoodieWriteHandle<T> {
 
   private static Logger logger = LogManager.getLogger(HoodieMergeHandle.class);
 
@@ -85,6 +87,64 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload> extends HoodieIOHa
         .getPartitionPath(), dataFileToBeMerged);
   }
 
+
+  public static Schema createHoodieWriteSchema(Schema originalSchema) {
+    return HoodieAvroUtils.addMetadataFields(originalSchema);
+  }
+
+  public Path makeNewPath(String partitionPath) {
+    Path path = FSUtils.getPartitionPath(config.getBasePath(), partitionPath);
+    try {
+      fs.mkdirs(path); // create a new partition as needed.
+    } catch (IOException e) {
+      throw new HoodieIOException("Failed to make dir " + path, e);
+    }
+
+    return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, 
writeToken, fileId));
+  }
+
+  public Schema getWriterSchema() {
+    return writerSchema;
+  }
+
+  /**
+   * Determines whether we can accept the incoming records, into the current 
file, depending on
+   * <p>
+   * - Whether it belongs to the same partitionPath as existing records - 
Whether the current file written bytes lt max
+   * file size
+   */
+  public boolean canWrite(HoodieRecord record) {
+    return false;
+  }
+
+  /**
+   * Perform the actual writing of the given record into the backing file.
+   */
+  public void write(HoodieRecord record, Optional<IndexedRecord> insertValue) {
+    // NO_OP
+  }
+
+  /**
+   * Perform the actual writing of the given record into the backing file.
+   */
+  public void write(HoodieRecord record, Optional<IndexedRecord> avroRecord, 
Optional<Exception> exception) {
+    Optional recordMetadata = record.getData().getMetadata();
+    if (exception.isPresent() && exception.get() instanceof Throwable) {
+      // Not throwing exception from here, since we don't want to fail the 
entire job for a single record
+      writeStatus.markFailure(record, exception.get(), recordMetadata);
+      logger.error("Error writing record " + record, exception.get());
+    } else {
+      write(record, avroRecord);
+    }
+  }
+
+  /**
+   * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata 
fields
+   */
+  protected GenericRecord rewriteRecord(GenericRecord record) {
+    return HoodieAvroUtils.rewriteRecord(record, writerSchema);
+  }
+
   /**
    * Extract old file path, initialize StorageWriter and WriteStatus
    */
@@ -95,14 +155,14 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload> extends HoodieIOHa
       String latestValidFilePath = dataFileToBeMerged.getFileName();
       
writeStatus.getStat().setPrevCommit(FSUtils.getCommitTime(latestValidFilePath));
 
-      HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, commitTime,
+      HoodiePartitionMetadata partitionMetadata = new 
HoodiePartitionMetadata(fs, instantTime,
           new Path(config.getBasePath()), 
FSUtils.getPartitionPath(config.getBasePath(), partitionPath));
       partitionMetadata.trySave(TaskContext.getPartitionId());
 
       oldFilePath = new Path(
           config.getBasePath() + "/" + partitionPath + "/" + 
latestValidFilePath);
-      String relativePath = new Path((partitionPath.isEmpty() ? "" : 
partitionPath + "/") + FSUtils
-          .makeDataFileName(commitTime, writeToken, fileId)).toString();
+      String relativePath = new Path((partitionPath.isEmpty() ? "" : 
partitionPath + "/")
+          + FSUtils.makeDataFileName(instantTime, writeToken, 
fileId)).toString();
       newFilePath = new Path(config.getBasePath(), relativePath);
 
       logger.info(String
@@ -120,13 +180,13 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload> extends HoodieIOHa
 
       // Create the writer for writing the new version file
       storageWriter = HoodieStorageWriterFactory
-          .getStorageWriter(commitTime, newFilePath, hoodieTable, config, 
writerSchema);
+          .getStorageWriter(instantTime, newFilePath, hoodieTable, config, 
writerSchema);
     } catch (IOException io) {
-      logger.error("Error in update task at commit " + commitTime, io);
+      logger.error("Error in update task at commit " + instantTime, io);
       writeStatus.setGlobalError(io);
       throw new HoodieUpsertException(
           "Failed to initialize HoodieUpdateHandle for FileId: " + fileId + " 
on commit "
-              + commitTime + " on path " + 
hoodieTable.getMetaClient().getBasePath(), io);
+              + instantTime + " on path " + 
hoodieTable.getMetaClient().getBasePath(), io);
     }
   }
 
@@ -148,7 +208,7 @@ public class HoodieMergeHandle<T extends 
HoodieRecordPayload> extends HoodieIOHa
       partitionPath = record.getPartitionPath();
       keyToNewRecords.put(record.getRecordKey(), record);
       // update the new location of the record, so we know where to find it 
next
-      record.setNewLocation(new HoodieRecordLocation(commitTime, fileId));
+      record.setNewLocation(new HoodieRecordLocation(instantTime, fileId));
     }
     logger.info("Number of entries in MemoryBasedMap => "
         + ((ExternalSpillableMap) keyToNewRecords).getInMemoryMapNumEntries()
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java
new file mode 100644
index 0000000..1f2369e
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieRangeInfoHandle.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io;
+
+import com.uber.hoodie.common.model.HoodieDataFile;
+import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.ParquetUtils;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.table.HoodieTable;
+import org.apache.hadoop.fs.Path;
+
+/**
+ * Extract range information for a given file slice
+ */
+public class HoodieRangeInfoHandle<T extends HoodieRecordPayload> extends 
HoodieReadHandle<T> {
+
+  public HoodieRangeInfoHandle(HoodieWriteConfig config, HoodieTable<T> 
hoodieTable,
+      Pair<String, String> partitionPathFilePair) {
+    super(config, null, hoodieTable, partitionPathFilePair);
+  }
+
+  public String[] getMinMaxKeys() {
+    HoodieDataFile dataFile = getLatestDataFile();
+    return ParquetUtils.readMinMaxRecordKeys(hoodieTable.getHadoopConf(), new 
Path(dataFile.getPath()));
+  }
+}
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
new file mode 100644
index 0000000..e0d3323
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.io;
+
+import com.uber.hoodie.common.model.HoodieDataFile;
+import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.table.HoodieTable;
+import org.apache.hadoop.fs.FileSystem;
+
+/**
+ * Base class for read operations done logically on the file group.
+ */
+public abstract class HoodieReadHandle<T extends HoodieRecordPayload> extends 
HoodieIOHandle {
+
+  protected final Pair<String, String> partitionPathFilePair;
+
+  public HoodieReadHandle(HoodieWriteConfig config, String instantTime, 
HoodieTable<T> hoodieTable,
+      Pair<String, String> partitionPathFilePair) {
+    super(config, instantTime, hoodieTable);
+    this.partitionPathFilePair = partitionPathFilePair;
+  }
+
+  @Override
+  protected FileSystem getFileSystem() {
+    return hoodieTable.getMetaClient().getFs();
+  }
+
+
+  public Pair<String, String> getPartitionPathFilePair() {
+    return partitionPathFilePair;
+  }
+
+  public String getFileId() {
+    return partitionPathFilePair.getRight();
+  }
+
+  protected HoodieDataFile getLatestDataFile() {
+    return hoodieTable.getROFileSystemView()
+        .getLatestDataFile(partitionPathFilePair.getLeft(), 
partitionPathFilePair.getRight()).get();
+  }
+}
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
similarity index 84%
copy from hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
copy to hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
index 8fd0391..1adeecc 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieIOHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
@@ -43,29 +43,23 @@ import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.TaskContext;
 
+/**
+ * Base class for all write operations logically performed at the file group 
level.
+ */
+public abstract class HoodieWriteHandle<T extends HoodieRecordPayload> extends 
HoodieIOHandle {
 
-public abstract class HoodieIOHandle<T extends HoodieRecordPayload> {
-
-  private static Logger logger = LogManager.getLogger(HoodieIOHandle.class);
-  protected final String commitTime;
-  protected final String fileId;
-  protected final String writeToken;
-  protected final HoodieWriteConfig config;
-  protected final FileSystem fs;
-  protected final HoodieTable<T> hoodieTable;
+  private static Logger logger = LogManager.getLogger(HoodieWriteHandle.class);
   protected final Schema originalSchema;
   protected final Schema writerSchema;
   protected HoodieTimer timer;
   protected final WriteStatus writeStatus;
+  protected final String fileId;
+  protected final String writeToken;
 
-  public HoodieIOHandle(HoodieWriteConfig config, String commitTime, String 
fileId,
-      HoodieTable<T> hoodieTable) {
-    this.commitTime = commitTime;
+  public HoodieWriteHandle(HoodieWriteConfig config, String instantTime, 
String fileId, HoodieTable<T> hoodieTable) {
+    super(config, instantTime, hoodieTable);
     this.fileId = fileId;
     this.writeToken = makeSparkWriteToken();
-    this.config = config;
-    this.fs = getFileSystem(hoodieTable, config);
-    this.hoodieTable = hoodieTable;
     this.originalSchema = new Schema.Parser().parse(config.getSchema());
     this.writerSchema = createHoodieWriteSchema(originalSchema);
     this.timer = new HoodieTimer().startTimer();
@@ -101,11 +95,12 @@ public abstract class HoodieIOHandle<T extends 
HoodieRecordPayload> {
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
 
-    return new Path(path.toString(), FSUtils.makeDataFileName(commitTime, 
writeToken, fileId));
+    return new Path(path.toString(), FSUtils.makeDataFileName(instantTime, 
writeToken, fileId));
   }
 
   /**
    * Creates an empty marker file corresponding to storage writer path
+   *
    * @param partitionPath Partition path
    */
   protected void createMarkerFile(String partitionPath) {
@@ -120,18 +115,16 @@ public abstract class HoodieIOHandle<T extends 
HoodieRecordPayload> {
 
   /**
    * THe marker path will be  
<base-path>/.hoodie/.temp/<instant_ts>/2019/04/25/filename
-   * @param partitionPath
-   * @return
    */
   private Path makeNewMarkerPath(String partitionPath) {
-    Path markerRootPath = new 
Path(hoodieTable.getMetaClient().getMarkerFolderPath(commitTime));
+    Path markerRootPath = new 
Path(hoodieTable.getMetaClient().getMarkerFolderPath(instantTime));
     Path path = FSUtils.getPartitionPath(markerRootPath, partitionPath);
     try {
       fs.mkdirs(path); // create a new partition as needed.
     } catch (IOException e) {
       throw new HoodieIOException("Failed to make dir " + path, e);
     }
-    return new Path(path.toString(), FSUtils.makeMarkerFile(commitTime, 
writeToken, fileId));
+    return new Path(path.toString(), FSUtils.makeMarkerFile(instantTime, 
writeToken, fileId));
   }
 
   public Schema getWriterSchema() {
@@ -141,8 +134,8 @@ public abstract class HoodieIOHandle<T extends 
HoodieRecordPayload> {
   /**
    * Determines whether we can accept the incoming records, into the current 
file, depending on
    * <p>
-   * - Whether it belongs to the same partitionPath as existing records - 
Whether the current file
-   * written bytes lt max file size
+   * - Whether it belongs to the same partitionPath as existing records - 
Whether the current file written bytes lt max
+   * file size
    */
   public boolean canWrite(HoodieRecord record) {
     return false;
@@ -171,8 +164,6 @@ public abstract class HoodieIOHandle<T extends 
HoodieRecordPayload> {
 
   /**
    * Rewrite the GenericRecord with the Schema containing the Hoodie Metadata 
fields
-   * @param record
-   * @return
    */
   protected GenericRecord rewriteRecord(GenericRecord record) {
     return HoodieAvroUtils.rewriteRecord(record, writerSchema);
@@ -181,4 +172,12 @@ public abstract class HoodieIOHandle<T extends 
HoodieRecordPayload> {
   public abstract WriteStatus close();
 
   public abstract WriteStatus getWriteStatus();
+
+  @Override
+  protected FileSystem getFileSystem() {
+    return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), 
config.isConsistencyCheckEnabled()
+        ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
+        config.getMaxConsistencyChecks(), 
config.getInitialConsistencyCheckIntervalMs(),
+        config.getMaxConsistencyCheckIntervalMs()) : new 
NoOpConsistencyGuard());
+  }
 }
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java 
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
index ac81a43..6b9f4f6 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieCopyOnWriteTable.java
@@ -500,7 +500,7 @@ public class HoodieCopyOnWriteTable<T extends 
HoodieRecordPayload> extends Hoodi
   /**
    * Helper class for a small file's location and its actual size on disk
    */
-  class SmallFile implements Serializable {
+  static class SmallFile implements Serializable {
 
     HoodieRecordLocation location;
     long sizeBytes;
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java 
b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java
index 2739981..a613f3b 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/WorkloadStat.java
@@ -43,7 +43,7 @@ public class WorkloadStat implements Serializable {
   }
 
   long addUpdates(HoodieRecordLocation location, long numUpdates) {
-    updateLocationToCount.put(location.getFileId(), 
Pair.of(location.getCommitTime(), numUpdates));
+    updateLocationToCount.put(location.getFileId(), 
Pair.of(location.getInstantTime(), numUpdates));
     return this.numUpdates += numUpdates;
   }
 
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
index fb1d1ee..ae2e57c 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
@@ -217,7 +217,7 @@ public class TestHoodieClientBase implements Serializable {
     for (HoodieRecord rec : taggedRecords) {
       assertTrue("Record " + rec + " found with no location.", 
rec.isCurrentLocationKnown());
       assertEquals("All records should have commit time " + commitTime + ", 
since updates were made",
-          rec.getCurrentLocation().getCommitTime(), commitTime);
+          rec.getCurrentLocation().getInstantTime(), commitTime);
     }
   }
 
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java 
b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
index 603ef8f..24bb4fa 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/index/TestHbaseIndex.java
@@ -166,7 +166,7 @@ public class TestHbaseIndex {
     assertTrue(javaRDD.filter(record -> 
record.isCurrentLocationKnown()).collect().size() == 200);
     assertTrue(javaRDD.map(record -> 
record.getKey().getRecordKey()).distinct().count() == 200);
     assertTrue(javaRDD.filter(
-        record -> (record.getCurrentLocation() != null && 
record.getCurrentLocation().getCommitTime()
+        record -> (record.getCurrentLocation() != null && 
record.getCurrentLocation().getInstantTime()
             .equals(newCommitTime))).distinct().count() == 200);
 
   }
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
 
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
index e62760a..f2e0784 100644
--- 
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
+++ 
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieBloomIndex.java
@@ -36,8 +36,10 @@ import com.uber.hoodie.common.model.HoodieTestUtils;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.HoodieAvroUtils;
+import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieIndexConfig;
 import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.io.HoodieKeyLookupHandle;
 import com.uber.hoodie.table.HoodieTable;
 import java.io.File;
 import java.io.IOException;
@@ -200,10 +202,10 @@ public class TestHoodieBloomIndex {
       // no longer sorted, but should have same files.
 
       List<Tuple2<String, BloomIndexFileInfo>> expected = Arrays.asList(
-          new Tuple2<>("2016/04/01", new 
BloomIndexFileInfo("2_0_20160401010101.parquet")),
-          new Tuple2<>("2015/03/12", new 
BloomIndexFileInfo("1_0_20150312101010.parquet")),
-          new Tuple2<>("2015/03/12", new 
BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000")),
-          new Tuple2<>("2015/03/12", new 
BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003")));
+          new Tuple2<>("2016/04/01", new BloomIndexFileInfo("2")),
+          new Tuple2<>("2015/03/12", new BloomIndexFileInfo("1")),
+          new Tuple2<>("2015/03/12", new BloomIndexFileInfo("3", "000", 
"000")),
+          new Tuple2<>("2015/03/12", new BloomIndexFileInfo("4", "001", 
"003")));
       assertEquals(expected, filesList);
     }
   }
@@ -279,7 +281,7 @@ public class TestHoodieBloomIndex {
     List<String> uuids = Arrays.asList(record1.getRecordKey(), 
record2.getRecordKey(), record3.getRecordKey(),
         record4.getRecordKey());
 
-    List<String> results = 
HoodieBloomIndexCheckFunction.checkCandidatesAgainstFile(jsc.hadoopConfiguration(),
 uuids,
+    List<String> results = 
HoodieKeyLookupHandle.checkCandidatesAgainstFile(jsc.hadoopConfiguration(), 
uuids,
         new Path(basePath + "/2016/01/31/" + filename));
     assertEquals(results.size(), 2);
     assertTrue(results.get(0).equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0") 
|| results.get(1).equals(
@@ -417,10 +419,11 @@ public class TestHoodieBloomIndex {
 
     // Let's tag
     HoodieBloomIndex bloomIndex = new HoodieBloomIndex(config);
-    JavaPairRDD<HoodieKey, Optional<String>> taggedRecordRDD = 
bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
+    JavaPairRDD<HoodieKey, Optional<Pair<String, String>>> taggedRecordRDD = 
bloomIndex
+        .fetchRecordLocation(keysRDD, jsc, table);
 
     // Should not find any files
-    for (Tuple2<HoodieKey, Optional<String>> record : 
taggedRecordRDD.collect()) {
+    for (Tuple2<HoodieKey, Optional<Pair<String, String>>> record : 
taggedRecordRDD.collect()) {
       assertTrue(!record._2.isPresent());
     }
 
@@ -438,18 +441,16 @@ public class TestHoodieBloomIndex {
     taggedRecordRDD = bloomIndex.fetchRecordLocation(keysRDD, jsc, table);
 
     // Check results
-    for (Tuple2<HoodieKey, Optional<String>> record : 
taggedRecordRDD.collect()) {
+    for (Tuple2<HoodieKey, Optional<Pair<String, String>>> record : 
taggedRecordRDD.collect()) {
       if 
(record._1.getRecordKey().equals("1eb5b87a-1feh-4edd-87b4-6ec96dc405a0")) {
         assertTrue(record._2.isPresent());
-        Path path1 = new Path(record._2.get());
-        assertEquals(FSUtils.getFileId(filename1), 
FSUtils.getFileId(path1.getName()));
+        assertEquals(FSUtils.getFileId(filename1), record._2.get().getRight());
       } else if 
(record._1.getRecordKey().equals("2eb5b87b-1feu-4edd-87b4-6ec96dc405a0")) {
         assertTrue(record._2.isPresent());
-        Path path2 = new Path(record._2.get());
         if (record._1.getPartitionPath().equals("2015/01/31")) {
-          assertEquals(FSUtils.getFileId(filename3), 
FSUtils.getFileId(path2.getName()));
+          assertEquals(FSUtils.getFileId(filename3), 
record._2.get().getRight());
         } else {
-          assertEquals(FSUtils.getFileId(filename2), 
FSUtils.getFileId(path2.getName()));
+          assertEquals(FSUtils.getFileId(filename2), 
record._2.get().getRight());
         }
       } else if 
(record._1.getRecordKey().equals("3eb5b87c-1fej-4edd-87b4-6ec96dc405a0")) {
         assertTrue(!record._2.isPresent());
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
 
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
index 0a812e3..4048a5e 100644
--- 
a/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
+++ 
b/hoodie-client/src/test/java/com/uber/hoodie/index/bloom/TestHoodieGlobalBloomIndex.java
@@ -18,7 +18,11 @@
 
 package com.uber.hoodie.index.bloom;
 
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 
 import com.google.common.collect.Lists;
 import com.uber.hoodie.common.HoodieClientTestUtils;
@@ -32,16 +36,17 @@ import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.HoodieAvroUtils;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.table.HoodieTable;
-
 import java.io.File;
 import java.io.IOException;
-import java.util.*;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
 import java.util.stream.Collectors;
-
 import org.apache.avro.Schema;
 import org.apache.commons.io.IOUtils;
 import org.apache.hadoop.fs.FileSystem;
-
 import org.apache.spark.api.java.JavaPairRDD;
 import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
@@ -154,20 +159,20 @@ public class TestHoodieGlobalBloomIndex {
 
     Map<String, BloomIndexFileInfo> filesMap = toFileMap(filesList);
     // key ranges checks
-    
assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMaxRecordKey());
-    
assertNull(filesMap.get("2016/04/01/2_0_20160401010101.parquet").getMinRecordKey());
-    
assertFalse(filesMap.get("2015/03/12/1_0_20150312101010.parquet").hasKeyRanges());
-    
assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMaxRecordKey());
-    
assertNotNull(filesMap.get("2015/03/12/3_0_20150312101010.parquet").getMinRecordKey());
-    
assertTrue(filesMap.get("2015/03/12/3_0_20150312101010.parquet").hasKeyRanges());
+    assertNull(filesMap.get("2016/04/01/2").getMaxRecordKey());
+    assertNull(filesMap.get("2016/04/01/2").getMinRecordKey());
+    assertFalse(filesMap.get("2015/03/12/1").hasKeyRanges());
+    assertNotNull(filesMap.get("2015/03/12/3").getMaxRecordKey());
+    assertNotNull(filesMap.get("2015/03/12/3").getMinRecordKey());
+    assertTrue(filesMap.get("2015/03/12/3").hasKeyRanges());
 
     Map<String, BloomIndexFileInfo> expected = new HashMap<>();
-    expected.put("2016/04/01/2_0_20160401010101.parquet", new 
BloomIndexFileInfo("2_0_20160401010101.parquet"));
-    expected.put("2015/03/12/1_0_20150312101010.parquet", new 
BloomIndexFileInfo("1_0_20150312101010.parquet"));
-    expected.put("2015/03/12/3_0_20150312101010.parquet",
-        new BloomIndexFileInfo("3_0_20150312101010.parquet", "000", "000"));
-    expected.put("2015/03/12/4_0_20150312101010.parquet",
-        new BloomIndexFileInfo("4_0_20150312101010.parquet", "001", "003"));
+    expected.put("2016/04/01/2", new BloomIndexFileInfo("2"));
+    expected.put("2015/03/12/1", new BloomIndexFileInfo("1"));
+    expected.put("2015/03/12/3",
+        new BloomIndexFileInfo("3", "000", "000"));
+    expected.put("2015/03/12/4",
+        new BloomIndexFileInfo("4", "001", "003"));
 
     assertEquals(expected, filesMap);
   }
@@ -300,7 +305,7 @@ public class TestHoodieGlobalBloomIndex {
   private Map<String, BloomIndexFileInfo> toFileMap(List<Tuple2<String, 
BloomIndexFileInfo>> filesList) {
     Map<String, BloomIndexFileInfo> filesMap = new HashMap<>();
     for (Tuple2<String, BloomIndexFileInfo> t : filesList) {
-      filesMap.put(t._1() + "/" + t._2().getFileName(), t._2());
+      filesMap.put(t._1() + "/" + t._2().getFileId(), t._2());
     }
     return filesMap;
   }
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java
index ddf9777..75f41ef 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/model/HoodieRecordLocation.java
@@ -27,11 +27,11 @@ import java.io.Serializable;
  */
 public class HoodieRecordLocation implements Serializable {
 
-  private final String commitTime;
+  private final String instantTime;
   private final String fileId;
 
-  public HoodieRecordLocation(String commitTime, String fileId) {
-    this.commitTime = commitTime;
+  public HoodieRecordLocation(String instantTime, String fileId) {
+    this.instantTime = instantTime;
     this.fileId = fileId;
   }
 
@@ -44,26 +44,26 @@ public class HoodieRecordLocation implements Serializable {
       return false;
     }
     HoodieRecordLocation otherLoc = (HoodieRecordLocation) o;
-    return Objects.equal(commitTime, otherLoc.commitTime)
+    return Objects.equal(instantTime, otherLoc.instantTime)
         && Objects.equal(fileId, otherLoc.fileId);
   }
 
   @Override
   public int hashCode() {
-    return Objects.hashCode(commitTime, fileId);
+    return Objects.hashCode(instantTime, fileId);
   }
 
   @Override
   public String toString() {
     final StringBuilder sb = new StringBuilder("HoodieRecordLocation {");
-    sb.append("commitTime=").append(commitTime).append(", ");
+    sb.append("instantTime=").append(instantTime).append(", ");
     sb.append("fileId=").append(fileId);
     sb.append('}');
     return sb.toString();
   }
 
-  public String getCommitTime() {
-    return commitTime;
+  public String getInstantTime() {
+    return instantTime;
   }
 
   public String getFileId() {
diff --git 
a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java 
b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
index e42dbcd..fd821e7 100644
--- 
a/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
+++ 
b/hoodie-common/src/test/java/com/uber/hoodie/common/model/HoodieTestUtils.java
@@ -346,10 +346,10 @@ public class HoodieTestUtils {
       try {
         logWriter = HoodieLogFormat.newWriterBuilder().onParentPath(new 
Path(basePath, partitionPath))
             
.withFileExtension(HoodieLogFile.DELTA_EXTENSION).withFileId(location.getFileId())
-            .overBaseCommit(location.getCommitTime()).withFs(fs).build();
+            .overBaseCommit(location.getInstantTime()).withFs(fs).build();
 
         Map<HoodieLogBlock.HeaderMetadataType, String> header = 
Maps.newHashMap();
-        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
location.getCommitTime());
+        header.put(HoodieLogBlock.HeaderMetadataType.INSTANT_TIME, 
location.getInstantTime());
         header.put(HoodieLogBlock.HeaderMetadataType.SCHEMA, 
schema.toString());
         logWriter.appendBlock(new 
HoodieAvroDataBlock(s.getValue().stream().map(r -> {
           try {
diff --git 
a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
 
b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
index 572ea4d..1cfb254 100644
--- 
a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
+++ 
b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestExternalSpillableMap.java
@@ -151,7 +151,7 @@ public class TestExternalSpillableMap {
     assert onDiskHoodieRecord.getKey().equals(records.get(dkey).getKey());
     // compare the member variables of HoodieRecord not set by the constructor
     assert 
records.get(ikey).getCurrentLocation().getFileId().equals(SpillableMapTestUtils.DUMMY_FILE_ID);
-    assert 
records.get(ikey).getCurrentLocation().getCommitTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME);
+    assert 
records.get(ikey).getCurrentLocation().getInstantTime().equals(SpillableMapTestUtils.DUMMY_COMMIT_TIME);
 
     // test contains
     assertTrue(records.containsKey(ikey));

Reply via email to