This is an automated email from the ASF dual-hosted git repository.

vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7bdae69  [HUDI-2253] Refactoring few tests to reduce runningtime. 
DeltaStreamer and MultiDeltaStreamer tests. Bulk insert row writer tests (#3371)
7bdae69 is described below

commit 7bdae69053afc5ef604a15806d78317cb976f2ce
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Jul 30 01:22:26 2021 -0400

    [HUDI-2253] Refactoring few tests to reduce runningtime. DeltaStreamer and 
MultiDeltaStreamer tests. Bulk insert row writer tests (#3371)
    
    Co-authored-by: Sivabalan Narayanan <[email protected]>
---
 .../TestHoodieBulkInsertDataInternalWriter.java    |   4 +-
 .../TestHoodieDataSourceInternalWriter.java        |   9 +-
 .../TestHoodieBulkInsertDataInternalWriter.java    |   4 +-
 .../TestHoodieDataSourceInternalBatchWrite.java    |   7 +-
 .../functional/TestHoodieDeltaStreamer.java        | 228 +------------------
 .../functional/TestHoodieDeltaStreamerBase.java    | 245 +++++++++++++++++++++
 .../TestHoodieMultiTableDeltaStreamer.java         |   4 +-
 7 files changed, 267 insertions(+), 234 deletions(-)

diff --git 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
index 9735379..fd943b7 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -74,7 +74,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
     // execute N rounds
-    for (int i = 0; i < 3; i++) {
+    for (int i = 0; i < 2; i++) {
       String instantTime = "00" + i;
       // init writer
       HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
@@ -82,7 +82,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
 
       int size = 10 + RANDOM.nextInt(1000);
       // write N rows to partition1, N rows to partition2 and N rows to 
partition3 ... Each batch should create a new RowCreateHandle and a new file
-      int batches = 5;
+      int batches = 3;
       Dataset<Row> totalInputRows = null;
 
       for (int j = 0; j < batches; j++) {
diff --git 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
index 342e2ae..eea49e6 100644
--- 
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
@@ -30,6 +30,7 @@ import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.sources.v2.DataSourceOptions;
 import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -87,7 +88,7 @@ public class TestHoodieDataSourceInternalWriter extends
     }
 
     int size = 10 + RANDOM.nextInt(1000);
-    int batches = 5;
+    int batches = 2;
     Dataset<Row> totalInputRows = null;
     for (int j = 0; j < batches; j++) {
       String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j 
% 3];
@@ -158,7 +159,7 @@ public class TestHoodieDataSourceInternalWriter extends
     int partitionCounter = 0;
 
     // execute N rounds
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < 2; i++) {
       String instantTime = "00" + i;
       // init writer
       HoodieDataSourceInternalWriter dataSourceInternalWriter =
@@ -168,7 +169,7 @@ public class TestHoodieDataSourceInternalWriter extends
       DataWriter<InternalRow> writer = 
dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++,
 RANDOM.nextLong(), RANDOM.nextLong());
 
       int size = 10 + RANDOM.nextInt(1000);
-      int batches = 5; // one batch per partition
+      int batches = 2; // one batch per partition
 
       for (int j = 0; j < batches; j++) {
         String partitionPath = 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
@@ -195,6 +196,8 @@ public class TestHoodieDataSourceInternalWriter extends
     }
   }
 
+  // takes up lot of running time with CI.
+  @Disabled
   @ParameterizedTest
   @MethodSource("bulkInsertTypeParams")
   public void testLargeWrites(boolean populateMetaFields) throws Exception {
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
 
b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
index 3b7fb97..a3d0e32 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -75,7 +75,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
     HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
     HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
     // execute N rounds
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < 2; i++) {
       String instantTime = "00" + i;
       // init writer
       HoodieBulkInsertDataInternalWriter writer = new 
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime, 
RANDOM.nextInt(100000),
@@ -83,7 +83,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
 
       int size = 10 + RANDOM.nextInt(1000);
       // write N rows to partition1, N rows to partition2 and N rows to 
partition3 ... Each batch should create a new RowCreateHandle and a new file
-      int batches = 5;
+      int batches = 3;
       Dataset<Row> totalInputRows = null;
 
       for (int j = 0; j < batches; j++) {
diff --git 
a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
 
b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
index 3c3866e..ae49804 100644
--- 
a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
+++ 
b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
@@ -32,6 +32,7 @@ import org.apache.spark.sql.Dataset;
 import org.apache.spark.sql.Row;
 import org.apache.spark.sql.catalyst.InternalRow;
 import org.apache.spark.sql.connector.write.DataWriter;
+import org.junit.jupiter.api.Disabled;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
@@ -161,7 +162,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
     int partitionCounter = 0;
 
     // execute N rounds
-    for (int i = 0; i < 5; i++) {
+    for (int i = 0; i < 2; i++) {
       String instantTime = "00" + i;
       // init writer
       HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
@@ -171,7 +172,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
       DataWriter<InternalRow> writer = 
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
 RANDOM.nextLong());
 
       int size = 10 + RANDOM.nextInt(1000);
-      int batches = 5; // one batch per partition
+      int batches = 3; // one batch per partition
 
       for (int j = 0; j < batches; j++) {
         String partitionPath = 
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
@@ -197,6 +198,8 @@ public class TestHoodieDataSourceInternalBatchWrite extends
     }
   }
 
+  // Large writes are not required to be executed w/ regular CI jobs. Takes 
lot of running time.
+  @Disabled
   @ParameterizedTest
   @MethodSource("bulkInsertTypeParams")
   public void testLargeWrites(boolean populateMetaFields) throws Exception {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 80f471e..1c68476 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -18,11 +18,6 @@
 
 package org.apache.hudi.utilities.functional;
 
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.util.ConcurrentModificationException;
-import java.util.concurrent.ExecutorService;
-import org.apache.hudi.DataSourceWriteOptions;
 import org.apache.hudi.common.config.DFSPropertiesConfiguration;
 import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.TypedProperties;
@@ -45,7 +40,6 @@ import org.apache.hudi.exception.HoodieException;
 import org.apache.hudi.exception.TableNotFoundException;
 import org.apache.hudi.hive.HiveSyncConfig;
 import org.apache.hudi.hive.HoodieHiveClient;
-import org.apache.hudi.hive.MultiPartKeysValueExtractor;
 import org.apache.hudi.keygen.SimpleKeyGenerator;
 import org.apache.hudi.utilities.DummySchemaProvider;
 import org.apache.hudi.utilities.HoodieClusteringJob;
@@ -65,7 +59,6 @@ import 
org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
 import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
 import org.apache.hudi.utilities.transform.Transformer;
 
-import org.apache.avro.Schema;
 import org.apache.avro.generic.GenericRecord;
 import org.apache.hadoop.fs.FSDataInputStream;
 import org.apache.hadoop.fs.FileSystem;
@@ -84,24 +77,22 @@ import org.apache.spark.sql.api.java.UDF4;
 import org.apache.spark.sql.functions;
 import org.apache.spark.sql.types.DataTypes;
 import org.apache.spark.sql.types.StructField;
-import org.apache.spark.streaming.kafka010.KafkaTestUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
 import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.junit.jupiter.params.ParameterizedTest;
 import org.junit.jupiter.params.provider.Arguments;
 import org.junit.jupiter.params.provider.MethodSource;
 
 import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Collections;
+import java.util.ConcurrentModificationException;
 import java.util.List;
 import java.util.Properties;
-import java.util.Random;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.Executors;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -119,176 +110,9 @@ import static org.junit.jupiter.api.Assertions.fail;
 /**
  * Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts, 
upserts, inserts. Check counts at the end.
  */
-public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
-
-  private static final Random RANDOM = new Random();
-  private static final String PROPS_FILENAME_TEST_SOURCE = 
"test-source.properties";
-  public static final String PROPS_FILENAME_TEST_SOURCE1 = 
"test-source1.properties";
-  public static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = 
"test-invalid-hive-sync-source1.properties";
-  public static final String PROPS_INVALID_FILE = 
"test-invalid-props.properties";
-  public static final String PROPS_INVALID_TABLE_CONFIG_FILE = 
"test-invalid-table-config.properties";
-  private static final String PROPS_FILENAME_TEST_INVALID = 
"test-invalid.properties";
-  private static final String PROPS_FILENAME_TEST_CSV = 
"test-csv-dfs-source.properties";
-  protected static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
-  private static final String PROPS_FILENAME_TEST_JSON_KAFKA = 
"test-json-kafka-dfs-source.properties";
-  private static final String PROPS_FILENAME_TEST_MULTI_WRITER = 
"test-multi-writer.properties";
-  private static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
-  private static String PARQUET_SOURCE_ROOT;
-  private static String JSON_KAFKA_SOURCE_ROOT;
-  private static final int PARQUET_NUM_RECORDS = 5;
-  private static final int CSV_NUM_RECORDS = 3;
-  private static final int JSON_KAFKA_NUM_RECORDS = 5;
-  private String kafkaCheckpointType = "string";
-  // Required fields
-  private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
-  private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
-  private static final String TABLE_TYPE_PARAM = "--table-type";
-  private static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE";
-  private static final String TARGET_TABLE_PARAM = "--target-table";
-  private static final String TARGET_TABLE_VALUE = "test";
-  private static final String BASE_FILE_FORMAT_PARAM = "--base-file-format";
-  private static final String BASE_FILE_FORMAT_VALUE = "PARQUET";
-  private static final String SOURCE_LIMIT_PARAM = "--source-limit";
-  private static final String SOURCE_LIMIT_VALUE = "500";
-  private static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync";
-  private static final String HOODIE_CONF_PARAM = "--hoodie-conf";
-  private static final String HOODIE_CONF_VALUE1 = 
"hoodie.datasource.hive_sync.table=test_table";
-  private static final String HOODIE_CONF_VALUE2 = 
"hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
-  private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamer.class);
-  public static KafkaTestUtils testUtils;
-  protected static String topicName;
-
-  protected static int testNum = 1;
-
-  @BeforeAll
-  public static void initClass() throws Exception {
-    UtilitiesTestBase.initClass(true);
-    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
-    JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
-    testUtils = new KafkaTestUtils();
-    testUtils.setup();
-    topicName = "topic" + testNum;
-
-    // prepare the configs.
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/config/base.properties");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
 dfs,
-        dfsBasePath + "/sql-transformer.properties");
-    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", 
dfs, dfsBasePath + "/source.avsc");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
 dfs, dfsBasePath + "/source-flattened.avsc");
-    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", 
dfs, dfsBasePath + "/target.avsc");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc",
 dfs, dfsBasePath + "/target-flattened.avsc");
-
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc",
 dfs, dfsBasePath + "/source_short_trip_uber.avsc");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", 
dfs, dfsBasePath + "/source_uber.avsc");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc",
 dfs, dfsBasePath + "/target_short_trip_uber.avsc");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc", 
dfs, dfsBasePath + "/target_uber.avsc");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties",
 dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties",
 dfs, dfsBasePath + "/config/uber_config.properties");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties",
 dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
-    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties",
 dfs, dfsBasePath + "/clusteringjob.properties");
-
-    TypedProperties props = new TypedProperties();
-    props.setProperty("include", "sql-transformer.properties");
-    props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestGenerator.class.getName());
-    props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
-    props.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source.avsc");
-    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target.avsc");
-
-    // Hive Configs
-    props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), 
"jdbc:hive2://127.0.0.1:9999/");
-    props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), 
"testdb1");
-    props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY().key(), 
"hive_trips");
-    
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), 
"datestr");
-    
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
-        MultiPartKeysValueExtractor.class.getName());
-    UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE);
-
-    // Properties used for the delta-streamer which incrementally pulls from 
upstream Hudi source table and writes to
-    // downstream hudi table
-    TypedProperties downstreamProps = new TypedProperties();
-    downstreamProps.setProperty("include", "base.properties");
-    downstreamProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
-    downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
-
-    // Source schema is the target schema of upstream table
-    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/target.avsc");
-    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
-    UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath 
+ "/test-downstream-source.properties");
-
-    // Properties used for testing invalid key generator
-    TypedProperties invalidProps = new TypedProperties();
-    invalidProps.setProperty("include", "sql-transformer.properties");
-    invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", 
"invalid");
-    invalidProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
-    invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
-    
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
-    
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
-    UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_INVALID);
-
-    TypedProperties props1 = new TypedProperties();
-    populateAllCommonProps(props1);
-    UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE1);
-
-    TypedProperties properties = new TypedProperties();
-    populateInvalidTableConfigFilePathProps(properties);
-    UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + 
"/" + PROPS_INVALID_TABLE_CONFIG_FILE);
-
-    TypedProperties invalidHiveSyncProps = new TypedProperties();
-    
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
 "uber_db.dummy_table_uber");
-    
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
-    UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, 
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
-
-    prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
-  }
-
-  @AfterAll
-  public static void release() {
-    if (testUtils != null) {
-      testUtils.teardown();
-    }
-  }
-
-  private static void populateInvalidTableConfigFilePathProps(TypedProperties 
props) {
-    props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
-    
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
-    props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", 
"uber_db.dummy_table_uber");
-    
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_uber_config.properties");
-  }
+public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
 
-  private static void populateAllCommonProps(TypedProperties props) {
-    populateCommonProps(props);
-    populateCommonKafkaProps(props);
-    populateCommonHiveProps(props);
-  }
-
-  protected static void populateCommonProps(TypedProperties props) {
-    props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
-    
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
-    props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", 
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
-    
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/uber_config.properties");
-    
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
 dfsBasePath + "/config/short_trip_uber_config.properties");
-  }
-
-  protected static void populateCommonKafkaProps(TypedProperties props) {
-    //Kafka source properties
-    props.setProperty("bootstrap.servers", testUtils.brokerAddress());
-    props.setProperty("auto.offset.reset", "earliest");
-    props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-    props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
-    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", 
String.valueOf(5000));
-  }
-
-  protected static void populateCommonHiveProps(TypedProperties props) {
-    // Hive Configs
-    props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), 
"jdbc:hive2://127.0.0.1:9999/");
-    props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), 
"testdb2");
-    
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY().key(),
 "false");
-    
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), 
"datestr");
-    
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
-        MultiPartKeysValueExtractor.class.getName());
-  }
+  private static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamer.class);
 
   protected static TypedProperties prepareMultiWriterProps(String 
propsFileName) throws IOException {
     TypedProperties props = new TypedProperties();
@@ -318,24 +142,6 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     return props;
   }
 
-  @AfterAll
-  public static void cleanupClass() {
-    UtilitiesTestBase.cleanupClass();
-    if (testUtils != null) {
-      testUtils.teardown();
-    }
-  }
-
-  @BeforeEach
-  public void setup() throws Exception {
-    super.setup();
-  }
-
-  @AfterEach
-  public void teardown() throws Exception {
-    super.teardown();
-  }
-
   static class TestHelpers {
 
     static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath, 
WriteOperationType op) {
@@ -1333,28 +1139,6 @@ public class TestHoodieDeltaStreamer extends 
UtilitiesTestBase {
     assertEquals(1000, c);
   }
 
-  private static void prepareParquetDFSFiles(int numRecords) throws 
IOException {
-    prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT);
-  }
-
-  protected static void prepareParquetDFSFiles(int numRecords, String 
baseParquetPath) throws IOException {
-    prepareParquetDFSFiles(numRecords, baseParquetPath, 
FIRST_PARQUET_FILE_NAME, false, null, null);
-  }
-
-  protected static void prepareParquetDFSFiles(int numRecords, String 
baseParquetPath, String fileName, boolean useCustomSchema,
-      String schemaStr, Schema schema) throws IOException {
-    String path = baseParquetPath + "/" + fileName;
-    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
-    if (useCustomSchema) {
-      Helpers.saveParquetToDFS(Helpers.toGenericRecords(
-          dataGenerator.generateInsertsAsPerSchema("000", numRecords, 
schemaStr),
-          schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
-    } else {
-      Helpers.saveParquetToDFS(Helpers.toGenericRecords(
-          dataGenerator.generateInserts("000", numRecords)), new Path(path));
-    }
-  }
-
   private static void prepareJsonKafkaDFSFiles(int numRecords, boolean 
createTopic, String topicName) throws IOException {
     if (createTopic) {
       try {
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
new file mode 100644
index 0000000..ae477dc
--- /dev/null
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
+
+
+  static final Random RANDOM = new Random();
+  static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
+  static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
+  static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 = 
"test-invalid-hive-sync-source1.properties";
+  static final String PROPS_INVALID_FILE = "test-invalid-props.properties";
+  static final String PROPS_INVALID_TABLE_CONFIG_FILE = 
"test-invalid-table-config.properties";
+  static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
+  static final String PROPS_FILENAME_TEST_CSV = 
"test-csv-dfs-source.properties";
+  static final String PROPS_FILENAME_TEST_PARQUET = 
"test-parquet-dfs-source.properties";
+  static final String PROPS_FILENAME_TEST_JSON_KAFKA = 
"test-json-kafka-dfs-source.properties";
+  static final String PROPS_FILENAME_TEST_MULTI_WRITER = 
"test-multi-writer.properties";
+  static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
+  static String PARQUET_SOURCE_ROOT;
+  static String JSON_KAFKA_SOURCE_ROOT;
+  static final int PARQUET_NUM_RECORDS = 5;
+  static final int CSV_NUM_RECORDS = 3;
+  static final int JSON_KAFKA_NUM_RECORDS = 5;
+  String kafkaCheckpointType = "string";
+  // Required fields
+  static final String TGT_BASE_PATH_PARAM = "--target-base-path";
+  static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
+  static final String TABLE_TYPE_PARAM = "--table-type";
+  static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE";
+  static final String TARGET_TABLE_PARAM = "--target-table";
+  static final String TARGET_TABLE_VALUE = "test";
+  static final String BASE_FILE_FORMAT_PARAM = "--base-file-format";
+  static final String BASE_FILE_FORMAT_VALUE = "PARQUET";
+  static final String SOURCE_LIMIT_PARAM = "--source-limit";
+  static final String SOURCE_LIMIT_VALUE = "500";
+  static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync";
+  static final String HOODIE_CONF_PARAM = "--hoodie-conf";
+  static final String HOODIE_CONF_VALUE1 = 
"hoodie.datasource.hive_sync.table=test_table";
+  static final String HOODIE_CONF_VALUE2 = 
"hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
+  static final Logger LOG = 
LogManager.getLogger(TestHoodieDeltaStreamerBase.class);
+  public static KafkaTestUtils testUtils;
+  protected static String topicName;
+
+  protected static int testNum = 1;
+
+  @BeforeAll
+  public static void initClass() throws Exception {
+    UtilitiesTestBase.initClass(true);
+    PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+    JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
+    testUtils = new KafkaTestUtils();
+    testUtils.setup();
+    topicName = "topic" + testNum;
+
+    // prepare the configs.
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/base.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties", 
dfs, dfsBasePath + "/config/base.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
 dfs,
+        dfsBasePath + "/sql-transformer.properties");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc", 
dfs, dfsBasePath + "/source.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
 dfs, dfsBasePath + "/source-flattened.avsc");
+    UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc", 
dfs, dfsBasePath + "/target.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc",
 dfs, dfsBasePath + "/target-flattened.avsc");
+
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc",
 dfs, dfsBasePath + "/source_short_trip_uber.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc", 
dfs, dfsBasePath + "/source_uber.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc",
 dfs, dfsBasePath + "/target_short_trip_uber.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc", 
dfs, dfsBasePath + "/target_uber.avsc");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties",
 dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties",
 dfs, dfsBasePath + "/config/uber_config.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties",
 dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
+    
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties",
 dfs, dfsBasePath + "/clusteringjob.properties");
+
+    TypedProperties props = new TypedProperties();
+    props.setProperty("include", "sql-transformer.properties");
+    props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
+    props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+    props.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+    
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", 
dfsBasePath + "/source.avsc");
+    
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", 
dfsBasePath + "/target.avsc");
+
+    // Hive Configs
+    props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), 
"jdbc:hive2://127.0.0.1:9999/");
+    props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), 
"testdb1");
+    props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY().key(), 
"hive_trips");
+    
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), 
"datestr");
+    
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
+        MultiPartKeysValueExtractor.class.getName());
+    UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE);
+
+    // Properties used for the delta-streamer which incrementally pulls from 
upstream Hudi source table and writes to
+    // downstream hudi table
+    TypedProperties downstreamProps = new TypedProperties();
+    downstreamProps.setProperty("include", "base.properties");
+    downstreamProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
+    downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+
+    // Source schema is the target schema of upstream table
+    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/target.avsc");
+    
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
+    UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath 
+ "/test-downstream-source.properties");
+
+    // Properties used for testing invalid key generator
+    TypedProperties invalidProps = new TypedProperties();
+    invalidProps.setProperty("include", "sql-transformer.properties");
+    invalidProps.setProperty("hoodie.datasource.write.keygenerator.class", 
"invalid");
+    invalidProps.setProperty("hoodie.datasource.write.recordkey.field", 
"_row_key");
+    invalidProps.setProperty("hoodie.datasource.write.partitionpath.field", 
"not_there");
+    
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source.avsc");
+    
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target.avsc");
+    UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath + 
"/" + PROPS_FILENAME_TEST_INVALID);
+
+    TypedProperties props1 = new TypedProperties();
+    populateAllCommonProps(props1);
+    UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" + 
PROPS_FILENAME_TEST_SOURCE1);
+
+    TypedProperties properties = new TypedProperties();
+    populateInvalidTableConfigFilePathProps(properties);
+    UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath + 
"/" + PROPS_INVALID_TABLE_CONFIG_FILE);
+
+    TypedProperties invalidHiveSyncProps = new TypedProperties();
+    
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
 "uber_db.dummy_table_uber");
+    
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
+    UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs, 
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
+
+    prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
+  }
+
+  @BeforeEach
+  public void setup() throws Exception {
+    super.setup();
+  }
+
+  @AfterAll
+  public static void cleanupClass() {
+    UtilitiesTestBase.cleanupClass();
+    if (testUtils != null) {
+      testUtils.teardown();
+    }
+  }
+
+  @AfterEach
+  public void teardown() throws Exception {
+    super.teardown();
+  }
+
+  private static void populateInvalidTableConfigFilePathProps(TypedProperties 
props) {
+    props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
+    
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
+    props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", 
"uber_db.dummy_table_uber");
+    
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/invalid_uber_config.properties");
+  }
+
+  static void populateAllCommonProps(TypedProperties props) {
+    populateCommonProps(props);
+    populateCommonKafkaProps(props);
+    populateCommonHiveProps(props);
+  }
+
+  protected static void populateCommonProps(TypedProperties props) {
+    props.setProperty("hoodie.datasource.write.keygenerator.class", 
TestHoodieDeltaStreamer.TestGenerator.class.getName());
+    
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", 
"yyyyMMdd");
+    props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested", 
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
+    
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
 dfsBasePath + "/config/uber_config.properties");
+    
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
 dfsBasePath + "/config/short_trip_uber_config.properties");
+  }
+
+  protected static void populateCommonKafkaProps(TypedProperties props) {
+    //Kafka source properties
+    props.setProperty("bootstrap.servers", testUtils.brokerAddress());
+    props.setProperty("auto.offset.reset", "earliest");
+    props.setProperty("key.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    props.setProperty("value.serializer", 
"org.apache.kafka.common.serialization.StringSerializer");
+    props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", 
String.valueOf(5000));
+  }
+
+  protected static void populateCommonHiveProps(TypedProperties props) {
+    // Hive Configs
+    props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(), 
"jdbc:hive2://127.0.0.1:9999/");
+    props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(), 
"testdb2");
+    
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY().key(),
 "false");
+    
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(), 
"datestr");
+    
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
+        MultiPartKeysValueExtractor.class.getName());
+  }
+
+  protected static void prepareParquetDFSFiles(int numRecords) throws 
IOException {
+    prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT);
+  }
+
+  protected static void prepareParquetDFSFiles(int numRecords, String 
baseParquetPath) throws IOException {
+    prepareParquetDFSFiles(numRecords, baseParquetPath, 
FIRST_PARQUET_FILE_NAME, false, null, null);
+  }
+
+  protected static void prepareParquetDFSFiles(int numRecords, String 
baseParquetPath, String fileName, boolean useCustomSchema,
+                                               String schemaStr, Schema 
schema) throws IOException {
+    String path = baseParquetPath + "/" + fileName;
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    if (useCustomSchema) {
+      Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+          dataGenerator.generateInsertsAsPerSchema("000", numRecords, 
schemaStr),
+          schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+    } else {
+      Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+          dataGenerator.generateInserts("000", numRecords)), new Path(path));
+    }
+  }
+
+}
diff --git 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index eed7c4c..f264ec6 100644
--- 
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++ 
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -38,16 +38,14 @@ import org.junit.jupiter.api.Test;
 import java.io.IOException;
 import java.util.Arrays;
 import java.util.List;
-import java.util.Random;
 
 import static org.junit.jupiter.api.Assertions.assertEquals;
 import static org.junit.jupiter.api.Assertions.assertThrows;
 import static org.junit.jupiter.api.Assertions.assertTrue;
 
-public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer 
{
+public class TestHoodieMultiTableDeltaStreamer extends 
TestHoodieDeltaStreamerBase {
 
   private static volatile Logger log = 
LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
-  private static final Random RANDOM = new Random();
 
   static class TestHelpers {
 

Reply via email to