This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new a0d7ab2  HUDI-70 : Making DeltaStreamer run in continuous mode with 
concurrent compaction
a0d7ab2 is described below

commit a0d7ab238473f22347e140b0e1e273ab80583eb7
Author: Balaji Varadarajan <varad...@uber.com>
AuthorDate: Wed May 15 13:21:55 2019 -0700

    HUDI-70 : Making DeltaStreamer run in continuous mode with concurrent 
compaction
---
 .../java/com/uber/hoodie/AbstractHoodieClient.java |  45 +-
 .../com/uber/hoodie/CompactionAdminClient.java     |   6 +
 .../java/com/uber/hoodie/HoodieReadClient.java     |  27 +-
 .../java/com/uber/hoodie/HoodieWriteClient.java    |  14 +-
 .../client/embedded/EmbeddedTimelineService.java   |   4 +
 .../hoodie/common/HoodieTestDataGenerator.java     |  99 ++-
 .../uber/hoodie/table/TestMergeOnReadTable.java    |   1 +
 .../hoodie/common/table/HoodieTableMetaClient.java |   2 +-
 .../table/view/RocksDbBasedFileSystemView.java     |   4 +-
 .../uber/hoodie/common/util/CompactionUtils.java   |   8 +
 .../com/uber/hoodie/common/util/RocksDBDAO.java    |  91 ++-
 .../common/util/collection/DiskBasedMap.java       |   3 +-
 .../common/util/collection/RocksDBBasedMap.java    | 125 ++++
 .../hoodie/common/util/TestRocksDBManager.java     |   2 +-
 .../util/collection/TestRocksDbBasedMap.java       |  56 ++
 .../main/java/com/uber/hoodie/DataSourceUtils.java |  12 +-
 .../com/uber/hoodie/HoodieSparkSqlWriter.scala     |  33 +-
 hoodie-spark/src/test/scala/DataSourceTest.scala   |   3 +-
 hoodie-utilities/pom.xml                           |  17 +-
 .../com/uber/hoodie/utilities/UtilHelpers.java     |  17 +-
 .../AbstractDeltaStreamerService.java              | 146 +++++
 .../hoodie/utilities/deltastreamer/Compactor.java  |  62 ++
 .../{HoodieDeltaStreamer.java => DeltaSync.java}   | 438 +++++++------
 .../deltastreamer/HoodieDeltaStreamer.java         | 674 +++++++++++----------
 .../deltastreamer/SchedulerConfGenerator.java      |  94 +++
 .../hoodie/utilities/TestHoodieDeltaStreamer.java  | 106 ++++
 .../utilities/sources/AbstractBaseTestSource.java  | 103 ++++
 .../sources/DistributedTestDataSource.java         |  79 +++
 .../hoodie/utilities/sources/TestDataSource.java   |  46 +-
 .../utilities/sources/config/TestSourceConfig.java |  43 ++
 packaging/hoodie-utilities-bundle/pom.xml          |   5 +
 pom.xml                                            |   5 +
 32 files changed, 1709 insertions(+), 661 deletions(-)

diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
index 243ecf5..5aa051c 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/AbstractHoodieClient.java
@@ -23,6 +23,7 @@ import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.config.HoodieWriteConfig;
 import java.io.IOException;
 import java.io.Serializable;
+import java.util.Optional;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
@@ -47,13 +48,21 @@ public abstract class AbstractHoodieClient implements 
Serializable {
    * of the cached file-system view. New completed actions will be synced 
automatically
    * in an incremental fashion.
    */
-  private transient EmbeddedTimelineService timelineServer;
+  private transient Optional<EmbeddedTimelineService> timelineServer;
+  private final boolean shouldStopTimelineServer;
 
   protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig) {
+    this(jsc, clientConfig, Optional.empty());
+  }
+
+  protected AbstractHoodieClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig,
+      Optional<EmbeddedTimelineService> timelineServer) {
     this.fs = FSUtils.getFs(clientConfig.getBasePath(), 
jsc.hadoopConfiguration());
     this.jsc = jsc;
     this.basePath = clientConfig.getBasePath();
     this.config = clientConfig;
+    this.timelineServer = timelineServer;
+    shouldStopTimelineServer = !timelineServer.isPresent();
     startEmbeddedServerView();
   }
 
@@ -65,28 +74,30 @@ public abstract class AbstractHoodieClient implements 
Serializable {
   }
 
   private synchronized void stopEmbeddedServerView(boolean 
resetViewStorageConfig) {
-    if (timelineServer != null) {
+    if (timelineServer.isPresent() && shouldStopTimelineServer) {
+      // Stop only if owner
       logger.info("Stopping Timeline service !!");
-      timelineServer.stop();
-      timelineServer = null;
-      // Reset Storage Config to Client specified config
-      if (resetViewStorageConfig) {
-        config.resetViewStorageConfig();
-      }
+      timelineServer.get().stop();
+    }
+
+    timelineServer = Optional.empty();
+    // Reset Storage Config to Client specified config
+    if (resetViewStorageConfig) {
+      config.resetViewStorageConfig();
     }
   }
 
   private synchronized void startEmbeddedServerView() {
     if (config.isEmbeddedTimelineServerEnabled()) {
-      if (timelineServer == null) {
+      if (!timelineServer.isPresent()) {
         // Run Embedded Timeline Server
         logger.info("Starting Timeline service !!");
-        timelineServer = new 
EmbeddedTimelineService(jsc.hadoopConfiguration(), jsc.getConf(),
-            config.getClientSpecifiedViewStorageConfig());
+        timelineServer = Optional.of(new 
EmbeddedTimelineService(jsc.hadoopConfiguration(), jsc.getConf(),
+            config.getClientSpecifiedViewStorageConfig()));
         try {
-          timelineServer.startServer();
+          timelineServer.get().startServer();
           // Allow executor to find this newly instantiated timeline service
-          
config.setViewStorageConfig(timelineServer.getRemoteFileSystemViewConfig());
+          
config.setViewStorageConfig(timelineServer.get().getRemoteFileSystemViewConfig());
         } catch (IOException e) {
           logger.warn("Unable to start timeline service. Proceeding as if 
embedded server is disabled", e);
           stopEmbeddedServerView(false);
@@ -98,4 +109,12 @@ public abstract class AbstractHoodieClient implements 
Serializable {
       logger.info("Embedded Timeline Server is disabled. Not starting timeline 
service");
     }
   }
+
+  public HoodieWriteConfig getConfig() {
+    return config;
+  }
+
+  public Optional<EmbeddedTimelineService> getTimelineServer() {
+    return timelineServer;
+  }
 }
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
index 751e7dc..d2b007f 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/CompactionAdminClient.java
@@ -24,6 +24,7 @@ import com.google.common.base.Optional;
 import com.google.common.base.Preconditions;
 import com.uber.hoodie.avro.model.HoodieCompactionOperation;
 import com.uber.hoodie.avro.model.HoodieCompactionPlan;
+import com.uber.hoodie.client.embedded.EmbeddedTimelineService;
 import com.uber.hoodie.common.model.CompactionOperation;
 import com.uber.hoodie.common.model.FileSlice;
 import com.uber.hoodie.common.model.HoodieDataFile;
@@ -68,6 +69,11 @@ public class CompactionAdminClient extends 
AbstractHoodieClient {
     super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build());
   }
 
+  public CompactionAdminClient(JavaSparkContext jsc, String basePath,
+      java.util.Optional<EmbeddedTimelineService> timelineServer) {
+    super(jsc, HoodieWriteConfig.newBuilder().withPath(basePath).build(), 
timelineServer);
+  }
+
   /**
    * Validate all compaction operations in a compaction plan. Verifies the 
file-slices are consistent with corresponding
    * compaction operations.
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
index a1e7ab7..86c2fa5 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieReadClient.java
@@ -20,6 +20,7 @@ package com.uber.hoodie;
 
 import com.google.common.base.Optional;
 import com.uber.hoodie.avro.model.HoodieCompactionPlan;
+import com.uber.hoodie.client.embedded.EmbeddedTimelineService;
 import com.uber.hoodie.common.model.HoodieDataFile;
 import com.uber.hoodie.common.model.HoodieKey;
 import com.uber.hoodie.common.model.HoodieRecord;
@@ -69,12 +70,20 @@ public class HoodieReadClient<T extends 
HoodieRecordPayload> extends AbstractHoo
   /**
    * @param basePath path to Hoodie dataset
    */
-  public HoodieReadClient(JavaSparkContext jsc, String basePath) {
+  public HoodieReadClient(JavaSparkContext jsc, String basePath,
+      java.util.Optional<EmbeddedTimelineService> timelineService) {
     this(jsc, HoodieWriteConfig.newBuilder().withPath(basePath)
         // by default we use HoodieBloomIndex
         .withIndexConfig(
             
HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-        .build());
+        .build(), timelineService);
+  }
+
+  /**
+   * @param basePath path to Hoodie dataset
+   */
+  public HoodieReadClient(JavaSparkContext jsc, String basePath) {
+    this(jsc, basePath, java.util.Optional.empty());
   }
 
   /**
@@ -91,13 +100,19 @@ public class HoodieReadClient<T extends 
HoodieRecordPayload> extends AbstractHoo
    * @param clientConfig instance of HoodieWriteConfig
    */
   public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig) {
-    super(jsc, clientConfig);
+    this(jsc, clientConfig, java.util.Optional.empty());
+  }
+
+  /**
+   * @param clientConfig instance of HoodieWriteConfig
+   */
+  public HoodieReadClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
+      java.util.Optional<EmbeddedTimelineService> timelineService) {
+    super(jsc, clientConfig, timelineService);
     final String basePath = clientConfig.getBasePath();
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTableMetaClient metaClient = new 
HoodieTableMetaClient(jsc.hadoopConfiguration(), basePath, true);
-    this.hoodieTable = HoodieTable
-        .getHoodieTable(metaClient,
-            clientConfig, jsc);
+    this.hoodieTable = HoodieTable.getHoodieTable(metaClient, clientConfig, 
jsc);
     this.commitTimeline = 
metaClient.getCommitTimeline().filterCompletedInstants();
     this.index = HoodieIndex.createIndex(clientConfig, jsc);
     this.sqlContextOpt = Optional.absent();
diff --git a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java 
b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
index 51776be..a9b548b 100644
--- a/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
+++ b/hoodie-client/src/main/java/com/uber/hoodie/HoodieWriteClient.java
@@ -27,6 +27,7 @@ import com.uber.hoodie.avro.model.HoodieCompactionPlan;
 import com.uber.hoodie.avro.model.HoodieRestoreMetadata;
 import com.uber.hoodie.avro.model.HoodieRollbackMetadata;
 import com.uber.hoodie.avro.model.HoodieSavepointMetadata;
+import com.uber.hoodie.client.embedded.EmbeddedTimelineService;
 import com.uber.hoodie.common.HoodieCleanStat;
 import com.uber.hoodie.common.HoodieRollbackStat;
 import com.uber.hoodie.common.model.HoodieCommitMetadata;
@@ -75,6 +76,7 @@ import java.util.Map;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
+import org.apache.hadoop.conf.Configuration;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.Partitioner;
@@ -124,7 +126,12 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   @VisibleForTesting
   HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig clientConfig,
       boolean rollbackInFlight, HoodieIndex index) {
-    super(jsc, clientConfig);
+    this(jsc, clientConfig, rollbackInFlight, index, Optional.empty());
+  }
+
+  public HoodieWriteClient(JavaSparkContext jsc, HoodieWriteConfig 
clientConfig,
+      boolean rollbackInFlight, HoodieIndex index, 
Optional<EmbeddedTimelineService> timelineService) {
+    super(jsc, clientConfig, timelineService);
     this.index = index;
     this.metrics = new HoodieMetrics(config, config.getTableName());
     this.rollbackInFlight = rollbackInFlight;
@@ -1184,7 +1191,10 @@ public class HoodieWriteClient<T extends 
HoodieRecordPayload> extends AbstractHo
   private HoodieTable getTableAndInitCtx(JavaRDD<HoodieRecord<T>> records) {
     // Create a Hoodie table which encapsulated the commits and files visible
     HoodieTable table = HoodieTable.getHoodieTable(
-        new HoodieTableMetaClient(jsc.hadoopConfiguration(), 
config.getBasePath(), true), config, jsc);
+        new HoodieTableMetaClient(
+            // Clone Configuration here. Otherwise we could see 
ConcurrentModificationException (race) in multi-threaded
+            // execution (HoodieDeltaStreamer) when Configuration gets 
serialized by Spark.
+            new Configuration(jsc.hadoopConfiguration()), 
config.getBasePath(), true), config, jsc);
     if 
(table.getMetaClient().getCommitActionType().equals(HoodieTimeline.COMMIT_ACTION))
 {
       writeContext = metrics.getCommitCtx();
     } else {
diff --git 
a/hoodie-client/src/main/java/com/uber/hoodie/client/embedded/EmbeddedTimelineService.java
 
b/hoodie-client/src/main/java/com/uber/hoodie/client/embedded/EmbeddedTimelineService.java
index 91a925e..fbd54c9 100644
--- 
a/hoodie-client/src/main/java/com/uber/hoodie/client/embedded/EmbeddedTimelineService.java
+++ 
b/hoodie-client/src/main/java/com/uber/hoodie/client/embedded/EmbeddedTimelineService.java
@@ -91,6 +91,10 @@ public class EmbeddedTimelineService {
         
.withRemoteServerHost(hostAddr).withRemoteServerPort(serverPort).build();
   }
 
+  public FileSystemViewManager getViewManager() {
+    return viewManager;
+  }
+
   public void stop() {
     if (null != server) {
       this.server.close();
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java
 
b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java
index f50baaf..1f79bb8 100644
--- 
a/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java
+++ 
b/hoodie-client/src/test/java/com/uber/hoodie/common/HoodieTestDataGenerator.java
@@ -31,16 +31,23 @@ import com.uber.hoodie.common.table.timeline.HoodieInstant;
 import com.uber.hoodie.common.util.AvroUtils;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.HoodieAvroUtils;
+import com.uber.hoodie.exception.HoodieIOException;
 import java.io.IOException;
+import java.io.Serializable;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import java.util.Random;
 import java.util.Set;
 import java.util.UUID;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import java.util.stream.Stream;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericData;
 import org.apache.avro.generic.GenericRecord;
@@ -83,17 +90,23 @@ public class HoodieTestDataGenerator {
 
   private static Random rand = new Random(46474747);
 
-  private List<KeyPartition> existingKeysList = new ArrayList<>();
-  private String[] partitionPaths;
+  private final Map<Integer, KeyPartition> existingKeys;
+  private final String[] partitionPaths;
+  private int numExistingKeys;
 
   public HoodieTestDataGenerator(String[] partitionPaths) {
-    this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
+    this(partitionPaths, new HashMap<>());
   }
 
   public HoodieTestDataGenerator() {
     this(DEFAULT_PARTITION_PATHS);
   }
 
+  public HoodieTestDataGenerator(String[] partitionPaths, Map<Integer, 
KeyPartition> keyPartitionMap) {
+    this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
+    this.existingKeys = keyPartitionMap;
+  }
+
   public static void writePartitionMetadata(FileSystem fs, String[] 
partitionPaths, String basePath) {
     for (String partitionPath : partitionPaths) {
       new HoodiePartitionMetadata(fs, "000", new Path(basePath), new 
Path(basePath, partitionPath)).trySave(0);
@@ -193,19 +206,29 @@ public class HoodieTestDataGenerator {
    * Generates new inserts, uniformly across the partition paths above. It 
also updates the list of existing keys.
    */
   public List<HoodieRecord> generateInserts(String commitTime, Integer n) 
throws IOException {
-    List<HoodieRecord> inserts = new ArrayList<>();
-    for (int i = 0; i < n; i++) {
+    return generateInsertsStream(commitTime, n).collect(Collectors.toList());
+  }
+
+  /**
+   * Generates new inserts, uniformly across the partition paths above. It 
also updates the list of existing keys.
+   */
+  public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer 
n)  {
+    int currSize = getNumExistingKeys();
+
+    return IntStream.range(0, n).boxed().map(i -> {
       String partitionPath = 
partitionPaths[rand.nextInt(partitionPaths.length)];
       HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), 
partitionPath);
-      HoodieRecord record = new HoodieRecord(key, generateRandomValue(key, 
commitTime));
-      inserts.add(record);
-
       KeyPartition kp = new KeyPartition();
       kp.key = key;
       kp.partitionPath = partitionPath;
-      existingKeysList.add(kp);
-    }
-    return inserts;
+      existingKeys.put(currSize + i, kp);
+      numExistingKeys++;
+      try {
+        return new HoodieRecord(key, generateRandomValue(key, commitTime));
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+    });
   }
 
   public List<HoodieRecord> generateSameKeyInserts(String commitTime, 
List<HoodieRecord> origin) throws IOException {
@@ -221,6 +244,7 @@ public class HoodieTestDataGenerator {
   public List<HoodieRecord> generateInsertsWithHoodieAvroPayload(String 
commitTime, int limit) throws
       IOException {
     List<HoodieRecord> inserts = new ArrayList<>();
+    int currSize = getNumExistingKeys();
     for (int i = 0; i < limit; i++) {
       String partitionPath = 
partitionPaths[rand.nextInt(partitionPaths.length)];
       HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), 
partitionPath);
@@ -230,7 +254,8 @@ public class HoodieTestDataGenerator {
       KeyPartition kp = new KeyPartition();
       kp.key = key;
       kp.partitionPath = partitionPath;
-      existingKeysList.add(kp);
+      existingKeys.put(currSize + i, kp);
+      numExistingKeys++;
     }
     return inserts;
   }
@@ -293,7 +318,7 @@ public class HoodieTestDataGenerator {
   public List<HoodieRecord> generateUpdates(String commitTime, Integer n) 
throws IOException {
     List<HoodieRecord> updates = new ArrayList<>();
     for (int i = 0; i < n; i++) {
-      KeyPartition kp = 
existingKeysList.get(rand.nextInt(existingKeysList.size() - 1));
+      KeyPartition kp = existingKeys.get(rand.nextInt(numExistingKeys - 1));
       HoodieRecord record = generateUpdateRecord(kp.key, commitTime);
       updates.add(record);
     }
@@ -307,39 +332,55 @@ public class HoodieTestDataGenerator {
    * @param n Number of unique records
    * @return list of hoodie record updates
    */
-  public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer 
n) throws IOException {
-    List<HoodieRecord> updates = new ArrayList<>();
-    Set<KeyPartition> used = new HashSet<>();
+  public List<HoodieRecord> generateUniqueUpdates(String commitTime, Integer 
n) {
+    return generateUniqueUpdatesStream(commitTime, 
n).collect(Collectors.toList());
+  }
+
+  /**
+   * Generates deduped updates of keys previously inserted, randomly 
distributed across the keys above.
+   *
+   * @param commitTime Commit Timestamp
+   * @param n Number of unique records
+   * @return stream of hoodie record updates
+   */
+  public Stream<HoodieRecord> generateUniqueUpdatesStream(String commitTime, 
Integer n) {
+    final Set<KeyPartition> used = new HashSet<>();
 
-    if (n > existingKeysList.size()) {
+    if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique updates is greater 
than number of available keys");
     }
 
-    for (int i = 0; i < n; i++) {
-      int index = rand.nextInt(existingKeysList.size() - 1);
-      KeyPartition kp = existingKeysList.get(index);
+    return IntStream.range(0, n).boxed().map(i -> {
+      int index = numExistingKeys == 1 ? 0 : rand.nextInt(numExistingKeys - 1);
+      KeyPartition kp = existingKeys.get(index);
       // Find the available keyPartition starting from randomly chosen one.
       while (used.contains(kp)) {
-        index = (index + 1) % existingKeysList.size();
-        kp = existingKeysList.get(index);
+        index = (index + 1) % numExistingKeys;
+        kp = existingKeys.get(index);
       }
-      HoodieRecord record = new HoodieRecord(kp.key, 
generateRandomValue(kp.key, commitTime));
-      updates.add(record);
       used.add(kp);
-    }
-    return updates;
+      try {
+        return new HoodieRecord(kp.key, generateRandomValue(kp.key, 
commitTime));
+      } catch (IOException e) {
+        throw new HoodieIOException(e.getMessage(), e);
+      }
+    });
   }
 
   public String[] getPartitionPaths() {
     return partitionPaths;
   }
 
-  public List<KeyPartition> getExistingKeysList() {
-    return existingKeysList;
+  public int getNumExistingKeys() {
+    return numExistingKeys;
   }
 
-  public static class KeyPartition {
+  public static class KeyPartition implements Serializable {
     HoodieKey key;
     String partitionPath;
   }
+
+  public void close() {
+    existingKeys.clear();
+  }
 }
diff --git 
a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java 
b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java
index 5f22de0..2a663c3 100644
--- 
a/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java
+++ 
b/hoodie-client/src/test/java/com/uber/hoodie/table/TestMergeOnReadTable.java
@@ -905,6 +905,7 @@ public class TestMergeOnReadTable {
           .filter(writeStatus -> 
writeStatus.getStat().getPartitionPath().contentEquals(partitionPath))
           .count() > 0);
     }
+    writeClient.close();
   }
 
   @Test
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
index c8dd7d2..7551748 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/HoodieTableMetaClient.java
@@ -85,7 +85,7 @@ public class HoodieTableMetaClient implements Serializable {
       throws DatasetNotFoundException {
     log.info("Loading HoodieTableMetaClient from " + basePath);
     this.basePath = basePath;
-    this.hadoopConf = new SerializableConfiguration(conf);
+    this.hadoopConf = new SerializableConfiguration(new Configuration(conf));
     Path basePathDir = new Path(this.basePath);
     this.metaPath = basePath + File.separator + METAFOLDER_NAME;
     Path metaPathDir = new Path(this.metaPath);
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
index dc371bc..9edbb94 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/table/view/RocksDbBasedFileSystemView.java
@@ -74,7 +74,7 @@ public class RocksDbBasedFileSystemView extends 
IncrementalTimelineSyncFileSyste
     super(config.isIncrementalTimelineSyncEnabled());
     this.config = config;
     this.schemaHelper = new RocksDBSchemaHelper(metaClient);
-    this.rocksDB = new RocksDBDAO(metaClient.getBasePath(), config);
+    this.rocksDB = new RocksDBDAO(metaClient.getBasePath(), 
config.getRocksdbBasePath());
     init(metaClient, visibleActiveTimeline);
   }
 
@@ -138,7 +138,7 @@ public class RocksDbBasedFileSystemView extends 
IncrementalTimelineSyncFileSyste
   protected void resetViewState() {
     log.info("Deleting all rocksdb data associated with dataset filesystem 
view");
     rocksDB.close();
-    rocksDB = new RocksDBDAO(metaClient.getBasePath(), config);
+    rocksDB = new RocksDBDAO(metaClient.getBasePath(), 
config.getRocksdbBasePath());
   }
 
   @Override
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java
index cc188a7..e0927ce 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/CompactionUtils.java
@@ -174,4 +174,12 @@ public class CompactionUtils {
       return Stream.empty();
     }
   }
+
+  /**
+   * Return all pending compaction instant times
+   * @return
+   */
+  public static List<HoodieInstant> 
getPendingCompactionInstantTimes(HoodieTableMetaClient metaClient) {
+    return 
metaClient.getActiveTimeline().filterPendingCompactionTimeline().getInstants().collect(Collectors.toList());
+  }
 }
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/RocksDBDAO.java 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/RocksDBDAO.java
index 5b7d9cc..739c778 100644
--- a/hoodie-common/src/main/java/com/uber/hoodie/common/util/RocksDBDAO.java
+++ b/hoodie-common/src/main/java/com/uber/hoodie/common/util/RocksDBDAO.java
@@ -20,7 +20,6 @@ package com.uber.hoodie.common.util;
 
 import com.google.common.annotations.VisibleForTesting;
 import com.google.common.base.Preconditions;
-import com.uber.hoodie.common.table.view.FileSystemViewStorageConfig;
 import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.exception.HoodieIOException;
@@ -57,7 +56,6 @@ public class RocksDBDAO {
 
   protected static final transient Logger log = 
LogManager.getLogger(RocksDBDAO.class);
 
-  private final FileSystemViewStorageConfig config;
   private transient ConcurrentHashMap<String, ColumnFamilyHandle> 
managedHandlesMap;
   private transient ConcurrentHashMap<String, ColumnFamilyDescriptor> 
managedDescriptorMap;
   private transient RocksDB rocksDB;
@@ -65,10 +63,9 @@ public class RocksDBDAO {
   private final String basePath;
   private final String rocksDBBasePath;
 
-  public RocksDBDAO(String basePath, FileSystemViewStorageConfig config) {
+  public RocksDBDAO(String basePath, String rocksDBBasePath) {
     this.basePath = basePath;
-    this.config = config;
-    this.rocksDBBasePath = String.format("%s/%s/%s", 
config.getRocksdbBasePath(),
+    this.rocksDBBasePath = String.format("%s/%s/%s", rocksDBBasePath,
         this.basePath.replace("/", "_"), UUID.randomUUID().toString());
     init();
   }
@@ -95,6 +92,7 @@ public class RocksDBDAO {
       managedDescriptorMap = new ConcurrentHashMap<>();
 
       // If already present, loads the existing column-family handles
+
       final DBOptions dbOptions = new 
DBOptions().setCreateIfMissing(true).setCreateMissingColumnFamilies(true)
           
.setWalDir(rocksDBBasePath).setStatsDumpPeriodSec(300).setStatistics(new 
Statistics());
       dbOptions.setLogger(new org.rocksdb.Logger(dbOptions) {
@@ -184,6 +182,26 @@ public class RocksDBDAO {
   }
 
   /**
+   * Helper to add put operation in batch
+   *
+   * @param batch Batch Handle
+   * @param columnFamilyName Column Family
+   * @param key Key
+   * @param value Payload
+   * @param <T> Type of payload
+   */
+  public <K extends Serializable, T extends Serializable> void 
putInBatch(WriteBatch batch, String columnFamilyName,
+      K key, T value) {
+    try {
+      byte[] keyBytes = SerializationUtils.serialize(key);
+      byte[] payload = SerializationUtils.serialize(value);
+      batch.put(managedHandlesMap.get(columnFamilyName), keyBytes, payload);
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  /**
    * Perform single PUT on a column-family
    *
    * @param columnFamilyName Column family name
@@ -201,6 +219,23 @@ public class RocksDBDAO {
   }
 
   /**
+   * Perform single PUT on a column-family
+   *
+   * @param columnFamilyName Column family name
+   * @param key Key
+   * @param value Payload
+   * @param <T> Type of Payload
+   */
+  public <K extends Serializable, T extends Serializable> void put(String 
columnFamilyName, K key, T value) {
+    try {
+      byte[] payload = SerializationUtils.serialize(value);
+      getRocksDB().put(managedHandlesMap.get(columnFamilyName), 
SerializationUtils.serialize(key), payload);
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  /**
    * Helper to add delete operation in batch
    *
    * @param batch Batch Handle
@@ -216,6 +251,21 @@ public class RocksDBDAO {
   }
 
   /**
+   * Helper to add delete operation in batch
+   *
+   * @param batch Batch Handle
+   * @param columnFamilyName Column Family
+   * @param key Key
+   */
+  public <K extends Serializable> void deleteInBatch(WriteBatch batch, String 
columnFamilyName, K key) {
+    try {
+      batch.delete(managedHandlesMap.get(columnFamilyName), 
SerializationUtils.serialize(key));
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  /**
    * Perform a single Delete operation
    *
    * @param columnFamilyName Column Family name
@@ -230,6 +280,20 @@ public class RocksDBDAO {
   }
 
   /**
+   * Perform a single Delete operation
+   *
+   * @param columnFamilyName Column Family name
+   * @param key Key to be deleted
+   */
+  public <K extends Serializable> void delete(String columnFamilyName, K key) {
+    try {
+      getRocksDB().delete(managedHandlesMap.get(columnFamilyName), 
SerializationUtils.serialize(key));
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  /**
    * Retrieve a value for a given key in a column family
    *
    * @param columnFamilyName Column Family Name
@@ -247,6 +311,23 @@ public class RocksDBDAO {
   }
 
   /**
+   * Retrieve a value for a given key in a column family
+   *
+   * @param columnFamilyName Column Family Name
+   * @param key Key to be retrieved
+   * @param <T> Type of object stored.
+   */
+  public <K extends Serializable, T extends Serializable> T get(String 
columnFamilyName, K key) {
+    Preconditions.checkArgument(!closed);
+    try {
+      byte[] val = getRocksDB().get(managedHandlesMap.get(columnFamilyName), 
SerializationUtils.serialize(key));
+      return val == null ? null : SerializationUtils.deserialize(val);
+    } catch (Exception e) {
+      throw new HoodieException(e);
+    }
+  }
+
+  /**
    * Perform a prefix search and return stream of key-value pairs retrieved
    *
    * @param columnFamilyName Column Family Name
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
index 34e2276..6b617a0 100644
--- 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/DiskBasedMap.java
@@ -51,7 +51,7 @@ import org.apache.log4j.Logger;
  * without any rollover support. It uses the following : 1) An in-memory map 
that tracks the key-> latest ValueMetadata.
  * 2) Current position in the file NOTE : Only String.class type supported for 
Key
  */
-public final class DiskBasedMap<T extends Serializable, R extends 
Serializable> implements Map<T, R> {
+public final class DiskBasedMap<T extends Serializable, R extends 
Serializable> implements Map<T, R>, Iterable<R> {
 
   private static final Logger log = LogManager.getLogger(DiskBasedMap.class);
   // Stores the key and corresponding value's latest metadata spilled to disk
@@ -149,6 +149,7 @@ public final class DiskBasedMap<T extends Serializable, R 
extends Serializable>
   /**
    * Custom iterator to iterate over values written to disk
    */
+  @Override
   public Iterator<R> iterator() {
     return new LazyFileIterable(filePath, valueMetadataMap).iterator();
   }
diff --git 
a/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/RocksDBBasedMap.java
 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/RocksDBBasedMap.java
new file mode 100644
index 0000000..267723f
--- /dev/null
+++ 
b/hoodie-common/src/main/java/com/uber/hoodie/common/util/collection/RocksDBBasedMap.java
@@ -0,0 +1,125 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util.collection;
+
+import com.uber.hoodie.common.util.RocksDBDAO;
+import com.uber.hoodie.exception.HoodieNotSupportedException;
+import java.io.Serializable;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+
+public final class RocksDBBasedMap<K extends Serializable, R extends 
Serializable> implements Map<K, R> {
+
+  private static final String COL_FAMILY_NAME = "map_handle";
+
+  private final String rocksDbStoragePath;
+  private RocksDBDAO rocksDBDAO;
+  private final String columnFamilyName;
+
+  public RocksDBBasedMap(String rocksDbStoragePath) {
+    this.rocksDbStoragePath = rocksDbStoragePath;
+    this.columnFamilyName = COL_FAMILY_NAME;
+  }
+
+  @Override
+  public int size() {
+    return (int)getRocksDBDAO().prefixSearch(columnFamilyName, "").count();
+  }
+
+  @Override
+  public boolean isEmpty() {
+    return size() == 0;
+  }
+
+  @Override
+  public boolean containsKey(Object key) {
+    // Wont be able to store nulls as values
+    return getRocksDBDAO().get(columnFamilyName, key.toString()) != null;
+  }
+
+  @Override
+  public boolean containsValue(Object value) {
+    throw new HoodieNotSupportedException("Not Supported");
+  }
+
+  @Override
+  public R get(Object key) {
+    return getRocksDBDAO().get(columnFamilyName, (Serializable)key);
+  }
+
+  @Override
+  public R put(K key, R value) {
+    getRocksDBDAO().put(columnFamilyName, key, value);
+    return value;
+  }
+
+  @Override
+  public R remove(Object key) {
+    R val = getRocksDBDAO().get(columnFamilyName, key.toString());
+    getRocksDBDAO().delete(columnFamilyName, key.toString());
+    return val;
+  }
+
+  @Override
+  public void putAll(Map<? extends K, ? extends R> m) {
+    getRocksDBDAO().writeBatch(batch -> {
+      m.entrySet().forEach(entry -> {
+        getRocksDBDAO().putInBatch(batch, columnFamilyName, entry.getKey(), 
entry.getValue());
+      });
+    });
+  }
+
+  private RocksDBDAO getRocksDBDAO() {
+    if (null == rocksDBDAO) {
+      rocksDBDAO = new RocksDBDAO("default", rocksDbStoragePath);
+      rocksDBDAO.addColumnFamily(columnFamilyName);
+    }
+    return rocksDBDAO;
+  }
+
+  @Override
+  public void clear() {
+    if (null != rocksDBDAO) {
+      rocksDBDAO.close();
+    }
+    rocksDBDAO = null;
+  }
+
+  @Override
+  public Set<K> keySet() {
+    throw new HoodieNotSupportedException("Not Supported");
+  }
+
+  @Override
+  public Collection<R> values() {
+    throw new HoodieNotSupportedException("Not Supported");
+  }
+
+  @Override
+  public Set<Entry<K, R>> entrySet() {
+    throw new HoodieNotSupportedException("Not Supported");
+  }
+
+  public Iterator<R> iterator() {
+    return getRocksDBDAO().prefixSearch(columnFamilyName, "")
+        .map(p -> (R)(p.getValue())).iterator();
+  }
+}
diff --git 
a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRocksDBManager.java
 
b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRocksDBManager.java
index 0c7bf31..dd239e1 100644
--- 
a/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRocksDBManager.java
+++ 
b/hoodie-common/src/test/java/com/uber/hoodie/common/util/TestRocksDBManager.java
@@ -67,7 +67,7 @@ public class TestRocksDBManager {
     }).collect(Collectors.toList());
 
     dbManager = new RocksDBDAO("/dummy/path",
-        FileSystemViewStorageConfig.newBuilder().build().newBuilder().build());
+        
FileSystemViewStorageConfig.newBuilder().build().newBuilder().build().getRocksdbBasePath());
     colFamilies.stream().forEach(family -> dbManager.dropColumnFamily(family));
     colFamilies.stream().forEach(family -> dbManager.addColumnFamily(family));
 
diff --git 
a/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestRocksDbBasedMap.java
 
b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestRocksDbBasedMap.java
new file mode 100644
index 0000000..e91625d
--- /dev/null
+++ 
b/hoodie-common/src/test/java/com/uber/hoodie/common/util/collection/TestRocksDbBasedMap.java
@@ -0,0 +1,56 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.common.util.collection;
+
+import com.uber.hoodie.common.model.HoodieRecord;
+import com.uber.hoodie.common.model.HoodieRecordPayload;
+import com.uber.hoodie.common.util.SchemaTestUtil;
+import com.uber.hoodie.common.util.SpillableMapTestUtils;
+import java.io.IOException;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TestRocksDbBasedMap {
+
+  private static final String BASE_OUTPUT_PATH = "/tmp/";
+
+  @Test
+  public void testSimple() throws IOException, URISyntaxException {
+    RocksDBBasedMap records = new RocksDBBasedMap(BASE_OUTPUT_PATH);
+    List<IndexedRecord> iRecords = SchemaTestUtil.generateHoodieTestRecords(0, 
100);
+    ((GenericRecord) 
iRecords.get(0)).get(HoodieRecord.COMMIT_TIME_METADATA_FIELD).toString();
+    List<String> recordKeys = SpillableMapTestUtils.upsertRecords(iRecords, 
records);
+
+    // make sure records have spilled to disk
+    Iterator<HoodieRecord<? extends HoodieRecordPayload>> itr = 
records.iterator();
+    List<HoodieRecord> oRecords = new ArrayList<>();
+    while (itr.hasNext()) {
+      HoodieRecord<? extends HoodieRecordPayload> rec = itr.next();
+      oRecords.add(rec);
+      assert recordKeys.contains(rec.getRecordKey());
+    }
+    Assert.assertEquals(recordKeys.size(), oRecords.size());
+  }
+}
diff --git a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java 
b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
index 3401266..e7b9494 100644
--- a/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
+++ b/hoodie-spark/src/main/java/com/uber/hoodie/DataSourceUtils.java
@@ -18,6 +18,7 @@
 
 package com.uber.hoodie;
 
+import com.uber.hoodie.client.embedded.EmbeddedTimelineService;
 import com.uber.hoodie.common.model.HoodieKey;
 import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.model.HoodieRecordPayload;
@@ -38,6 +39,7 @@ import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
 import java.util.Map;
+import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.avro.Schema.Field;
 import org.apache.avro.generic.GenericRecord;
@@ -182,10 +184,10 @@ public class DataSourceUtils {
   @SuppressWarnings("unchecked")
   public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
       JavaRDD<HoodieRecord> incomingHoodieRecords,
-      HoodieWriteConfig writeConfig) throws Exception {
+      HoodieWriteConfig writeConfig, Optional<EmbeddedTimelineService> 
timelineService) throws Exception {
     HoodieReadClient client = null;
     try {
-      client = new HoodieReadClient<>(jssc, writeConfig);
+      client = new HoodieReadClient<>(jssc, writeConfig, timelineService);
       return client.tagLocation(incomingHoodieRecords)
           .filter(r -> !((HoodieRecord<HoodieRecordPayload>) 
r).isCurrentLocationKnown());
     } catch (DatasetNotFoundException e) {
@@ -202,12 +204,14 @@ public class DataSourceUtils {
   @SuppressWarnings("unchecked")
   public static JavaRDD<HoodieRecord> dropDuplicates(JavaSparkContext jssc,
                                                      JavaRDD<HoodieRecord> 
incomingHoodieRecords,
-                                                     Map<String, String> 
parameters) throws Exception {
+                                                     Map<String, String> 
parameters,
+                                                     
Optional<EmbeddedTimelineService> timelineService)
+      throws Exception {
     HoodieWriteConfig writeConfig = HoodieWriteConfig
         .newBuilder()
         .withPath(parameters.get("path"))
         .withProps(parameters).build();
-    return dropDuplicates(jssc, incomingHoodieRecords, writeConfig);
+    return dropDuplicates(jssc, incomingHoodieRecords, writeConfig, 
timelineService);
   }
 
   public static HiveSyncConfig buildHiveSyncConfig(TypedProperties props, 
String basePath) {
diff --git 
a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala 
b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
index e4f6fc3..35c19aa 100644
--- a/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
+++ b/hoodie-spark/src/main/scala/com/uber/hoodie/HoodieSparkSqlWriter.scala
@@ -32,7 +32,7 @@ import org.apache.hadoop.hive.conf.HiveConf
 import org.apache.log4j.LogManager
 import org.apache.spark.api.java.JavaSparkContext
 import org.apache.spark.rdd.RDD
-import org.apache.spark.sql.{DataFrame, SaveMode, SQLContext}
+import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode}
 
 import scala.collection.JavaConversions._
 import scala.collection.mutable.ListBuffer
@@ -98,21 +98,6 @@ private[hoodie] object HoodieSparkSqlWriter {
 
     val jsc = new JavaSparkContext(sparkContext)
 
-    val hoodieRecords =
-      if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
-        DataSourceUtils.dropDuplicates(
-          jsc,
-          hoodieAllIncomingRecords,
-          mapAsJavaMap(parameters))
-      } else {
-        hoodieAllIncomingRecords
-      }
-
-    if (hoodieRecords.isEmpty()) {
-      log.info("new batch has no new records, skipping...")
-      return (true, None)
-    }
-
     val basePath = new Path(parameters("path"))
     val fs = basePath.getFileSystem(sparkContext.hadoopConfiguration)
     var exists = fs.exists(basePath)
@@ -141,6 +126,22 @@ private[hoodie] object HoodieSparkSqlWriter {
     val client = DataSourceUtils.createHoodieClient(jsc, schema.toString, 
path.get, tblName.get,
       mapAsJavaMap(parameters)
     )
+
+    val hoodieRecords =
+      if (parameters(INSERT_DROP_DUPS_OPT_KEY).toBoolean) {
+        DataSourceUtils.dropDuplicates(
+          jsc,
+          hoodieAllIncomingRecords,
+          mapAsJavaMap(parameters), client.getTimelineServer)
+      } else {
+        hoodieAllIncomingRecords
+      }
+
+    if (hoodieRecords.isEmpty()) {
+      log.info("new batch has no new records, skipping...")
+      return (true, None)
+    }
+
     val commitTime = client.startCommit()
 
     val writeStatuses = DataSourceUtils.doWriteOperation(client, 
hoodieRecords, commitTime, operation)
diff --git a/hoodie-spark/src/test/scala/DataSourceTest.scala 
b/hoodie-spark/src/test/scala/DataSourceTest.scala
index 6a93498..66c0ac7 100644
--- a/hoodie-spark/src/test/scala/DataSourceTest.scala
+++ b/hoodie-spark/src/test/scala/DataSourceTest.scala
@@ -236,8 +236,9 @@ class DataSourceTest extends AssertionsForJUnit {
 
       inputDF2.write.mode(SaveMode.Append).json(sourcePath)
       // wait for spark streaming to process one microbatch
-      Thread.sleep(3000)
+      Thread.sleep(10000)
       val commitInstantTime2: String = 
HoodieDataSourceHelpers.latestCommit(fs, destPath)
+
       assertEquals(2, HoodieDataSourceHelpers.listCommitsSince(fs, destPath, 
"000").size())
       // Read RO View
       val hoodieROViewDF2 = spark.read.format("com.uber.hoodie")
diff --git a/hoodie-utilities/pom.xml b/hoodie-utilities/pom.xml
index 4179837..2625cc9 100644
--- a/hoodie-utilities/pom.xml
+++ b/hoodie-utilities/pom.xml
@@ -44,6 +44,18 @@
           <target>1.8</target>
         </configuration>
       </plugin>
+      <plugin>
+        <groupId>org.apache.maven.plugins</groupId>
+        <artifactId>maven-jar-plugin</artifactId>
+        <version>3.1.2</version>
+        <executions>
+          <execution>
+            <goals>
+              <goal>test-jar</goal>
+            </goals>
+          </execution>
+        </executions>
+      </plugin>
     </plugins>
 
     <resources>
@@ -218,10 +230,13 @@
       <artifactId>commons-dbcp</artifactId>
     </dependency>
     <dependency>
+      <groupId>commons-lang</groupId>
+      <artifactId>commons-lang</artifactId>
+    </dependency>
+    <dependency>
       <groupId>commons-pool</groupId>
       <artifactId>commons-pool</artifactId>
     </dependency>
-
     <dependency>
       <groupId>org.apache.httpcomponents</groupId>
       <artifactId>httpcore</artifactId>
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java
index 730777c..9e6204e 100644
--- a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java
+++ b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/UtilHelpers.java
@@ -37,7 +37,9 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.io.StringReader;
 import java.nio.ByteBuffer;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -132,7 +134,11 @@ public class UtilHelpers {
   }
 
   private static SparkConf buildSparkConf(String appName, String 
defaultMaster) {
-    SparkConf sparkConf = new SparkConf().setAppName(appName);
+    return buildSparkConf(appName, defaultMaster, new HashMap<>());
+  }
+
+  private static SparkConf buildSparkConf(String appName, String 
defaultMaster, Map<String, String> additionalConfigs) {
+    final SparkConf sparkConf = new SparkConf().setAppName(appName);
     String master = sparkConf.get("spark.master", defaultMaster);
     sparkConf.setMaster(master);
     if (master.startsWith("yarn")) {
@@ -147,8 +153,13 @@ public class UtilHelpers {
         "org.apache.hadoop.io.compress.GzipCodec");
     sparkConf.set("spark.hadoop.mapred.output.compression.type", "BLOCK");
 
-    sparkConf = HoodieWriteClient.registerClasses(sparkConf);
-    return sparkConf;
+    additionalConfigs.entrySet().forEach(e -> sparkConf.set(e.getKey(), 
e.getValue()));
+    SparkConf newSparkConf = HoodieWriteClient.registerClasses(sparkConf);
+    return newSparkConf;
+  }
+
+  public static JavaSparkContext buildSparkContext(String appName, String 
defaultMaster, Map<String, String> configs) {
+    return new JavaSparkContext(buildSparkConf(appName, defaultMaster, 
configs));
   }
 
   public static JavaSparkContext buildSparkContext(String appName, String 
defaultMaster) {
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/AbstractDeltaStreamerService.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/AbstractDeltaStreamerService.java
new file mode 100644
index 0000000..179fdbd
--- /dev/null
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/AbstractDeltaStreamerService.java
@@ -0,0 +1,146 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.utilities.deltastreamer;
+
+import com.uber.hoodie.common.util.collection.Pair;
+import java.io.Serializable;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Base Class for running delta-sync/compaction in separate thread and 
controlling their life-cyle
+ */
+public abstract class AbstractDeltaStreamerService implements Serializable {
+
+  protected static volatile Logger log = 
LogManager.getLogger(AbstractDeltaStreamerService.class);
+
+  // Flag to track if the service is started.
+  private boolean started;
+  // Flag indicating shutdown is externally requested
+  private boolean shutdownRequested;
+  // Flag indicating the service is shutdown
+  private volatile boolean shutdown;
+  // Executor Service for running delta-sync/compaction
+  private transient ExecutorService executor;
+  // Future tracking delta-sync/compaction
+  private transient CompletableFuture future;
+
+  AbstractDeltaStreamerService() {
+    shutdownRequested = false;
+  }
+
+  boolean isShutdownRequested() {
+    return shutdownRequested;
+  }
+
+  boolean isShutdown() {
+    return shutdown;
+  }
+
+  /**
+   * Wait till the service shutdown. If the service shutdown with exception, 
it will be thrown
+   * @throws ExecutionException
+   * @throws InterruptedException
+   */
+  void waitForShutdown() throws ExecutionException, InterruptedException {
+    try {
+      future.get();
+    } catch (ExecutionException ex) {
+      log.error("Service shutdown with error", ex);
+      throw ex;
+    }
+  }
+
+  /**
+   * Request shutdown either forcefully or gracefully. Graceful shutdown 
allows the service to finish up the current
+   * round of work and shutdown. For graceful shutdown, it waits till the 
service is shutdown
+   * @param force Forcefully shutdown
+   */
+  void shutdown(boolean force) {
+    if (!shutdownRequested || force) {
+      shutdownRequested = true;
+      if (executor != null) {
+        if (force) {
+          executor.shutdownNow();
+        } else {
+          executor.shutdown();
+          try {
+            // Wait for some max time after requesting shutdown
+            executor.awaitTermination(24, TimeUnit.HOURS);
+          } catch (InterruptedException ie) {
+            log.error("Interrupted while waiting for shutdown", ie);
+          }
+        }
+      }
+    }
+  }
+
+  /**
+   * Start the service. Runs the service in a different thread and returns. 
Also starts a monitor thread
+   * to run-callbacks in case of shutdown
+   * @param onShutdownCallback
+   */
+  public void start(Function<Boolean, Boolean> onShutdownCallback) {
+    Pair<CompletableFuture, ExecutorService> res = startService();
+    future = res.getKey();
+    executor = res.getValue();
+    started = true;
+    monitorThreads(onShutdownCallback);
+  }
+
+  /**
+   * Service implementation
+   * @return
+   */
+  protected abstract Pair<CompletableFuture, ExecutorService> startService();
+
+  /**
+   * A monitor thread is started which would trigger a callback if the service 
is shutdown
+   * @param onShutdownCallback
+   */
+  private void monitorThreads(Function<Boolean, Boolean> onShutdownCallback) {
+    log.info("Submitting monitor thread !!");
+    Executors.newSingleThreadExecutor().submit(() -> {
+      boolean error = false;
+      try {
+        log.info("Monitoring thread(s) !!");
+        future.get();
+      } catch (ExecutionException ex) {
+        log.error("Monitor noticed one or more threads failed."
+            + " Requesting graceful shutdown of other threads", ex);
+        error = true;
+        shutdown(false);
+      } catch (InterruptedException ie) {
+        log.error("Got interrupted Monitoring threads", ie);
+        error = true;
+        shutdown(false);
+      } finally {
+        // Mark as shutdown
+        shutdown = true;
+        onShutdownCallback.apply(error);
+      }
+    });
+  }
+}
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/Compactor.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/Compactor.java
new file mode 100644
index 0000000..d72d7da
--- /dev/null
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/Compactor.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package com.uber.hoodie.utilities.deltastreamer;
+
+import com.uber.hoodie.HoodieWriteClient;
+import com.uber.hoodie.WriteStatus;
+import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.exception.HoodieException;
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.Optional;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+
+/**
+ * Run one round of compaction
+ */
+public class Compactor implements Serializable {
+
+  protected static volatile Logger log = LogManager.getLogger(Compactor.class);
+
+  private transient HoodieWriteClient compactionClient;
+  private transient JavaSparkContext jssc;
+
+  public Compactor(HoodieWriteClient compactionClient, JavaSparkContext jssc) {
+    this.jssc = jssc;
+    this.compactionClient = compactionClient;
+  }
+
+  public void compact(HoodieInstant instant) throws IOException {
+    log.info("Compactor executing compaction " + instant);
+    JavaRDD<WriteStatus> res = 
compactionClient.compact(instant.getTimestamp());
+    long numWriteErrors = res.collect().stream().filter(r -> 
r.hasErrors()).count();
+    if (numWriteErrors != 0) {
+      // We treat even a single error in compaction as fatal
+      log.error("Compaction for instant (" + instant + ") failed with write 
errors. "
+          + "Errors :" + numWriteErrors);
+      throw new HoodieException("Compaction for instant (" + instant + ") 
failed with write errors. "
+          + "Errors :" + numWriteErrors);
+    }
+    // Commit compaction
+    compactionClient.commitCompaction(instant.getTimestamp(), res, 
Optional.empty());
+  }
+}
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
similarity index 52%
copy from 
hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
copy to 
hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
index 7e76ff5..c372b3c 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/DeltaSync.java
@@ -21,17 +21,11 @@ package com.uber.hoodie.utilities.deltastreamer;
 import static 
com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
 import static 
com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
 
-import com.beust.jcommander.IStringConverter;
-import com.beust.jcommander.JCommander;
-import com.beust.jcommander.Parameter;
-import com.beust.jcommander.ParameterException;
 import com.codahale.metrics.Timer;
 import com.uber.hoodie.AvroConversionUtils;
 import com.uber.hoodie.DataSourceUtils;
 import com.uber.hoodie.HoodieWriteClient;
 import com.uber.hoodie.KeyGenerator;
-import com.uber.hoodie.OverwriteWithLatestAvroPayload;
-import com.uber.hoodie.SimpleKeyGenerator;
 import com.uber.hoodie.WriteStatus;
 import com.uber.hoodie.common.model.HoodieCommitMetadata;
 import com.uber.hoodie.common.model.HoodieRecord;
@@ -40,29 +34,29 @@ import com.uber.hoodie.common.model.HoodieTableType;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
-import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.TypedProperties;
+import com.uber.hoodie.common.util.collection.Pair;
 import com.uber.hoodie.config.HoodieCompactionConfig;
 import com.uber.hoodie.config.HoodieIndexConfig;
 import com.uber.hoodie.config.HoodieWriteConfig;
+import com.uber.hoodie.exception.HoodieException;
 import com.uber.hoodie.hive.HiveSyncConfig;
 import com.uber.hoodie.hive.HiveSyncTool;
 import com.uber.hoodie.index.HoodieIndex;
-import com.uber.hoodie.utilities.HiveIncrementalPuller;
 import com.uber.hoodie.utilities.UtilHelpers;
+import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
 import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
 import com.uber.hoodie.utilities.schema.RowBasedSchemaProvider;
 import com.uber.hoodie.utilities.schema.SchemaProvider;
 import com.uber.hoodie.utilities.sources.InputBatch;
-import com.uber.hoodie.utilities.sources.JsonDFSSource;
 import com.uber.hoodie.utilities.transform.Transformer;
 import java.io.IOException;
 import java.io.Serializable;
-import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Function;
 import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.conf.Configuration;
@@ -78,19 +72,19 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
 import scala.collection.JavaConversions;
 
+
 /**
- * An Utility which can incrementally take the output from {@link 
HiveIncrementalPuller} and apply
- * it to the target dataset. Does not maintain any state, queries at runtime 
to see how far behind
- * the target dataset is from the source dataset. This can be overriden to 
force sync from a
- * timestamp.
+ * Sync's one batch of data to hoodie dataset
  */
-public class HoodieDeltaStreamer implements Serializable {
-
-  private static volatile Logger log = 
LogManager.getLogger(HoodieDeltaStreamer.class);
+public class DeltaSync implements Serializable {
 
+  protected static volatile Logger log = LogManager.getLogger(DeltaSync.class);
   public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
 
-  private final Config cfg;
+  /**
+   * Delta Sync Config
+   */
+  private final HoodieDeltaStreamer.Config cfg;
 
   /**
    * Source to pull deltas from
@@ -98,8 +92,7 @@ public class HoodieDeltaStreamer implements Serializable {
   private transient SourceFormatAdapter formatAdapter;
 
   /**
-   * Schema provider that supplies the command for reading the input and 
writing out the target
-   * table.
+   * Schema provider that supplies the command for reading the input and 
writing out the target table.
    */
   private transient SchemaProvider schemaProvider;
 
@@ -119,11 +112,6 @@ public class HoodieDeltaStreamer implements Serializable {
   private transient FileSystem fs;
 
   /**
-   * Timeline with completed commits
-   */
-  private transient Optional<HoodieTimeline> commitTimelineOpt;
-
-  /**
    * Spark context
    */
   private transient JavaSparkContext jssc;
@@ -141,49 +129,114 @@ public class HoodieDeltaStreamer implements Serializable 
{
   /**
    * Bag of properties with source, hoodie client, key generator etc.
    */
-  TypedProperties props;
+  private final TypedProperties props;
 
-  public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws 
IOException {
-    this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, 
jssc.hadoopConfiguration()),
-        getDefaultHiveConf(jssc.hadoopConfiguration()));
-  }
+  /**
+   * Callback when write client is instantiated
+   */
+  private transient Function<HoodieWriteClient, Boolean> 
onInitializingHoodieWriteClient;
+
+  /**
+   * Timeline with completed commits
+   */
+  private transient Optional<HoodieTimeline> commitTimelineOpt;
+
+  /**
+   * Write Client
+   */
+  private transient HoodieWriteClient writeClient;
+
+  /**
+   * Table Type
+   */
+  private final HoodieTableType tableType;
+
+  public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession,
+      SchemaProvider schemaProvider, HoodieTableType tableType, 
TypedProperties props,
+      JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
+      Function<HoodieWriteClient, Boolean> onInitializingHoodieWriteClient)
+      throws IOException {
 
-  public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
HiveConf hiveConf) throws IOException {
     this.cfg = cfg;
     this.jssc = jssc;
-    this.sparkSession = 
SparkSession.builder().config(jssc.getConf()).getOrCreate();
-    this.fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration());
+    this.sparkSession = sparkSession;
+    this.fs = fs;
+    this.tableType = tableType;
+    this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
+    this.props = props;
+    log.info("Creating delta streamer with configs : " + props.toString());
+    this.schemaProvider = schemaProvider;
+
+    refreshTimeline();
 
+    this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
+    this.keyGenerator = 
DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
+
+    this.formatAdapter = new 
SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
+        sparkSession, schemaProvider));
+
+    this.hiveConf = hiveConf;
+    if (cfg.filterDupes) {
+      cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
+    }
+
+    // If schemaRegistry already resolved, setup write-client
+    setupWriteClient();
+  }
+
+  /**
+   * Refresh Timeline
+   */
+  private void refreshTimeline() throws IOException {
     if (fs.exists(new Path(cfg.targetBasePath))) {
-      HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
cfg.targetBasePath);
+      HoodieTableMetaClient meta = new HoodieTableMetaClient(new 
Configuration(fs.getConf()), cfg.targetBasePath);
       this.commitTimelineOpt = 
Optional.of(meta.getActiveTimeline().getCommitsTimeline()
           .filterCompletedInstants());
     } else {
       this.commitTimelineOpt = Optional.empty();
+      HoodieTableMetaClient.initTableType(new 
Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
+          cfg.storageType, cfg.targetTableName, "archived");
     }
+  }
 
-    this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
-    log.info("Creating delta streamer with configs : " + props.toString());
-    this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
-    this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
-    this.keyGenerator = 
DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
+  /**
+   * Run one round of delta sync and return new compaction instant if one got 
scheduled
+   */
+  public Optional<String> syncOnce() throws Exception {
+    Optional<String> scheduledCompaction = Optional.empty();
+    HoodieDeltaStreamerMetrics metrics = new 
HoodieDeltaStreamerMetrics(getHoodieClientConfig(schemaProvider));
+    Timer.Context overallTimerContext = metrics.getOverallTimerContext();
 
-    this.formatAdapter =
-        new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, 
props, jssc, sparkSession,
-            schemaProvider));
+    // Refresh Timeline
+    refreshTimeline();
 
-    this.hiveConf = hiveConf;
-  }
+    Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
srcRecordsWithCkpt =
+        readFromSource(commitTimelineOpt);
+
+    if (null != srcRecordsWithCkpt) {
+      // this is the first input batch. If schemaProvider not set, use it and 
register Avro Schema and start
+      // compactor
+      if (null == schemaProvider) {
+        // Set the schemaProvider if not user-provided
+        this.schemaProvider = srcRecordsWithCkpt.getKey();
+        // Setup HoodieWriteClient and compaction now that we decided on schema
+        setupWriteClient();
+      }
+
+      scheduledCompaction = 
writeToSink(srcRecordsWithCkpt.getRight().getRight(),
+          srcRecordsWithCkpt.getRight().getLeft(), metrics, 
overallTimerContext);
+    }
 
-  private static HiveConf getDefaultHiveConf(Configuration cfg) {
-    HiveConf hiveConf = new HiveConf();
-    hiveConf.addResource(cfg);
-    return hiveConf;
+    // Clear persistent RDDs
+    jssc.getPersistentRDDs().values().forEach(JavaRDD::unpersist);
+    return scheduledCompaction;
   }
 
-  public void sync() throws Exception {
-    HoodieDeltaStreamerMetrics metrics = new 
HoodieDeltaStreamerMetrics(getHoodieClientConfig(null));
-    Timer.Context overallTimerContext = metrics.getOverallTimerContext();
+  /**
+   * Read from Upstream Source and apply transformation if needed
+   */
+  private Pair<SchemaProvider, Pair<String, JavaRDD<HoodieRecord>>> 
readFromSource(
+      Optional<HoodieTimeline> commitTimelineOpt) throws Exception {
     // Retrieve the previous round checkpoints, if any
     Optional<String> resumeCheckpointStr = Optional.empty();
     if (commitTimelineOpt.isPresent()) {
@@ -200,7 +253,7 @@ public class HoodieDeltaStreamer implements Serializable {
         }
       }
     } else {
-      HoodieTableMetaClient.initTableType(jssc.hadoopConfiguration(), 
cfg.targetBasePath,
+      HoodieTableMetaClient.initTableType(new 
Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath,
           cfg.storageType, cfg.targetTableName, "archived");
     }
     log.info("Checkpoint to resume from : " + resumeCheckpointStr);
@@ -218,11 +271,11 @@ public class HoodieDeltaStreamer implements Serializable {
           dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, 
sparkSession, data, props));
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       avroRDDOptional = transformed.map(t ->
-         AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD()
+          AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD()
       );
       // Use Transformed Row's schema if not overridden
       schemaProvider =
-          this.schemaProvider == null ? transformed.map(r -> 
(SchemaProvider)new RowBasedSchemaProvider(r.schema()))
+          this.schemaProvider == null ? transformed.map(r -> (SchemaProvider) 
new RowBasedSchemaProvider(r.schema()))
               .orElse(dataAndCheckpoint.getSchemaProvider()) : 
this.schemaProvider;
     } else {
       // Pull the data from the source & prepare the write
@@ -235,43 +288,54 @@ public class HoodieDeltaStreamer implements Serializable {
 
     if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
       log.info("No new data, nothing to commit.. ");
-      return;
+      return null;
     }
 
-    registerAvroSchemas(schemaProvider);
-
     JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
     JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
       HoodieRecordPayload payload = 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-          (Comparable) DataSourceUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField));
+          (Comparable) gr.get(cfg.sourceOrderingField));
       return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
     });
+    return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
+  }
+
+  /**
+   * Perform Hoodie Write. Run Cleaner, schedule compaction and syncs to hive 
if needed
+   *
+   * @param records Input Records
+   * @param checkpointStr Checkpoint String
+   * @param metrics Metrics
+   * @return Optional Compaction instant if one is scheduled
+   */
+  private Optional<String> writeToSink(JavaRDD<HoodieRecord> records, String 
checkpointStr,
+      HoodieDeltaStreamerMetrics metrics, Timer.Context overallTimerContext) 
throws Exception {
+
+    Optional<String> scheduledCompactionInstant = Optional.empty();
 
     // filter dupes if needed
-    HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
     if (cfg.filterDupes) {
       // turn upserts to insert
       cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
-      records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg);
+      records = DataSourceUtils.dropDuplicates(jssc, records, 
writeClient.getConfig(),
+          writeClient.getTimelineServer());
 
       if (records.isEmpty()) {
         log.info("No new data, nothing to commit.. ");
-        return;
+        return Optional.empty();
       }
     }
 
-    // Perform the write
-    HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, true);
-    String commitTime = client.startCommit();
+    String commitTime = startCommit();
     log.info("Starting commit  : " + commitTime);
 
     JavaRDD<WriteStatus> writeStatusRDD;
     if (cfg.operation == Operation.INSERT) {
-      writeStatusRDD = client.insert(records, commitTime);
+      writeStatusRDD = writeClient.insert(records, commitTime);
     } else if (cfg.operation == Operation.UPSERT) {
-      writeStatusRDD = client.upsert(records, commitTime);
+      writeStatusRDD = writeClient.upsert(records, commitTime);
     } else if (cfg.operation == Operation.BULK_INSERT) {
-      writeStatusRDD = client.bulkInsert(records, commitTime);
+      writeStatusRDD = writeClient.bulkInsert(records, commitTime);
     } else {
       throw new HoodieDeltaStreamerException("Unknown operation :" + 
cfg.operation);
     }
@@ -289,19 +353,27 @@ public class HoodieDeltaStreamer implements Serializable {
             + totalErrorRecords + "/" + totalRecords);
       }
 
-      boolean success = client.commit(commitTime, writeStatusRDD,
+      boolean success = writeClient.commit(commitTime, writeStatusRDD,
           Optional.of(checkpointCommitMetadata));
       if (success) {
         log.info("Commit " + commitTime + " successful!");
+
+        // Schedule compaction if needed
+        if (tableType.equals(HoodieTableType.MERGE_ON_READ) && 
cfg.continuousMode) {
+          scheduledCompactionInstant = writeClient
+              .scheduleCompaction(Optional.of(checkpointCommitMetadata));
+        }
+
         // Sync to hive if enabled
         Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
         syncHive();
         hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
       } else {
         log.info("Commit " + commitTime + " failed!");
+        throw new HoodieException("Commit " + commitTime + " failed!");
       }
     } else {
-      log.error("There are errors when ingesting records. Errors/Total="
+      log.error("Delta Sync found errors when writing. Errors/Total="
           + totalErrorRecords + "/" + totalRecords);
       log.error("Printing out the top 100 errors");
       writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
@@ -311,15 +383,43 @@ public class HoodieDeltaStreamer implements Serializable {
               log.trace("Error for key:" + r.getKey() + " is " + 
r.getValue()));
         }
       });
+      // Rolling back instant
+      writeClient.rollback(commitTime);
+      throw new HoodieException("Commit " + commitTime + " failed and 
rolled-back !");
     }
-    client.close();
     long overallTimeMs = overallTimerContext != null ? 
overallTimerContext.stop() : 0;
 
     // Send DeltaStreamer Metrics
     metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
+
+    return scheduledCompactionInstant;
   }
 
-  public void syncHive() {
+  private String startCommit() {
+    final int maxRetries = 2;
+    int retryNum = 1;
+    RuntimeException lastException = null;
+    while (retryNum <= maxRetries) {
+      try {
+        return writeClient.startCommit();
+      } catch (IllegalArgumentException ie) {
+        lastException = ie;
+        log.error("Got error trying to start a new commit. Retrying after 
sleeping for a sec", ie);
+        retryNum++;
+        try {
+          Thread.sleep(1000);
+        } catch (InterruptedException e) {
+          //No-Op
+        }
+      }
+    }
+    throw lastException;
+  }
+
+  /**
+   * Sync to Hive
+   */
+  private void syncHive() {
     if (cfg.enableHiveSync) {
       HiveSyncConfig hiveSyncConfig = 
DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
       log.info("Syncing target hoodie table with hive table(" + 
hiveSyncConfig.tableName
@@ -330,30 +430,38 @@ public class HoodieDeltaStreamer implements Serializable {
   }
 
   /**
-   * Register Avro Schemas
-   * @param schemaProvider Schema Provider
+   * Note that depending on configs and source-type, schemaProvider could 
either be eagerly or lazily created.
+   * SchemaProvider creation is a precursor to HoodieWriteClient and 
AsyncCompactor creation. This method takes care of
+   * this constraint.
    */
-  private void registerAvroSchemas(SchemaProvider schemaProvider) {
-    // register the schemas, so that shuffle does not serialize the full 
schemas
-    if (null != schemaProvider) {
-      List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), 
schemaProvider.getTargetSchema());
-      log.info("Registering Schema :" + schemas);
-      
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
+  public void setupWriteClient() {
+    log.info("Setting up Hoodie Write Client");
+    if ((null != schemaProvider) && (null == writeClient)) {
+      registerAvroSchemas(schemaProvider);
+      HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
+      writeClient = new HoodieWriteClient<>(jssc, hoodieCfg, true);
+      onInitializingHoodieWriteClient.apply(writeClient);
     }
   }
 
+  /**
+   * Helper to construct Write Client config
+   *
+   * @param schemaProvider Schema Provider
+   */
   private HoodieWriteConfig getHoodieClientConfig(SchemaProvider 
schemaProvider) {
     HoodieWriteConfig.Builder builder =
-        HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath)
-            .withAutoCommit(false).combineInput(cfg.filterDupes, true)
+        HoodieWriteConfig.newBuilder()
+            .withProps(props)
+            .withPath(cfg.targetBasePath)
+            .combineInput(cfg.filterDupes, true)
             .withCompactionConfig(HoodieCompactionConfig.newBuilder()
                 .withPayloadClass(cfg.payloadClassName)
-                // turn on inline compaction by default, for MOR tables
-                .withInlineCompaction(HoodieTableType.valueOf(cfg.storageType) 
== HoodieTableType.MERGE_ON_READ)
-                .build())
+                // Inline compaction is disabled for continuous mode. 
otherwise enabled for MOR
+                .withInlineCompaction(!cfg.continuousMode && 
tableType.equals(HoodieTableType.MERGE_ON_READ)).build())
             .forTable(cfg.targetTableName)
             
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-            .withProps(props);
+            .withAutoCommit(false);
     if (null != schemaProvider) {
       builder = 
builder.withSchema(schemaProvider.getTargetSchema().toString());
     }
@@ -361,151 +469,27 @@ public class HoodieDeltaStreamer implements Serializable 
{
     return builder.build();
   }
 
-  public enum Operation {
-    UPSERT, INSERT, BULK_INSERT
-  }
-
-  private static class OperationConvertor implements 
IStringConverter<Operation> {
-    @Override
-    public Operation convert(String value) throws ParameterException {
-      return Operation.valueOf(value);
+  /**
+   * Register Avro Schemas
+   *
+   * @param schemaProvider Schema Provider
+   */
+  private void registerAvroSchemas(SchemaProvider schemaProvider) {
+    // register the schemas, so that shuffle does not serialize the full 
schemas
+    if (null != schemaProvider) {
+      List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), 
schemaProvider.getTargetSchema());
+      log.info("Registering Schema :" + schemas);
+      
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
     }
   }
 
-  public static class Config implements Serializable {
-
-    @Parameter(names = {"--target-base-path"}, description = "base path for 
the target hoodie dataset. "
-        + "(Will be created if did not exist first time around. If exists, 
expected to be a hoodie dataset)",
-        required = true)
-    public String targetBasePath;
-
-    // TODO: How to obtain hive configs to register?
-    @Parameter(names = {"--target-table"}, description = "name of the target 
table in Hive", required = true)
-    public String targetTableName;
-
-    @Parameter(names = {"--storage-type"}, description = "Type of Storage. "
-        + "COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
-    public String storageType;
-
-    @Parameter(names = {"--props"}, description = "path to properties file on 
localfs or dfs, with configurations for "
-        + "hoodie client, schema provider, key generator and data source. For 
hoodie client props, sane defaults are "
-        + "used, but recommend use to provide basic things like metrics 
endpoints, hive configs etc. For sources, refer"
-        + "to individual classes, for supported properties.")
-    public String propsFilePath =
-        "file://" + System.getProperty("user.dir") + 
"/src/test/resources/delta-streamer-config/dfs-source.properties";
-
-    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration 
that can be set in the properties file "
-        + "(using the CLI parameter \"--propsFilePath\") can also be passed 
command line using this parameter")
-    public List<String> configs = new ArrayList<>();
-
-    @Parameter(names = {"--source-class"}, description = "Subclass of 
com.uber.hoodie.utilities.sources to read data. "
-        + "Built-in options: com.uber.hoodie.utilities.sources.{JsonDFSSource 
(default), AvroDFSSource, "
-        + "JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
-    public String sourceClassName = JsonDFSSource.class.getName();
-
-    @Parameter(names = {"--source-ordering-field"}, description = "Field 
within source record to decide how"
-        + " to break ties between records with same key in input data. 
Default: 'ts' holding unix timestamp of record")
-    public String sourceOrderingField = "ts";
-
-    @Parameter(names = {"--key-generator-class"}, description = "Subclass of 
com.uber.hoodie.KeyGenerator "
-        + "to generate a HoodieKey from the given avro record. Built in: 
SimpleKeyGenerator (uses "
-        + "provided field names as recordkey & partitionpath. Nested fields 
specified via dot notation, e.g: a.b.c)")
-    public String keyGeneratorClass = SimpleKeyGenerator.class.getName();
-
-    @Parameter(names = {"--payload-class"}, description = "subclass of 
HoodieRecordPayload, that works off "
-        + "a GenericRecord. Implement your own, if you want to do something 
other than overwriting existing value")
-    public String payloadClassName = 
OverwriteWithLatestAvroPayload.class.getName();
-
-    @Parameter(names = {"--schemaprovider-class"}, description = "subclass of 
com.uber.hoodie.utilities.schema"
-        + ".SchemaProvider to attach schemas to input & target table data, 
built in options: "
-        + "com.uber.hoodie.utilities.schema.FilebasedSchemaProvider."
-        + "Source (See com.uber.hoodie.utilities.sources.Source) 
implementation can implement their own SchemaProvider."
-        + " For Sources that return Dataset<Row>, the schema is obtained 
implicitly. "
-        + "However, this CLI option allows overriding the schemaprovider 
returned by Source.")
-    public String schemaProviderClassName = null;
-
-    @Parameter(names = {"--transformer-class"},
-        description = "subclass of 
com.uber.hoodie.utilities.transform.Transformer"
-        + ". Allows transforming raw source dataset to a target dataset 
(conforming to target schema) before writing."
-            + " Default : Not set. E:g - 
com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer (which allows"
-            + "a SQL query templated to be passed as a transformation 
function)")
-    public String transformerClassName = null;
-
-    @Parameter(names = {"--source-limit"}, description = "Maximum amount of 
data to read from source. "
-        + "Default: No limit For e.g: DFS-Source => max bytes to read, 
Kafka-Source => max events to read")
-    public long sourceLimit = Long.MAX_VALUE;
-
-    @Parameter(names = {"--op"}, description = "Takes one of these values : 
UPSERT (default), INSERT (use when input "
-        + "is purely new data/inserts to gain speed)",
-        converter = OperationConvertor.class)
-    public Operation operation = Operation.UPSERT;
-
-    @Parameter(names = {"--filter-dupes"}, description = "Should duplicate 
records from source be dropped/filtered out"
-        + "before insert/bulk-insert")
-    public Boolean filterDupes = false;
-
-    @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing 
to hive")
-    public Boolean enableHiveSync = false;
-
-    @Parameter(names = {"--spark-master"}, description = "spark master to 
use.")
-    public String sparkMaster = "local[2]";
-
-    @Parameter(names = {"--commit-on-errors"}, description = "Commit even when 
some records failed to be written")
-    public Boolean commitOnErrors = false;
-
-    @Parameter(names = {"--help", "-h"}, help = true)
-    public Boolean help = false;
-  }
-
-  public static void main(String[] args) throws Exception {
-    final Config cfg = new Config();
-    JCommander cmd = new JCommander(cfg, args);
-    if (cfg.help || args.length == 0) {
-      cmd.usage();
-      System.exit(1);
+  /**
+   * Close all resources
+   */
+  public void close() {
+    if (null != writeClient) {
+      writeClient.close();
+      writeClient = null;
     }
-
-    JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + 
cfg.targetTableName, cfg.sparkMaster);
-    new HoodieDeltaStreamer(cfg, jssc).sync();
-  }
-
-  public SourceFormatAdapter getFormatAdapter() {
-    return formatAdapter;
-  }
-
-  public SchemaProvider getSchemaProvider() {
-    return schemaProvider;
-  }
-
-  public Transformer getTransformer() {
-    return transformer;
-  }
-
-  public KeyGenerator getKeyGenerator() {
-    return keyGenerator;
-  }
-
-  public FileSystem getFs() {
-    return fs;
-  }
-
-  public Optional<HoodieTimeline> getCommitTimelineOpt() {
-    return commitTimelineOpt;
-  }
-
-  public JavaSparkContext getJssc() {
-    return jssc;
-  }
-
-  public SparkSession getSparkSession() {
-    return sparkSession;
-  }
-
-  public HiveConf getHiveConf() {
-    return hiveConf;
-  }
-
-  public TypedProperties getProps() {
-    return props;
   }
 }
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
index 7e76ff5..3342bff 100644
--- 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -18,71 +18,68 @@
 
 package com.uber.hoodie.utilities.deltastreamer;
 
-import static 
com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE;
-import static 
com.uber.hoodie.utilities.schema.RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME;
+import static com.uber.hoodie.common.table.HoodieTimeline.COMPACTION_ACTION;
 
 import com.beust.jcommander.IStringConverter;
 import com.beust.jcommander.JCommander;
 import com.beust.jcommander.Parameter;
 import com.beust.jcommander.ParameterException;
-import com.codahale.metrics.Timer;
-import com.uber.hoodie.AvroConversionUtils;
-import com.uber.hoodie.DataSourceUtils;
 import com.uber.hoodie.HoodieWriteClient;
-import com.uber.hoodie.KeyGenerator;
 import com.uber.hoodie.OverwriteWithLatestAvroPayload;
 import com.uber.hoodie.SimpleKeyGenerator;
-import com.uber.hoodie.WriteStatus;
-import com.uber.hoodie.common.model.HoodieCommitMetadata;
-import com.uber.hoodie.common.model.HoodieRecord;
-import com.uber.hoodie.common.model.HoodieRecordPayload;
 import com.uber.hoodie.common.model.HoodieTableType;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
-import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
+import com.uber.hoodie.common.table.timeline.HoodieInstant.State;
+import com.uber.hoodie.common.util.CompactionUtils;
 import com.uber.hoodie.common.util.FSUtils;
 import com.uber.hoodie.common.util.TypedProperties;
-import com.uber.hoodie.config.HoodieCompactionConfig;
-import com.uber.hoodie.config.HoodieIndexConfig;
-import com.uber.hoodie.config.HoodieWriteConfig;
-import com.uber.hoodie.hive.HiveSyncConfig;
-import com.uber.hoodie.hive.HiveSyncTool;
-import com.uber.hoodie.index.HoodieIndex;
+import com.uber.hoodie.common.util.collection.Pair;
+import com.uber.hoodie.exception.HoodieException;
+import com.uber.hoodie.exception.HoodieIOException;
 import com.uber.hoodie.utilities.HiveIncrementalPuller;
 import com.uber.hoodie.utilities.UtilHelpers;
-import com.uber.hoodie.utilities.exception.HoodieDeltaStreamerException;
-import com.uber.hoodie.utilities.schema.RowBasedSchemaProvider;
 import com.uber.hoodie.utilities.schema.SchemaProvider;
-import com.uber.hoodie.utilities.sources.InputBatch;
 import com.uber.hoodie.utilities.sources.JsonDFSSource;
-import com.uber.hoodie.utilities.transform.Transformer;
 import java.io.IOException;
 import java.io.Serializable;
 import java.util.ArrayList;
-import java.util.Arrays;
 import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 import java.util.Optional;
-import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.ReentrantLock;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.hadoop.hive.conf.HiveConf;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
-import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
-import org.apache.spark.sql.Dataset;
-import org.apache.spark.sql.Row;
 import org.apache.spark.sql.SparkSession;
-import scala.collection.JavaConversions;
+
 
 /**
- * An Utility which can incrementally take the output from {@link 
HiveIncrementalPuller} and apply
- * it to the target dataset. Does not maintain any state, queries at runtime 
to see how far behind
- * the target dataset is from the source dataset. This can be overriden to 
force sync from a
- * timestamp.
+ * An Utility which can incrementally take the output from {@link 
HiveIncrementalPuller} and apply it to the target
+ * dataset. Does not maintain any state, queries at runtime to see how far 
behind the target dataset is from the source
+ * dataset. This can be overriden to force sync from a timestamp.
+ *
+ * In continuous mode, DeltaStreamer runs in loop-mode going through the below 
operations
+ *    (a) pull-from-source
+ *    (b) write-to-sink
+ *    (c) Schedule Compactions if needed
+ *    (d) Conditionally Sync to Hive
+ *  each cycle. For MOR table with continuous mode enabled, a seperate 
compactor thread is allocated to execute
+ *  compactions
  */
 public class HoodieDeltaStreamer implements Serializable {
 
@@ -90,58 +87,9 @@ public class HoodieDeltaStreamer implements Serializable {
 
   public static String CHECKPOINT_KEY = "deltastreamer.checkpoint.key";
 
-  private final Config cfg;
-
-  /**
-   * Source to pull deltas from
-   */
-  private transient SourceFormatAdapter formatAdapter;
-
-  /**
-   * Schema provider that supplies the command for reading the input and 
writing out the target
-   * table.
-   */
-  private transient SchemaProvider schemaProvider;
-
-  /**
-   * Allows transforming source to target dataset before writing
-   */
-  private transient Transformer transformer;
-
-  /**
-   * Extract the key for the target dataset
-   */
-  private KeyGenerator keyGenerator;
-
-  /**
-   * Filesystem used
-   */
-  private transient FileSystem fs;
-
-  /**
-   * Timeline with completed commits
-   */
-  private transient Optional<HoodieTimeline> commitTimelineOpt;
+  private final transient Config cfg;
 
-  /**
-   * Spark context
-   */
-  private transient JavaSparkContext jssc;
-
-  /**
-   * Spark Session
-   */
-  private transient SparkSession sparkSession;
-
-  /**
-   * Hive Config
-   */
-  private transient HiveConf hiveConf;
-
-  /**
-   * Bag of properties with source, hoodie client, key generator etc.
-   */
-  TypedProperties props;
+  private transient DeltaSyncService deltaSyncService;
 
   public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc) throws 
IOException {
     this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, 
jssc.hadoopConfiguration()),
@@ -150,29 +98,11 @@ public class HoodieDeltaStreamer implements Serializable {
 
   public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
HiveConf hiveConf) throws IOException {
     this.cfg = cfg;
-    this.jssc = jssc;
-    this.sparkSession = 
SparkSession.builder().config(jssc.getConf()).getOrCreate();
-    this.fs = FSUtils.getFs(cfg.targetBasePath, jssc.hadoopConfiguration());
-
-    if (fs.exists(new Path(cfg.targetBasePath))) {
-      HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
cfg.targetBasePath);
-      this.commitTimelineOpt = 
Optional.of(meta.getActiveTimeline().getCommitsTimeline()
-          .filterCompletedInstants());
-    } else {
-      this.commitTimelineOpt = Optional.empty();
-    }
-
-    this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
-    log.info("Creating delta streamer with configs : " + props.toString());
-    this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
-    this.transformer = UtilHelpers.createTransformer(cfg.transformerClassName);
-    this.keyGenerator = 
DataSourceUtils.createKeyGenerator(cfg.keyGeneratorClass, props);
-
-    this.formatAdapter =
-        new SourceFormatAdapter(UtilHelpers.createSource(cfg.sourceClassName, 
props, jssc, sparkSession,
-            schemaProvider));
+    this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf);
+  }
 
-    this.hiveConf = hiveConf;
+  public void shutdownGracefully() {
+    deltaSyncService.shutdown(false);
   }
 
   private static HiveConf getDefaultHiveConf(Configuration cfg) {
@@ -181,184 +111,27 @@ public class HoodieDeltaStreamer implements Serializable 
{
     return hiveConf;
   }
 
-  public void sync() throws Exception {
-    HoodieDeltaStreamerMetrics metrics = new 
HoodieDeltaStreamerMetrics(getHoodieClientConfig(null));
-    Timer.Context overallTimerContext = metrics.getOverallTimerContext();
-    // Retrieve the previous round checkpoints, if any
-    Optional<String> resumeCheckpointStr = Optional.empty();
-    if (commitTimelineOpt.isPresent()) {
-      Optional<HoodieInstant> lastCommit = 
commitTimelineOpt.get().lastInstant();
-      if (lastCommit.isPresent()) {
-        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata.fromBytes(
-            commitTimelineOpt.get().getInstantDetails(lastCommit.get()).get(), 
HoodieCommitMetadata.class);
-        if (commitMetadata.getMetadata(CHECKPOINT_KEY) != null) {
-          resumeCheckpointStr = 
Optional.of(commitMetadata.getMetadata(CHECKPOINT_KEY));
-        } else {
-          throw new HoodieDeltaStreamerException(
-              "Unable to find previous checkpoint. Please double check if this 
table "
-                  + "was indeed built via delta streamer ");
-        }
-      }
-    } else {
-      HoodieTableMetaClient.initTableType(jssc.hadoopConfiguration(), 
cfg.targetBasePath,
-          cfg.storageType, cfg.targetTableName, "archived");
-    }
-    log.info("Checkpoint to resume from : " + resumeCheckpointStr);
-
-    final Optional<JavaRDD<GenericRecord>> avroRDDOptional;
-    final String checkpointStr;
-    final SchemaProvider schemaProvider;
-    if (transformer != null) {
-      // Transformation is needed. Fetch New rows in Row Format, apply 
transformation and then convert them
-      // to generic records for writing
-      InputBatch<Dataset<Row>> dataAndCheckpoint = 
formatAdapter.fetchNewDataInRowFormat(
-          resumeCheckpointStr, cfg.sourceLimit);
-
-      Optional<Dataset<Row>> transformed =
-          dataAndCheckpoint.getBatch().map(data -> transformer.apply(jssc, 
sparkSession, data, props));
-      checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
-      avroRDDOptional = transformed.map(t ->
-         AvroConversionUtils.createRdd(t, HOODIE_RECORD_STRUCT_NAME, 
HOODIE_RECORD_NAMESPACE).toJavaRDD()
-      );
-      // Use Transformed Row's schema if not overridden
-      schemaProvider =
-          this.schemaProvider == null ? transformed.map(r -> 
(SchemaProvider)new RowBasedSchemaProvider(r.schema()))
-              .orElse(dataAndCheckpoint.getSchemaProvider()) : 
this.schemaProvider;
-    } else {
-      // Pull the data from the source & prepare the write
-      InputBatch<JavaRDD<GenericRecord>> dataAndCheckpoint =
-          formatAdapter.fetchNewDataInAvroFormat(resumeCheckpointStr, 
cfg.sourceLimit);
-      avroRDDOptional = dataAndCheckpoint.getBatch();
-      checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
-      schemaProvider = dataAndCheckpoint.getSchemaProvider();
-    }
-
-    if ((!avroRDDOptional.isPresent()) || (avroRDDOptional.get().isEmpty())) {
-      log.info("No new data, nothing to commit.. ");
-      return;
-    }
-
-    registerAvroSchemas(schemaProvider);
-
-    JavaRDD<GenericRecord> avroRDD = avroRDDOptional.get();
-    JavaRDD<HoodieRecord> records = avroRDD.map(gr -> {
-      HoodieRecordPayload payload = 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
-          (Comparable) DataSourceUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField));
-      return new HoodieRecord<>(keyGenerator.getKey(gr), payload);
-    });
-
-    // filter dupes if needed
-    HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
-    if (cfg.filterDupes) {
-      // turn upserts to insert
-      cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
-      records = DataSourceUtils.dropDuplicates(jssc, records, hoodieCfg);
-
-      if (records.isEmpty()) {
-        log.info("No new data, nothing to commit.. ");
-        return;
-      }
-    }
-
-    // Perform the write
-    HoodieWriteClient client = new HoodieWriteClient<>(jssc, hoodieCfg, true);
-    String commitTime = client.startCommit();
-    log.info("Starting commit  : " + commitTime);
-
-    JavaRDD<WriteStatus> writeStatusRDD;
-    if (cfg.operation == Operation.INSERT) {
-      writeStatusRDD = client.insert(records, commitTime);
-    } else if (cfg.operation == Operation.UPSERT) {
-      writeStatusRDD = client.upsert(records, commitTime);
-    } else if (cfg.operation == Operation.BULK_INSERT) {
-      writeStatusRDD = client.bulkInsert(records, commitTime);
-    } else {
-      throw new HoodieDeltaStreamerException("Unknown operation :" + 
cfg.operation);
-    }
-
-    long totalErrorRecords = writeStatusRDD.mapToDouble(ws -> 
ws.getTotalErrorRecords()).sum().longValue();
-    long totalRecords = writeStatusRDD.mapToDouble(ws -> 
ws.getTotalRecords()).sum().longValue();
-    boolean hasErrors = totalErrorRecords > 0;
-    long hiveSyncTimeMs = 0;
-    if (!hasErrors || cfg.commitOnErrors) {
-      HashMap<String, String> checkpointCommitMetadata = new HashMap<>();
-      checkpointCommitMetadata.put(CHECKPOINT_KEY, checkpointStr);
-
-      if (hasErrors) {
-        log.warn("Some records failed to be merged but forcing commit since 
commitOnErrors set. Errors/Total="
-            + totalErrorRecords + "/" + totalRecords);
-      }
-
-      boolean success = client.commit(commitTime, writeStatusRDD,
-          Optional.of(checkpointCommitMetadata));
-      if (success) {
-        log.info("Commit " + commitTime + " successful!");
-        // Sync to hive if enabled
-        Timer.Context hiveSyncContext = metrics.getHiveSyncTimerContext();
-        syncHive();
-        hiveSyncTimeMs = hiveSyncContext != null ? hiveSyncContext.stop() : 0;
-      } else {
-        log.info("Commit " + commitTime + " failed!");
-      }
-    } else {
-      log.error("There are errors when ingesting records. Errors/Total="
-          + totalErrorRecords + "/" + totalRecords);
-      log.error("Printing out the top 100 errors");
-      writeStatusRDD.filter(ws -> ws.hasErrors()).take(100).forEach(ws -> {
-        log.error("Global error :", ws.getGlobalError());
-        if (ws.getErrors().size() > 0) {
-          ws.getErrors().entrySet().forEach(r ->
-              log.trace("Error for key:" + r.getKey() + " is " + 
r.getValue()));
-        }
-      });
-    }
-    client.close();
-    long overallTimeMs = overallTimerContext != null ? 
overallTimerContext.stop() : 0;
-
-    // Send DeltaStreamer Metrics
-    metrics.updateDeltaStreamerMetrics(overallTimeMs, hiveSyncTimeMs);
-  }
-
-  public void syncHive() {
-    if (cfg.enableHiveSync) {
-      HiveSyncConfig hiveSyncConfig = 
DataSourceUtils.buildHiveSyncConfig(props, cfg.targetBasePath);
-      log.info("Syncing target hoodie table with hive table(" + 
hiveSyncConfig.tableName
-          + "). Hive metastore URL :" + hiveSyncConfig.jdbcUrl + ", basePath 
:" + cfg.targetBasePath);
-
-      new HiveSyncTool(hiveSyncConfig, hiveConf, fs).syncHoodieTable();
-    }
-  }
-
   /**
-   * Register Avro Schemas
-   * @param schemaProvider Schema Provider
+   * Main method to start syncing
+   * @throws Exception
    */
-  private void registerAvroSchemas(SchemaProvider schemaProvider) {
-    // register the schemas, so that shuffle does not serialize the full 
schemas
-    if (null != schemaProvider) {
-      List<Schema> schemas = Arrays.asList(schemaProvider.getSourceSchema(), 
schemaProvider.getTargetSchema());
-      log.info("Registering Schema :" + schemas);
-      
jssc.sc().getConf().registerAvroSchemas(JavaConversions.asScalaBuffer(schemas).toList());
+  public void sync() throws Exception {
+    if (cfg.continuousMode) {
+      deltaSyncService.start(this::onDeltaSyncShutdown);
+      deltaSyncService.waitForShutdown();
+      log.info("Delta Sync shutting down");
+    } else {
+      log.info("Delta Streamer running only single round");
+      deltaSyncService.getDeltaSync().syncOnce();
+      deltaSyncService.close();
+      log.info("Shut down deltastreamer");
     }
   }
 
-  private HoodieWriteConfig getHoodieClientConfig(SchemaProvider 
schemaProvider) {
-    HoodieWriteConfig.Builder builder =
-        HoodieWriteConfig.newBuilder().withPath(cfg.targetBasePath)
-            .withAutoCommit(false).combineInput(cfg.filterDupes, true)
-            .withCompactionConfig(HoodieCompactionConfig.newBuilder()
-                .withPayloadClass(cfg.payloadClassName)
-                // turn on inline compaction by default, for MOR tables
-                .withInlineCompaction(HoodieTableType.valueOf(cfg.storageType) 
== HoodieTableType.MERGE_ON_READ)
-                .build())
-            .forTable(cfg.targetTableName)
-            
.withIndexConfig(HoodieIndexConfig.newBuilder().withIndexType(HoodieIndex.IndexType.BLOOM).build())
-            .withProps(props);
-    if (null != schemaProvider) {
-      builder = 
builder.withSchema(schemaProvider.getTargetSchema().toString());
-    }
-
-    return builder.build();
+  private boolean onDeltaSyncShutdown(boolean error) {
+    log.info("DeltaSync shutdown. Closing write client. Error?" + error);
+    deltaSyncService.close();
+    return true;
   }
 
   public enum Operation {
@@ -366,6 +139,7 @@ public class HoodieDeltaStreamer implements Serializable {
   }
 
   private static class OperationConvertor implements 
IStringConverter<Operation> {
+
     @Override
     public Operation convert(String value) throws ParameterException {
       return Operation.valueOf(value);
@@ -426,9 +200,9 @@ public class HoodieDeltaStreamer implements Serializable {
 
     @Parameter(names = {"--transformer-class"},
         description = "subclass of 
com.uber.hoodie.utilities.transform.Transformer"
-        + ". Allows transforming raw source dataset to a target dataset 
(conforming to target schema) before writing."
-            + " Default : Not set. E:g - 
com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer (which allows"
-            + "a SQL query templated to be passed as a transformation 
function)")
+            + ". Allows transforming raw source dataset to a target dataset 
(conforming to target schema) before "
+            + "writing. Default : Not set. E:g - 
com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer (which "
+            + "allows a SQL query templated to be passed as a transformation 
function)")
     public String transformerClassName = null;
 
     @Parameter(names = {"--source-limit"}, description = "Maximum amount of 
data to read from source. "
@@ -447,16 +221,57 @@ public class HoodieDeltaStreamer implements Serializable {
     @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing 
to hive")
     public Boolean enableHiveSync = false;
 
+    @Parameter(names = {"--max-pending-compactions"},
+        description = "Maximum number of outstanding inflight/requested 
compactions. Delta Sync will not happen unless"
+            + "outstanding compactions is less than this number")
+    public Integer maxPendingCompactions = 5;
+
+    @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in 
continuous mode running"
+        + " source-fetch -> Transform -> Hudi Write in loop")
+    public Boolean continuousMode = false;
+
     @Parameter(names = {"--spark-master"}, description = "spark master to 
use.")
     public String sparkMaster = "local[2]";
 
     @Parameter(names = {"--commit-on-errors"}, description = "Commit even when 
some records failed to be written")
     public Boolean commitOnErrors = false;
 
+    @Parameter(names = {"--delta-sync-scheduling-weight"}, description =
+        "Scheduling weight for delta sync as defined in "
+            + "https://spark.apache.org/docs/latest/job-scheduling.html";)
+    public Integer deltaSyncSchedulingWeight = 1;
+
+    @Parameter(names = {"--compact-scheduling-weight"}, description = 
"Scheduling weight for compaction as defined in "
+        + "https://spark.apache.org/docs/latest/job-scheduling.html";)
+    public Integer compactSchedulingWeight = 1;
+
+    @Parameter(names = {"--delta-sync-scheduling-minshare"}, description = 
"Minshare for delta sync as defined in "
+        + "https://spark.apache.org/docs/latest/job-scheduling.html";)
+    public Integer deltaSyncSchedulingMinShare = 0;
+
+    @Parameter(names = {"--compact-scheduling-minshare"}, description = 
"Minshare for compaction as defined in "
+        + "https://spark.apache.org/docs/latest/job-scheduling.html";)
+    public Integer compactSchedulingMinShare = 0;
+
     @Parameter(names = {"--help", "-h"}, help = true)
     public Boolean help = false;
   }
 
+  /**
+   * Helper to set Spark Scheduling Configs dynamically
+   *
+   * @param cfg Config
+   */
+  public static Map<String, String> getSparkSchedulingConfigs(Config cfg) 
throws Exception {
+    Map<String, String> additionalSparkConfigs = new HashMap<>();
+    if (cfg.continuousMode && 
cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+      String sparkSchedulingConfFile = 
SchedulerConfGenerator.generateAndStoreConfig(cfg.deltaSyncSchedulingWeight,
+          cfg.compactSchedulingWeight, cfg.deltaSyncSchedulingMinShare, 
cfg.compactSchedulingMinShare);
+      additionalSparkConfigs.put("spark.scheduler.allocation.file", 
sparkSchedulingConfFile);
+    }
+    return additionalSparkConfigs;
+  }
+
   public static void main(String[] args) throws Exception {
     final Config cfg = new Config();
     JCommander cmd = new JCommander(cfg, args);
@@ -465,47 +280,288 @@ public class HoodieDeltaStreamer implements Serializable 
{
       System.exit(1);
     }
 
-    JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + 
cfg.targetTableName, cfg.sparkMaster);
+    Map<String, String> additionalSparkConfigs = 
getSparkSchedulingConfigs(cfg);
+    JavaSparkContext jssc = UtilHelpers.buildSparkContext("delta-streamer-" + 
cfg.targetTableName,
+        cfg.sparkMaster, additionalSparkConfigs);
+    if (!("FAIR".equals(jssc.getConf().get("spark.scheduler.mode")))
+        && cfg.continuousMode && 
cfg.storageType.equals(HoodieTableType.MERGE_ON_READ.name())) {
+      log.warn("Job Scheduling Configs will not be in effect as 
spark.scheduler.mode "
+          + "is not set to FAIR at instatiation time. Continuing without 
scheduling configs");
+    }
     new HoodieDeltaStreamer(cfg, jssc).sync();
   }
 
-  public SourceFormatAdapter getFormatAdapter() {
-    return formatAdapter;
-  }
 
-  public SchemaProvider getSchemaProvider() {
-    return schemaProvider;
-  }
+  /**
+   * Syncs data either in single-run or in continuous mode.
+   */
+  public static class DeltaSyncService extends AbstractDeltaStreamerService {
+
+    /**
+     * Delta Sync Config
+     */
+    private final HoodieDeltaStreamer.Config cfg;
+
+    /**
+     * Schema provider that supplies the command for reading the input and 
writing out the target table.
+     */
+    private transient SchemaProvider schemaProvider;
+
+    /**
+     * Spark Session
+     */
+    private transient SparkSession sparkSession;
+
+    /**
+     * Spark context
+     */
+    private transient JavaSparkContext jssc;
+
+    /**
+     * Bag of properties with source, hoodie client, key generator etc.
+     */
+    TypedProperties props;
+
+    /**
+     * Async Compactor Service
+     */
+    private AsyncCompactService asyncCompactService;
+
+    /**
+     * Table Type
+     */
+    private final HoodieTableType tableType;
+
+    /**
+     * Delta Sync
+     */
+    private transient DeltaSync deltaSync;
+
+    public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext 
jssc, FileSystem fs, HiveConf hiveConf)
+        throws IOException {
+      this.cfg = cfg;
+      this.jssc = jssc;
+      this.sparkSession = 
SparkSession.builder().config(jssc.getConf()).getOrCreate();
+
+      if (fs.exists(new Path(cfg.targetBasePath))) {
+        HoodieTableMetaClient meta = new HoodieTableMetaClient(
+            new Configuration(fs.getConf()), cfg.targetBasePath, false);
+        tableType = meta.getTableType();
+      } else {
+        tableType = HoodieTableType.valueOf(cfg.storageType);
+      }
 
-  public Transformer getTransformer() {
-    return transformer;
-  }
+      this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
+      log.info("Creating delta streamer with configs : " + props.toString());
+      this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
 
-  public KeyGenerator getKeyGenerator() {
-    return keyGenerator;
-  }
+      if (cfg.filterDupes) {
+        cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
+      }
 
-  public FileSystem getFs() {
-    return fs;
-  }
+      deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType,
+          props, jssc, fs, hiveConf, this::onInitializingWriteClient);
+    }
 
-  public Optional<HoodieTimeline> getCommitTimelineOpt() {
-    return commitTimelineOpt;
-  }
+    public DeltaSync getDeltaSync() {
+      return deltaSync;
+    }
 
-  public JavaSparkContext getJssc() {
-    return jssc;
-  }
+    @Override
+    protected Pair<CompletableFuture, ExecutorService> startService() {
+      ExecutorService executor = Executors.newFixedThreadPool(1);
+      return Pair.of(CompletableFuture.supplyAsync(() -> {
+        boolean error = false;
+        if (cfg.continuousMode && 
tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+          // set Scheduler Pool.
+          log.info("Setting Spark Pool name for delta-sync to " + 
SchedulerConfGenerator.DELTASYNC_POOL_NAME);
+          jssc.setLocalProperty("spark.scheduler.pool", 
SchedulerConfGenerator.DELTASYNC_POOL_NAME);
+        }
+        try {
+          while (!isShutdownRequested()) {
+            try {
+              Optional<String> scheduledCompactionInstant = 
deltaSync.syncOnce();
+              if (scheduledCompactionInstant.isPresent()) {
+                log.info("Enqueuing new pending compaction instant (" + 
scheduledCompactionInstant + ")");
+                asyncCompactService.enqueuePendingCompaction(new 
HoodieInstant(State.REQUESTED, COMPACTION_ACTION,
+                    scheduledCompactionInstant.get()));
+                
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
+              }
+            } catch (Exception e) {
+              log.error("Shutting down delta-sync due to exception", e);
+              error = true;
+              throw new HoodieException(e.getMessage(), e);
+            }
+          }
+        } finally {
+          shutdownCompactor(error);
+        }
+        return true;
+      }, executor), executor);
+    }
+
+    /**
+     * Shutdown compactor as DeltaSync is shutdown
+     */
+    private void shutdownCompactor(boolean error) {
+      log.info("Delta Sync shutdown. Error ?" + error);
+      if (asyncCompactService != null) {
+        log.warn("Gracefully shutting down compactor");
+        asyncCompactService.shutdown(false);
+      }
+    }
+
+    /**
+     * Callback to initialize write client and start compaction service if 
required
+     * @param writeClient HoodieWriteClient
+     * @return
+     */
+    protected Boolean onInitializingWriteClient(HoodieWriteClient writeClient) 
{
+      if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+        asyncCompactService = new AsyncCompactService(jssc, writeClient);
+        // Enqueue existing pending compactions first
+        HoodieTableMetaClient meta = new HoodieTableMetaClient(
+            new Configuration(jssc.hadoopConfiguration()), cfg.targetBasePath, 
true);
+        List<HoodieInstant> pending = 
CompactionUtils.getPendingCompactionInstantTimes(meta);
+        pending.stream().forEach(hoodieInstant -> 
asyncCompactService.enqueuePendingCompaction(hoodieInstant));
+        asyncCompactService.start((error) -> {
+          // Shutdown DeltaSync
+          shutdown(false);
+          return true;
+        });
+        try {
+          
asyncCompactService.waitTillPendingCompactionsReducesTo(cfg.maxPendingCompactions);
+        } catch (InterruptedException ie) {
+          throw new HoodieException(ie);
+        }
+      }
+      return true;
+    }
+
+    /**
+     * Close all resources
+     */
+    public void close() {
+      if (null != deltaSync) {
+        deltaSync.close();
+      }
+    }
+
+    public SchemaProvider getSchemaProvider() {
+      return schemaProvider;
+    }
+
+    public SparkSession getSparkSession() {
+      return sparkSession;
+    }
+
+    public JavaSparkContext getJavaSparkContext() {
+      return jssc;
+    }
+
+    public AsyncCompactService getAsyncCompactService() {
+      return asyncCompactService;
+    }
 
-  public SparkSession getSparkSession() {
-    return sparkSession;
+    public TypedProperties getProps() {
+      return props;
+    }
   }
 
-  public HiveConf getHiveConf() {
-    return hiveConf;
+  /**
+   * Async Compactor Service tha runs in separate thread. Currently, only one 
compactor is allowed to run at any time.
+   */
+  public static class AsyncCompactService extends AbstractDeltaStreamerService 
{
+
+    private final int maxConcurrentCompaction;
+    private transient Compactor compactor;
+    private transient JavaSparkContext jssc;
+    private transient BlockingQueue<HoodieInstant> pendingCompactions = new 
LinkedBlockingQueue<>();
+    private transient ReentrantLock queueLock = new ReentrantLock();
+    private transient Condition consumed = queueLock.newCondition();
+
+    public AsyncCompactService(JavaSparkContext jssc, HoodieWriteClient 
client) {
+      this.jssc = jssc;
+      this.compactor = new Compactor(client, jssc);
+      //TODO: HUDI-157 : Only allow 1 compactor to run in parallel till 
Incremental View on MOR is fully implemented.
+      this.maxConcurrentCompaction = 1;
+    }
+
+    /**
+     * Enqueues new Pending compaction
+     */
+    public void enqueuePendingCompaction(HoodieInstant instant) {
+      pendingCompactions.add(instant);
+    }
+
+    /**
+     * Wait till outstanding pending compactions reduces to the passed in value
+     * @param numPendingCompactions Maximum pending compactions allowed
+     * @throws InterruptedException
+     */
+    public void waitTillPendingCompactionsReducesTo(int numPendingCompactions) 
throws InterruptedException {
+      try {
+        queueLock.lock();
+        while (!isShutdown() && (pendingCompactions.size() > 
numPendingCompactions)) {
+          consumed.await();
+        }
+      } finally {
+        queueLock.unlock();
+      }
+    }
+
+    /**
+     * Fetch Next pending compaction if available
+     * @return
+     * @throws InterruptedException
+     */
+    private HoodieInstant fetchNextCompactionInstant() throws 
InterruptedException {
+      log.info("Compactor waiting for next instant for compaction upto 60 
seconds");
+      HoodieInstant instant = pendingCompactions.poll(60, TimeUnit.SECONDS);
+      if (instant != null) {
+        try {
+          queueLock.lock();
+          // Signal waiting thread
+          consumed.signal();
+        } finally {
+          queueLock.unlock();
+        }
+      }
+      return instant;
+    }
+
+    /**
+     * Start Compaction Service
+     */
+    protected Pair<CompletableFuture, ExecutorService> startService() {
+      ExecutorService executor = 
Executors.newFixedThreadPool(maxConcurrentCompaction);
+      List<CompletableFuture<Boolean>> compactionFutures =
+          IntStream.range(0, maxConcurrentCompaction).mapToObj(i -> 
CompletableFuture.supplyAsync(() -> {
+            try {
+              // Set Compactor Pool Name for allowing users to prioritize 
compaction
+              log.info("Setting Spark Pool name for compaction to " + 
SchedulerConfGenerator.COMPACT_POOL_NAME);
+              jssc.setLocalProperty("spark.scheduler.pool", 
SchedulerConfGenerator.COMPACT_POOL_NAME);
+
+              while (!isShutdownRequested()) {
+                final HoodieInstant instant = fetchNextCompactionInstant();
+                if (null != instant) {
+                  compactor.compact(instant);
+                }
+              }
+              log.info("Compactor shutting down properly!!");
+            } catch (InterruptedException ie) {
+              log.warn("Compactor executor thread got interrupted exception. 
Stopping", ie);
+            } catch (IOException e) {
+              log.error("Compactor executor failed", e);
+              throw new HoodieIOException(e.getMessage(), e);
+            }
+            return true;
+          }, executor)).collect(Collectors.toList());
+      return 
Pair.of(CompletableFuture.allOf(compactionFutures.stream().toArray(CompletableFuture[]::new)),
 executor);
+    }
   }
 
-  public TypedProperties getProps() {
-    return props;
+  public DeltaSyncService getDeltaSyncService() {
+    return deltaSyncService;
   }
 }
diff --git 
a/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java
 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java
new file mode 100644
index 0000000..fdc35e4
--- /dev/null
+++ 
b/hoodie-utilities/src/main/java/com/uber/hoodie/utilities/deltastreamer/SchedulerConfGenerator.java
@@ -0,0 +1,94 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package com.uber.hoodie.utilities.deltastreamer;
+
+import java.io.BufferedWriter;
+import java.io.File;
+import java.io.FileWriter;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.UUID;
+import org.apache.commons.lang.text.StrSubstitutor;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+
+/**
+ * Utility Class to generate Spark Scheduling allocation file. This kicks in 
only when user
+ * sets spark.scheduler.mode=FAIR at spark-submit time
+ */
+public class SchedulerConfGenerator {
+
+  protected static volatile Logger log = 
LogManager.getLogger(SchedulerConfGenerator.class);
+
+  public static final String DELTASYNC_POOL_NAME = "hoodiedeltasync";
+  public static final String COMPACT_POOL_NAME = "hoodiecompact";
+
+
+  private static final String DELTASYNC_POOL_KEY = "deltasync_pool";
+  private static final String COMPACT_POOL_KEY = "compact_pool";
+  private static final String DELTASYNC_POLICY_KEY = "deltasync_policy";
+  private static final String COMPACT_POLICY_KEY = "compact_policy";
+  private static final String DELTASYNC_WEIGHT_KEY = "deltasync_weight";
+  private static final String DELTASYNC_MINSHARE_KEY = "deltasync_minshare";
+  private static final String COMPACT_WEIGHT_KEY = "compact_weight";
+  private static final String COMPACT_MINSHARE_KEY = "compact_minshare";
+
+  private static String SPARK_SCHEDULING_PATTERN =
+      "<?xml version=\"1.0\"?>\n"
+          + "<allocations>\n"
+          + "  <pool name=\"%(deltasync_pool)\">\n"
+          + "    <schedulingMode>%(deltasync_policy)</schedulingMode>\n"
+          + "    <weight>%(deltasync_weight)</weight>\n"
+          + "    <minShare>%(deltasync_minshare)</minShare>\n"
+          + "  </pool>\n"
+          + "  <pool name=\"%(compact_pool)\">\n"
+          + "    <schedulingMode>%(compact_policy)</schedulingMode>\n"
+          + "    <weight>%(compact_weight)</weight>\n"
+          + "    <minShare>%(compact_minshare)</minShare>\n"
+          + "  </pool>\n"
+          + "</allocations>";
+
+  private static String generateConfig(Integer deltaSyncWeight, Integer 
compactionWeight, Integer deltaSyncMinShare,
+      Integer compactionMinShare) {
+    Map<String, String> schedulingProps = new HashMap<>();
+    schedulingProps.put(DELTASYNC_POOL_KEY, DELTASYNC_POOL_NAME);
+    schedulingProps.put(COMPACT_POOL_KEY, COMPACT_POOL_NAME);
+    schedulingProps.put(DELTASYNC_POLICY_KEY, "FAIR");
+    schedulingProps.put(COMPACT_POLICY_KEY, "FAIR");
+    schedulingProps.put(DELTASYNC_WEIGHT_KEY, deltaSyncWeight.toString());
+    schedulingProps.put(DELTASYNC_MINSHARE_KEY, deltaSyncMinShare.toString());
+    schedulingProps.put(COMPACT_WEIGHT_KEY, compactionWeight.toString());
+    schedulingProps.put(COMPACT_MINSHARE_KEY, compactionMinShare.toString());
+
+    StrSubstitutor sub = new StrSubstitutor(schedulingProps, "%(", ")");
+    String xmlString = sub.replace(SPARK_SCHEDULING_PATTERN);
+    log.info("Scheduling Configurations generated. Config=\n" + xmlString);
+    return xmlString;
+  }
+
+  public static String generateAndStoreConfig(Integer deltaSyncWeight, Integer 
compactionWeight,
+      Integer deltaSyncMinShare, Integer compactionMinShare) throws 
IOException {
+    File tempConfigFile = File.createTempFile(UUID.randomUUID().toString(), 
".xml");
+    BufferedWriter bw = new BufferedWriter(new FileWriter(tempConfigFile));
+    bw.write(generateConfig(deltaSyncWeight, compactionWeight, 
deltaSyncMinShare, compactionMinShare));
+    bw.close();
+    log.info("Configs written to file" + tempConfigFile.getAbsolutePath());
+    return tempConfigFile.getAbsolutePath();
+  }
+}
diff --git 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
index 14a7be5..68a5c8d 100644
--- 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
+++ 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/TestHoodieDeltaStreamer.java
@@ -24,11 +24,13 @@ import static org.junit.Assert.fail;
 
 import com.uber.hoodie.DataSourceWriteOptions;
 import com.uber.hoodie.common.model.HoodieCommitMetadata;
+import com.uber.hoodie.common.model.HoodieTableType;
 import com.uber.hoodie.common.table.HoodieTableMetaClient;
 import com.uber.hoodie.common.table.HoodieTimeline;
 import com.uber.hoodie.common.table.timeline.HoodieInstant;
 import com.uber.hoodie.common.util.DFSPropertiesConfiguration;
 import com.uber.hoodie.common.util.TypedProperties;
+import com.uber.hoodie.config.HoodieCompactionConfig;
 import com.uber.hoodie.exception.DatasetNotFoundException;
 import com.uber.hoodie.hive.HiveSyncConfig;
 import com.uber.hoodie.hive.HoodieHiveClient;
@@ -36,17 +38,28 @@ import com.uber.hoodie.hive.MultiPartKeysValueExtractor;
 import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer;
 import com.uber.hoodie.utilities.deltastreamer.HoodieDeltaStreamer.Operation;
 import com.uber.hoodie.utilities.schema.FilebasedSchemaProvider;
+import com.uber.hoodie.utilities.sources.DistributedTestDataSource;
 import com.uber.hoodie.utilities.sources.HoodieIncrSource;
+import com.uber.hoodie.utilities.sources.InputBatch;
 import com.uber.hoodie.utilities.sources.TestDataSource;
+import com.uber.hoodie.utilities.sources.config.TestSourceConfig;
 import com.uber.hoodie.utilities.transform.SqlQueryBasedTransformer;
 import com.uber.hoodie.utilities.transform.Transformer;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.Executors;
+import java.util.concurrent.Future;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Function;
+import java.util.stream.Collectors;
+import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaRDD;
 import org.apache.spark.api.java.JavaSparkContext;
 import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
@@ -57,6 +70,7 @@ import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
 import org.junit.After;
 import org.junit.AfterClass;
+import org.junit.Assert;
 import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
@@ -197,6 +211,22 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       assertEquals(expected, recordCount);
     }
 
+    static void assertAtleastNCompactionCommits(int minExpected, String 
datasetPath, FileSystem fs) {
+      HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
datasetPath);
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getCommitTimeline().filterCompletedInstants();
+      log.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+      int numCompactionCommits = (int)timeline.getInstants().count();
+      assertTrue("Got=" + numCompactionCommits + ", exp >=" + minExpected, 
minExpected <= numCompactionCommits);
+    }
+
+    static void assertAtleastNDeltaCommits(int minExpected, String 
datasetPath, FileSystem fs) {
+      HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
datasetPath);
+      HoodieTimeline timeline = 
meta.getActiveTimeline().getDeltaCommitTimeline().filterCompletedInstants();
+      log.info("Timeline Instants=" + 
meta.getActiveTimeline().getInstants().collect(Collectors.toList()));
+      int numDeltaCommits = (int)timeline.getInstants().count();
+      assertTrue("Got=" + numDeltaCommits + ", exp >=" + minExpected, 
minExpected <= numDeltaCommits);
+    }
+
     static String assertCommitMetadata(String expected, String datasetPath, 
FileSystem fs, int totalCommits)
         throws IOException {
       HoodieTableMetaClient meta = new HoodieTableMetaClient(fs.getConf(), 
datasetPath);
@@ -208,6 +238,23 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
       assertEquals(expected, 
commitMetadata.getMetadata(HoodieDeltaStreamer.CHECKPOINT_KEY));
       return lastInstant.getTimestamp();
     }
+
+    static void waitTillCondition(Function<Boolean, Boolean> condition, long 
timeoutInSecs) throws Exception {
+      Future<Boolean> res = Executors.newSingleThreadExecutor().submit(() -> {
+        boolean ret = false;
+        while (!ret) {
+          try {
+            Thread.sleep(3000);
+            ret = condition.apply(true);
+          } catch (Throwable error) {
+            log.warn("Got error :", error);
+            ret = false;
+          }
+        }
+        return true;
+      });
+      res.get(timeoutInSecs, TimeUnit.SECONDS);
+    }
   }
 
   @Test
@@ -261,6 +308,51 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     assertEquals(2000, counts.get(0).getLong(1));
   }
 
+  @Test
+  public void testUpsertsCOWContinuousMode() throws Exception {
+    testUpsertsContinuousMode(HoodieTableType.COPY_ON_WRITE, "continuous_cow");
+  }
+
+  @Test
+  public void testUpsertsMORContinuousMode() throws Exception {
+    testUpsertsContinuousMode(HoodieTableType.MERGE_ON_READ, "continuous_mor");
+  }
+
+  private void testUpsertsContinuousMode(HoodieTableType tableType, String 
tempDir) throws Exception {
+    String datasetBasePath = dfsBasePath + "/" + tempDir;
+    // Keep it higher than batch-size to test continuous mode
+    int totalRecords = 3000;
+
+    // Initial bulk insert
+    HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(datasetBasePath, 
Operation.UPSERT);
+    cfg.continuousMode = true;
+    cfg.storageType = tableType.name();
+    cfg.configs.add(String.format("%s=%d", 
TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, totalRecords));
+    cfg.configs.add(String.format("%s=false", 
HoodieCompactionConfig.AUTO_CLEAN_PROP));
+    HoodieDeltaStreamer ds = new HoodieDeltaStreamer(cfg, jsc);
+    Future dsFuture = Executors.newSingleThreadExecutor().submit(() -> {
+      try {
+        ds.sync();
+      } catch (Exception ex) {
+        throw new RuntimeException(ex.getMessage(), ex);
+      }
+    });
+
+    TestHelpers.waitTillCondition((r) -> {
+      if (tableType.equals(HoodieTableType.MERGE_ON_READ)) {
+        TestHelpers.assertAtleastNDeltaCommits(5, datasetBasePath, dfs);
+        TestHelpers.assertAtleastNCompactionCommits(2, datasetBasePath, dfs);
+      } else {
+        TestHelpers.assertAtleastNCompactionCommits(5, datasetBasePath, dfs);
+      }
+      TestHelpers.assertRecordCount(totalRecords, datasetBasePath + 
"/*/*.parquet", sqlContext);
+      TestHelpers.assertDistanceCount(totalRecords, datasetBasePath + 
"/*/*.parquet", sqlContext);
+      return true;
+    }, 180);
+    ds.shutdownGracefully();
+    dsFuture.get();
+  }
+
   /**
    * Test Bulk Insert and upserts with hive syncing. Tests Hudi incremental 
processing using a 2 step pipeline
    * The first step involves using a SQL template to transform a source
@@ -366,6 +458,20 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     assertEquals(1000, counts.get(1).getLong(1));
   }
 
+  @Test
+  public void testDistributedTestDataSource() throws Exception {
+    TypedProperties props = new TypedProperties();
+    props.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, "1000");
+    props.setProperty(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP, "1");
+    props.setProperty(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS, 
"true");
+    DistributedTestDataSource distributedTestDataSource = new 
DistributedTestDataSource(props,
+        jsc, sparkSession, null);
+    InputBatch<JavaRDD<GenericRecord>> batch = 
distributedTestDataSource.fetchNext(Optional.empty(), 10000000);
+    batch.getBatch().get().cache();
+    long c = batch.getBatch().get().count();
+    Assert.assertEquals(1000, c);
+  }
+
   /**
    * UDF to calculate Haversine distance
    */
diff --git 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java
 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java
new file mode 100644
index 0000000..22b1324
--- /dev/null
+++ 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/AbstractBaseTestSource.java
@@ -0,0 +1,103 @@
+package com.uber.hoodie.utilities.sources;
+
+import com.uber.hoodie.common.HoodieTestDataGenerator;
+import com.uber.hoodie.common.model.HoodieRecord;
+import com.uber.hoodie.common.util.TypedProperties;
+import com.uber.hoodie.common.util.collection.RocksDBBasedMap;
+import com.uber.hoodie.exception.HoodieIOException;
+import com.uber.hoodie.utilities.schema.SchemaProvider;
+import com.uber.hoodie.utilities.sources.config.TestSourceConfig;
+import java.io.File;
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Optional;
+import java.util.stream.Stream;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+public abstract class AbstractBaseTestSource extends AvroSource {
+
+  // Static instance, helps with reuse across a test.
+  protected static transient HoodieTestDataGenerator dataGenerator;
+
+  public static void initDataGen() {
+    dataGenerator = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS);
+  }
+
+  public static void initDataGen(TypedProperties props) {
+    try {
+      boolean useRocksForTestDataGenKeys = 
props.getBoolean(TestSourceConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS,
+          TestSourceConfig.DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
+      String baseStoreDir = 
props.getString(TestSourceConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS, null);
+      if (null == baseStoreDir) {
+        baseStoreDir = File.createTempFile("test_data_gen", 
".keys").getParent();
+      }
+      log.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ", 
BaseStoreDir=" + baseStoreDir);
+      dataGenerator = new 
HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
+          useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) : 
new HashMap<>());
+    } catch (IOException e) {
+      throw new HoodieIOException(e.getMessage(), e);
+    }
+  }
+
+  public static void resetDataGen() {
+    if (null != dataGenerator) {
+      dataGenerator.close();
+    }
+    dataGenerator = null;
+  }
+
+  protected AbstractBaseTestSource(TypedProperties props,
+      JavaSparkContext sparkContext, SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+  }
+
+  protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props, 
int sourceLimit, String commitTime) {
+    int maxUniqueKeys = 
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
+        TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
+
+    // generate `sourceLimit` number of upserts each time.
+    int numExistingKeys = dataGenerator.getNumExistingKeys();
+    log.info("NumExistingKeys=" + numExistingKeys);
+
+    int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
+    int numInserts = sourceLimit - numUpdates;
+    log.info("Before adjustments => numInserts=" + numInserts + ", 
numUpdates=" + numUpdates);
+
+    if (numInserts + numExistingKeys > maxUniqueKeys) {
+      // Limit inserts so that maxUniqueRecords is maintained
+      numInserts = Math.max(0, maxUniqueKeys - numExistingKeys);
+    }
+
+    if ((numInserts + numUpdates) < sourceLimit) {
+      // try to expand updates to safe limit
+      numUpdates = Math.min(numExistingKeys, sourceLimit - numInserts);
+    }
+
+    log.info("NumInserts=" + numInserts + ", NumUpdates=" + numUpdates + ", 
maxUniqueRecords=" + maxUniqueKeys);
+    long memoryUsage1 = Runtime.getRuntime().totalMemory() - 
Runtime.getRuntime().freeMemory();
+    log.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total 
Memory=" + Runtime.getRuntime().totalMemory()
+        + ", Free Memory=" + Runtime.getRuntime().freeMemory());
+
+    List<GenericRecord> records = new ArrayList<>();
+    Stream<GenericRecord> updateStream = 
dataGenerator.generateUniqueUpdatesStream(commitTime, numUpdates)
+        .map(AbstractBaseTestSource::toGenericRecord);
+    Stream<GenericRecord> insertStream = 
dataGenerator.generateInsertsStream(commitTime, numInserts)
+        .map(AbstractBaseTestSource::toGenericRecord);
+    return Stream.concat(updateStream, insertStream);
+  }
+
+  private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
+    try {
+      Optional<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
+      return (GenericRecord) recordOpt.get();
+    } catch (IOException e) {
+      return null;
+    }
+  }
+}
diff --git 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java
 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java
new file mode 100644
index 0000000..533e25e
--- /dev/null
+++ 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/DistributedTestDataSource.java
@@ -0,0 +1,79 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.utilities.sources;
+
+import com.uber.hoodie.common.util.TypedProperties;
+import com.uber.hoodie.utilities.schema.SchemaProvider;
+import com.uber.hoodie.utilities.sources.config.TestSourceConfig;
+import java.util.Iterator;
+import java.util.Optional;
+import java.util.stream.Collectors;
+import java.util.stream.IntStream;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.spark.api.java.JavaRDD;
+import org.apache.spark.api.java.JavaSparkContext;
+import org.apache.spark.sql.SparkSession;
+
+/**
+ * A Test DataSource which scales test-data generation by using spark 
parallelism.
+ */
+public class DistributedTestDataSource extends AbstractBaseTestSource {
+
+  private final int numTestSourcePartitions;
+
+  public DistributedTestDataSource(TypedProperties props,
+      JavaSparkContext sparkContext, SparkSession sparkSession,
+      SchemaProvider schemaProvider) {
+    super(props, sparkContext, sparkSession, schemaProvider);
+    this.numTestSourcePartitions = 
props.getInteger(TestSourceConfig.NUM_SOURCE_PARTITIONS_PROP,
+        TestSourceConfig.DEFAULT_NUM_SOURCE_PARTITIONS);
+  }
+
+  @Override
+  protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Optional<String> 
lastCkptStr, long sourceLimit) {
+    int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) + 
1).orElse(0);
+    String commitTime = String.format("%05d", nextCommitNum);
+    log.info("Source Limit is set to " + sourceLimit);
+
+    // No new data.
+    if (sourceLimit <= 0) {
+      return new InputBatch<>(Optional.empty(), commitTime);
+    }
+
+    TypedProperties newProps = new TypedProperties();
+    newProps.putAll(props);
+
+    // Set the maxUniqueRecords per partition for TestDataSource
+    int maxUniqueRecords = 
props.getInteger(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP,
+        TestSourceConfig.DEFAULT_MAX_UNIQUE_RECORDS);
+    String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1, 
maxUniqueRecords / numTestSourcePartitions));
+    newProps.setProperty(TestSourceConfig.MAX_UNIQUE_RECORDS_PROP, 
maxUniqueRecordsPerPartition);
+    int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit / 
numTestSourcePartitions));
+    JavaRDD<GenericRecord> avroRDD = 
sparkContext.parallelize(IntStream.range(0, numTestSourcePartitions).boxed()
+        .collect(Collectors.toList()), 
numTestSourcePartitions).mapPartitions(idx -> {
+          log.info("Initializing source with newProps=" + newProps);
+          if (null == dataGenerator) {
+            initDataGen(newProps);
+          }
+          Iterator<GenericRecord> itr = fetchNextBatch(newProps, 
perPartitionSourceLimit, commitTime).iterator();
+          return itr;
+        });
+    return new InputBatch<>(Optional.of(avroRDD), commitTime);
+  }
+}
diff --git 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java
 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java
index bc46472..e4bd4ff 100644
--- 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java
+++ 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/TestDataSource.java
@@ -18,17 +18,12 @@
 
 package com.uber.hoodie.utilities.sources;
 
-import com.uber.hoodie.common.HoodieTestDataGenerator;
-import com.uber.hoodie.common.model.HoodieRecord;
 import com.uber.hoodie.common.util.TypedProperties;
 import com.uber.hoodie.utilities.schema.SchemaProvider;
-import java.io.IOException;
-import java.util.ArrayList;
 import java.util.List;
 import java.util.Optional;
 import java.util.stream.Collectors;
 import org.apache.avro.generic.GenericRecord;
-import org.apache.avro.generic.IndexedRecord;
 import org.apache.log4j.LogManager;
 import org.apache.log4j.Logger;
 import org.apache.spark.api.java.JavaRDD;
@@ -38,32 +33,15 @@ import org.apache.spark.sql.SparkSession;
 /**
  * An implementation of {@link Source}, that emits test upserts.
  */
-public class TestDataSource extends AvroSource {
+public class TestDataSource extends AbstractBaseTestSource {
 
   private static volatile Logger log = 
LogManager.getLogger(TestDataSource.class);
 
-  // Static instance, helps with reuse across a test.
-  private static HoodieTestDataGenerator dataGenerator;
-
-  public static void initDataGen() {
-    dataGenerator = new HoodieTestDataGenerator();
-  }
-
-  public static void resetDataGen() {
-    dataGenerator = null;
-  }
-
   public TestDataSource(TypedProperties props, JavaSparkContext sparkContext, 
SparkSession sparkSession,
       SchemaProvider schemaProvider) {
     super(props, sparkContext, sparkSession, schemaProvider);
-  }
-
-  private GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
-    try {
-      Optional<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(dataGenerator.avroSchema);
-      return (GenericRecord) recordOpt.get();
-    } catch (IOException e) {
-      return null;
+    if (null == dataGenerator) {
+      initDataGen(props);
     }
   }
 
@@ -73,26 +51,14 @@ public class TestDataSource extends AvroSource {
 
     int nextCommitNum = lastCheckpointStr.map(s -> Integer.parseInt(s) + 
1).orElse(0);
     String commitTime = String.format("%05d", nextCommitNum);
+    log.info("Source Limit is set to " + sourceLimit);
+
     // No new data.
     if (sourceLimit <= 0) {
       return new InputBatch<>(Optional.empty(), commitTime);
     }
 
-    // generate `sourceLimit` number of upserts each time.
-    int numExistingKeys = dataGenerator.getExistingKeysList().size();
-    int numUpdates = Math.min(numExistingKeys, (int) sourceLimit / 2);
-    int numInserts = (int) sourceLimit - numUpdates;
-
-    List<GenericRecord> records = new ArrayList<>();
-    try {
-      records.addAll(dataGenerator.generateUniqueUpdates(commitTime, 
numUpdates).stream()
-          .map(this::toGenericRecord).collect(Collectors.toList()));
-      records.addAll(dataGenerator.generateInserts(commitTime, 
numInserts).stream()
-          .map(this::toGenericRecord).collect(Collectors.toList()));
-    } catch (IOException e) {
-      log.error("Error generating test data.", e);
-    }
-    
+    List<GenericRecord> records = fetchNextBatch(props, (int)sourceLimit, 
commitTime).collect(Collectors.toList());
     JavaRDD<GenericRecord> avroRDD = 
sparkContext.<GenericRecord>parallelize(records, 4);
     return new InputBatch<>(Optional.of(avroRDD), commitTime);
   }
diff --git 
a/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/config/TestSourceConfig.java
 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/config/TestSourceConfig.java
new file mode 100644
index 0000000..0f63221
--- /dev/null
+++ 
b/hoodie-utilities/src/test/java/com/uber/hoodie/utilities/sources/config/TestSourceConfig.java
@@ -0,0 +1,43 @@
+/*
+ *  Copyright (c) 2019 Uber Technologies, Inc. (hoodie-dev-gr...@uber.com)
+ *
+ *  Licensed under the Apache License, Version 2.0 (the "License");
+ *  you may not use this file except in compliance with the License.
+ *  You may obtain a copy of the License at
+ *
+ *           http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ *
+ */
+
+package com.uber.hoodie.utilities.sources.config;
+
+/**
+ * Configurations for Test Data Sources
+ */
+public class TestSourceConfig  {
+
+  // Used by DistributedTestDataSource only. Number of partitions where each 
partitions generates test-data
+  public static final String NUM_SOURCE_PARTITIONS_PROP = 
"hoodie.deltastreamer.source.test.num_partitions";
+  public static final Integer DEFAULT_NUM_SOURCE_PARTITIONS = 10;
+
+  // Maximum number of unique records generated for the run
+  public static final String MAX_UNIQUE_RECORDS_PROP = 
"hoodie.deltastreamer.source.test.max_unique_records";
+  public static final Integer DEFAULT_MAX_UNIQUE_RECORDS = Integer.MAX_VALUE;
+
+  // Use Rocks DB for storing datagen keys
+  public static final String USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS =
+      
"hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys";
+  public static final Boolean DEFAULT_USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS = 
false;
+
+  // Base Dir for storing datagen keys
+  public static final String ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS =
+      "hoodie.deltastreamer.source.test.datagen.rocksdb_base_dir";
+
+}
diff --git a/packaging/hoodie-utilities-bundle/pom.xml 
b/packaging/hoodie-utilities-bundle/pom.xml
index a3842ab..76d478d 100644
--- a/packaging/hoodie-utilities-bundle/pom.xml
+++ b/packaging/hoodie-utilities-bundle/pom.xml
@@ -66,6 +66,7 @@
                 <includes>
                   <include>commons-codec:commons-codec</include>
                   <include>commons-dbcp:commons-dbcp</include>
+                  <include>commons-lang:commons-lang</include>
                   <include>commons-pool:commons-pool</include>
                   <include>com.uber.hoodie:hoodie-common</include>
                   <include>com.uber.hoodie:hoodie-client</include>
@@ -110,6 +111,10 @@
                   
<shadedPattern>com.uber.hoodie.org.apache.commons.dbcp.</shadedPattern>
                 </relocation>
                 <relocation>
+                  <pattern>org.apache.commons.lang.</pattern>
+                  
<shadedPattern>com.uber.hoodie.org.apache.commons.lang.</shadedPattern>
+                </relocation>
+                <relocation>
                   <pattern>org.apache.commons.pool.</pattern>
                   
<shadedPattern>com.uber.hoodie.org.apache.commons.pool.</shadedPattern>
                 </relocation>
diff --git a/pom.xml b/pom.xml
index 44e9dd8..59437ec 100644
--- a/pom.xml
+++ b/pom.xml
@@ -575,6 +575,11 @@
         <version>1.4</version>
       </dependency>
       <dependency>
+        <groupId>commons-lang</groupId>
+        <artifactId>commons-lang</artifactId>
+        <version>2.6</version>
+      </dependency>
+      <dependency>
         <groupId>commons-logging</groupId>
         <artifactId>commons-logging</artifactId>
         <version>1.2</version>

Reply via email to