This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 7bdae69 [HUDI-2253] Refactoring few tests to reduce runningtime.
DeltaStreamer and MultiDeltaStreamer tests. Bulk insert row writer tests (#3371)
7bdae69 is described below
commit 7bdae69053afc5ef604a15806d78317cb976f2ce
Author: Sivabalan Narayanan <[email protected]>
AuthorDate: Fri Jul 30 01:22:26 2021 -0400
[HUDI-2253] Refactoring few tests to reduce runningtime. DeltaStreamer and
MultiDeltaStreamer tests. Bulk insert row writer tests (#3371)
Co-authored-by: Sivabalan Narayanan <[email protected]>
---
.../TestHoodieBulkInsertDataInternalWriter.java | 4 +-
.../TestHoodieDataSourceInternalWriter.java | 9 +-
.../TestHoodieBulkInsertDataInternalWriter.java | 4 +-
.../TestHoodieDataSourceInternalBatchWrite.java | 7 +-
.../functional/TestHoodieDeltaStreamer.java | 228 +------------------
.../functional/TestHoodieDeltaStreamerBase.java | 245 +++++++++++++++++++++
.../TestHoodieMultiTableDeltaStreamer.java | 4 +-
7 files changed, 267 insertions(+), 234 deletions(-)
diff --git
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
index 9735379..fd943b7 100644
---
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -74,7 +74,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
// execute N rounds
- for (int i = 0; i < 3; i++) {
+ for (int i = 0; i < 2; i++) {
String instantTime = "00" + i;
// init writer
HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000), RANDOM.nextLong(), RANDOM.nextLong(),
@@ -82,7 +82,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
int size = 10 + RANDOM.nextInt(1000);
// write N rows to partition1, N rows to partition2 and N rows to
partition3 ... Each batch should create a new RowCreateHandle and a new file
- int batches = 5;
+ int batches = 3;
Dataset<Row> totalInputRows = null;
for (int j = 0; j < batches; j++) {
diff --git
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
index 342e2ae..eea49e6 100644
---
a/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark2/src/test/java/org/apache/hudi/internal/TestHoodieDataSourceInternalWriter.java
@@ -30,6 +30,7 @@ import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.sources.v2.DataSourceOptions;
import org.apache.spark.sql.sources.v2.writer.DataWriter;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -87,7 +88,7 @@ public class TestHoodieDataSourceInternalWriter extends
}
int size = 10 + RANDOM.nextInt(1000);
- int batches = 5;
+ int batches = 2;
Dataset<Row> totalInputRows = null;
for (int j = 0; j < batches; j++) {
String partitionPath = HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j
% 3];
@@ -158,7 +159,7 @@ public class TestHoodieDataSourceInternalWriter extends
int partitionCounter = 0;
// execute N rounds
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 2; i++) {
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalWriter dataSourceInternalWriter =
@@ -168,7 +169,7 @@ public class TestHoodieDataSourceInternalWriter extends
DataWriter<InternalRow> writer =
dataSourceInternalWriter.createWriterFactory().createDataWriter(partitionCounter++,
RANDOM.nextLong(), RANDOM.nextLong());
int size = 10 + RANDOM.nextInt(1000);
- int batches = 5; // one batch per partition
+ int batches = 2; // one batch per partition
for (int j = 0; j < batches; j++) {
String partitionPath =
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
@@ -195,6 +196,8 @@ public class TestHoodieDataSourceInternalWriter extends
}
}
+ // takes up lot of running time with CI.
+ @Disabled
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testLargeWrites(boolean populateMetaFields) throws Exception {
diff --git
a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
index 3b7fb97..a3d0e32 100644
---
a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
+++
b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieBulkInsertDataInternalWriter.java
@@ -75,7 +75,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
HoodieWriteConfig cfg = getWriteConfig(populateMetaFields);
HoodieTable table = HoodieSparkTable.create(cfg, context, metaClient);
// execute N rounds
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 2; i++) {
String instantTime = "00" + i;
// init writer
HoodieBulkInsertDataInternalWriter writer = new
HoodieBulkInsertDataInternalWriter(table, cfg, instantTime,
RANDOM.nextInt(100000),
@@ -83,7 +83,7 @@ public class TestHoodieBulkInsertDataInternalWriter extends
int size = 10 + RANDOM.nextInt(1000);
// write N rows to partition1, N rows to partition2 and N rows to
partition3 ... Each batch should create a new RowCreateHandle and a new file
- int batches = 5;
+ int batches = 3;
Dataset<Row> totalInputRows = null;
for (int j = 0; j < batches; j++) {
diff --git
a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
index 3c3866e..ae49804 100644
---
a/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
+++
b/hudi-spark-datasource/hudi-spark3/src/test/java/org/apache/hudi/spark3/internal/TestHoodieDataSourceInternalBatchWrite.java
@@ -32,6 +32,7 @@ import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.connector.write.DataWriter;
+import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
@@ -161,7 +162,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
int partitionCounter = 0;
// execute N rounds
- for (int i = 0; i < 5; i++) {
+ for (int i = 0; i < 2; i++) {
String instantTime = "00" + i;
// init writer
HoodieDataSourceInternalBatchWrite dataSourceInternalBatchWrite =
@@ -171,7 +172,7 @@ public class TestHoodieDataSourceInternalBatchWrite extends
DataWriter<InternalRow> writer =
dataSourceInternalBatchWrite.createBatchWriterFactory(null).createWriter(partitionCounter++,
RANDOM.nextLong());
int size = 10 + RANDOM.nextInt(1000);
- int batches = 5; // one batch per partition
+ int batches = 3; // one batch per partition
for (int j = 0; j < batches; j++) {
String partitionPath =
HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS[j % 3];
@@ -197,6 +198,8 @@ public class TestHoodieDataSourceInternalBatchWrite extends
}
}
+ // Large writes are not required to be executed w/ regular CI jobs. Takes
lot of running time.
+ @Disabled
@ParameterizedTest
@MethodSource("bulkInsertTypeParams")
public void testLargeWrites(boolean populateMetaFields) throws Exception {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 80f471e..1c68476 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -18,11 +18,6 @@
package org.apache.hudi.utilities.functional;
-import java.sql.Connection;
-import java.sql.DriverManager;
-import java.util.ConcurrentModificationException;
-import java.util.concurrent.ExecutorService;
-import org.apache.hudi.DataSourceWriteOptions;
import org.apache.hudi.common.config.DFSPropertiesConfiguration;
import org.apache.hudi.common.config.HoodieConfig;
import org.apache.hudi.common.config.TypedProperties;
@@ -45,7 +40,6 @@ import org.apache.hudi.exception.HoodieException;
import org.apache.hudi.exception.TableNotFoundException;
import org.apache.hudi.hive.HiveSyncConfig;
import org.apache.hudi.hive.HoodieHiveClient;
-import org.apache.hudi.hive.MultiPartKeysValueExtractor;
import org.apache.hudi.keygen.SimpleKeyGenerator;
import org.apache.hudi.utilities.DummySchemaProvider;
import org.apache.hudi.utilities.HoodieClusteringJob;
@@ -65,7 +59,6 @@ import
org.apache.hudi.utilities.testutils.sources.config.SourceConfigs;
import org.apache.hudi.utilities.transform.SqlQueryBasedTransformer;
import org.apache.hudi.utilities.transform.Transformer;
-import org.apache.avro.Schema;
import org.apache.avro.generic.GenericRecord;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
@@ -84,24 +77,22 @@ import org.apache.spark.sql.api.java.UDF4;
import org.apache.spark.sql.functions;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
-import org.apache.spark.streaming.kafka010.KafkaTestUtils;
-import org.junit.jupiter.api.AfterAll;
-import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
-import org.junit.jupiter.api.BeforeAll;
-import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.MethodSource;
import java.io.IOException;
+import java.sql.Connection;
+import java.sql.DriverManager;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
+import java.util.ConcurrentModificationException;
import java.util.List;
import java.util.Properties;
-import java.util.Random;
+import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -119,176 +110,9 @@ import static org.junit.jupiter.api.Assertions.fail;
/**
* Basic tests against {@link HoodieDeltaStreamer}, by issuing bulk_inserts,
upserts, inserts. Check counts at the end.
*/
-public class TestHoodieDeltaStreamer extends UtilitiesTestBase {
-
- private static final Random RANDOM = new Random();
- private static final String PROPS_FILENAME_TEST_SOURCE =
"test-source.properties";
- public static final String PROPS_FILENAME_TEST_SOURCE1 =
"test-source1.properties";
- public static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 =
"test-invalid-hive-sync-source1.properties";
- public static final String PROPS_INVALID_FILE =
"test-invalid-props.properties";
- public static final String PROPS_INVALID_TABLE_CONFIG_FILE =
"test-invalid-table-config.properties";
- private static final String PROPS_FILENAME_TEST_INVALID =
"test-invalid.properties";
- private static final String PROPS_FILENAME_TEST_CSV =
"test-csv-dfs-source.properties";
- protected static final String PROPS_FILENAME_TEST_PARQUET =
"test-parquet-dfs-source.properties";
- private static final String PROPS_FILENAME_TEST_JSON_KAFKA =
"test-json-kafka-dfs-source.properties";
- private static final String PROPS_FILENAME_TEST_MULTI_WRITER =
"test-multi-writer.properties";
- private static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
- private static String PARQUET_SOURCE_ROOT;
- private static String JSON_KAFKA_SOURCE_ROOT;
- private static final int PARQUET_NUM_RECORDS = 5;
- private static final int CSV_NUM_RECORDS = 3;
- private static final int JSON_KAFKA_NUM_RECORDS = 5;
- private String kafkaCheckpointType = "string";
- // Required fields
- private static final String TGT_BASE_PATH_PARAM = "--target-base-path";
- private static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
- private static final String TABLE_TYPE_PARAM = "--table-type";
- private static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE";
- private static final String TARGET_TABLE_PARAM = "--target-table";
- private static final String TARGET_TABLE_VALUE = "test";
- private static final String BASE_FILE_FORMAT_PARAM = "--base-file-format";
- private static final String BASE_FILE_FORMAT_VALUE = "PARQUET";
- private static final String SOURCE_LIMIT_PARAM = "--source-limit";
- private static final String SOURCE_LIMIT_VALUE = "500";
- private static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync";
- private static final String HOODIE_CONF_PARAM = "--hoodie-conf";
- private static final String HOODIE_CONF_VALUE1 =
"hoodie.datasource.hive_sync.table=test_table";
- private static final String HOODIE_CONF_VALUE2 =
"hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
- private static final Logger LOG =
LogManager.getLogger(TestHoodieDeltaStreamer.class);
- public static KafkaTestUtils testUtils;
- protected static String topicName;
-
- protected static int testNum = 1;
-
- @BeforeAll
- public static void initClass() throws Exception {
- UtilitiesTestBase.initClass(true);
- PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
- JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
- testUtils = new KafkaTestUtils();
- testUtils.setup();
- topicName = "topic" + testNum;
-
- // prepare the configs.
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties",
dfs, dfsBasePath + "/base.properties");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties",
dfs, dfsBasePath + "/config/base.properties");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
dfs,
- dfsBasePath + "/sql-transformer.properties");
- UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc",
dfs, dfsBasePath + "/source.avsc");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
dfs, dfsBasePath + "/source-flattened.avsc");
- UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc",
dfs, dfsBasePath + "/target.avsc");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc",
dfs, dfsBasePath + "/target-flattened.avsc");
-
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc",
dfs, dfsBasePath + "/source_short_trip_uber.avsc");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc",
dfs, dfsBasePath + "/source_uber.avsc");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc",
dfs, dfsBasePath + "/target_short_trip_uber.avsc");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc",
dfs, dfsBasePath + "/target_uber.avsc");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties",
dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties",
dfs, dfsBasePath + "/config/uber_config.properties");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties",
dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
-
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties",
dfs, dfsBasePath + "/clusteringjob.properties");
-
- TypedProperties props = new TypedProperties();
- props.setProperty("include", "sql-transformer.properties");
- props.setProperty("hoodie.datasource.write.keygenerator.class",
TestGenerator.class.getName());
- props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
- props.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
-
- // Hive Configs
- props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(),
"jdbc:hive2://127.0.0.1:9999/");
- props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(),
"testdb1");
- props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY().key(),
"hive_trips");
-
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(),
"datestr");
-
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
- MultiPartKeysValueExtractor.class.getName());
- UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" +
PROPS_FILENAME_TEST_SOURCE);
-
- // Properties used for the delta-streamer which incrementally pulls from
upstream Hudi source table and writes to
- // downstream hudi table
- TypedProperties downstreamProps = new TypedProperties();
- downstreamProps.setProperty("include", "base.properties");
- downstreamProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
- downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
-
- // Source schema is the target schema of upstream table
-
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/target.avsc");
-
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
- UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath
+ "/test-downstream-source.properties");
-
- // Properties used for testing invalid key generator
- TypedProperties invalidProps = new TypedProperties();
- invalidProps.setProperty("include", "sql-transformer.properties");
- invalidProps.setProperty("hoodie.datasource.write.keygenerator.class",
"invalid");
- invalidProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
- invalidProps.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
-
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
-
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
- UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath +
"/" + PROPS_FILENAME_TEST_INVALID);
-
- TypedProperties props1 = new TypedProperties();
- populateAllCommonProps(props1);
- UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" +
PROPS_FILENAME_TEST_SOURCE1);
-
- TypedProperties properties = new TypedProperties();
- populateInvalidTableConfigFilePathProps(properties);
- UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath +
"/" + PROPS_INVALID_TABLE_CONFIG_FILE);
-
- TypedProperties invalidHiveSyncProps = new TypedProperties();
-
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"uber_db.dummy_table_uber");
-
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
- UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs,
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
-
- prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
- }
-
- @AfterAll
- public static void release() {
- if (testUtils != null) {
- testUtils.teardown();
- }
- }
-
- private static void populateInvalidTableConfigFilePathProps(TypedProperties
props) {
- props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
-
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyyMMdd");
- props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"uber_db.dummy_table_uber");
-
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_uber_config.properties");
- }
+public class TestHoodieDeltaStreamer extends TestHoodieDeltaStreamerBase {
- private static void populateAllCommonProps(TypedProperties props) {
- populateCommonProps(props);
- populateCommonKafkaProps(props);
- populateCommonHiveProps(props);
- }
-
- protected static void populateCommonProps(TypedProperties props) {
- props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
-
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyyMMdd");
- props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
-
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/uber_config.properties");
-
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
dfsBasePath + "/config/short_trip_uber_config.properties");
- }
-
- protected static void populateCommonKafkaProps(TypedProperties props) {
- //Kafka source properties
- props.setProperty("bootstrap.servers", testUtils.brokerAddress());
- props.setProperty("auto.offset.reset", "earliest");
- props.setProperty("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- props.setProperty("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
String.valueOf(5000));
- }
-
- protected static void populateCommonHiveProps(TypedProperties props) {
- // Hive Configs
- props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(),
"jdbc:hive2://127.0.0.1:9999/");
- props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(),
"testdb2");
-
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY().key(),
"false");
-
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(),
"datestr");
-
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
- MultiPartKeysValueExtractor.class.getName());
- }
+ private static final Logger LOG =
LogManager.getLogger(TestHoodieDeltaStreamer.class);
protected static TypedProperties prepareMultiWriterProps(String
propsFileName) throws IOException {
TypedProperties props = new TypedProperties();
@@ -318,24 +142,6 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
return props;
}
- @AfterAll
- public static void cleanupClass() {
- UtilitiesTestBase.cleanupClass();
- if (testUtils != null) {
- testUtils.teardown();
- }
- }
-
- @BeforeEach
- public void setup() throws Exception {
- super.setup();
- }
-
- @AfterEach
- public void teardown() throws Exception {
- super.teardown();
- }
-
static class TestHelpers {
static HoodieDeltaStreamer.Config makeDropAllConfig(String basePath,
WriteOperationType op) {
@@ -1333,28 +1139,6 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
assertEquals(1000, c);
}
- private static void prepareParquetDFSFiles(int numRecords) throws
IOException {
- prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT);
- }
-
- protected static void prepareParquetDFSFiles(int numRecords, String
baseParquetPath) throws IOException {
- prepareParquetDFSFiles(numRecords, baseParquetPath,
FIRST_PARQUET_FILE_NAME, false, null, null);
- }
-
- protected static void prepareParquetDFSFiles(int numRecords, String
baseParquetPath, String fileName, boolean useCustomSchema,
- String schemaStr, Schema schema) throws IOException {
- String path = baseParquetPath + "/" + fileName;
- HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
- if (useCustomSchema) {
- Helpers.saveParquetToDFS(Helpers.toGenericRecords(
- dataGenerator.generateInsertsAsPerSchema("000", numRecords,
schemaStr),
- schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
- } else {
- Helpers.saveParquetToDFS(Helpers.toGenericRecords(
- dataGenerator.generateInserts("000", numRecords)), new Path(path));
- }
- }
-
private static void prepareJsonKafkaDFSFiles(int numRecords, boolean
createTopic, String topicName) throws IOException {
if (createTopic) {
try {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
new file mode 100644
index 0000000..ae477dc
--- /dev/null
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamerBase.java
@@ -0,0 +1,245 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hudi.utilities.functional;
+
+import org.apache.hudi.DataSourceWriteOptions;
+import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
+import org.apache.hudi.hive.MultiPartKeysValueExtractor;
+import org.apache.hudi.utilities.testutils.UtilitiesTestBase;
+
+import org.apache.avro.Schema;
+import org.apache.hadoop.fs.Path;
+import org.apache.log4j.LogManager;
+import org.apache.log4j.Logger;
+import org.apache.spark.streaming.kafka010.KafkaTestUtils;
+import org.junit.jupiter.api.AfterAll;
+import org.junit.jupiter.api.AfterEach;
+import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.BeforeEach;
+
+import java.io.IOException;
+import java.util.Random;
+
+public class TestHoodieDeltaStreamerBase extends UtilitiesTestBase {
+
+
+ static final Random RANDOM = new Random();
+ static final String PROPS_FILENAME_TEST_SOURCE = "test-source.properties";
+ static final String PROPS_FILENAME_TEST_SOURCE1 = "test-source1.properties";
+ static final String PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1 =
"test-invalid-hive-sync-source1.properties";
+ static final String PROPS_INVALID_FILE = "test-invalid-props.properties";
+ static final String PROPS_INVALID_TABLE_CONFIG_FILE =
"test-invalid-table-config.properties";
+ static final String PROPS_FILENAME_TEST_INVALID = "test-invalid.properties";
+ static final String PROPS_FILENAME_TEST_CSV =
"test-csv-dfs-source.properties";
+ static final String PROPS_FILENAME_TEST_PARQUET =
"test-parquet-dfs-source.properties";
+ static final String PROPS_FILENAME_TEST_JSON_KAFKA =
"test-json-kafka-dfs-source.properties";
+ static final String PROPS_FILENAME_TEST_MULTI_WRITER =
"test-multi-writer.properties";
+ static final String FIRST_PARQUET_FILE_NAME = "1.parquet";
+ static String PARQUET_SOURCE_ROOT;
+ static String JSON_KAFKA_SOURCE_ROOT;
+ static final int PARQUET_NUM_RECORDS = 5;
+ static final int CSV_NUM_RECORDS = 3;
+ static final int JSON_KAFKA_NUM_RECORDS = 5;
+ String kafkaCheckpointType = "string";
+ // Required fields
+ static final String TGT_BASE_PATH_PARAM = "--target-base-path";
+ static final String TGT_BASE_PATH_VALUE = "s3://mybucket/blah";
+ static final String TABLE_TYPE_PARAM = "--table-type";
+ static final String TABLE_TYPE_VALUE = "COPY_ON_WRITE";
+ static final String TARGET_TABLE_PARAM = "--target-table";
+ static final String TARGET_TABLE_VALUE = "test";
+ static final String BASE_FILE_FORMAT_PARAM = "--base-file-format";
+ static final String BASE_FILE_FORMAT_VALUE = "PARQUET";
+ static final String SOURCE_LIMIT_PARAM = "--source-limit";
+ static final String SOURCE_LIMIT_VALUE = "500";
+ static final String ENABLE_HIVE_SYNC_PARAM = "--enable-hive-sync";
+ static final String HOODIE_CONF_PARAM = "--hoodie-conf";
+ static final String HOODIE_CONF_VALUE1 =
"hoodie.datasource.hive_sync.table=test_table";
+ static final String HOODIE_CONF_VALUE2 =
"hoodie.datasource.write.recordkey.field=Field1,Field2,Field3";
+ static final Logger LOG =
LogManager.getLogger(TestHoodieDeltaStreamerBase.class);
+ public static KafkaTestUtils testUtils;
+ protected static String topicName;
+
+ protected static int testNum = 1;
+
+ @BeforeAll
+ public static void initClass() throws Exception {
+ UtilitiesTestBase.initClass(true);
+ PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
+ JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
+ testUtils = new KafkaTestUtils();
+ testUtils.setup();
+ topicName = "topic" + testNum;
+
+ // prepare the configs.
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties",
dfs, dfsBasePath + "/base.properties");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/base.properties",
dfs, dfsBasePath + "/config/base.properties");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/sql-transformer.properties",
dfs,
+ dfsBasePath + "/sql-transformer.properties");
+ UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source.avsc",
dfs, dfsBasePath + "/source.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source-flattened.avsc",
dfs, dfsBasePath + "/source-flattened.avsc");
+ UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target.avsc",
dfs, dfsBasePath + "/target.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target-flattened.avsc",
dfs, dfsBasePath + "/target-flattened.avsc");
+
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_short_trip_uber.avsc",
dfs, dfsBasePath + "/source_short_trip_uber.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/source_uber.avsc",
dfs, dfsBasePath + "/source_uber.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_short_trip_uber.avsc",
dfs, dfsBasePath + "/target_short_trip_uber.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/target_uber.avsc",
dfs, dfsBasePath + "/target_uber.avsc");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/invalid_hive_sync_uber_config.properties",
dfs, dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/uber_config.properties",
dfs, dfsBasePath + "/config/uber_config.properties");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/short_trip_uber_config.properties",
dfs, dfsBasePath + "/config/short_trip_uber_config.properties");
+
UtilitiesTestBase.Helpers.copyToDFS("delta-streamer-config/clusteringjob.properties",
dfs, dfsBasePath + "/clusteringjob.properties");
+
+ TypedProperties props = new TypedProperties();
+ props.setProperty("include", "sql-transformer.properties");
+ props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
+ props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
+ props.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
+
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
+
+ // Hive Configs
+ props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(),
"jdbc:hive2://127.0.0.1:9999/");
+ props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(),
"testdb1");
+ props.setProperty(DataSourceWriteOptions.HIVE_TABLE_OPT_KEY().key(),
"hive_trips");
+
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(),
"datestr");
+
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
+ MultiPartKeysValueExtractor.class.getName());
+ UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/" +
PROPS_FILENAME_TEST_SOURCE);
+
+ // Properties used for the delta-streamer which incrementally pulls from
upstream Hudi source table and writes to
+ // downstream hudi table
+ TypedProperties downstreamProps = new TypedProperties();
+ downstreamProps.setProperty("include", "base.properties");
+ downstreamProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
+ downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+
+ // Source schema is the target schema of upstream table
+
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/target.avsc");
+
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
+ UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath
+ "/test-downstream-source.properties");
+
+ // Properties used for testing invalid key generator
+ TypedProperties invalidProps = new TypedProperties();
+ invalidProps.setProperty("include", "sql-transformer.properties");
+ invalidProps.setProperty("hoodie.datasource.write.keygenerator.class",
"invalid");
+ invalidProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
+ invalidProps.setProperty("hoodie.datasource.write.partitionpath.field",
"not_there");
+
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
+
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
+ UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath +
"/" + PROPS_FILENAME_TEST_INVALID);
+
+ TypedProperties props1 = new TypedProperties();
+ populateAllCommonProps(props1);
+ UtilitiesTestBase.Helpers.savePropsToDFS(props1, dfs, dfsBasePath + "/" +
PROPS_FILENAME_TEST_SOURCE1);
+
+ TypedProperties properties = new TypedProperties();
+ populateInvalidTableConfigFilePathProps(properties);
+ UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath +
"/" + PROPS_INVALID_TABLE_CONFIG_FILE);
+
+ TypedProperties invalidHiveSyncProps = new TypedProperties();
+
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"uber_db.dummy_table_uber");
+
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
+ UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs,
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
+
+ prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
+ }
+
+ @BeforeEach
+ public void setup() throws Exception {
+ super.setup();
+ }
+
+ @AfterAll
+ public static void cleanupClass() {
+ UtilitiesTestBase.cleanupClass();
+ if (testUtils != null) {
+ testUtils.teardown();
+ }
+ }
+
+ @AfterEach
+ public void teardown() throws Exception {
+ super.teardown();
+ }
+
+ private static void populateInvalidTableConfigFilePathProps(TypedProperties
props) {
+ props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
+
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyyMMdd");
+ props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"uber_db.dummy_table_uber");
+
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_uber_config.properties");
+ }
+
+ static void populateAllCommonProps(TypedProperties props) {
+ populateCommonProps(props);
+ populateCommonKafkaProps(props);
+ populateCommonHiveProps(props);
+ }
+
+ protected static void populateCommonProps(TypedProperties props) {
+ props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
+
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyyMMdd");
+ props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
+
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/uber_config.properties");
+
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
dfsBasePath + "/config/short_trip_uber_config.properties");
+ }
+
+ protected static void populateCommonKafkaProps(TypedProperties props) {
+ //Kafka source properties
+ props.setProperty("bootstrap.servers", testUtils.brokerAddress());
+ props.setProperty("auto.offset.reset", "earliest");
+ props.setProperty("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ props.setProperty("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
+ props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
String.valueOf(5000));
+ }
+
+ protected static void populateCommonHiveProps(TypedProperties props) {
+ // Hive Configs
+ props.setProperty(DataSourceWriteOptions.HIVE_URL_OPT_KEY().key(),
"jdbc:hive2://127.0.0.1:9999/");
+ props.setProperty(DataSourceWriteOptions.HIVE_DATABASE_OPT_KEY().key(),
"testdb2");
+
props.setProperty(DataSourceWriteOptions.HIVE_ASSUME_DATE_PARTITION_OPT_KEY().key(),
"false");
+
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_FIELDS_OPT_KEY().key(),
"datestr");
+
props.setProperty(DataSourceWriteOptions.HIVE_PARTITION_EXTRACTOR_CLASS_OPT_KEY().key(),
+ MultiPartKeysValueExtractor.class.getName());
+ }
+
+ protected static void prepareParquetDFSFiles(int numRecords) throws
IOException {
+ prepareParquetDFSFiles(numRecords, PARQUET_SOURCE_ROOT);
+ }
+
+ protected static void prepareParquetDFSFiles(int numRecords, String
baseParquetPath) throws IOException {
+ prepareParquetDFSFiles(numRecords, baseParquetPath,
FIRST_PARQUET_FILE_NAME, false, null, null);
+ }
+
+ protected static void prepareParquetDFSFiles(int numRecords, String
baseParquetPath, String fileName, boolean useCustomSchema,
+ String schemaStr, Schema
schema) throws IOException {
+ String path = baseParquetPath + "/" + fileName;
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ if (useCustomSchema) {
+ Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+ dataGenerator.generateInsertsAsPerSchema("000", numRecords,
schemaStr),
+ schema), new Path(path), HoodieTestDataGenerator.AVRO_TRIP_SCHEMA);
+ } else {
+ Helpers.saveParquetToDFS(Helpers.toGenericRecords(
+ dataGenerator.generateInserts("000", numRecords)), new Path(path));
+ }
+ }
+
+}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index eed7c4c..f264ec6 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -38,16 +38,14 @@ import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
-import java.util.Random;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.junit.jupiter.api.Assertions.assertTrue;
-public class TestHoodieMultiTableDeltaStreamer extends TestHoodieDeltaStreamer
{
+public class TestHoodieMultiTableDeltaStreamer extends
TestHoodieDeltaStreamerBase {
private static volatile Logger log =
LogManager.getLogger(TestHoodieMultiTableDeltaStreamer.class);
- private static final Random RANDOM = new Random();
static class TestHelpers {