This is an automated email from the ASF dual-hosted git repository.

vbalaji pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new d610252  [HUDI-288]: Add support for ingesting multiple kafka streams 
in a single DeltaStreamer deployment (#1150)
d610252 is described below

commit d610252d6b54dcbb1897bb8881d59c3838a46c18
Author: Pratyaksh Sharma <[email protected]>
AuthorDate: Wed Apr 8 04:40:26 2020 +0530

    [HUDI-288]: Add support for ingesting multiple kafka streams in a single 
DeltaStreamer deployment (#1150)
    
    * [HUDI-288]: Add support for ingesting multiple kafka streams in a single 
DeltaStreamer deployment
---
 .../apache/hudi/table/HoodieCommitArchiveLog.java  |  10 +-
 .../java/org/apache/hudi/client/TestMultiFS.java   |   4 +-
 .../hudi/common/HoodieTestDataGenerator.java       | 151 ++++++--
 .../org/apache/hudi/hive/TestHiveSyncTool.java     |   6 +
 .../test/java/org/apache/hudi/hive/TestUtil.java   |   9 +-
 .../org/apache/hudi/hive/util/HiveTestService.java |  25 ++
 .../org/apache/hudi/integ/ITTestHoodieDemo.java    |   8 +-
 .../scala/org/apache/hudi/DataSourceOptions.scala  |   1 +
 .../hudi/utilities/deltastreamer/DeltaSync.java    |   9 +-
 .../deltastreamer/HoodieDeltaStreamer.java         |  32 +-
 .../HoodieMultiTableDeltaStreamer.java             | 393 +++++++++++++++++++++
 .../deltastreamer/TableExecutionContext.java       |  85 +++++
 .../hudi/utilities/TestHoodieDeltaStreamer.java    |  64 +++-
 .../TestHoodieMultiTableDeltaStreamer.java         | 166 +++++++++
 .../apache/hudi/utilities/UtilitiesTestBase.java   |  19 +-
 .../utilities/sources/AbstractBaseTestSource.java  |  19 +-
 .../sources/AbstractDFSSourceTestBase.java         |   2 +-
 .../hudi/utilities/sources/TestKafkaSource.java    |   9 +-
 .../utilities/sources/TestParquetDFSSource.java    |   2 +-
 .../invalid_hive_sync_uber_config.properties       |  23 ++
 .../short_trip_uber_config.properties              |  24 ++
 .../source_short_trip_uber.avsc                    |  44 +++
 .../delta-streamer-config/source_uber.avsc         |  44 +++
 .../target_short_trip_uber.avsc                    |  44 +++
 .../delta-streamer-config/target_uber.avsc         |  44 +++
 .../delta-streamer-config/uber_config.properties   |  25 ++
 26 files changed, 1184 insertions(+), 78 deletions(-)

diff --git 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
index 635e96b..73dd799 100644
--- 
a/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
+++ 
b/hudi-client/src/main/java/org/apache/hudi/table/HoodieCommitArchiveLog.java
@@ -292,7 +292,8 @@ public class HoodieCommitArchiveLog {
         archivedMetaWrapper.setActionType(ActionType.clean.name());
         break;
       }
-      case HoodieTimeline.COMMIT_ACTION: {
+      case HoodieTimeline.COMMIT_ACTION:
+      case HoodieTimeline.DELTA_COMMIT_ACTION: {
         HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
             .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), 
HoodieCommitMetadata.class);
         
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
@@ -311,13 +312,6 @@ public class HoodieCommitArchiveLog {
         archivedMetaWrapper.setActionType(ActionType.savepoint.name());
         break;
       }
-      case HoodieTimeline.DELTA_COMMIT_ACTION: {
-        HoodieCommitMetadata commitMetadata = HoodieCommitMetadata
-            .fromBytes(commitTimeline.getInstantDetails(hoodieInstant).get(), 
HoodieCommitMetadata.class);
-        
archivedMetaWrapper.setHoodieCommitMetadata(convertCommitMetadata(commitMetadata));
-        archivedMetaWrapper.setActionType(ActionType.commit.name());
-        break;
-      }
       case HoodieTimeline.COMPACTION_ACTION: {
         HoodieCompactionPlan plan = 
CompactionUtils.getCompactionPlan(metaClient, hoodieInstant.getTimestamp());
         archivedMetaWrapper.setHoodieCompactionPlan(plan);
diff --git a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java 
b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
index 0e606e4..c6ec523 100644
--- a/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
+++ b/hudi-client/src/test/java/org/apache/hudi/client/TestMultiFS.java
@@ -68,7 +68,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
     cleanupTestDataGenerator();
   }
 
-  private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) 
throws Exception {
+  private HoodieWriteClient getHoodieWriteClient(HoodieWriteConfig config) {
     return new HoodieWriteClient(jsc, config);
   }
 
@@ -89,7 +89,7 @@ public class TestMultiFS extends HoodieClientTestHarness {
     HoodieWriteConfig localConfig = getHoodieWriteConfig(tablePath);
 
     try (HoodieWriteClient hdfsWriteClient = getHoodieWriteClient(cfg);
-        HoodieWriteClient localWriteClient = 
getHoodieWriteClient(localConfig);) {
+        HoodieWriteClient localWriteClient = 
getHoodieWriteClient(localConfig)) {
 
       // Write generated data to hdfs (only inserts)
       String readCommitTime = hdfsWriteClient.startCommit();
diff --git 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
index 087587a..8e9036a 100644
--- 
a/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
+++ 
b/hudi-client/src/test/java/org/apache/hudi/common/HoodieTestDataGenerator.java
@@ -41,6 +41,8 @@ import org.apache.hadoop.conf.Configuration;
 import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.FileSystem;
 import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
 
 import java.io.IOException;
 import java.io.Serializable;
@@ -69,6 +71,7 @@ public class HoodieTestDataGenerator {
 
   // based on examination of sample file, the schema produces the following 
per record size
   public static final int SIZE_PER_RECORD = 50 * 1024;
+  private static Logger logger = 
LogManager.getLogger(HoodieTestDataGenerator.class);
   public static final String DEFAULT_FIRST_PARTITION_PATH = "2016/03/15";
   public static final String DEFAULT_SECOND_PARTITION_PATH = "2015/03/16";
   public static final String DEFAULT_THIRD_PARTITION_PATH = "2015/03/17";
@@ -89,12 +92,18 @@ public class HoodieTestDataGenerator {
   public static final String TIP_NESTED_SCHEMA = "{\"name\": \"tip_history\", 
\"type\": {\"type\": \"array\", \"items\": {\"type\": \"record\", \"name\": 
\"tip_history\", \"fields\": ["
       + "{\"name\": \"amount\", \"type\": \"double\"}, {\"name\": 
\"currency\", \"type\": \"string\"}]}}},";
   public static final String MAP_TYPE_SCHEMA = "{\"name\": \"city_to_state\", 
\"type\": {\"type\": \"map\", \"values\": \"string\"}},";
-
   public static final String TRIP_EXAMPLE_SCHEMA =
       TRIP_SCHEMA_PREFIX + MAP_TYPE_SCHEMA + FARE_NESTED_SCHEMA + 
TIP_NESTED_SCHEMA + TRIP_SCHEMA_SUFFIX;
   public static final String TRIP_FLATTENED_SCHEMA =
       TRIP_SCHEMA_PREFIX + FARE_FLATTENED_SCHEMA + TRIP_SCHEMA_SUFFIX;
 
+  public static final String TRIP_SCHEMA = 
"{\"type\":\"record\",\"name\":\"tripUberRec\",\"fields\":["
+      + 
"{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+      + 
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
 \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
+  public static final String SHORT_TRIP_SCHEMA = 
"{\"type\":\"record\",\"name\":\"shortTripRec\",\"fields\":["
+      + 
"{\"name\":\"timestamp\",\"type\":\"double\"},{\"name\":\"_row_key\",\"type\":\"string\"},{\"name\":\"rider\",\"type\":\"string\"},"
+      + 
"{\"name\":\"driver\",\"type\":\"string\"},{\"name\":\"fare\",\"type\":\"double\"},{\"name\":
 \"_hoodie_is_deleted\", \"type\": \"boolean\", \"default\": false}]}";
+
   public static final String NULL_SCHEMA = 
Schema.create(Schema.Type.NULL).toString();
   public static final String TRIP_HIVE_COLUMN_TYPES = 
"double,string,string,string,double,double,double,double,"
       + 
"map<string,string>,struct<amount:double,currency:string>,array<struct<amount:double,currency:string>>,boolean";
@@ -102,13 +111,17 @@ public class HoodieTestDataGenerator {
   public static final Schema AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_EXAMPLE_SCHEMA);
   public static final Schema AVRO_SCHEMA_WITH_METADATA_FIELDS =
       HoodieAvroUtils.addMetadataFields(AVRO_SCHEMA);
+  public static final Schema AVRO_SHORT_TRIP_SCHEMA = new 
Schema.Parser().parse(SHORT_TRIP_SCHEMA);
+  public static final Schema AVRO_TRIP_SCHEMA = new 
Schema.Parser().parse(TRIP_SCHEMA);
   public static final Schema FLATTENED_AVRO_SCHEMA = new 
Schema.Parser().parse(TRIP_FLATTENED_SCHEMA);
 
   private static final Random RAND = new Random(46474747);
 
-  private final Map<Integer, KeyPartition> existingKeys;
+  //Maintains all the existing keys schema wise
+  private final Map<String, Map<Integer, KeyPartition>> existingKeysBySchema;
   private final String[] partitionPaths;
-  private int numExistingKeys;
+  //maintains the count of existing keys schema wise
+  private Map<String, Integer> numKeysBySchema;
 
   public HoodieTestDataGenerator(String[] partitionPaths) {
     this(partitionPaths, new HashMap<>());
@@ -120,7 +133,9 @@ public class HoodieTestDataGenerator {
 
   public HoodieTestDataGenerator(String[] partitionPaths, Map<Integer, 
KeyPartition> keyPartitionMap) {
     this.partitionPaths = Arrays.copyOf(partitionPaths, partitionPaths.length);
-    this.existingKeys = keyPartitionMap;
+    this.existingKeysBySchema = new HashMap<>();
+    existingKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, keyPartitionMap);
+    numKeysBySchema = new HashMap<>();
   }
 
   public static void writePartitionMetadata(FileSystem fs, String[] 
partitionPaths, String basePath) {
@@ -129,6 +144,18 @@ public class HoodieTestDataGenerator {
     }
   }
 
+  public TestRawTripPayload generateRandomValueAsPerSchema(String schemaStr, 
HoodieKey key, String commitTime, boolean isFlattened) throws IOException {
+    if (TRIP_EXAMPLE_SCHEMA.equals(schemaStr)) {
+      return generateRandomValue(key, commitTime, isFlattened);
+    } else if (TRIP_SCHEMA.equals(schemaStr)) {
+      return generatePayloadForTripSchema(key, commitTime);
+    } else if (SHORT_TRIP_SCHEMA.equals(schemaStr)) {
+      return generatePayloadForShortTripSchema(key, commitTime);
+    }
+
+    return null;
+  }
+
   /**
    * Generates a new avro record of the above nested schema format,
    * retaining the key if optionally provided.
@@ -161,6 +188,19 @@ public class HoodieTestDataGenerator {
   }
 
   /**
+   * Generates a new avro record with TRIP_SCHEMA, retaining the key if 
optionally provided.
+   */
+  public TestRawTripPayload generatePayloadForTripSchema(HoodieKey key, String 
commitTime) throws IOException {
+    GenericRecord rec = generateRecordForTripSchema(key.getRecordKey(), 
"rider-" + commitTime, "driver-" + commitTime, 0.0);
+    return new TestRawTripPayload(rec.toString(), key.getRecordKey(), 
key.getPartitionPath(), TRIP_SCHEMA);
+  }
+
+  public TestRawTripPayload generatePayloadForShortTripSchema(HoodieKey key, 
String commitTime) throws IOException {
+    GenericRecord rec = generateRecordForShortTripSchema(key.getRecordKey(), 
"rider-" + commitTime, "driver-" + commitTime, 0.0);
+    return new TestRawTripPayload(rec.toString(), key.getRecordKey(), 
key.getPartitionPath(), SHORT_TRIP_SCHEMA);
+  }
+
+  /**
    * Generates a new avro record of the above schema format for a delete.
    */
   public static TestRawTripPayload generateRandomDeleteValue(HoodieKey key, 
String instantTime) throws IOException {
@@ -223,6 +263,31 @@ public class HoodieTestDataGenerator {
     return rec;
   }
 
+  /*
+  Generate random record using TRIP_SCHEMA
+   */
+  public GenericRecord generateRecordForTripSchema(String rowKey, String 
riderName, String driverName, double timestamp) {
+    GenericRecord rec = new GenericData.Record(AVRO_TRIP_SCHEMA);
+    rec.put("_row_key", rowKey);
+    rec.put("timestamp", timestamp);
+    rec.put("rider", riderName);
+    rec.put("driver", driverName);
+    rec.put("fare", RAND.nextDouble() * 100);
+    rec.put("_hoodie_is_deleted", false);
+    return rec;
+  }
+
+  public GenericRecord generateRecordForShortTripSchema(String rowKey, String 
riderName, String driverName, double timestamp) {
+    GenericRecord rec = new GenericData.Record(AVRO_SHORT_TRIP_SCHEMA);
+    rec.put("_row_key", rowKey);
+    rec.put("timestamp", timestamp);
+    rec.put("rider", riderName);
+    rec.put("driver", driverName);
+    rec.put("fare", RAND.nextDouble() * 100);
+    rec.put("_hoodie_is_deleted", false);
+    return rec;
+  }
+
   public static void createCommitFile(String basePath, String instantTime, 
Configuration configuration) {
     Arrays.asList(HoodieTimeline.makeCommitFileName(instantTime), 
HoodieTimeline.makeInflightCommitFileName(instantTime),
         HoodieTimeline.makeRequestedCommitFileName(instantTime))
@@ -283,6 +348,10 @@ public class HoodieTestDataGenerator {
     }
   }
 
+  public List<HoodieRecord> generateInsertsAsPerSchema(String commitTime, 
Integer n, String schemaStr) {
+    return generateInsertsStream(commitTime, n, 
schemaStr).collect(Collectors.toList());
+  }
+
   /**
    * Generates new inserts with nested schema, uniformly across the partition 
paths above.
    * It also updates the list of existing keys.
@@ -301,15 +370,22 @@ public class HoodieTestDataGenerator {
    * @return  List of {@link HoodieRecord}s
    */
   public List<HoodieRecord> generateInserts(String instantTime, Integer n, 
boolean isFlattened) {
-    return generateInsertsStream(instantTime, n, 
isFlattened).collect(Collectors.toList());
+    return generateInsertsStream(instantTime, n, isFlattened, 
TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
+  }
+
+  /**
+   * Generates new inserts, uniformly across the partition paths above. It 
also updates the list of existing keys.
+   */
+  public Stream<HoodieRecord> generateInsertsStream(String commitTime, Integer 
n, String schemaStr) {
+    return generateInsertsStream(commitTime, n, false, schemaStr);
   }
 
   /**
    * Generates new inserts, uniformly across the partition paths above. It 
also updates the list of existing keys.
    */
   public Stream<HoodieRecord> generateInsertsStream(
-      String instantTime, Integer n, boolean isFlattened) {
-    int currSize = getNumExistingKeys();
+      String instantTime, Integer n, boolean isFlattened, String schemaStr) {
+    int currSize = getNumExistingKeys(schemaStr);
 
     return IntStream.range(0, n).boxed().map(i -> {
       String partitionPath = 
partitionPaths[RAND.nextInt(partitionPaths.length)];
@@ -317,16 +393,36 @@ public class HoodieTestDataGenerator {
       KeyPartition kp = new KeyPartition();
       kp.key = key;
       kp.partitionPath = partitionPath;
-      existingKeys.put(currSize + i, kp);
-      numExistingKeys++;
+      populateKeysBySchema(schemaStr, currSize + i, kp);
+      incrementNumExistingKeysBySchema(schemaStr);
       try {
-        return new HoodieRecord(key, generateRandomValue(key, instantTime, 
isFlattened));
+        return new HoodieRecord(key, generateRandomValueAsPerSchema(schemaStr, 
key, instantTime, isFlattened));
       } catch (IOException e) {
         throw new HoodieIOException(e.getMessage(), e);
       }
     });
   }
 
+  /*
+  Takes care of populating keys schema wise
+   */
+  private void populateKeysBySchema(String schemaStr, int i, KeyPartition kp) {
+    if (existingKeysBySchema.containsKey(schemaStr)) {
+      existingKeysBySchema.get(schemaStr).put(i, kp);
+    } else {
+      existingKeysBySchema.put(schemaStr, new HashMap<>());
+      existingKeysBySchema.get(schemaStr).put(i, kp);
+    }
+  }
+
+  private void incrementNumExistingKeysBySchema(String schemaStr) {
+    if (numKeysBySchema.containsKey(schemaStr)) {
+      numKeysBySchema.put(schemaStr, numKeysBySchema.get(schemaStr) + 1);
+    } else {
+      numKeysBySchema.put(schemaStr, 1);
+    }
+  }
+
   public List<HoodieRecord> generateSameKeyInserts(String instantTime, 
List<HoodieRecord> origin) throws IOException {
     List<HoodieRecord> copy = new ArrayList<>();
     for (HoodieRecord r : origin) {
@@ -339,7 +435,7 @@ public class HoodieTestDataGenerator {
 
   public List<HoodieRecord> generateInsertsWithHoodieAvroPayload(String 
instantTime, int limit) {
     List<HoodieRecord> inserts = new ArrayList<>();
-    int currSize = getNumExistingKeys();
+    int currSize = getNumExistingKeys(TRIP_EXAMPLE_SCHEMA);
     for (int i = 0; i < limit; i++) {
       String partitionPath = 
partitionPaths[RAND.nextInt(partitionPaths.length)];
       HoodieKey key = new HoodieKey(UUID.randomUUID().toString(), 
partitionPath);
@@ -349,8 +445,8 @@ public class HoodieTestDataGenerator {
       KeyPartition kp = new KeyPartition();
       kp.key = key;
       kp.partitionPath = partitionPath;
-      existingKeys.put(currSize + i, kp);
-      numExistingKeys++;
+      populateKeysBySchema(TRIP_EXAMPLE_SCHEMA, currSize + i, kp);
+      incrementNumExistingKeysBySchema(TRIP_EXAMPLE_SCHEMA);
     }
     return inserts;
   }
@@ -431,6 +527,8 @@ public class HoodieTestDataGenerator {
   public List<HoodieRecord> generateUpdates(String instantTime, Integer n) 
throws IOException {
     List<HoodieRecord> updates = new ArrayList<>();
     for (int i = 0; i < n; i++) {
+      Map<Integer, KeyPartition> existingKeys = 
existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
+      Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
       KeyPartition kp = existingKeys.get(RAND.nextInt(numExistingKeys - 1));
       HoodieRecord record = generateUpdateRecord(kp.key, instantTime);
       updates.add(record);
@@ -438,6 +536,10 @@ public class HoodieTestDataGenerator {
     return updates;
   }
 
+  public List<HoodieRecord> generateUpdatesAsPerSchema(String commitTime, 
Integer n, String schemaStr) {
+    return generateUniqueUpdatesStream(commitTime, n, 
schemaStr).collect(Collectors.toList());
+  }
+
   /**
    * Generates deduped updates of keys previously inserted, randomly 
distributed across the keys above.
    *
@@ -446,7 +548,7 @@ public class HoodieTestDataGenerator {
    * @return list of hoodie record updates
    */
   public List<HoodieRecord> generateUniqueUpdates(String instantTime, Integer 
n) {
-    return generateUniqueUpdatesStream(instantTime, 
n).collect(Collectors.toList());
+    return generateUniqueUpdatesStream(instantTime, n, 
TRIP_EXAMPLE_SCHEMA).collect(Collectors.toList());
   }
 
   /**
@@ -466,8 +568,10 @@ public class HoodieTestDataGenerator {
    * @param n          Number of unique records
    * @return stream of hoodie record updates
    */
-  public Stream<HoodieRecord> generateUniqueUpdatesStream(String instantTime, 
Integer n) {
+  public Stream<HoodieRecord> generateUniqueUpdatesStream(String instantTime, 
Integer n, String schemaStr) {
     final Set<KeyPartition> used = new HashSet<>();
+    int numExistingKeys = numKeysBySchema.getOrDefault(schemaStr, 0);
+    Map<Integer, KeyPartition> existingKeys = 
existingKeysBySchema.get(schemaStr);
     if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique updates is greater 
than number of available keys");
     }
@@ -480,9 +584,10 @@ public class HoodieTestDataGenerator {
         index = (index + 1) % numExistingKeys;
         kp = existingKeys.get(index);
       }
+      logger.debug("key getting updated: " + kp.key.getRecordKey());
       used.add(kp);
       try {
-        return new HoodieRecord(kp.key, generateRandomValue(kp.key, 
instantTime));
+        return new HoodieRecord(kp.key, 
generateRandomValueAsPerSchema(schemaStr, kp.key, instantTime, false));
       } catch (IOException e) {
         throw new HoodieIOException(e.getMessage(), e);
       }
@@ -497,6 +602,8 @@ public class HoodieTestDataGenerator {
    */
   public Stream<HoodieKey> generateUniqueDeleteStream(Integer n) {
     final Set<KeyPartition> used = new HashSet<>();
+    Map<Integer, KeyPartition> existingKeys = 
existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
+    Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
     if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique deletes is greater 
than number of available keys");
     }
@@ -514,6 +621,7 @@ public class HoodieTestDataGenerator {
       used.add(kp);
       result.add(kp.key);
     }
+    numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys);
     return result.stream();
   }
 
@@ -526,6 +634,8 @@ public class HoodieTestDataGenerator {
    */
   public Stream<HoodieRecord> generateUniqueDeleteRecordStream(String 
instantTime, Integer n) {
     final Set<KeyPartition> used = new HashSet<>();
+    Map<Integer, KeyPartition> existingKeys = 
existingKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
+    Integer numExistingKeys = numKeysBySchema.get(TRIP_EXAMPLE_SCHEMA);
     if (n > numExistingKeys) {
       throw new IllegalArgumentException("Requested unique deletes is greater 
than number of available keys");
     }
@@ -536,7 +646,7 @@ public class HoodieTestDataGenerator {
       while (!existingKeys.containsKey(index)) {
         index = (index + 1) % numExistingKeys;
       }
-      // swap chosen index with last index and remove last entry. 
+      // swap chosen index with last index and remove last entry.
       KeyPartition kp = existingKeys.remove(index);
       existingKeys.put(index, existingKeys.get(numExistingKeys - 1));
       existingKeys.remove(numExistingKeys - 1);
@@ -548,6 +658,7 @@ public class HoodieTestDataGenerator {
         throw new HoodieIOException(e.getMessage(), e);
       }
     }
+    numKeysBySchema.put(TRIP_EXAMPLE_SCHEMA, numExistingKeys);
     return result.stream();
   }
 
@@ -555,8 +666,8 @@ public class HoodieTestDataGenerator {
     return partitionPaths;
   }
 
-  public int getNumExistingKeys() {
-    return numExistingKeys;
+  public int getNumExistingKeys(String schemaStr) {
+    return numKeysBySchema.getOrDefault(schemaStr, 0);
   }
 
   public static class KeyPartition implements Serializable {
@@ -566,6 +677,6 @@ public class HoodieTestDataGenerator {
   }
 
   public void close() {
-    existingKeys.clear();
+    existingKeysBySchema.clear();
   }
 }
diff --git 
a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java 
b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
index 449c7f3..f394314 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestHiveSyncTool.java
@@ -31,6 +31,7 @@ import org.apache.parquet.schema.PrimitiveType;
 import org.apache.parquet.schema.Types;
 import org.joda.time.DateTime;
 import org.junit.After;
+import org.junit.AfterClass;
 import org.junit.Before;
 import org.junit.Test;
 import org.junit.runner.RunWith;
@@ -70,6 +71,11 @@ public class TestHiveSyncTool {
     TestUtil.clear();
   }
 
+  @AfterClass
+  public static void cleanUpClass() {
+    TestUtil.shutdown();
+  }
+
   /**
    * Testing converting array types to Hive field declaration strings. 
According to the Parquet-113 spec:
    * https://github.com/apache/parquet-format/blob/master/LogicalTypes.md#lists
diff --git a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java 
b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
index 3c0f551..cee1330 100644
--- a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
+++ b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/TestUtil.java
@@ -83,6 +83,7 @@ public class TestUtil {
   private static MiniDFSCluster dfsCluster;
   private static ZooKeeperServer zkServer;
   private static HiveServer2 hiveServer;
+  private static HiveTestService hiveTestService;
   private static Configuration configuration;
   static HiveSyncConfig hiveSyncConfig;
   private static DateTimeFormatter dtfOut;
@@ -100,8 +101,8 @@ public class TestUtil {
       zkServer = zkService.start();
     }
     if (hiveServer == null) {
-      HiveTestService hiveService = new HiveTestService(configuration);
-      hiveServer = hiveService.start();
+      hiveTestService = new HiveTestService(configuration);
+      hiveServer = hiveTestService.start();
     }
     fileSystem = FileSystem.get(configuration);
 
@@ -139,11 +140,13 @@ public class TestUtil {
     return hiveServer.getHiveConf();
   }
 
-  @SuppressWarnings("unused")
   public static void shutdown() {
     if (hiveServer != null) {
       hiveServer.stop();
     }
+    if (hiveTestService != null) {
+      hiveTestService.stop();
+    }
     if (dfsCluster != null) {
       dfsCluster.shutdown();
     }
diff --git 
a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/HiveTestService.java 
b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
index d2808d6..c1c355e 100644
--- 
a/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
+++ 
b/hudi-hive-sync/src/test/java/org/apache/hudi/hive/util/HiveTestService.java
@@ -121,6 +121,20 @@ public class HiveTestService {
     return hiveServer;
   }
 
+  public void stop() {
+    resetSystemProperties();
+    if (tServer != null) {
+      tServer.stop();
+    }
+    if (hiveServer != null) {
+      hiveServer.stop();
+    }
+    LOG.info("Hive Minicluster service shut down.");
+    tServer = null;
+    hiveServer = null;
+    hadoopConf = null;
+  }
+
   private HiveConf configureHive(Configuration conf, String localHiveLocation) 
throws IOException {
     conf.set("hive.metastore.local", "false");
     conf.set(HiveConf.ConfVars.METASTOREURIS.varname, "thrift://" + bindIP + 
":" + metastorePort);
@@ -183,6 +197,17 @@ public class HiveTestService {
     }
   }
 
+  private void resetSystemProperties() {
+    for (Map.Entry<String, String> entry : sysProps.entrySet()) {
+      if (entry.getValue() != null) {
+        System.setProperty(entry.getKey(), entry.getValue());
+      } else {
+        System.getProperties().remove(entry.getKey());
+      }
+    }
+    sysProps.clear();
+  }
+
   private static String getHiveLocation(String baseLocation) {
     return baseLocation + Path.SEPARATOR + "hive";
   }
diff --git 
a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java 
b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
index 5e6bc33..01eecd0 100644
--- a/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
+++ b/hudi-integ-test/src/test/java/org/apache/hudi/integ/ITTestHoodieDemo.java
@@ -121,14 +121,14 @@ public class ITTestHoodieDemo extends ITTestBase {
             + " --table-type COPY_ON_WRITE "
             + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource 
--source-ordering-field ts "
             + " --target-base-path " + COW_BASE_PATH + " --target-table " + 
COW_TABLE_NAME
-            + " --props /var/demo/config/dfs-source.properties "
+            + " --props /var/demo/config/dfs-source.properties"
             + " --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
             + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME),
             ("spark-submit --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + 
HUDI_UTILITIES_BUNDLE
             + " --table-type MERGE_ON_READ "
             + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource 
--source-ordering-field ts "
             + " --target-base-path " + MOR_BASE_PATH + " --target-table " + 
MOR_TABLE_NAME
-            + " --props /var/demo/config/dfs-source.properties "
+            + " --props /var/demo/config/dfs-source.properties"
             + " --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
             + " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, 
"dt", MOR_TABLE_NAME)));
 
@@ -173,14 +173,14 @@ public class ITTestHoodieDemo extends ITTestBase {
             + " --table-type COPY_ON_WRITE "
             + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource 
--source-ordering-field ts "
             + " --target-base-path " + COW_BASE_PATH + " --target-table " + 
COW_TABLE_NAME
-            + " --props /var/demo/config/dfs-source.properties "
+            + " --props /var/demo/config/dfs-source.properties"
             + " --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
             + String.format(HIVE_SYNC_CMD_FMT, "dt", COW_TABLE_NAME)),
             ("spark-submit --class 
org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer " + 
HUDI_UTILITIES_BUNDLE
             + " --table-type MERGE_ON_READ "
             + " --source-class org.apache.hudi.utilities.sources.JsonDFSSource 
--source-ordering-field ts "
             + " --target-base-path " + MOR_BASE_PATH + " --target-table " + 
MOR_TABLE_NAME
-            + " --props /var/demo/config/dfs-source.properties "
+            + " --props /var/demo/config/dfs-source.properties"
             + " --schemaprovider-class 
org.apache.hudi.utilities.schema.FilebasedSchemaProvider "
             + " --disable-compaction " + String.format(HIVE_SYNC_CMD_FMT, 
"dt", MOR_TABLE_NAME)));
     executeCommandStringsInDocker(ADHOC_1_CONTAINER, cmds);
diff --git a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala 
b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
index 10978e4..9d7d6cc 100644
--- a/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
+++ b/hudi-spark/src/main/scala/org/apache/hudi/DataSourceOptions.scala
@@ -262,6 +262,7 @@ object DataSourceWriteOptions {
   val HIVE_URL_OPT_KEY = "hoodie.datasource.hive_sync.jdbcurl"
   val HIVE_PARTITION_FIELDS_OPT_KEY = 
"hoodie.datasource.hive_sync.partition_fields"
   val HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY = 
"hoodie.datasource.hive_sync.partition_extractor_class"
+  val HIVE_ASSUME_DATE_PARTITION_OPT_KEY = 
"hoodie.datasource.hive_sync.assume_date_partitioning"
   val HIVE_USE_PRE_APACHE_INPUT_FORMAT_OPT_KEY = 
"hoodie.datasource.hive_sync.use_pre_apache_input_format"
 
   // DEFAULT FOR HIVE SPECIFIC CONFIGS
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
index 5cc33ee..c964c91 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
@@ -26,7 +26,6 @@ import org.apache.hudi.common.config.TypedProperties;
 import org.apache.hudi.common.model.HoodieCommitMetadata;
 import org.apache.hudi.common.model.HoodieRecord;
 import org.apache.hudi.common.model.HoodieRecordPayload;
-import org.apache.hudi.common.model.HoodieTableType;
 import org.apache.hudi.common.table.HoodieTableMetaClient;
 import org.apache.hudi.common.table.timeline.HoodieInstant;
 import org.apache.hudi.common.table.timeline.HoodieTimeline;
@@ -153,20 +152,14 @@ public class DeltaSync implements Serializable {
    */
   private transient HoodieWriteClient writeClient;
 
-  /**
-   * Table Type.
-   */
-  private final HoodieTableType tableType;
-
   public DeltaSync(HoodieDeltaStreamer.Config cfg, SparkSession sparkSession, 
SchemaProvider schemaProvider,
-                   HoodieTableType tableType, TypedProperties props, 
JavaSparkContext jssc, FileSystem fs, HiveConf hiveConf,
+                   TypedProperties props, JavaSparkContext jssc, FileSystem 
fs, HiveConf hiveConf,
                    Function<HoodieWriteClient, Boolean> 
onInitializingHoodieWriteClient) throws IOException {
 
     this.cfg = cfg;
     this.jssc = jssc;
     this.sparkSession = sparkSession;
     this.fs = fs;
-    this.tableType = tableType;
     this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
     this.props = props;
     this.schemaProvider = schemaProvider;
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
index bc4c85d..8368478 100644
--- 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamer.java
@@ -93,6 +93,17 @@ public class HoodieDeltaStreamer implements Serializable {
         getDefaultHiveConf(jssc.hadoopConfiguration()));
   }
 
+  public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, 
TypedProperties props) throws IOException {
+    this(cfg, jssc, FSUtils.getFs(cfg.targetBasePath, 
jssc.hadoopConfiguration()),
+        getDefaultHiveConf(jssc.hadoopConfiguration()), props);
+  }
+
+  public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
HiveConf hiveConf,
+                             TypedProperties properties) throws IOException {
+    this.cfg = cfg;
+    this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf, 
properties);
+  }
+
   public HoodieDeltaStreamer(Config cfg, JavaSparkContext jssc, FileSystem fs, 
HiveConf hiveConf) throws IOException {
     this.cfg = cfg;
     this.deltaSyncService = new DeltaSyncService(cfg, jssc, fs, hiveConf);
@@ -142,7 +153,7 @@ public class HoodieDeltaStreamer implements Serializable {
     UPSERT, INSERT, BULK_INSERT
   }
 
-  private static class OperationConvertor implements 
IStringConverter<Operation> {
+  protected static class OperationConvertor implements 
IStringConverter<Operation> {
 
     @Override
     public Operation convert(String value) throws ParameterException {
@@ -150,7 +161,7 @@ public class HoodieDeltaStreamer implements Serializable {
     }
   }
 
-  private static class TransformersConverter implements 
IStringConverter<List<String>> {
+  protected static class TransformersConverter implements 
IStringConverter<List<String>> {
 
     @Override
     public List<String> convert(String value) throws ParameterException {
@@ -169,6 +180,7 @@ public class HoodieDeltaStreamer implements Serializable {
         required = true)
     public String targetBasePath;
 
+    // TODO: How to obtain hive configs to register?
     @Parameter(names = {"--target-table"}, description = "name of the target 
table in Hive", required = true)
     public String targetTableName;
 
@@ -359,8 +371,8 @@ public class HoodieDeltaStreamer implements Serializable {
      */
     private transient DeltaSync deltaSync;
 
-    public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext 
jssc, FileSystem fs, HiveConf hiveConf)
-        throws IOException {
+    public DeltaSyncService(Config cfg, JavaSparkContext jssc, FileSystem fs, 
HiveConf hiveConf,
+                            TypedProperties properties) throws IOException {
       this.cfg = cfg;
       this.jssc = jssc;
       this.sparkSession = 
SparkSession.builder().config(jssc.getConf()).getOrCreate();
@@ -376,7 +388,7 @@ public class HoodieDeltaStreamer implements Serializable {
         tableType = HoodieTableType.valueOf(cfg.tableType);
       }
 
-      this.props = UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
+      this.props = properties != null ? properties : 
UtilHelpers.readConfig(fs, new Path(cfg.propsFilePath), 
cfg.configs).getConfig();
       LOG.info("Creating delta streamer with configs : " + props.toString());
       this.schemaProvider = 
UtilHelpers.createSchemaProvider(cfg.schemaProviderClassName, props, jssc);
 
@@ -384,8 +396,14 @@ public class HoodieDeltaStreamer implements Serializable {
         cfg.operation = cfg.operation == Operation.UPSERT ? Operation.INSERT : 
cfg.operation;
       }
 
-      deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, tableType, 
props, jssc, fs, hiveConf,
-          this::onInitializingWriteClient);
+      deltaSync = new DeltaSync(cfg, sparkSession, schemaProvider, props, 
jssc, fs, hiveConf,
+        this::onInitializingWriteClient);
+
+    }
+
+    public DeltaSyncService(HoodieDeltaStreamer.Config cfg, JavaSparkContext 
jssc, FileSystem fs, HiveConf hiveConf)
+        throws IOException {
+      this(cfg, jssc, fs, hiveConf, null);
     }
 
     public DeltaSync getDeltaSync() {
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
new file mode 100644
index 0000000..74455f2
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -0,0 +1,393 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import com.beust.jcommander.Parameter;
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.fs.FSUtils;
+import org.apache.hudi.common.model.OverwriteWithLatestAvroPayload;
+import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.UtilHelpers;
+import org.apache.hudi.utilities.schema.SchemaRegistryProvider;
+
+import com.beust.jcommander.JCommander;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.FileUtil;
+import org.apache.hadoop.fs.Path;
+import org.apache.hudi.utilities.sources.JsonDFSSource;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.api.java.JavaSparkContext;
+
+import java.io.IOException;
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * Wrapper over HoodieDeltaStreamer.java class.
+ * Helps with ingesting incremental data into hoodie datasets for multiple 
tables.
+ * Currently supports only COPY_ON_WRITE storage type.
+ */
+public class HoodieMultiTableDeltaStreamer {
+
+  private static Logger logger = 
LogManager.getLogger(HoodieMultiTableDeltaStreamer.class);
+
+  private List<TableExecutionContext> tableExecutionContexts;
+  private transient JavaSparkContext jssc;
+  private Set<String> successTables;
+  private Set<String> failedTables;
+
+  public HoodieMultiTableDeltaStreamer(Config config, JavaSparkContext jssc) 
throws IOException {
+    this.tableExecutionContexts = new ArrayList<>();
+    this.successTables = new HashSet<>();
+    this.failedTables = new HashSet<>();
+    this.jssc = jssc;
+    String commonPropsFile = config.propsFilePath;
+    String configFolder = config.configFolder;
+    FileSystem fs = FSUtils.getFs(commonPropsFile, jssc.hadoopConfiguration());
+    configFolder = configFolder.charAt(configFolder.length() - 1) == '/' ? 
configFolder.substring(0, configFolder.length() - 1) : configFolder;
+    checkIfPropsFileAndConfigFolderExist(commonPropsFile, configFolder, fs);
+    TypedProperties properties = UtilHelpers.readConfig(fs, new 
Path(commonPropsFile), new ArrayList<>()).getConfig();
+    //get the tables to be ingested and their corresponding config files from 
this properties instance
+    populateTableExecutionContextList(properties, configFolder, fs, config);
+  }
+
+  private void checkIfPropsFileAndConfigFolderExist(String commonPropsFile, 
String configFolder, FileSystem fs) throws IOException {
+    if (!fs.exists(new Path(commonPropsFile))) {
+      throw new IllegalArgumentException("Please provide valid common config 
file path!");
+    }
+
+    if (!fs.exists(new Path(configFolder))) {
+      fs.mkdirs(new Path(configFolder));
+    }
+  }
+
+  private void checkIfTableConfigFileExists(String configFolder, FileSystem 
fs, String configFilePath) throws IOException {
+    if (!fs.exists(new Path(configFilePath)) || !fs.isFile(new 
Path(configFilePath))) {
+      throw new IllegalArgumentException("Please provide valid table config 
file path!");
+    }
+
+    Path path = new Path(configFilePath);
+    Path filePathInConfigFolder = new Path(configFolder, path.getName());
+    if (!fs.exists(filePathInConfigFolder)) {
+      FileUtil.copy(fs, path, fs, filePathInConfigFolder, false, fs.getConf());
+    }
+  }
+
+  //commonProps are passed as parameter which contain table to config file 
mapping
+  private void populateTableExecutionContextList(TypedProperties properties, 
String configFolder, FileSystem fs, Config config) throws IOException {
+    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
+    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + 
tablesToBeIngested);
+    TableExecutionContext executionContext;
+    for (String table : tablesToBeIngested) {
+      String[] tableWithDatabase = table.split("\\.");
+      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : 
"default";
+      String currentTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : table;
+      String configProp = Constants.INGESTION_PREFIX + database + 
Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
+      String configFilePath = properties.getString(configProp, 
Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
+      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
+      TypedProperties tableProperties = UtilHelpers.readConfig(fs, new 
Path(configFilePath), new ArrayList<>()).getConfig();
+      properties.forEach((k,v) -> {
+        tableProperties.setProperty(k.toString(), v.toString());
+      });
+      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+      //copy all the values from config to cfg
+      String targetBasePath = resetTarget(config, database, currentTable);
+      Helpers.deepCopyConfigs(config, cfg);
+      String overriddenTargetBasePath = 
tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
+      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) 
? targetBasePath : overriddenTargetBasePath;
+      if (cfg.enableHiveSync && 
StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY(),
 ""))) {
+        throw new HoodieException("Hive sync table field not provided!");
+      }
+      populateSchemaProviderProps(cfg, tableProperties);
+      executionContext = new TableExecutionContext();
+      executionContext.setProperties(tableProperties);
+      executionContext.setConfig(cfg);
+      executionContext.setDatabase(database);
+      executionContext.setTableName(currentTable);
+      this.tableExecutionContexts.add(executionContext);
+    }
+  }
+
+  private List<String> getTablesToBeIngested(TypedProperties properties) {
+    String combinedTablesString = 
properties.getString(Constants.TABLES_TO_BE_INGESTED_PROP);
+    if (combinedTablesString == null) {
+      return new ArrayList<>();
+    }
+    String[] tablesArray = 
combinedTablesString.split(Constants.COMMA_SEPARATOR);
+    return Arrays.asList(tablesArray);
+  }
+
+  private void populateSchemaProviderProps(HoodieDeltaStreamer.Config cfg, 
TypedProperties typedProperties) {
+    if 
(cfg.schemaProviderClassName.equals(SchemaRegistryProvider.class.getName())) {
+      String schemaRegistryBaseUrl = 
typedProperties.getString(Constants.SCHEMA_REGISTRY_BASE_URL_PROP);
+      String schemaRegistrySuffix = 
typedProperties.getString(Constants.SCHEMA_REGISTRY_URL_SUFFIX_PROP);
+      typedProperties.setProperty(Constants.SOURCE_SCHEMA_REGISTRY_URL_PROP, 
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + 
schemaRegistrySuffix);
+      typedProperties.setProperty(Constants.TARGET_SCHEMA_REGISTRY_URL_PROP, 
schemaRegistryBaseUrl + typedProperties.getString(Constants.KAFKA_TOPIC_PROP) + 
schemaRegistrySuffix);
+    }
+  }
+
+  public static class Helpers {
+
+    static String getDefaultConfigFilePath(String configFolder, String 
database, String currentTable) {
+      return configFolder + Constants.FILEDELIMITER + database + 
Constants.UNDERSCORE + currentTable + Constants.DEFAULT_CONFIG_FILE_NAME_SUFFIX;
+    }
+
+    static String getTableWithDatabase(TableExecutionContext context) {
+      return context.getDatabase() + Constants.DELIMITER + 
context.getTableName();
+    }
+
+    static void deepCopyConfigs(Config globalConfig, 
HoodieDeltaStreamer.Config tableConfig) {
+      tableConfig.enableHiveSync = globalConfig.enableHiveSync;
+      tableConfig.schemaProviderClassName = 
globalConfig.schemaProviderClassName;
+      tableConfig.sourceOrderingField = globalConfig.sourceOrderingField;
+      tableConfig.sourceClassName = globalConfig.sourceClassName;
+      tableConfig.tableType = globalConfig.tableType;
+      tableConfig.targetTableName = globalConfig.targetTableName;
+      tableConfig.operation = globalConfig.operation;
+      tableConfig.sourceLimit = globalConfig.sourceLimit;
+      tableConfig.checkpoint = globalConfig.checkpoint;
+      tableConfig.continuousMode = globalConfig.continuousMode;
+      tableConfig.filterDupes = globalConfig.filterDupes;
+      tableConfig.payloadClassName = globalConfig.payloadClassName;
+      tableConfig.forceDisableCompaction = globalConfig.forceDisableCompaction;
+      tableConfig.maxPendingCompactions = globalConfig.maxPendingCompactions;
+      tableConfig.minSyncIntervalSeconds = globalConfig.minSyncIntervalSeconds;
+      tableConfig.transformerClassNames = globalConfig.transformerClassNames;
+      tableConfig.commitOnErrors = globalConfig.commitOnErrors;
+      tableConfig.compactSchedulingMinShare = 
globalConfig.compactSchedulingMinShare;
+      tableConfig.compactSchedulingWeight = 
globalConfig.compactSchedulingWeight;
+      tableConfig.deltaSyncSchedulingMinShare = 
globalConfig.deltaSyncSchedulingMinShare;
+      tableConfig.deltaSyncSchedulingWeight = 
globalConfig.deltaSyncSchedulingWeight;
+      tableConfig.sparkMaster = globalConfig.sparkMaster;
+    }
+  }
+
+  public static void main(String[] args) throws IOException {
+    final Config config = new Config();
+    JCommander cmd = new JCommander(config, null, args);
+    if (config.help || args.length == 0) {
+      cmd.usage();
+      System.exit(1);
+    }
+    JavaSparkContext jssc = 
UtilHelpers.buildSparkContext("multi-table-delta-streamer", 
Constants.LOCAL_SPARK_MASTER);
+    try {
+      new HoodieMultiTableDeltaStreamer(config, jssc).sync();
+    } finally {
+      jssc.stop();
+    }
+  }
+
+  public static class Config implements Serializable {
+
+    @Parameter(names = {"--base-path-prefix"},
+        description = "base path prefix for multi table support via 
HoodieMultiTableDeltaStreamer class")
+    public String basePathPrefix;
+
+    @Parameter(names = {"--target-table"}, description = "name of the target 
table in Hive", required = true)
+    public String targetTableName;
+
+    @Parameter(names = {"--table-type"}, description = "Type of table. 
COPY_ON_WRITE (or) MERGE_ON_READ", required = true)
+    public String tableType;
+
+    @Parameter(names = {"--config-folder"}, description = "Path to folder 
which contains all the properties file", required = true)
+    public String configFolder;
+
+    @Parameter(names = {"--props"}, description = "path to properties file on 
localfs or dfs, with configurations for "
+        + "hoodie client, schema provider, key generator and data source. For 
hoodie client props, sane defaults are "
+        + "used, but recommend use to provide basic things like metrics 
endpoints, hive configs etc. For sources, refer"
+        + "to individual classes, for supported properties.")
+    public String propsFilePath =
+        "file://" + System.getProperty("user.dir") + 
"/src/test/resources/delta-streamer-config/dfs-source.properties";
+
+    @Parameter(names = {"--hoodie-conf"}, description = "Any configuration 
that can be set in the properties file "
+        + "(using the CLI parameter \"--propsFilePath\") can also be passed 
command line using this parameter")
+    public List<String> configs = new ArrayList<>();
+
+    @Parameter(names = {"--source-class"},
+        description = "Subclass of org.apache.hudi.utilities.sources to read 
data. "
+        + "Built-in options: org.apache.hudi.utilities.sources.{JsonDFSSource 
(default), AvroDFSSource, "
+        + "JsonKafkaSource, AvroKafkaSource, HiveIncrPullSource}")
+    public String sourceClassName = JsonDFSSource.class.getName();
+
+    @Parameter(names = {"--source-ordering-field"}, description = "Field 
within source record to decide how"
+        + " to break ties between records with same key in input data. 
Default: 'ts' holding unix timestamp of record")
+    public String sourceOrderingField = "ts";
+
+    @Parameter(names = {"--payload-class"}, description = "subclass of 
HoodieRecordPayload, that works off "
+        + "a GenericRecord. Implement your own, if you want to do something 
other than overwriting existing value")
+    public String payloadClassName = 
OverwriteWithLatestAvroPayload.class.getName();
+
+    @Parameter(names = {"--schemaprovider-class"}, description = "subclass of 
org.apache.hudi.utilities.schema"
+        + ".SchemaProvider to attach schemas to input & target table data, 
built in options: "
+        + "org.apache.hudi.utilities.schema.FilebasedSchemaProvider."
+        + "Source (See org.apache.hudi.utilities.sources.Source) 
implementation can implement their own SchemaProvider."
+        + " For Sources that return Dataset<Row>, the schema is obtained 
implicitly. "
+        + "However, this CLI option allows overriding the schemaprovider 
returned by Source.")
+    public String schemaProviderClassName = null;
+
+    @Parameter(names = {"--transformer-class"},
+        description = "A subclass or a list of subclasses of 
org.apache.hudi.utilities.transform.Transformer"
+        + ". Allows transforming raw source Dataset to a target Dataset 
(conforming to target schema) before "
+        + "writing. Default : Not set. E:g - 
org.apache.hudi.utilities.transform.SqlQueryBasedTransformer (which "
+        + "allows a SQL query templated to be passed as a transformation 
function). "
+        + "Pass a comma-separated list of subclass names to chain the 
transformations.",
+        converter = HoodieDeltaStreamer.TransformersConverter.class)
+    public List<String> transformerClassNames = null;
+
+    @Parameter(names = {"--source-limit"}, description = "Maximum amount of 
data to read from source. "
+        + "Default: No limit For e.g: DFS-Source => max bytes to read, 
Kafka-Source => max events to read")
+    public long sourceLimit = Long.MAX_VALUE;
+
+    @Parameter(names = {"--op"}, description = "Takes one of these values : 
UPSERT (default), INSERT (use when input "
+        + "is purely new data/inserts to gain speed)", converter = 
HoodieDeltaStreamer.OperationConvertor.class)
+    public HoodieDeltaStreamer.Operation operation = 
HoodieDeltaStreamer.Operation.UPSERT;
+
+    @Parameter(names = {"--filter-dupes"},
+        description = "Should duplicate records from source be 
dropped/filtered out before insert/bulk-insert")
+    public Boolean filterDupes = false;
+
+    @Parameter(names = {"--enable-hive-sync"}, description = "Enable syncing 
to hive")
+    public Boolean enableHiveSync = false;
+
+    @Parameter(names = {"--max-pending-compactions"},
+        description = "Maximum number of outstanding inflight/requested 
compactions. Delta Sync will not happen unless"
+        + "outstanding compactions is less than this number")
+    public Integer maxPendingCompactions = 5;
+
+    @Parameter(names = {"--continuous"}, description = "Delta Streamer runs in 
continuous mode running"
+        + " source-fetch -> Transform -> Hudi Write in loop")
+    public Boolean continuousMode = false;
+
+    @Parameter(names = {"--min-sync-interval-seconds"},
+        description = "the min sync interval of each sync in continuous mode")
+    public Integer minSyncIntervalSeconds = 0;
+
+    @Parameter(names = {"--spark-master"}, description = "spark master to 
use.")
+    public String sparkMaster = "local[2]";
+
+    @Parameter(names = {"--commit-on-errors"}, description = "Commit even when 
some records failed to be written")
+    public Boolean commitOnErrors = false;
+
+    @Parameter(names = {"--delta-sync-scheduling-weight"},
+        description = "Scheduling weight for delta sync as defined in "
+        + "https://spark.apache.org/docs/latest/job-scheduling.html";)
+    public Integer deltaSyncSchedulingWeight = 1;
+
+    @Parameter(names = {"--compact-scheduling-weight"}, description = 
"Scheduling weight for compaction as defined in "
+        + "https://spark.apache.org/docs/latest/job-scheduling.html";)
+    public Integer compactSchedulingWeight = 1;
+
+    @Parameter(names = {"--delta-sync-scheduling-minshare"}, description = 
"Minshare for delta sync as defined in "
+        + "https://spark.apache.org/docs/latest/job-scheduling.html";)
+    public Integer deltaSyncSchedulingMinShare = 0;
+
+    @Parameter(names = {"--compact-scheduling-minshare"}, description = 
"Minshare for compaction as defined in "
+        + "https://spark.apache.org/docs/latest/job-scheduling.html";)
+    public Integer compactSchedulingMinShare = 0;
+
+    /**
+     * Compaction is enabled for MoR table by default. This flag disables it
+     */
+    @Parameter(names = {"--disable-compaction"},
+        description = "Compaction is enabled for MoR table by default. This 
flag disables it ")
+    public Boolean forceDisableCompaction = false;
+
+    /**
+     * Resume Delta Streamer from this checkpoint.
+     */
+    @Parameter(names = {"--checkpoint"}, description = "Resume Delta Streamer 
from this checkpoint.")
+    public String checkpoint = null;
+
+    @Parameter(names = {"--help", "-h"}, help = true)
+    public Boolean help = false;
+  }
+
+  /**
+   * Resets target table name and target path using base-path-prefix.
+   * @param configuration
+   * @param database
+   * @param tableName
+   * @return
+   */
+  private static String resetTarget(Config configuration, String database, 
String tableName) {
+    String basePathPrefix = configuration.basePathPrefix;
+    basePathPrefix = basePathPrefix.charAt(basePathPrefix.length() - 1) == '/' 
? basePathPrefix.substring(0, basePathPrefix.length() - 1) : basePathPrefix;
+    String targetBasePath = basePathPrefix + Constants.FILEDELIMITER + 
database + Constants.FILEDELIMITER + tableName;
+    configuration.targetTableName = database + Constants.DELIMITER + tableName;
+    return targetBasePath;
+  }
+
+  /*
+  Creates actual HoodieDeltaStreamer objects for every table/topic and does 
incremental sync
+   */
+  public void sync() {
+    for (TableExecutionContext context : tableExecutionContexts) {
+      try {
+        new HoodieDeltaStreamer(context.getConfig(), jssc, 
context.getProperties()).sync();
+        successTables.add(Helpers.getTableWithDatabase(context));
+      } catch (Exception e) {
+        logger.error("error while running MultiTableDeltaStreamer for table: " 
+ context.getTableName(), e);
+        failedTables.add(Helpers.getTableWithDatabase(context));
+      }
+    }
+
+    logger.info("Ingestion was successful for topics: " + successTables);
+    if (!failedTables.isEmpty()) {
+      logger.info("Ingestion failed for topics: " + failedTables);
+    }
+  }
+
+  public static class Constants {
+    public static final String KAFKA_TOPIC_PROP = 
"hoodie.deltastreamer.source.kafka.topic";
+    private static final String SOURCE_SCHEMA_REGISTRY_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.url";
+    private static final String TARGET_SCHEMA_REGISTRY_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.targetUrl";
+    public static final String HIVE_SYNC_TABLE_PROP = 
"hoodie.datasource.hive_sync.table";
+    private static final String SCHEMA_REGISTRY_BASE_URL_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.baseUrl";
+    private static final String SCHEMA_REGISTRY_URL_SUFFIX_PROP = 
"hoodie.deltastreamer.schemaprovider.registry.urlSuffix";
+    private static final String TABLES_TO_BE_INGESTED_PROP = 
"hoodie.deltastreamer.ingestion.tablesToBeIngested";
+    private static final String INGESTION_PREFIX = 
"hoodie.deltastreamer.ingestion.";
+    private static final String INGESTION_CONFIG_SUFFIX = ".configFile";
+    private static final String DEFAULT_CONFIG_FILE_NAME_SUFFIX = 
"_config.properties";
+    private static final String TARGET_BASE_PATH_PROP = 
"hoodie.deltastreamer.ingestion.targetBasePath";
+    private static final String LOCAL_SPARK_MASTER = "local[2]";
+    private static final String FILEDELIMITER = "/";
+    private static final String DELIMITER = ".";
+    private static final String UNDERSCORE = "_";
+    private static final String COMMA_SEPARATOR = ",";
+  }
+
+  public Set<String> getSuccessTables() {
+    return successTables;
+  }
+
+  public Set<String> getFailedTables() {
+    return failedTables;
+  }
+
+  public List<TableExecutionContext> getTableExecutionContexts() {
+    return this.tableExecutionContexts;
+  }
+}
diff --git 
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TableExecutionContext.java
 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TableExecutionContext.java
new file mode 100644
index 0000000..12c00d4
--- /dev/null
+++ 
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/TableExecutionContext.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.deltastreamer;
+
+import org.apache.hudi.common.config.TypedProperties;
+
+import java.util.Objects;
+
+/**
+ * Wrapper over TableConfig objects.
+ * Useful for incrementally syncing multiple tables one by one via 
HoodieMultiTableDeltaStreamer.java class.
+ */
+public class TableExecutionContext {
+
+  private TypedProperties properties;
+  private HoodieDeltaStreamer.Config config;
+  private String database;
+  private String tableName;
+
+  public HoodieDeltaStreamer.Config getConfig() {
+    return config;
+  }
+
+  public void setConfig(HoodieDeltaStreamer.Config config) {
+    this.config = config;
+  }
+
+  public String getDatabase() {
+    return database;
+  }
+
+  public void setDatabase(String database) {
+    this.database = database;
+  }
+
+  public String getTableName() {
+    return tableName;
+  }
+
+  public void setTableName(String tableName) {
+    this.tableName = tableName;
+  }
+
+  public TypedProperties getProperties() {
+    return properties;
+  }
+
+  public void setProperties(TypedProperties properties) {
+    this.properties = properties;
+  }
+
+  @Override
+  public boolean equals(Object o) {
+    if (this == o) {
+      return true;
+    }
+    if (o == null || getClass() != o.getClass()) {
+      return false;
+    }
+
+    TableExecutionContext that = (TableExecutionContext) o;
+    return Objects.equals(properties, that.properties) && 
Objects.equals(database, that.database) && Objects.equals(tableName, 
that.tableName);
+  }
+
+  @Override
+  public int hashCode() {
+    return Objects.hash(properties, database, tableName);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
index b7323d4..7edf534 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieDeltaStreamer.java
@@ -69,6 +69,7 @@ import org.apache.spark.sql.SparkSession;
 import org.apache.spark.sql.api.java.UDF4;
 import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
+import org.apache.spark.streaming.kafka010.KafkaTestUtils;
 import org.junit.After;
 import org.junit.AfterClass;
 import org.junit.Assert;
@@ -100,6 +101,10 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
 
   private static final Random RANDOM = new Random();
   private static final String PROPS_FILENAME_TEST_SOURCE = 
"test-source.properties";
+  public static final String PROPS_FILENAME_TEST_SOURCE1 = 
"test-source1.properties";
+  public static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = 
"test-invalid-hive-sync-source1.properties";
+  public static final String PROPS_INVALID_FILE = 
"test-invalid-props.properties";
+  public static final String PROPS_INVALID_TABLE_CONFIG_FILE = 
"test-invalid-table-config.properties";
   private static final String PROPS_FILENAME_TEST_INVALID = 
"test-invalid.properties";
   private static final String PROPS_FILENAME_TEST_CSV = 
"test-csv-dfs-source.properties";
   private static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
@@ -107,6 +112,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   private static final int PARQUET_NUM_RECORDS = 5;
   private static final int CSV_NUM_RECORDS = 3;
   private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamer.class);
+  public static KafkaTestUtils testUtils;
 
   private static int testNum = 1;
 
@@ -114,9 +120,12 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
   public static void initClass() throws Exception {
     UtilitiesTestBase.initClass(true);
     PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+    testUtils = new KafkaTestUtils();
+    testUtils.setup();
 
     // prepare the configs.
     
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/config/base.properties");
     
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
 dfs,
         dfsBasePath + "/sql-transformer.properties");
     UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", 
dfs, dfsBasePath + "/source.avsc");
@@ -124,6 +133,14 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", 
dfs, dfsBasePath + "/target.avsc");
     
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc",
 dfs, dfsBasePath + "/target-flattened.avsc");
 
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc",
 dfs, dfsBasePath + "/source_short_trip_uber.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", 
dfs, dfsBasePath + "/source_uber.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc",
 dfs, dfsBasePath + "/target_short_trip_uber.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc", 
dfs, dfsBasePath + "/target_uber.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties",
 dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties",
 dfs, dfsBasePath + "/config/uber_config.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties",
 dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
+
     TypedProperties props = new TypedProperties();
     props.setProperty("include", "sql-transformer.properties");
     props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestGenerator.class.getName());
@@ -163,11 +180,54 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
     UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_INVALID);
 
+    TypedProperties props1 = new TypedProperties();
+    populateCommonProps(props1);
+    UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE1);
+
+    TypedProperties properties = new TypedProperties();
+    populateInvalidTableConfigFilePathProps(properties);
+    UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + 
"/" + PROPS_INVALID_TABLE_CONFIG_FILE);
+
+    TypedProperties invalidHiveSyncProps = new TypedProperties();
+    
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
 "uber_db.dummy_table_uber");
+    
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
+    UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, 
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
+
     prepareParquetDFSFiles(PARQUET_NUM_RECORDS);
   }
 
+  private static void populateInvalidTableConfigFilePathProps(TypedProperties 
props) {
+    props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
+    
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
+    props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", 
"uber_db.dummy_table_uber");
+    
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_uber_config.properties");
+  }
+
+  private static void populateCommonProps(TypedProperties props) {
+    props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
+    
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
+    props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", 
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
+    
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/uber_config.properties");
+    
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
 dfsBasePath + "/config/short_trip_uber_config.properties");
+
+    //Kafka source properties
+    props.setProperty("bootstrap.servers", testUtils.brokerAddress());
+    props.setProperty("auto.offset.reset", "earliest");
+    props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", 
String.valueOf(5000));
+
+    // Hive Configs
+    props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY(), 
"jdbc:hive2://127.0.0.1:9999/");
+    props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY(), 
"testdb2");
+    
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY(), 
"false");
+    props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY(), 
"datestr");
+    
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY(),
+        MultiPartKeysValueExtractor.class.getName());
+  }
+
   @AfterClass
-  public static void cleanupClass() throws Exception {
+  public static void cleanupClass() {
     UtilitiesTestBase.cleanupClass();
   }
 
@@ -649,7 +709,7 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     String path = PARQUET_SOURCE_ROOT + "/1.parquet";
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
     Helpers.saveParquetToDFS(Helpers.toGenericRecords(
-        dataGenerator.generateInserts("000", numRecords), dataGenerator), new 
Path(path));
+        dataGenerator.generateInserts("000", numRecords)), new Path(path));
   }
 
   private void prepareParquetDFSSource(boolean useSchemaProvider, boolean 
hasTransformer) throws IOException {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMultiTableDeltaStreamer.java
new file mode 100644
index 0000000..db0e777
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/TestHoodieMultiTableDeltaStreamer.java
@@ -0,0 +1,166 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.HoodieTestDataGenerator;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.exception.HoodieException;
+import org.apache.hudi.utilities.deltastreamer.HoodieMultiTableDeltaStreamer;
+import org.apache.hudi.utilities.deltastreamer.TableExecutionContext;
+import org.apache.hudi.utilities.schema.FilebasedSchemaProvider;
+import org.apache.hudi.utilities.sources.JsonKafkaSource;
+import org.apache.hudi.utilities.sources.TestDataSource;
+
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.io.IOException;
+import java.util.List;
+
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
+
+public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer 
{
+
+  private static volatile Logger log = 
LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
+
+  static class TestHelpers {
+
+    static HoodieMultiTableDeltaStreamer.Config getConfig(String fileName, 
String configFolder, String sourceClassName, boolean enableHiveSync) {
+      HoodieMultiTableDeltaStreamer.Config config = new 
HoodieMultiTableDeltaStreamer.Config();
+      config.configFolder = configFolder;
+      config.targetTableName = "dummy_table";
+      config.basePathPrefix = dfsBasePath + "/multi_table_dataset";
+      config.propsFilePath = dfsBasePath + "/" + fileName;
+      config.tableType = "COPY_ON_WRITE";
+      config.sourceClassName = sourceClassName;
+      config.sourceOrderingField = "timestamp";
+      config.schemaProviderClassName = FilebasedSchemaProvider.class.getName();
+      config.enableHiveSync = enableHiveSync;
+      return config;
+    }
+  }
+
+  @Test
+  public void testInvalidHiveSyncProps() throws IOException {
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1,dfsBasePath + 
"/config", TestDataSource.class.getName(), true);
+    try {
+      new HoodieMultiTableDeltaStreamer(cfg, jsc);
+      fail("Should fail when hive sync table not provided with enableHiveSync 
flag");
+    } catch (HoodieException he) {
+      log.error("Expected error when creating table execution objects", he);
+      assertTrue(he.getMessage().contains("Hive sync table field not 
provided!"));
+    }
+  }
+
+  @Test
+  public void testInvalidPropsFilePath() throws IOException {
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_INVALID_FILE,dfsBasePath + "/config", 
TestDataSource.class.getName(), true);
+    try {
+      new HoodieMultiTableDeltaStreamer(cfg, jsc);
+      fail("Should fail when invalid props file is provided");
+    } catch (IllegalArgumentException iae) {
+      log.error("Expected error when creating table execution objects", iae);
+      assertTrue(iae.getMessage().contains("Please provide valid common config 
file path!"));
+    }
+  }
+
+  @Test
+  public void testInvalidTableConfigFilePath() throws IOException {
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_INVALID_TABLE_CONFIG_FILE,dfsBasePath + "/config", 
TestDataSource.class.getName(), true);
+    try {
+      new HoodieMultiTableDeltaStreamer(cfg, jsc);
+      fail("Should fail when invalid table config props file path is 
provided");
+    } catch (IllegalArgumentException iae) {
+      log.error("Expected error when creating table execution objects", iae);
+      assertTrue(iae.getMessage().contains("Please provide valid table config 
file path!"));
+    }
+  }
+
+  @Test
+  public void testCustomConfigProps() throws IOException {
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", 
TestDataSource.class.getName(), false);
+    HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
+    TableExecutionContext executionContext = 
streamer.getTableExecutionContexts().get(1);
+    assertEquals(streamer.getTableExecutionContexts().size(), 2);
+    assertEquals(executionContext.getConfig().targetBasePath, dfsBasePath + 
"/multi_table_dataset/uber_db/dummy_table_uber");
+    assertEquals(executionContext.getConfig().targetTableName, 
"uber_db.dummy_table_uber");
+    
assertEquals(executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.KAFKA_TOPIC_PROP),
 "topic1");
+    
assertEquals(executionContext.getProperties().getString(DataSourceWriteOptions.RECORDKEY_FIELD_OPT_KEY()),
 "_row_key");
+    
assertEquals(executionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY()),
 TestHoodieDeltaStreamer.TestGenerator.class.getName());
+    
assertEquals(executionContext.getProperties().getString(HoodieMultiTableDeltaStreamer.Constants.HIVE_SYNC_TABLE_PROP),
 "uber_hive_dummy_table");
+  }
+
+  @Test
+  @Ignore
+  public void testInvalidIngestionProps() {
+    try {
+      HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", 
TestDataSource.class.getName(), true);
+      new HoodieMultiTableDeltaStreamer(cfg, jsc);
+      fail("Creation of execution object should fail without kafka topic");
+    } catch (Exception e) {
+      log.error("Creation of execution object failed with error: " + 
e.getMessage(), e);
+      assertTrue(e.getMessage().contains("Please provide valid table config 
arguments!"));
+    }
+  }
+
+  @Test //0 corresponds to fg
+  public void testMultiTableExecution() throws IOException {
+    //create topics for each table
+    testUtils.createTopic("topic1", 2);
+    testUtils.createTopic("topic2", 2);
+
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.sendMessages("topic1", 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
+    testUtils.sendMessages("topic2", 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1,dfsBasePath + "/config", 
JsonKafkaSource.class.getName(), false);
+    HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg, jsc);
+    List<TableExecutionContext> executionContexts = 
streamer.getTableExecutionContexts();
+    TypedProperties properties = executionContexts.get(1).getProperties();
+    
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source_uber.avsc");
+    
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target_uber.avsc");
+    executionContexts.get(1).setProperties(properties);
+    TypedProperties properties1 = executionContexts.get(0).getProperties();
+    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source_short_trip_uber.avsc");
+    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target_short_trip_uber.avsc");
+    executionContexts.get(0).setProperties(properties1);
+    String targetBasePath1 = 
executionContexts.get(1).getConfig().targetBasePath;
+    String targetBasePath2 = 
executionContexts.get(0).getConfig().targetBasePath;
+    streamer.sync();
+
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + 
"/*/*.parquet", sqlContext);
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 
+ "/*/*.parquet", sqlContext);
+
+    //insert updates for already existing records in kafka topics
+    testUtils.sendMessages("topic1", 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
+    testUtils.sendMessages("topic2", 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+    streamer.sync();
+    assertEquals(streamer.getSuccessTables().size(), 2);
+    assertTrue(streamer.getFailedTables().isEmpty());
+
+    //assert the record count matches now
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(5, targetBasePath1 + 
"/*/*.parquet", sqlContext);
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(10, targetBasePath2 
+ "/*/*.parquet", sqlContext);
+  }
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
index 7f14957..e82d66e 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/UtilitiesTestBase.java
@@ -83,6 +83,7 @@ public class UtilitiesTestBase {
   protected transient SparkSession sparkSession = null;
   protected transient SQLContext sqlContext;
   protected static HiveServer2 hiveServer;
+  protected static HiveTestService hiveTestService;
   private static ObjectMapper mapper = new ObjectMapper();
 
   @BeforeClass
@@ -97,20 +98,23 @@ public class UtilitiesTestBase {
     dfsBasePath = dfs.getWorkingDirectory().toString();
     dfs.mkdirs(new Path(dfsBasePath));
     if (startHiveService) {
-      HiveTestService hiveService = new 
HiveTestService(hdfsTestService.getHadoopConf());
-      hiveServer = hiveService.start();
+      hiveTestService = new HiveTestService(hdfsTestService.getHadoopConf());
+      hiveServer = hiveTestService.start();
       clearHiveDb();
     }
   }
 
   @AfterClass
-  public static void cleanupClass() throws Exception {
+  public static void cleanupClass() {
     if (hdfsTestService != null) {
       hdfsTestService.stop();
     }
     if (hiveServer != null) {
       hiveServer.stop();
     }
+    if (hiveTestService != null) {
+      hiveTestService.stop();
+    }
   }
 
   @Before
@@ -263,20 +267,19 @@ public class UtilitiesTestBase {
       return props;
     }
 
-    public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, 
HoodieTestDataGenerator dataGenerator) {
+    public static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
       try {
-        Option<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(dataGenerator.AVRO_SCHEMA);
+        Option<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
         return (GenericRecord) recordOpt.get();
       } catch (IOException e) {
         return null;
       }
     }
 
-    public static List<GenericRecord> toGenericRecords(List<HoodieRecord> 
hoodieRecords,
-        HoodieTestDataGenerator dataGenerator) {
+    public static List<GenericRecord> toGenericRecords(List<HoodieRecord> 
hoodieRecords) {
       List<GenericRecord> records = new ArrayList<GenericRecord>();
       for (HoodieRecord hoodieRecord : hoodieRecords) {
-        records.add(toGenericRecord(hoodieRecord, dataGenerator));
+        records.add(toGenericRecord(hoodieRecord));
       }
       return records;
     }
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
index 262a14f..9e28833 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractBaseTestSource.java
@@ -88,7 +88,7 @@ public abstract class AbstractBaseTestSource extends 
AvroSource {
     HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
 
     // generate `sourceLimit` number of upserts each time.
-    int numExistingKeys = dataGenerator.getNumExistingKeys();
+    int numExistingKeys = 
dataGenerator.getNumExistingKeys(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
     LOG.info("NumExistingKeys=" + numExistingKeys);
 
     int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
@@ -116,21 +116,22 @@ public abstract class AbstractBaseTestSource extends 
AvroSource {
       LOG.info("After adjustments => NumInserts=" + numInserts + ", 
NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords="
           + maxUniqueKeys);
       // if we generate update followed by deletes -> some keys in update 
batch might be picked up for deletes. Hence generating delete batch followed by 
updates
-      deleteStream = 
dataGenerator.generateUniqueDeleteRecordStream(instantTime, 50).map(hr -> 
AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
-      updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates - 50).map(hr -> AbstractBaseTestSource.toGenericRecord(hr, 
dataGenerator));
+      deleteStream = 
dataGenerator.generateUniqueDeleteRecordStream(instantTime, 
50).map(AbstractBaseTestSource::toGenericRecord);
+      updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .map(AbstractBaseTestSource::toGenericRecord);
     } else {
       LOG.info("After adjustments => NumInserts=" + numInserts + ", 
NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
-      updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates)
-          .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, 
dataGenerator));
+      updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime, 
numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .map(AbstractBaseTestSource::toGenericRecord);
     }
-    Stream<GenericRecord> insertStream = 
dataGenerator.generateInsertsStream(instantTime, numInserts, false)
-        .map(hr -> AbstractBaseTestSource.toGenericRecord(hr, dataGenerator));
+    Stream<GenericRecord> insertStream = 
dataGenerator.generateInsertsStream(instantTime, numInserts, false, 
HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
+        .map(AbstractBaseTestSource::toGenericRecord);
     return Stream.concat(deleteStream, Stream.concat(updateStream, 
insertStream));
   }
 
-  private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord, 
HoodieTestDataGenerator dataGenerator) {
+  private static GenericRecord toGenericRecord(HoodieRecord hoodieRecord) {
     try {
-      Option<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(dataGenerator.AVRO_SCHEMA);
+      Option<IndexedRecord> recordOpt = 
hoodieRecord.getData().getInsertValue(HoodieTestDataGenerator.AVRO_SCHEMA);
       return (GenericRecord) recordOpt.get();
     } catch (IOException e) {
       return null;
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
index b4a023e..c0f0a3d 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java
@@ -64,7 +64,7 @@ public abstract class AbstractDFSSourceTestBase extends 
UtilitiesTestBase {
   }
 
   @AfterClass
-  public static void cleanupClass() throws Exception {
+  public static void cleanupClass() {
     UtilitiesTestBase.cleanupClass();
   }
 
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
index e592c74..c522557 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestKafkaSource.java
@@ -42,7 +42,6 @@ import org.junit.Before;
 import org.junit.BeforeClass;
 import org.junit.Test;
 
-import java.io.IOException;
 import java.util.HashMap;
 import java.util.UUID;
 
@@ -64,7 +63,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
   }
 
   @AfterClass
-  public static void cleanupClass() throws Exception {
+  public static void cleanupClass() {
     UtilitiesTestBase.cleanupClass();
   }
 
@@ -95,7 +94,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
   }
 
   @Test
-  public void testJsonKafkaSource() throws IOException {
+  public void testJsonKafkaSource() {
 
     // topic setup.
     testUtils.createTopic(TEST_TOPIC_NAME, 2);
@@ -143,7 +142,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
   }
 
   @Test
-  public void testJsonKafkaSourceWithDefaultUpperCap() throws IOException {
+  public void testJsonKafkaSourceWithDefaultUpperCap() {
     // topic setup.
     testUtils.createTopic(TEST_TOPIC_NAME, 2);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
@@ -172,7 +171,7 @@ public class TestKafkaSource extends UtilitiesTestBase {
   }
 
   @Test
-  public void testJsonKafkaSourceWithConfigurableUpperCap() throws IOException 
{
+  public void testJsonKafkaSourceWithConfigurableUpperCap() {
     // topic setup.
     testUtils.createTopic(TEST_TOPIC_NAME, 2);
     HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
index a1a7697..a2b3574 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
@@ -48,6 +48,6 @@ public class TestParquetDFSSource extends 
AbstractDFSSourceTestBase {
 
   @Override
   void writeNewDataToFile(List<HoodieRecord> records, Path path) throws 
IOException {
-    Helpers.saveParquetToDFS(Helpers.toGenericRecords(records, dataGenerator), 
path);
+    Helpers.saveParquetToDFS(Helpers.toGenericRecords(records), path);
   }
 }
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/invalid_hive_sync_uber_config.properties
 
b/hudi-utilities/src/test/resources/delta-streamer-config/invalid_hive_sync_uber_config.properties
new file mode 100644
index 0000000..5c569c5
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/invalid_hive_sync_uber_config.properties
@@ -0,0 +1,23 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+include=base.properties
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.partitionpath.field=created_at
+hoodie.deltastreamer.source.kafka.topic=test_topic
+hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
 
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
new file mode 100644
index 0000000..52d39ba
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
@@ -0,0 +1,24 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+include=base.properties
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.partitionpath.field=created_at
+hoodie.deltastreamer.source.kafka.topic=topic2
+hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
+hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
\ No newline at end of file
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/source_short_trip_uber.avsc
 
b/hudi-utilities/src/test/resources/delta-streamer-config/source_short_trip_uber.avsc
new file mode 100644
index 0000000..8a589bd
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/source_short_trip_uber.avsc
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "shortTripRec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "double"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "fare",
+    "type" : "double"
+  },
+  {
+    "name" : "_hoodie_is_deleted",
+    "type" : "boolean",
+    "default" : false
+  } ]
+}
+
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/source_uber.avsc 
b/hudi-utilities/src/test/resources/delta-streamer-config/source_uber.avsc
new file mode 100644
index 0000000..324862e
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/source_uber.avsc
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "tripUberRec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "double"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "fare",
+    "type" : "double"
+  },
+  {
+    "name" : "_hoodie_is_deleted",
+    "type" : "boolean",
+    "default" : false
+  } ]
+}
+
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/target_short_trip_uber.avsc
 
b/hudi-utilities/src/test/resources/delta-streamer-config/target_short_trip_uber.avsc
new file mode 100644
index 0000000..8a589bd
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/target_short_trip_uber.avsc
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "shortTripRec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "double"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "fare",
+    "type" : "double"
+  },
+  {
+    "name" : "_hoodie_is_deleted",
+    "type" : "boolean",
+    "default" : false
+  } ]
+}
+
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/target_uber.avsc 
b/hudi-utilities/src/test/resources/delta-streamer-config/target_uber.avsc
new file mode 100644
index 0000000..324862e
--- /dev/null
+++ b/hudi-utilities/src/test/resources/delta-streamer-config/target_uber.avsc
@@ -0,0 +1,44 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+{
+  "type" : "record",
+  "name" : "tripUberRec",
+  "fields" : [
+  {
+    "name" : "timestamp",
+    "type" : "double"
+  }, {
+    "name" : "_row_key",
+    "type" : "string"
+  }, {
+    "name" : "rider",
+    "type" : "string"
+  }, {
+    "name" : "driver",
+    "type" : "string"
+  }, {
+    "name" : "fare",
+    "type" : "double"
+  },
+  {
+    "name" : "_hoodie_is_deleted",
+    "type" : "boolean",
+    "default" : false
+  } ]
+}
+
diff --git 
a/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties
 
b/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties
new file mode 100644
index 0000000..3d3501f
--- /dev/null
+++ 
b/hudi-utilities/src/test/resources/delta-streamer-config/uber_config.properties
@@ -0,0 +1,25 @@
+###
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#      http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+###
+include=base.properties
+hoodie.datasource.write.recordkey.field=_row_key
+hoodie.datasource.write.partitionpath.field=created_at
+hoodie.deltastreamer.source.kafka.topic=topic1
+hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
+hoodie.datasource.hive_sync.database=uber_hive_db
+hoodie.datasource.hive_sync.table=uber_hive_dummy_table
\ No newline at end of file

Reply via email to