This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 5823c1e  HUDI-138 - Meta Files handling also need to support 
consistency guard
5823c1e is described below

commit 5823c1ebd7e12a48805eed05532edfd25b69c6fe
Author: Balaji Varadarajan <[email protected]>
AuthorDate: Thu Jun 20 18:05:01 2019 -0700

    HUDI-138 - Meta Files handling also need to support consistency guard
---
 .../main/java/com/uber/hoodie/cli/HoodieCLI.java   |  26 +++-
 .../uber/hoodie/cli/commands/CleansCommand.java    |   6 +-
 .../uber/hoodie/cli/commands/CommitsCommand.java   |   5 +-
 .../uber/hoodie/cli/commands/DatasetsCommand.java  |  27 +++-
 .../hoodie/cli/commands/SavepointsCommand.java     |   6 +-
 .../java/com/uber/hoodie/AbstractHoodieClient.java |   6 +
 .../com/uber/hoodie/CompactionAdminClient.java     |   6 +-
 .../java/com/uber/hoodie/HoodieWriteClient.java    |  59 ++++----
 .../com/uber/hoodie/client/utils/ClientUtils.java  |  39 ++++++
 .../com/uber/hoodie/config/HoodieWriteConfig.java  |  49 +++----
 .../java/com/uber/hoodie/io/HoodieReadHandle.java  |   1 -
 .../java/com/uber/hoodie/io/HoodieWriteHandle.java |  15 +-
 .../java/com/uber/hoodie/table/HoodieTable.java    |  11 +-
 .../src/test/java/com/uber/hoodie/TestCleaner.java |   8 +-
 .../java/com/uber/hoodie/TestConsistencyGuard.java |  21 ++-
 .../java/com/uber/hoodie/TestHoodieClientBase.java |  10 +-
 .../TestHoodieClientOnCopyOnWriteStorage.java      |  10 +-
 .../common/io/storage/HoodieWrapperFileSystem.java | 151 +++++++++++++++++----
 .../hoodie/common/table/HoodieTableMetaClient.java |  38 +++++-
 .../hoodie/common/util/ConsistencyGuardConfig.java | 121 +++++++++++++++++
 .../common/util/FailSafeConsistencyGuard.java      |  32 ++---
 21 files changed, 481 insertions(+), 166 deletions(-)

diff --git a/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java 
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java
index 67a999a..ba9c023 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/HoodieCLI.java
@@ -19,6 +19,7 @@
 package com.uber.hoodie.cli;
 
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
 import com.uber.hoodie.common.util.FSUtils;
 import java.io.IOException;
 import org.apache.hadoop.conf.Configuration;
@@ -27,8 +28,10 @@ import org.apache.hadoop.fs.FileSystem;
 public class HoodieCLI {
 
   public static Configuration conf;
+  public static ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
   public static FileSystem fs;
   public static CLIState state = CLIState.INIT;
+  public static String basePath;
   public static HoodieTableMetaClient tableMetadata;
   public static HoodieTableMetaClient syncTableMetadata;
 
@@ -37,6 +40,18 @@ public class HoodieCLI {
     INIT, DATASET, SYNC
   }
 
+  public static void setConsistencyGuardConfig(ConsistencyGuardConfig config) {
+    consistencyGuardConfig = config;
+  }
+
+  private static void setTableMetaClient(HoodieTableMetaClient tableMetadata) {
+    HoodieCLI.tableMetadata = tableMetadata;
+  }
+
+  private static void setBasePath(String basePath) {
+    HoodieCLI.basePath = basePath;
+  }
+
   public static boolean initConf() {
     if (HoodieCLI.conf == null) {
       HoodieCLI.conf = FSUtils.prepareHadoopConf(new Configuration());
@@ -47,11 +62,16 @@ public class HoodieCLI {
 
   public static void initFS(boolean force) throws IOException {
     if (fs == null || force) {
-      fs = FileSystem.get(conf);
+      fs = (tableMetadata != null) ? tableMetadata.getFs() : 
FileSystem.get(conf);
     }
   }
 
-  public static void setTableMetadata(HoodieTableMetaClient tableMetadata) {
-    HoodieCLI.tableMetadata = tableMetadata;
+  public static void refreshTableMetadata() {
+    setTableMetaClient(new HoodieTableMetaClient(HoodieCLI.conf, basePath, 
false, HoodieCLI.consistencyGuardConfig));
+  }
+
+  public static void connectTo(String basePath) {
+    setBasePath(basePath);
+    refreshTableMetadata();
   }
 }
diff --git 
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java 
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java
index 79b271e..c0f7129 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CleansCommand.java
@@ -23,7 +23,6 @@ import 
com.uber.hoodie.avro.model.HoodieCleanPartitionMetadata;
 import com.uber.hoodie.cli.HoodieCLI;
 import com.uber.hoodie.cli.HoodiePrintHelper;
 import com.uber.hoodie.cli.TableHeader;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -91,9 +90,8 @@ public class CleansCommand implements CommandMarker {
 
   @CliCommand(value = "cleans refresh", help = "Refresh the commits")
   public String refreshCleans() throws IOException {
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(HoodieCLI.conf, 
HoodieCLI.tableMetadata.getBasePath());
-    HoodieCLI.setTableMetadata(metadata);
-    return "Metadata for table " + metadata.getTableConfig().getTableName() + 
" refreshed.";
+    HoodieCLI.refreshTableMetadata();
+    return "Metadata for table " + 
HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed.";
   }
 
   @CliCommand(value = "clean showpartitions", help = "Show partition level 
details of a clean")
diff --git 
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java 
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java
index 641cb80..2360503 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/CommitsCommand.java
@@ -115,9 +115,8 @@ public class CommitsCommand implements CommandMarker {
 
   @CliCommand(value = "commits refresh", help = "Refresh the commits")
   public String refreshCommits() throws IOException {
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(HoodieCLI.conf, 
HoodieCLI.tableMetadata.getBasePath());
-    HoodieCLI.setTableMetadata(metadata);
-    return "Metadata for table " + metadata.getTableConfig().getTableName() + 
" refreshed.";
+    HoodieCLI.refreshTableMetadata();
+    return "Metadata for table " + 
HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed.";
   }
 
   @CliCommand(value = "commit rollback", help = "Rollback a commit")
diff --git 
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java 
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java
index f001f5c..257091e 100644
--- a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java
+++ b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/DatasetsCommand.java
@@ -23,6 +23,7 @@ import com.uber.hoodie.cli.HoodiePrintHelper;
 import com.uber.hoodie.cli.TableHeader;
 import com.uber.hoodie.common.model.HoodieTableType;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
 import com.uber.hoodie.exception.DatasetNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -39,11 +40,25 @@ public class DatasetsCommand implements CommandMarker {
 
   @CliCommand(value = "connect", help = "Connect to a hoodie dataset")
   public String connect(
-      @CliOption(key = {"path"}, mandatory = true, help = "Base Path of the 
dataset") final String path)
-      throws IOException {
-    boolean initialized = HoodieCLI.initConf();
-    HoodieCLI.initFS(initialized);
-    HoodieCLI.setTableMetadata(new HoodieTableMetaClient(HoodieCLI.conf, 
path));
+      @CliOption(key = {"path"}, mandatory = true, help = "Base Path of the 
dataset") final String path,
+      @CliOption(key = {"eventuallyConsistent"}, mandatory = false, 
unspecifiedDefaultValue = "false",
+          help = "Enable eventual consistency") final boolean 
eventuallyConsistent,
+      @CliOption(key = {"initialCheckIntervalMs"}, mandatory = false, 
unspecifiedDefaultValue = "2000",
+          help = "Initial wait time for eventual consistency") final Integer 
initialConsistencyIntervalMs,
+      @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, 
unspecifiedDefaultValue = "300000",
+          help = "Max wait time for eventual consistency") final Integer 
maxConsistencyIntervalMs,
+      @CliOption(key = {"maxCheckIntervalMs"}, mandatory = false, 
unspecifiedDefaultValue = "7",
+          help = "Max checks for eventual consistency") final Integer 
maxConsistencyChecks) throws IOException {
+    HoodieCLI.setConsistencyGuardConfig(
+        ConsistencyGuardConfig.newBuilder()
+            .withConsistencyCheckEnabled(eventuallyConsistent)
+            
.withInitialConsistencyCheckIntervalMs(initialConsistencyIntervalMs)
+            .withMaxConsistencyCheckIntervalMs(maxConsistencyIntervalMs)
+            .withMaxConsistencyChecks(maxConsistencyChecks)
+            .build());
+    HoodieCLI.initConf();
+    HoodieCLI.connectTo(path);
+    HoodieCLI.initFS(true);
     HoodieCLI.state = HoodieCLI.CLIState.DATASET;
     return "Metadata for table " + 
HoodieCLI.tableMetadata.getTableConfig().getTableName() + " loaded";
   }
@@ -85,7 +100,7 @@ public class DatasetsCommand implements CommandMarker {
     HoodieTableMetaClient.initTableType(HoodieCLI.conf, path, tableType, name, 
payloadClass);
 
     // Now connect to ensure loading works
-    return connect(path);
+    return connect(path, false, 0, 0, 0);
   }
 
   @CliAvailabilityIndicator({"desc"})
diff --git 
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java 
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java
index 82358aa..8709469 100644
--- 
a/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java
+++ 
b/hoodie-cli/src/main/java/com/uber/hoodie/cli/commands/SavepointsCommand.java
@@ -23,7 +23,6 @@ import com.uber.hoodie.cli.HoodieCLI;
 import com.uber.hoodie.cli.HoodiePrintHelper;
 import com.uber.hoodie.cli.utils.InputStreamConsumer;
 import com.uber.hoodie.cli.utils.SparkUtil;
-import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
@@ -133,9 +132,8 @@ public class SavepointsCommand implements CommandMarker {
 
   @CliCommand(value = "savepoints refresh", help = "Refresh the savepoints")
   public String refreshMetaClient() throws IOException {
-    HoodieTableMetaClient metadata = new HoodieTableMetaClient(HoodieCLI.conf, 
HoodieCLI.tableMetadata.getBasePath());
-    HoodieCLI.setTableMetadata(metadata);
-    return "Metadata for table " + metadata.getTableConfig().getTableName() + 
" refreshed.";
+    HoodieCLI.refreshTableMetadata();
+    return "Metadata for table " + 
HoodieCLI.tableMetadata.getTableConfig().getTableName() + " refreshed.";
   }
 
   private static HoodieWriteClient createHoodieClient(JavaSparkContext jsc, 
String basePath) throws Exception {
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
index 5aa051c..c03bd21 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
@@ -19,6 +19,8 @@
 package com.uber.hoodie;
 
 import com.uber.hoodie.client.embedded.EmbeddedTimelineService;
+import com.uber.hoodie.client.utils.ClientUtils;
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import java.io.IOException;
@@ -117,4 +119,8 @@ public abstract class AbstractHoodieClient implements 
Serializable {
   public Optional<EmbeddedTimelineService> getTimelineServer() {
     return timelineServer;
   }
+
+  protected HoodieTableMetaClient createMetaClient(boolean 
loadActiveTimelineOnLoad) {
+    return ClientUtils.createMetaClient(jsc, config, loadActiveTimelineOnLoad);
+  }
 }
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
index d2b007f..03bec59 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
@@ -114,7 +114,7 @@ public class CompactionAdminClient extends 
AbstractHoodieClient {
    */
   public List<RenameOpResult> unscheduleCompactionPlan(
       String compactionInstant, boolean skipValidation, int parallelism, 
boolean dryRun) throws Exception {
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTableMetaClient metaClient = createMetaClient(false);
     List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
         getRenamingActionsForUnschedulingCompactionPlan(metaClient, 
compactionInstant, parallelism,
             Optional.absent(), skipValidation);
@@ -156,7 +156,7 @@ public class CompactionAdminClient extends 
AbstractHoodieClient {
    */
   public List<RenameOpResult> unscheduleCompactionFileId(HoodieFileGroupId 
fgId,
       boolean skipValidation, boolean dryRun) throws Exception {
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTableMetaClient metaClient = createMetaClient(false);
     List<Pair<HoodieLogFile, HoodieLogFile>> renameActions =
         getRenamingActionsForUnschedulingCompactionForFileId(metaClient, fgId,
             Optional.absent(), skipValidation);
@@ -198,7 +198,7 @@ public class CompactionAdminClient extends 
AbstractHoodieClient {
    */
   public List<RenameOpResult> repairCompaction(String compactionInstant,
       int parallelism, boolean dryRun) throws Exception {
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath);
+    HoodieTableMetaClient metaClient = createMetaClient(false);
     List<ValidationOpResult> validationResults =
         validateCompactionPlan(metaClient, compactionInstant, parallelism);
     List<ValidationOpResult> failed = validationResults.stream()
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
index a9b548b..3c8b36c 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
@@ -76,7 +76,6 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
-import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -153,7 +152,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   public JavaRDD<HoodieRecord<T>> filterExists(JavaRDD<HoodieRecord<T>> 
hoodieRecords) {
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
 
     JavaRDD<HoodieRecord<T>> recordsWithLocation = 
index.tagLocation(hoodieRecords, jsc, table);
     return recordsWithLocation.filter(v1 -> !v1.isCurrentLocationKnown());
@@ -471,9 +470,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     JavaRDD<WriteStatus> statuses = index.updateLocation(writeStatusRDD, jsc, 
table);
     // Trigger the insert and collect statuses
     statuses = statuses.persist(config.getWriteStatusStorageLevel());
-    commitOnAutoCommit(commitTime, statuses,
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true)
-            .getCommitActionType());
+    commitOnAutoCommit(commitTime, statuses, 
table.getMetaClient().getCommitActionType());
     return statuses;
   }
 
@@ -496,7 +493,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public boolean commit(String commitTime, JavaRDD<WriteStatus> writeStatuses,
       Optional<Map<String, String>> extraMetadata) {
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath(), true);
+    HoodieTableMetaClient metaClient = createMetaClient(false);
     return commit(commitTime, writeStatuses, extraMetadata, 
metaClient.getCommitActionType());
   }
 
@@ -506,7 +503,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     logger.info("Commiting " + commitTime);
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
 
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     HoodieCommitMetadata metadata = new HoodieCommitMetadata();
@@ -536,7 +533,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
 
       // We cannot have unbounded commit files. Archive commits if we have to 
archive
       HoodieCommitArchiveLog archiveLog = new HoodieCommitArchiveLog(config,
-          new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true));
+          createMetaClient(true));
       archiveLog.archiveIfRequired(jsc);
       if (config.isAutoClean()) {
         // Call clean to cleanup if there is anything to cleanup after the 
commit,
@@ -580,7 +577,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public boolean savepoint(String user, String comment) {
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     if (table.getCompletedCommitsTimeline().empty()) {
       throw new HoodieSavepointException("Could not savepoint. Commit timeline 
is empty");
     }
@@ -610,7 +607,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public boolean savepoint(String commitTime, String user, String comment) {
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) 
{
       throw new UnsupportedOperationException("Savepointing is not supported 
or MergeOnRead table types");
     }
@@ -674,7 +671,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public void deleteSavepoint(String savepointTime) {
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     if (table.getMetaClient().getTableType() == HoodieTableType.MERGE_ON_READ) 
{
       throw new UnsupportedOperationException("Savepointing is not supported 
or MergeOnRead table types");
     }
@@ -705,7 +702,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   private void deleteRequestedCompaction(String compactionTime) {
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
     HoodieInstant compactionRequestedInstant =
         new HoodieInstant(State.REQUESTED, HoodieTimeline.COMPACTION_ACTION, 
compactionTime);
@@ -734,7 +731,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public boolean rollbackToSavepoint(String savepointTime) {
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     HoodieActiveTimeline activeTimeline = table.getActiveTimeline();
 
     // Rollback to savepoint is expected to be a manual operation and no 
concurrent write or compaction is expected
@@ -788,7 +785,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
 
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     // Get all the commits on the timeline after the provided commit time
     List<HoodieInstant> instantsToRollback = 
table.getActiveTimeline().getCommitsAndCompactionTimeline().getInstants()
         .filter(instant -> 
HoodieActiveTimeline.GREATER.test(instant.getTimestamp(), instantTime))
@@ -848,7 +845,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   private List<HoodieRollbackStat> doRollbackAndGetStats(final String 
commitToRollback) throws
       IOException {
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     HoodieTimeline inflightCommitTimeline = table.getInflightCommitTimeline();
     HoodieTimeline commitTimeline = table.getCompletedCommitsTimeline();
     // Check if any of the commits is a savepoint - do not allow rollback on 
those commits
@@ -899,7 +896,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   private void finishRollback(final Timer.Context context, 
List<HoodieRollbackStat> rollbackStats,
       List<String> commitsToRollback, final String startRollbackTime) throws 
IOException {
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     Optional<Long> durationInMs = Optional.empty();
     Long numFilesDeleted = rollbackStats.stream().mapToLong(stat -> 
stat.getSuccessDeleteFiles().size()).sum();
     if (context != null) {
@@ -925,7 +922,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   private void finishRestore(final Timer.Context context, Map<String, 
List<HoodieRollbackStat>> commitToStats,
       List<String> commitsToRollback, final String startRestoreTime, final 
String restoreToInstant) throws IOException {
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     Optional<Long> durationInMs = Optional.empty();
     Long numFilesDeleted = 0L;
     for (Map.Entry<String, List<HoodieRollbackStat>> commitToStat : 
commitToStats.entrySet()) {
@@ -1001,7 +998,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
 
       // Create a Hoodie table which encapsulated the commits and files visible
       HoodieTable<T> table = HoodieTable.getHoodieTable(
-          new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+          createMetaClient(true), config, jsc);
 
       List<HoodieCleanStat> cleanStats = table.clean(jsc);
       if (cleanStats.isEmpty()) {
@@ -1053,7 +1050,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
       rollbackInflightCommits();
     }
     logger.info("Generate a new instant time " + instantTime);
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), config.getBasePath());
+    HoodieTableMetaClient metaClient = createMetaClient(true);
     // if there are pending compactions, their instantTime must not be greater 
than that of this instant time
     
metaClient.getActiveTimeline().filterPendingCompactionTimeline().lastInstant().ifPresent(latestPending
 -> {
       Preconditions.checkArgument(
@@ -1086,8 +1083,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public boolean scheduleCompactionAtInstant(String instantTime, 
Optional<Map<String, String>> extraMetadata)
       throws IOException {
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(),
-        config.getBasePath(), true);
+    HoodieTableMetaClient metaClient = createMetaClient(true);
     // if there are inflight writes, their instantTime must not be less than 
that of compaction instant time
     
metaClient.getCommitsTimeline().filterInflightsExcludingCompaction().firstInstant().ifPresent(earliestInflight
 -> {
       Preconditions.checkArgument(
@@ -1130,8 +1126,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   public void commitCompaction(String compactionInstantTime, 
JavaRDD<WriteStatus> writeStatuses,
       Optional<Map<String, String>> extraMetadata) throws IOException {
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(),
-        config.getBasePath(), true);
+    HoodieTableMetaClient metaClient = createMetaClient(true);
     HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     HoodieActiveTimeline timeline = metaClient.getActiveTimeline();
     HoodieCompactionPlan compactionPlan = AvroUtils.deserializeCompactionPlan(
@@ -1178,7 +1173,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   private void rollbackInflightCommits() {
     HoodieTable<T> table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        createMetaClient(true), config, jsc);
     HoodieTimeline inflightTimeline = 
table.getMetaClient().getCommitsTimeline().filterInflightsExcludingCompaction();
     List<String> commits = 
inflightTimeline.getInstants().map(HoodieInstant::getTimestamp)
         .collect(Collectors.toList());
@@ -1190,11 +1185,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
 
   private HoodieTable getTableAndInitCtx(JavaRDD<HoodieRecord<T>> records) {
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTable table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(
-            // Clone Configuration here. Otherwise we could see 
ConcurrentModificationException (race) in multi-threaded
-            // execution (HoodieDeltaStreamer) when Configuration gets 
serialized by Spark.
-            new Configuration(jsc.hadoopConfiguration()), 
config.getBasePath(), true), config, jsc);
+    HoodieTable table = HoodieTable.getHoodieTable(createMetaClient(true), 
config, jsc);
     if 
(table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION))
 {
       writeContext = metrics.getCommitCtx();
     } else {
@@ -1214,8 +1205,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
    */
   private JavaRDD<WriteStatus> compact(String compactionInstantTime, boolean 
autoCommit) throws IOException {
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(),
-        config.getBasePath(), true);
+    HoodieTableMetaClient metaClient = createMetaClient(true);
     HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     HoodieTimeline pendingCompactionTimeline = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline();
     HoodieInstant inflightInstant = 
HoodieTimeline.getCompactionInflightInstant(compactionInstantTime);
@@ -1223,7 +1213,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
       //inflight compaction - Needs to rollback first deleting new parquet 
files before we run compaction.
       rollbackInflightCompaction(inflightInstant, table);
       // refresh table
-      metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true);
+      metaClient = createMetaClient(true);
       table = HoodieTable.getHoodieTable(metaClient, config, jsc);
       pendingCompactionTimeline = 
metaClient.getActiveTimeline().filterPendingCompactionTimeline();
     }
@@ -1253,8 +1243,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     activeTimeline.transitionCompactionRequestedToInflight(compactionInstant);
     compactionTimer = metrics.getCompactionCtx();
     // Create a Hoodie table which encapsulated the commits and files visible
-    HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(),
-        config.getBasePath(), true);
+    HoodieTableMetaClient metaClient = createMetaClient(true);
     HoodieTable<T> table = HoodieTable.getHoodieTable(metaClient, config, jsc);
     JavaRDD<WriteStatus> statuses = table.compact(jsc, 
compactionInstant.getTimestamp(), compactionPlan);
     // Force compaction action
@@ -1383,7 +1372,7 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
     try {
       // Create a Hoodie table which encapsulated the commits and files visible
       HoodieTable table = HoodieTable.getHoodieTable(
-          new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+          createMetaClient(true), config, jsc);
       // 0. All of the rolling stat management is only done by the DELTA 
commit for MOR and COMMIT for COW other wise
       // there may be race conditions
       HoodieRollingStatMetadata rollingStatMetadata = new 
HoodieRollingStatMetadata(actionType);
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/client/utils/ClientUtils.java 
b/hoodie-client/src/main/java/com/uber/hoodie/client/utils/ClientUtils.java
new file mode 100644
index 0000000..e9859f9
--- /dev/null
+++ b/hoodie-client/src/main/java/com/uber/hoodie/client/utils/ClientUtils.java
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.client.utils;
+
+import com.uber.hoodie.common.table.HoodieTableMetaClient;
+import com.uber.hoodie.config.HoodieWriteConfig;
+import org.apache.spark.api.java.JavaSparkContext;
+
+public class ClientUtils {
+
+  /**
+   * Create Consistency Aware MetaClient
+   *
+   * @param jsc JavaSparkContext
+   * @param config HoodieWriteConfig
+   * @param loadActiveTimelineOnLoad early loading of timeline
+   */
+  public static HoodieTableMetaClient createMetaClient(JavaSparkContext jsc, 
HoodieWriteConfig config,
+      boolean loadActiveTimelineOnLoad) {
+    return new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), loadActiveTimelineOnLoad,
+        config.getConsistencyGuardConfig());
+  }
+}
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java 
b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
index 2d522ad..e78c358 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/config/HoodieWriteConfig.java
@@ -22,6 +22,7 @@ import com.google.common.base.Preconditions;
 import com.uber.hoodie.WriteStatus;
 import com.uber.hoodie.common.model.HoodieCleaningPolicy;
 import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
 import com.uber.hoodie.common.util.ReflectionUtils;
 import com.uber.hoodie.index.HoodieIndex;
 import com.uber.hoodie.io.compact.strategy.CompactionStrategy;
@@ -66,10 +67,10 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String DEFAULT_HOODIE_WRITE_STATUS_CLASS = 
WriteStatus.class.getName();
   private static final String FINALIZE_WRITE_PARALLELISM = 
"hoodie.finalize.write.parallelism";
   private static final String DEFAULT_FINALIZE_WRITE_PARALLELISM = 
DEFAULT_PARALLELISM;
-  private static final String CONSISTENCY_CHECK_ENABLED_PROP = 
"hoodie.consistency.check.enabled";
-  private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
+
   private static final String EMBEDDED_TIMELINE_SERVER_ENABLED = 
"hoodie.embed.timeline.server";
   private static final String DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED = 
"false";
+
   private static final String FAIL_ON_TIMELINE_ARCHIVING_ENABLED_PROP = 
"hoodie.fail.on.timeline.archiving";
   private static final String DEFAULT_FAIL_ON_TIMELINE_ARCHIVING_ENABLED = 
"true";
   // time between successive attempts to ensure written data's metadata is 
consistent on storage
@@ -85,6 +86,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
   private static final String MAX_CONSISTENCY_CHECKS_PROP = 
"hoodie.consistency.check.max_checks";
   private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
 
+  private ConsistencyGuardConfig consistencyGuardConfig;
+
   // Hoodie Write Client transparently rewrites File System View config when 
embedded mode is enabled
   // We keep track of original config and rewritten config
   private final FileSystemViewStorageConfig clientSpecifiedViewStorageConfig;
@@ -94,6 +97,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     super(props);
     Properties newProps = new Properties();
     newProps.putAll(props);
+    this.consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().fromProperties(newProps).build();
     this.clientSpecifiedViewStorageConfig = 
FileSystemViewStorageConfig.newBuilder().fromProperties(newProps).build();
     this.viewStorageConfig = clientSpecifiedViewStorageConfig;
   }
@@ -162,10 +166,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return Integer.parseInt(props.getProperty(FINALIZE_WRITE_PARALLELISM));
   }
 
-  public boolean isConsistencyCheckEnabled() {
-    return 
Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP));
-  }
-
   public boolean isEmbeddedTimelineServerEnabled() {
     return 
Boolean.parseBoolean(props.getProperty(EMBEDDED_TIMELINE_SERVER_ENABLED));
   }
@@ -495,6 +495,14 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
     return 
Double.valueOf(props.getProperty(HoodieMemoryConfig.WRITESTATUS_FAILURE_FRACTION_PROP));
   }
 
+  public ConsistencyGuardConfig getConsistencyGuardConfig() {
+    return consistencyGuardConfig;
+  }
+
+  public void setConsistencyGuardConfig(ConsistencyGuardConfig 
consistencyGuardConfig) {
+    this.consistencyGuardConfig = consistencyGuardConfig;
+  }
+
   public FileSystemViewStorageConfig getViewStorageConfig() {
     return viewStorageConfig;
   }
@@ -520,6 +528,7 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
     private boolean isMetricsConfigSet = false;
     private boolean isMemoryConfigSet = false;
     private boolean isViewConfigSet = false;
+    private boolean isConsistencyGuardSet = false;
 
     public Builder fromFile(File propertiesFile) throws IOException {
       FileReader reader = new FileReader(propertiesFile);
@@ -639,13 +648,14 @@ public class HoodieWriteConfig extends 
DefaultHoodieConfig {
       return this;
     }
 
-    public Builder withFinalizeWriteParallelism(int parallelism) {
-      props.setProperty(FINALIZE_WRITE_PARALLELISM, 
String.valueOf(parallelism));
+    public Builder withConsistencyGuardConfig(ConsistencyGuardConfig 
consistencyGuardConfig) {
+      props.putAll(consistencyGuardConfig.getProps());
+      isConsistencyGuardSet = true;
       return this;
     }
 
-    public Builder withConsistencyCheckEnabled(boolean enabled) {
-      props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP, 
String.valueOf(enabled));
+    public Builder withFinalizeWriteParallelism(int parallelism) {
+      props.setProperty(FINALIZE_WRITE_PARALLELISM, 
String.valueOf(parallelism));
       return this;
     }
 
@@ -654,21 +664,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig 
{
       return this;
     }
 
-    public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs) 
{
-      props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, 
String.valueOf(initialIntevalMs));
-      return this;
-    }
-
-    public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) {
-      props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, 
String.valueOf(maxIntervalMs));
-      return this;
-    }
-
-    public Builder withMaxConsistencyChecks(int maxConsistencyChecks) {
-      props.setProperty(MAX_CONSISTENCY_CHECKS_PROP, 
String.valueOf(maxConsistencyChecks));
-      return this;
-    }
-
     public HoodieWriteConfig build() {
       // Check for mandatory properties
       setDefaultOnCondition(props, !props.containsKey(INSERT_PARALLELISM), 
INSERT_PARALLELISM,
@@ -691,8 +686,6 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           HOODIE_WRITE_STATUS_CLASS_PROP, DEFAULT_HOODIE_WRITE_STATUS_CLASS);
       setDefaultOnCondition(props, 
!props.containsKey(FINALIZE_WRITE_PARALLELISM),
           FINALIZE_WRITE_PARALLELISM, DEFAULT_FINALIZE_WRITE_PARALLELISM);
-      setDefaultOnCondition(props, 
!props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP),
-          CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED);
       setDefaultOnCondition(props, 
!props.containsKey(EMBEDDED_TIMELINE_SERVER_ENABLED),
           EMBEDDED_TIMELINE_SERVER_ENABLED, 
DEFAULT_EMBEDDED_TIMELINE_SERVER_ENABLED);
       setDefaultOnCondition(props, 
!props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
@@ -717,6 +710,8 @@ public class HoodieWriteConfig extends DefaultHoodieConfig {
           HoodieMemoryConfig.newBuilder().fromProperties(props).build());
       setDefaultOnCondition(props, !isViewConfigSet,
           
FileSystemViewStorageConfig.newBuilder().fromProperties(props).build());
+      setDefaultOnCondition(props, !isConsistencyGuardSet,
+          ConsistencyGuardConfig.newBuilder().fromProperties(props).build());
 
       // Build WriteConfig at the end
       HoodieWriteConfig config = new HoodieWriteConfig(props);
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
index e0d3323..e4b1838 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieReadHandle.java
@@ -43,7 +43,6 @@ public abstract class HoodieReadHandle<T extends 
HoodieRecordPayload> extends Ho
     return hoodieTable.getMetaClient().getFs();
   }
 
-
   public Pair<String, String> getPartitionPathFilePair() {
     return partitionPathFilePair;
   }
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java 
b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
index 1adeecc..aa3eec2 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/io/HoodieWriteHandle.java
@@ -19,14 +19,11 @@
 package com.uber.hoodie.io;
 
 import com.uber.hoodie.WriteStatus;
-import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
 import com.uber.hoodie.common.util.FSUtils;
-import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
 import com.uber.hoodie.common.util.HoodieAvroUtils;
 import com.uber.hoodie.common.util.HoodieTimer;
-import com.uber.hoodie.common.util.NoOpConsistencyGuard;
 import com.uber.hoodie.common.util.ReflectionUtils;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.exception.HoodieException;
@@ -68,13 +65,6 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload> extends H
         config.getWriteStatusFailureFraction());
   }
 
-  private static FileSystem getFileSystem(HoodieTable hoodieTable, 
HoodieWriteConfig config) {
-    return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), 
config.isConsistencyCheckEnabled()
-        ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
-        config.getMaxConsistencyChecks(), 
config.getInitialConsistencyCheckIntervalMs(),
-        config.getMaxConsistencyCheckIntervalMs()) : new 
NoOpConsistencyGuard());
-  }
-
   /**
    * Generate a write token based on the currently running spark task and its 
place in the spark dag.
    */
@@ -175,9 +165,6 @@ public abstract class HoodieWriteHandle<T extends 
HoodieRecordPayload> extends H
 
   @Override
   protected FileSystem getFileSystem() {
-    return new HoodieWrapperFileSystem(hoodieTable.getMetaClient().getFs(), 
config.isConsistencyCheckEnabled()
-        ? new FailSafeConsistencyGuard(hoodieTable.getMetaClient().getFs(),
-        config.getMaxConsistencyChecks(), 
config.getInitialConsistencyCheckIntervalMs(),
-        config.getMaxConsistencyCheckIntervalMs()) : new 
NoOpConsistencyGuard());
+    return hoodieTable.getMetaClient().getFs();
   }
 }
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java 
b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
index 11f545a..7fd9a29 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/table/HoodieTable.java
@@ -21,6 +21,7 @@ package com.uber.hoodie.table;
 import com.uber.hoodie.WriteStatus;
 import com.uber.hoodie.avro.model.HoodieCompactionPlan;
 import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
+import com.uber.hoodie.client.utils.ClientUtils;
 import com.uber.hoodie.common.HoodieCleanStat;
 import com.uber.hoodie.common.HoodieRollbackStat;
 import com.uber.hoodie.common.SerializableConfiguration;
@@ -83,7 +84,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
     this.hadoopConfiguration = new 
SerializableConfiguration(jsc.hadoopConfiguration());
     this.viewManager = FileSystemViewManager.createViewManager(
         new SerializableConfiguration(jsc.hadoopConfiguration()), 
config.getViewStorageConfig());
-    this.metaClient = new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true);
+    this.metaClient = ClientUtils.createMetaClient(jsc, config, true);
     this.index = HoodieIndex.createIndex(config, jsc);
   }
 
@@ -291,7 +292,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
    */
   public void finalizeWrite(JavaSparkContext jsc, String instantTs, 
List<HoodieWriteStat> stats)
       throws HoodieIOException {
-    cleanFailedWrites(jsc, instantTs, stats, 
config.isConsistencyCheckEnabled());
+    cleanFailedWrites(jsc, instantTs, stats, 
config.getConsistencyGuardConfig().isConsistencyCheckEnabled());
   }
 
   /**
@@ -412,7 +413,7 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
 
   private boolean waitForCondition(String partitionPath, Stream<Pair<String, 
String>> partitionFilePaths,
       FileVisibility visibility) {
-    final FileSystem fileSystem = metaClient.getFs();
+    final FileSystem fileSystem = metaClient.getRawFs();
     List<String> fileList = 
partitionFilePaths.map(Pair::getValue).collect(Collectors.toList());
     try {
       getFailSafeConsistencyGuard(fileSystem).waitTill(partitionPath, 
fileList, visibility);
@@ -424,8 +425,6 @@ public abstract class HoodieTable<T extends 
HoodieRecordPayload> implements Seri
   }
 
   private ConsistencyGuard getFailSafeConsistencyGuard(FileSystem fileSystem) {
-    return new FailSafeConsistencyGuard(fileSystem, 
config.getMaxConsistencyChecks(),
-        config.getInitialConsistencyCheckIntervalMs(),
-        config.getMaxConsistencyCheckIntervalMs());
+    return new FailSafeConsistencyGuard(fileSystem, 
config.getConsistencyGuardConfig());
   }
 }
diff --git a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
index 7798805..ffb6daf 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestCleaner.java
@@ -48,6 +48,7 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant;
 import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
 import com.uber.hoodie.common.util.AvroUtils;
 import com.uber.hoodie.common.util.CompactionUtils;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieCompactionConfig;
@@ -195,7 +196,8 @@ public class TestCleaner extends TestHoodieClientBase {
         
HoodieCompactionConfig.newBuilder().withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS)
             .retainFileVersions(maxVersions).build())
         .withParallelism(1, 1).withBulkInsertParallelism(1)
-        .withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true)
+        .withFinalizeWriteParallelism(1)
+        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         .build();
     HoodieWriteClient client = getHoodieWriteClient(cfg);
 
@@ -357,7 +359,9 @@ public class TestCleaner extends TestHoodieClientBase {
         HoodieCompactionConfig.newBuilder()
             
.withCleanerPolicy(HoodieCleaningPolicy.KEEP_LATEST_FILE_VERSIONS).retainCommits(maxCommits).build())
         .withParallelism(1, 1).withBulkInsertParallelism(1)
-        
.withFinalizeWriteParallelism(1).withConsistencyCheckEnabled(true).build();
+        .withFinalizeWriteParallelism(1)
+        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
+        .build();
     HoodieWriteClient client = getHoodieWriteClient(cfg);
 
     final Function2<List<HoodieRecord>, String, Integer> 
recordInsertGenWrappedFunction =
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
index 64d97f8..7712c1c 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestConsistencyGuard.java
@@ -20,6 +20,7 @@ package com.uber.hoodie;
 
 import com.uber.hoodie.common.HoodieClientTestUtils;
 import com.uber.hoodie.common.util.ConsistencyGuard;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
 import java.io.IOException;
@@ -58,7 +59,7 @@ public class TestConsistencyGuard {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f2");
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f3");
 
-    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 1, 1000, 1000);
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig(1, 1000, 1000));
     passing.waitTillFileAppears(new Path(basePath + 
"/partition/path/f1_1-0-1_000.parquet"));
     passing.waitTillFileAppears(new Path(basePath + 
"/partition/path/f2_1-0-1_000.parquet"));
     passing.waitTillAllFilesAppear(basePath + "/partition/path",
@@ -77,7 +78,7 @@ public class TestConsistencyGuard {
   @Test(expected = TimeoutException.class)
   public void testCheckFailingAppear() throws Exception {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
-    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillAllFilesAppear(basePath + "/partition/path",
         Arrays.asList(basePath + "/partition/path/f1_1-0-2_000.parquet",
             basePath + "/partition/path/f2_1-0-2_000.parquet"));
@@ -87,14 +88,14 @@ public class TestConsistencyGuard {
   @Test(expected = TimeoutException.class)
   public void testCheckFailingAppears() throws Exception {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
-    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillFileAppears(new Path(basePath + 
"/partition/path/f1_1-0-2_000.parquet"));
   }
 
   @Test(expected = TimeoutException.class)
   public void testCheckFailingDisappear() throws Exception {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
-    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillAllFilesDisappear(basePath + "/partition/path",
         Arrays.asList(basePath + "/partition/path/f1_1-0-1_000.parquet",
             basePath + "/partition/path/f2_1-0-2_000.parquet"));
@@ -104,7 +105,17 @@ public class TestConsistencyGuard {
   public void testCheckFailingDisappears() throws Exception {
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
     HoodieClientTestUtils.fakeDataFile(basePath, "partition/path", "000", 
"f1");
-    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 3, 10, 10);
+    ConsistencyGuard passing = new FailSafeConsistencyGuard(fs, 
getConsistencyGuardConfig());
     passing.waitTillFileDisappears(new Path(basePath + 
"/partition/path/f1_1-0-1_000.parquet"));
   }
+
+  private ConsistencyGuardConfig getConsistencyGuardConfig() {
+    return getConsistencyGuardConfig(3, 10, 10);
+  }
+
+  private ConsistencyGuardConfig getConsistencyGuardConfig(int maxChecks, int 
initalSleep, int maxSleep) {
+    return 
ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true)
+        
.withInitialConsistencyCheckIntervalMs(initalSleep).withMaxConsistencyCheckIntervalMs(maxSleep)
+        .withMaxConsistencyChecks(maxChecks).build();
+  }
 }
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java 
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
index a668d3b..7f0f0d6 100644
--- a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
+++ b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientBase.java
@@ -26,7 +26,7 @@ import static org.junit.Assert.assertTrue;
 import com.uber.hoodie.common.HoodieCleanStat;
 import com.uber.hoodie.common.HoodieClientTestUtils;
 import com.uber.hoodie.common.HoodieTestDataGenerator;
-import com.uber.hoodie.common.TestRawTripPayload;
+import com.uber.hoodie.common.TestRawTripPayload.MetadataMergeWriteStatus;
 import com.uber.hoodie.common.model.HoodiePartitionMetadata;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieTableType;
@@ -37,12 +37,14 @@ import com.uber.hoodie.common.table.SyncableFileSystemView;
 import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
 import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig;
 import com.uber.hoodie.common.table.view.FileSystemViewStorageType;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.config.HoodieCompactionConfig;
 import com.uber.hoodie.config.HoodieIndexConfig;
 import com.uber.hoodie.config.HoodieStorageConfig;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import com.uber.hoodie.index.HoodieIndex;
+import com.uber.hoodie.index.HoodieIndex.IndexType;
 import com.uber.hoodie.table.HoodieTable;
 import java.io.File;
 import java.io.IOException;
@@ -191,12 +193,12 @@ public class TestHoodieClientBase implements Serializable 
{
     return 
HoodieWriteConfig.newBuilder().withPath(basePath).withSchema(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
         .withParallelism(2, 2)
         .withBulkInsertParallelism(2).withFinalizeWriteParallelism(2)
-        
.withWriteStatusClass(TestRawTripPayload.MetadataMergeWriteStatus.class)
-        .withConsistencyCheckEnabled(true)
+        .withWriteStatusClass(MetadataMergeWriteStatus.class)
+        
.withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder().withConsistencyCheckEnabled(true).build())
         
.withCompactionConfig(HoodieCompactionConfig.newBuilder().compactionSmallFileSize(1024
 * 1024).build())
         .withStorageConfig(HoodieStorageConfig.newBuilder().limitFileSize(1024 
* 1024).build())
         .forTable("test-trip-table")
-        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
+        
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(IndexType.BLOOM).build())
         .withEmbeddedTimelineServerEnabled(true).withFileSystemViewConfig(
           
FileSystemViewStorageConfig.newBuilder().withStorageType(FileSystemViewStorageType.EMBEDDED_KV_STORE)
               .build());
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
 
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
index aa53d9f..ad5fae8 100644
--- 
a/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
+++ 
b/hoodie-client/src/test/java/com/uber/hoodie/TestHoodieClientOnCopyOnWriteStorage.java
@@ -39,6 +39,7 @@ import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.TableFileSystemView;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.ParquetUtils;
 import com.uber.hoodie.common.util.collection.Pair;
@@ -686,8 +687,13 @@ public class TestHoodieClientOnCopyOnWriteStorage extends 
TestHoodieClientBase {
 
   private Pair<Path, JavaRDD<WriteStatus>> 
testConsistencyCheck(HoodieTableMetaClient metaClient, String commitTime)
       throws Exception {
-    HoodieWriteConfig cfg = 
getConfigBuilder().withAutoCommit(false).withMaxConsistencyCheckIntervalMs(1)
-        .withInitialConsistencyCheckIntervalMs(1).build();
+    HoodieWriteConfig cfg = getConfigBuilder().withAutoCommit(false)
+        .withConsistencyGuardConfig(ConsistencyGuardConfig.newBuilder()
+            .withConsistencyCheckEnabled(true)
+            .withMaxConsistencyCheckIntervalMs(1)
+            .withInitialConsistencyCheckIntervalMs(1)
+            .build())
+        .build();
     HoodieWriteClient client = getHoodieWriteClient(cfg);
 
     client.startCommitWithTime(commitTime);
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
index 14e76ac..72e2d57 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/io/storage/HoodieWrapperFileSystem.java
@@ -22,6 +22,7 @@ import com.uber.hoodie.common.storage.StorageSchemes;
 import com.uber.hoodie.common.util.ConsistencyGuard;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.NoOpConsistencyGuard;
+import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.exception.HoodieIOException;
 import java.io.IOException;
 import java.net.URI;
@@ -236,7 +237,28 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean rename(Path src, Path dst) throws IOException {
-    return fileSystem.rename(convertToDefaultPath(src), 
convertToDefaultPath(dst));
+    try {
+      consistencyGuard.waitTillFileAppears(convertToDefaultPath(src));
+    } catch (TimeoutException e) {
+      throw new HoodieException("Timed out waiting for " + src + " to appear", 
e);
+    }
+
+    boolean success = fileSystem.rename(convertToDefaultPath(src), 
convertToDefaultPath(dst));
+
+    if (success) {
+      try {
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+      } catch (TimeoutException e) {
+        throw new HoodieException("Timed out waiting for " + dst + " to 
appear", e);
+      }
+
+      try {
+        consistencyGuard.waitTillFileDisappears(convertToDefaultPath(src));
+      } catch (TimeoutException e) {
+        throw new HoodieException("Timed out waiting for " + src + " to 
disappear", e);
+      }
+    }
+    return success;
   }
 
   @Override
@@ -247,7 +269,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
       try {
         consistencyGuard.waitTillFileDisappears(f);
       } catch (TimeoutException e) {
-        return false;
+        throw new HoodieException("Timed out waiting for " + f + " to 
disappear", e);
       }
     }
     return success;
@@ -270,7 +292,15 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean mkdirs(Path f, FsPermission permission) throws IOException {
-    return fileSystem.mkdirs(convertToDefaultPath(f), permission);
+    boolean success = fileSystem.mkdirs(convertToDefaultPath(f), permission);
+    if (success) {
+      try {
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+      } catch (TimeoutException e) {
+        throw new HoodieException("Timed out waiting for directory " + f + " 
to appear", e);
+      }
+    }
+    return success;
   }
 
   @Override
@@ -353,31 +383,39 @@ public class HoodieWrapperFileSystem extends FileSystem {
   @Override
   public FSDataOutputStream createNonRecursive(Path f, boolean overwrite, int 
bufferSize,
       short replication, long blockSize, Progressable progress) throws 
IOException {
-    return fileSystem
-        .createNonRecursive(convertToDefaultPath(f), overwrite, bufferSize, 
replication, blockSize,
-            progress);
+    Path p = convertToDefaultPath(f);
+    return wrapOutputStream(p, fileSystem.createNonRecursive(p, overwrite, 
bufferSize, replication, blockSize,
+        progress));
   }
 
   @Override
   public FSDataOutputStream createNonRecursive(Path f, FsPermission 
permission, boolean overwrite,
       int bufferSize, short replication, long blockSize, Progressable 
progress) throws IOException {
-    return fileSystem
-        .createNonRecursive(convertToDefaultPath(f), permission, overwrite, 
bufferSize, replication,
-            blockSize, progress);
+    Path p = convertToDefaultPath(f);
+    return wrapOutputStream(p, fileSystem.createNonRecursive(p, permission, 
overwrite, bufferSize, replication,
+        blockSize, progress));
   }
 
   @Override
   public FSDataOutputStream createNonRecursive(Path f, FsPermission permission,
       EnumSet<CreateFlag> flags, int bufferSize, short replication, long 
blockSize,
       Progressable progress) throws IOException {
-    return fileSystem
-        .createNonRecursive(convertToDefaultPath(f), permission, flags, 
bufferSize, replication,
-            blockSize, progress);
+    Path p = convertToDefaultPath(f);
+    return wrapOutputStream(p, fileSystem.createNonRecursive(p, permission, 
flags, bufferSize, replication,
+        blockSize, progress));
   }
 
   @Override
   public boolean createNewFile(Path f) throws IOException {
-    return fileSystem.createNewFile(convertToDefaultPath(f));
+    boolean newFile = fileSystem.createNewFile(convertToDefaultPath(f));
+    if (newFile) {
+      try {
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+      } catch (TimeoutException e) {
+        throw new HoodieException("Timed out waiting for " + f + " to appear", 
e);
+      }
+    }
+    return newFile;
   }
 
   @Override
@@ -394,6 +432,11 @@ public class HoodieWrapperFileSystem extends FileSystem {
   public void concat(Path trg, Path[] psrcs) throws IOException {
     Path[] psrcsNew = convertDefaults(psrcs);
     fileSystem.concat(convertToDefaultPath(trg), psrcsNew);
+    try {
+      consistencyGuard.waitTillFileAppears(convertToDefaultPath(trg));
+    } catch (TimeoutException e) {
+      throw new HoodieException("Timed out waiting for " + trg + " to appear", 
e);
+    }
   }
 
   @Override
@@ -408,7 +451,7 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean delete(Path f) throws IOException {
-    return fileSystem.delete(convertToDefaultPath(f));
+    return delete(f, true);
   }
 
   @Override
@@ -493,62 +536,100 @@ public class HoodieWrapperFileSystem extends FileSystem {
 
   @Override
   public boolean mkdirs(Path f) throws IOException {
-    return fileSystem.mkdirs(convertToDefaultPath(f));
+    boolean success = fileSystem.mkdirs(convertToDefaultPath(f));
+    if (success) {
+      try {
+        consistencyGuard.waitTillFileAppears(convertToDefaultPath(f));
+      } catch (TimeoutException e) {
+        throw new HoodieException("Timed out waiting for directory " + f + " 
to appear", e);
+      }
+    }
+    return success;
   }
 
   @Override
   public void copyFromLocalFile(Path src, Path dst) throws IOException {
-    fileSystem.copyFromLocalFile(convertToDefaultPath(src), 
convertToDefaultPath(dst));
+    fileSystem.copyFromLocalFile(convertToLocalPath(src), 
convertToDefaultPath(dst));
+    try {
+      consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+    } catch (TimeoutException e) {
+      throw new HoodieException("Timed out waiting for destination " + dst + " 
to appear", e);
+    }
   }
 
   @Override
   public void moveFromLocalFile(Path[] srcs, Path dst) throws IOException {
-    fileSystem.moveFromLocalFile(convertDefaults(srcs), 
convertToDefaultPath(dst));
+    fileSystem.moveFromLocalFile(convertLocalPaths(srcs), 
convertToDefaultPath(dst));
+    try {
+      consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+    } catch (TimeoutException e) {
+      throw new HoodieException("Timed out waiting for destination " + dst + " 
to appear", e);
+    }
   }
 
   @Override
   public void moveFromLocalFile(Path src, Path dst) throws IOException {
-    fileSystem.moveFromLocalFile(convertToDefaultPath(src), 
convertToDefaultPath(dst));
+    fileSystem.moveFromLocalFile(convertToLocalPath(src), 
convertToDefaultPath(dst));
+    try {
+      consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+    } catch (TimeoutException e) {
+      throw new HoodieException("Timed out waiting for destination " + dst + " 
to appear", e);
+    }
   }
 
   @Override
   public void copyFromLocalFile(boolean delSrc, Path src, Path dst) throws 
IOException {
-    fileSystem.copyFromLocalFile(delSrc, convertToDefaultPath(src), 
convertToDefaultPath(dst));
+    fileSystem.copyFromLocalFile(delSrc, convertToLocalPath(src), 
convertToDefaultPath(dst));
+    try {
+      consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+    } catch (TimeoutException e) {
+      throw new HoodieException("Timed out waiting for destination " + dst + " 
to appear", e);
+    }
   }
 
   @Override
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path[] 
srcs, Path dst)
       throws IOException {
     fileSystem
-        .copyFromLocalFile(delSrc, overwrite, convertDefaults(srcs), 
convertToDefaultPath(dst));
+        .copyFromLocalFile(delSrc, overwrite, convertLocalPaths(srcs), 
convertToDefaultPath(dst));
+    try {
+      consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+    } catch (TimeoutException e) {
+      throw new HoodieException("Timed out waiting for destination " + dst + " 
to appear", e);
+    }
   }
 
   @Override
   public void copyFromLocalFile(boolean delSrc, boolean overwrite, Path src, 
Path dst)
       throws IOException {
     fileSystem
-        .copyFromLocalFile(delSrc, overwrite, convertToDefaultPath(src), 
convertToDefaultPath(dst));
+        .copyFromLocalFile(delSrc, overwrite, convertToLocalPath(src), 
convertToDefaultPath(dst));
+    try {
+      consistencyGuard.waitTillFileAppears(convertToDefaultPath(dst));
+    } catch (TimeoutException e) {
+      throw new HoodieException("Timed out waiting for destination " + dst + " 
to appear", e);
+    }
   }
 
   @Override
   public void copyToLocalFile(Path src, Path dst) throws IOException {
-    fileSystem.copyToLocalFile(convertToDefaultPath(src), 
convertToDefaultPath(dst));
+    fileSystem.copyToLocalFile(convertToDefaultPath(src), 
convertToLocalPath(dst));
   }
 
   @Override
   public void moveToLocalFile(Path src, Path dst) throws IOException {
-    fileSystem.moveToLocalFile(convertToDefaultPath(src), 
convertToDefaultPath(dst));
+    fileSystem.moveToLocalFile(convertToDefaultPath(src), 
convertToLocalPath(dst));
   }
 
   @Override
   public void copyToLocalFile(boolean delSrc, Path src, Path dst) throws 
IOException {
-    fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), 
convertToDefaultPath(dst));
+    fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), 
convertToLocalPath(dst));
   }
 
   @Override
   public void copyToLocalFile(boolean delSrc, Path src, Path dst, boolean 
useRawLocalFileSystem)
       throws IOException {
-    fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), 
convertToDefaultPath(dst),
+    fileSystem.copyToLocalFile(delSrc, convertToDefaultPath(src), 
convertToLocalPath(dst),
         useRawLocalFileSystem);
   }
 
@@ -787,6 +868,22 @@ public class HoodieWrapperFileSystem extends FileSystem {
     return convertPathWithScheme(oldPath, fileSystem.getScheme());
   }
 
+  private Path convertToLocalPath(Path oldPath) {
+    try {
+      return convertPathWithScheme(oldPath, 
FileSystem.getLocal(getConf()).getScheme());
+    } catch (IOException e) {
+      throw new HoodieIOException(e.getMessage(), e);
+    }
+  }
+
+  private Path[] convertLocalPaths(Path[] psrcs) {
+    Path[] psrcsNew = new Path[psrcs.length];
+    for (int i = 0; i < psrcs.length; i++) {
+      psrcsNew[i] = convertToLocalPath(psrcs[i]);
+    }
+    return psrcsNew;
+  }
+
   private Path[] convertDefaults(Path[] psrcs) {
     Path[] psrcsNew = new Path[psrcs.length];
     for (int i = 0; i < psrcs.length; i++) {
@@ -803,4 +900,8 @@ public class HoodieWrapperFileSystem extends FileSystem {
     throw new IllegalArgumentException(file.toString()
         + " does not have a open stream. Cannot get the bytes written on the 
stream");
   }
+
+  public FileSystem getFileSystem() {
+    return fileSystem;
+  }
 }
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
index c8dd7d2..7ed0214 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
@@ -20,12 +20,17 @@ package com.uber.hoodie.common.table;
 
 import static com.uber.hoodie.common.model.HoodieTableType.MERGE_ON_READ;
 
+import com.google.common.base.Preconditions;
 import com.uber.hoodie.common.SerializableConfiguration;
+import com.uber.hoodie.common.io.storage.HoodieWrapperFileSystem;
 import com.uber.hoodie.common.model.HoodieTableType;
 import com.uber.hoodie.common.table.timeline.HoodieActiveTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieArchivedTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.util.ConsistencyGuardConfig;
 import com.uber.hoodie.common.util.FSUtils;
+import com.uber.hoodie.common.util.FailSafeConsistencyGuard;
+import com.uber.hoodie.common.util.NoOpConsistencyGuard;
 import com.uber.hoodie.exception.DatasetNotFoundException;
 import com.uber.hoodie.exception.HoodieException;
 import java.io.File;
@@ -66,13 +71,14 @@ public class HoodieTableMetaClient implements Serializable {
   public static final String MARKER_EXTN = ".marker";
 
   private String basePath;
-  private transient FileSystem fs;
+  private transient HoodieWrapperFileSystem fs;
   private String metaPath;
   private SerializableConfiguration hadoopConf;
   private HoodieTableType tableType;
   private HoodieTableConfig tableConfig;
   private HoodieActiveTimeline activeTimeline;
   private HoodieArchivedTimeline archivedTimeline;
+  private ConsistencyGuardConfig consistencyGuardConfig = 
ConsistencyGuardConfig.newBuilder().build();
 
   public HoodieTableMetaClient(Configuration conf, String basePath)
       throws DatasetNotFoundException {
@@ -81,13 +87,19 @@ public class HoodieTableMetaClient implements Serializable {
   }
 
   public HoodieTableMetaClient(Configuration conf, String basePath,
-      boolean loadActiveTimelineOnLoad)
+      boolean loadActiveTimelineOnLoad) {
+    this(conf, basePath, loadActiveTimelineOnLoad, 
ConsistencyGuardConfig.newBuilder().build());
+  }
+
+  public HoodieTableMetaClient(Configuration conf, String basePath,
+      boolean loadActiveTimelineOnLoad, ConsistencyGuardConfig 
consistencyGuardConfig)
       throws DatasetNotFoundException {
     log.info("Loading HoodieTableMetaClient from " + basePath);
     this.basePath = basePath;
+    this.consistencyGuardConfig = consistencyGuardConfig;
     this.hadoopConf = new SerializableConfiguration(conf);
     Path basePathDir = new Path(this.basePath);
-    this.metaPath = basePath + File.separator + METAFOLDER_NAME;
+    this.metaPath = new Path(basePath, METAFOLDER_NAME).toString();
     Path metaPathDir = new Path(this.metaPath);
     this.fs = getFs();
     DatasetNotFoundException.checkValidDataset(fs, basePathDir, metaPathDir);
@@ -190,13 +202,25 @@ public class HoodieTableMetaClient implements 
Serializable {
   /**
    * Get the FS implementation for this table
    */
-  public FileSystem getFs() {
+  public HoodieWrapperFileSystem getFs() {
     if (fs == null) {
-      fs = FSUtils.getFs(metaPath, hadoopConf.get());
+      FileSystem fileSystem = FSUtils.getFs(metaPath, hadoopConf.get());
+      Preconditions.checkArgument(!(fileSystem instanceof 
HoodieWrapperFileSystem),
+          "File System not expected to be that of HoodieWrapperFileSystem");
+      fs = new HoodieWrapperFileSystem(fileSystem, 
consistencyGuardConfig.isConsistencyCheckEnabled()
+            ? new FailSafeConsistencyGuard(fileSystem, consistencyGuardConfig) 
: new NoOpConsistencyGuard());
     }
     return fs;
   }
 
+  /**
+   * Return raw file-system
+   * @return
+   */
+  public FileSystem getRawFs() {
+    return getFs().getFileSystem();
+  }
+
   public Configuration getHadoopConf() {
     return hadoopConf.get();
   }
@@ -223,6 +247,10 @@ public class HoodieTableMetaClient implements Serializable 
{
     return activeTimeline;
   }
 
+  public ConsistencyGuardConfig getConsistencyGuardConfig() {
+    return consistencyGuardConfig;
+  }
+
   /**
    * Get the archived commits as a timeline. This is costly operation, as all 
data from the archived
    * files are read. This should not be used, unless for historical debugging 
purposes
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuardConfig.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuardConfig.java
new file mode 100644
index 0000000..1418191
--- /dev/null
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/ConsistencyGuardConfig.java
@@ -0,0 +1,121 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util;
+
+import com.uber.hoodie.config.DefaultHoodieConfig;
+import java.io.File;
+import java.io.FileReader;
+import java.io.IOException;
+import java.util.Properties;
+
+public class ConsistencyGuardConfig extends DefaultHoodieConfig {
+
+  private static final String CONSISTENCY_CHECK_ENABLED_PROP = 
"hoodie.consistency.check.enabled";
+  private static final String DEFAULT_CONSISTENCY_CHECK_ENABLED = "false";
+
+  // time between successive attempts to ensure written data's metadata is 
consistent on storage
+  private static final String INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP =
+      "hoodie.consistency.check.initial_interval_ms";
+  private static long DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS = 2000L;
+
+  // max interval time
+  private static final String MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP = 
"hoodie.consistency.check.max_interval_ms";
+  private static long DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS = 300000L;
+
+  // maximum number of checks, for consistency of written data. Will wait upto 
256 Secs
+  private static final String MAX_CONSISTENCY_CHECKS_PROP = 
"hoodie.consistency.check.max_checks";
+  private static int DEFAULT_MAX_CONSISTENCY_CHECKS = 7;
+
+  public ConsistencyGuardConfig(Properties props) {
+    super(props);
+  }
+
+  public static ConsistencyGuardConfig.Builder newBuilder() {
+    return new Builder();
+  }
+
+  public boolean isConsistencyCheckEnabled() {
+    return 
Boolean.parseBoolean(props.getProperty(CONSISTENCY_CHECK_ENABLED_PROP));
+  }
+
+  public int getMaxConsistencyChecks() {
+    return Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECKS_PROP));
+  }
+
+  public int getInitialConsistencyCheckIntervalMs() {
+    return 
Integer.parseInt(props.getProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
+  }
+
+  public int getMaxConsistencyCheckIntervalMs() {
+    return 
Integer.parseInt(props.getProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP));
+  }
+
+  public static class Builder {
+
+    private final Properties props = new Properties();
+
+    public Builder fromFile(File propertiesFile) throws IOException {
+      FileReader reader = new FileReader(propertiesFile);
+      try {
+        props.load(reader);
+        return this;
+      } finally {
+        reader.close();
+      }
+    }
+
+    public Builder fromProperties(Properties props) {
+      this.props.putAll(props);
+      return this;
+    }
+
+    public Builder withConsistencyCheckEnabled(boolean enabled) {
+      props.setProperty(CONSISTENCY_CHECK_ENABLED_PROP, 
String.valueOf(enabled));
+      return this;
+    }
+
+    public Builder withInitialConsistencyCheckIntervalMs(int initialIntevalMs) 
{
+      props.setProperty(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, 
String.valueOf(initialIntevalMs));
+      return this;
+    }
+
+    public Builder withMaxConsistencyCheckIntervalMs(int maxIntervalMs) {
+      props.setProperty(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, 
String.valueOf(maxIntervalMs));
+      return this;
+    }
+
+    public Builder withMaxConsistencyChecks(int maxConsistencyChecks) {
+      props.setProperty(MAX_CONSISTENCY_CHECKS_PROP, 
String.valueOf(maxConsistencyChecks));
+      return this;
+    }
+
+    public ConsistencyGuardConfig build() {
+      setDefaultOnCondition(props, 
!props.containsKey(CONSISTENCY_CHECK_ENABLED_PROP),
+          CONSISTENCY_CHECK_ENABLED_PROP, DEFAULT_CONSISTENCY_CHECK_ENABLED);
+      setDefaultOnCondition(props, 
!props.containsKey(INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
+          INITIAL_CONSISTENCY_CHECK_INTERVAL_MS_PROP, 
String.valueOf(DEFAULT_INITIAL_CONSISTENCY_CHECK_INTERVAL_MS));
+      setDefaultOnCondition(props, 
!props.containsKey(MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP),
+          MAX_CONSISTENCY_CHECK_INTERVAL_MS_PROP, 
String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECK_INTERVAL_MS));
+      setDefaultOnCondition(props, 
!props.containsKey(MAX_CONSISTENCY_CHECKS_PROP),
+          MAX_CONSISTENCY_CHECKS_PROP, 
String.valueOf(DEFAULT_MAX_CONSISTENCY_CHECKS));
+
+      return new ConsistencyGuardConfig(props);
+    }
+  }
+}
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
index f69fcef..ee215be 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/FailSafeConsistencyGuard.java
@@ -18,6 +18,7 @@
 
 package com.uber.hoodie.common.util;
 
+import com.google.common.base.Preconditions;
 import java.io.FileNotFoundException;
 import java.io.IOException;
 import java.util.ArrayList;
@@ -40,15 +41,12 @@ public class FailSafeConsistencyGuard implements 
ConsistencyGuard {
   private static final transient Logger log = 
LogManager.getLogger(FailSafeConsistencyGuard.class);
 
   private final FileSystem fs;
-  private final int maxAttempts;
-  private final long initialDelayMs;
-  private final long maxDelayMs;
+  private final ConsistencyGuardConfig consistencyGuardConfig;
 
-  public FailSafeConsistencyGuard(FileSystem fs, int maxAttempts, long 
initalDelayMs, long maxDelayMs) {
+  public FailSafeConsistencyGuard(FileSystem fs, ConsistencyGuardConfig 
consistencyGuardConfig) {
     this.fs = fs;
-    this.maxAttempts = maxAttempts;
-    this.initialDelayMs = initalDelayMs;
-    this.maxDelayMs = maxDelayMs;
+    this.consistencyGuardConfig = consistencyGuardConfig;
+    
Preconditions.checkArgument(consistencyGuardConfig.isConsistencyCheckEnabled());
   }
 
   @Override
@@ -121,13 +119,13 @@ public class FailSafeConsistencyGuard implements 
ConsistencyGuard {
    */
   private boolean checkFileVisibility(Path filePath, FileVisibility 
visibility) throws IOException {
     try {
-      FileStatus[] status = fs.listStatus(filePath);
+      FileStatus status = fs.getFileStatus(filePath);
       switch (visibility) {
         case APPEAR:
-          return status.length != 0;
+          return status != null;
         case DISAPPEAR:
         default:
-          return status.length == 0;
+          return status == null;
       }
     } catch (FileNotFoundException nfe) {
       switch (visibility) {
@@ -147,9 +145,9 @@ public class FailSafeConsistencyGuard implements 
ConsistencyGuard {
    * @throws TimeoutException
    */
   private void waitForFileVisibility(Path filePath, FileVisibility visibility) 
throws TimeoutException {
-    long waitMs = initialDelayMs;
+    long waitMs = 
consistencyGuardConfig.getInitialConsistencyCheckIntervalMs();
     int attempt = 0;
-    while (attempt < maxAttempts) {
+    while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) {
       try {
         if (checkFileVisibility(filePath, visibility)) {
           return;
@@ -160,7 +158,7 @@ public class FailSafeConsistencyGuard implements 
ConsistencyGuard {
 
       sleepSafe(waitMs);
       waitMs = waitMs * 2; // double check interval every attempt
-      waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs;
+      waitMs = Math.min(waitMs, 
consistencyGuardConfig.getMaxConsistencyCheckIntervalMs());
       attempt++;
     }
     throw new TimeoutException("Timed-out waiting for the file to " + 
visibility.name());
@@ -173,17 +171,17 @@ public class FailSafeConsistencyGuard implements 
ConsistencyGuard {
    * @throws TimeoutException when retries are exhausted
    */
   private void retryTillSuccess(Function<Integer, Boolean> predicate, String 
timedOutMessage) throws TimeoutException {
-    long waitMs = initialDelayMs;
+    long waitMs = 
consistencyGuardConfig.getInitialConsistencyCheckIntervalMs();
     int attempt = 0;
-    log.warn("Max Attempts=" + maxAttempts);
-    while (attempt < maxAttempts) {
+    log.info("Max Attempts=" + 
consistencyGuardConfig.getMaxConsistencyChecks());
+    while (attempt < consistencyGuardConfig.getMaxConsistencyChecks()) {
       boolean success = predicate.apply(attempt);
       if (success) {
         return;
       }
       sleepSafe(waitMs);
       waitMs = waitMs * 2; // double check interval every attempt
-      waitMs = waitMs > maxDelayMs ? maxDelayMs : waitMs;
+      waitMs = Math.min(waitMs, 
consistencyGuardConfig.getMaxConsistencyCheckIntervalMs());
       attempt++;
     }
     throw new TimeoutException(timedOutMessage);

Reply via email to