This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 26c00a3adef [HUDI-7187] Fix integ test props to honor new streamer
properties (#10866)
26c00a3adef is described below
commit 26c00a3adefff9217187ca0ab9a5b2a7c9e42199
Author: wombatu-kun <[email protected]>
AuthorDate: Sun Mar 31 11:16:01 2024 +0700
[HUDI-7187] Fix integ test props to honor new streamer properties (#10866)
Co-authored-by: Vova Kolmakov <[email protected]>
---
.../TestKafkaConnectHdfsProvider.java | 4 +-
.../hudi/utilities/config/SourceTestConfig.java | 15 +++--
.../deltastreamer/HoodieDeltaStreamerTestBase.java | 54 ++++++++--------
.../deltastreamer/TestHoodieDeltaStreamer.java | 48 +++++++-------
.../TestHoodieDeltaStreamerWithMultiWriter.java | 4 +-
.../TestHoodieMultiTableDeltaStreamer.java | 14 ++--
.../functional/TestHiveSchemaProvider.java | 10 +--
.../functional/TestJdbcbasedSchemaProvider.java | 14 ++--
.../schema/TestSchemaRegistryProvider.java | 16 ++---
.../utilities/sources/BaseTestKafkaSource.java | 2 +-
.../hudi/utilities/sources/TestAvroDFSSource.java | 2 +-
.../utilities/sources/TestAvroKafkaSource.java | 12 ++--
.../hudi/utilities/sources/TestCsvDFSSource.java | 6 +-
.../sources/TestGcsEventsHoodieIncrSource.java | 18 +++---
.../utilities/sources/TestHoodieIncrSource.java | 4 +-
.../hudi/utilities/sources/TestJdbcSource.java | 74 +++++++++++-----------
.../hudi/utilities/sources/TestJsonDFSSource.java | 2 +-
.../utilities/sources/TestJsonKafkaSource.java | 6 +-
.../sources/TestJsonKafkaSourcePostProcessor.java | 2 +-
.../utilities/sources/TestParquetDFSSource.java | 2 +-
.../utilities/sources/TestProtoKafkaSource.java | 4 +-
.../sources/TestS3EventsHoodieIncrSource.java | 20 +++---
.../utilities/sources/TestSqlFileBasedSource.java | 4 +-
.../hudi/utilities/sources/TestSqlSource.java | 2 +-
.../debezium/TestAbstractDebeziumSource.java | 6 +-
.../helpers/TestCloudObjectsSelectorCommon.java | 18 +++---
.../sources/helpers/TestKafkaOffsetGen.java | 6 +-
.../utilities/testutils/UtilitiesTestBase.java | 4 +-
.../testutils/sources/AbstractBaseTestSource.java | 24 ++++---
.../sources/DistributedTestDataSource.java | 11 ++--
.../transform/TestSqlFileBasedTransformer.java | 8 +--
.../transform/TestSqlQueryBasedTransformer.java | 2 +-
.../streamer-config/dfs-source.properties | 6 +-
.../invalid_hive_sync_uber_config.properties | 6 +-
.../streamer-config/kafka-source.properties | 6 +-
.../short_trip_uber_config.properties | 12 ++--
.../streamer-config/sql-transformer.properties | 2 +-
.../streamer-config/uber_config.properties | 10 +--
38 files changed, 232 insertions(+), 228 deletions(-)
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
index fb6f5d649cb..e90cfdb6856 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/checkpointing/TestKafkaConnectHdfsProvider.java
@@ -62,7 +62,7 @@ public class TestKafkaConnectHdfsProvider extends
HoodieCommonTestHarness {
new File(topicPath + "/year=2016/month=05/day=02/"
+ "random_snappy_2" + BASE_FILE_EXTENSION).createNewFile();
final TypedProperties props = new TypedProperties();
- props.put("hoodie.deltastreamer.checkpoint.provider.path",
topicPath.toString());
+ props.put("hoodie.streamer.checkpoint.provider.path",
topicPath.toString());
final InitialCheckPointProvider provider = new
KafkaConnectHdfsProvider(props);
provider.init(HoodieTestUtils.getDefaultHadoopConf());
assertEquals("topic1,0:300,1:200", provider.getCheckpoint());
@@ -83,7 +83,7 @@ public class TestKafkaConnectHdfsProvider extends
HoodieCommonTestHarness {
new File(topicPath + "/year=2016/month=05/day=02/"
+ "topic1+0+201+300" + BASE_FILE_EXTENSION).createNewFile();
final TypedProperties props = new TypedProperties();
- props.put("hoodie.deltastreamer.checkpoint.provider.path",
topicPath.toString());
+ props.put("hoodie.streamer.checkpoint.provider.path",
topicPath.toString());
final InitialCheckPointProvider provider = new
KafkaConnectHdfsProvider(props);
provider.init(HoodieTestUtils.getDefaultHadoopConf());
assertThrows(HoodieException.class, provider::getCheckpoint);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java
index 450d6e8dc3a..760e7ed7ff4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/config/SourceTestConfig.java
@@ -21,29 +21,36 @@ package org.apache.hudi.utilities.config;
import org.apache.hudi.common.config.ConfigProperty;
+import static
org.apache.hudi.common.util.ConfigUtils.DELTA_STREAMER_CONFIG_PREFIX;
+import static org.apache.hudi.common.util.ConfigUtils.STREAMER_CONFIG_PREFIX;
+
/**
* Configurations for Test Data Sources.
*/
public class SourceTestConfig {
public static final ConfigProperty<Integer> NUM_SOURCE_PARTITIONS_PROP =
ConfigProperty
- .key("hoodie.deltastreamer.source.test.num_partitions")
+ .key(STREAMER_CONFIG_PREFIX + "source.test.num_partitions")
.defaultValue(10)
+ .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.test.num_partitions")
.withDocumentation("Used by DistributedTestDataSource only. Number of
partitions where each partitions generates test-data");
public static final ConfigProperty<Integer> MAX_UNIQUE_RECORDS_PROP =
ConfigProperty
- .key("hoodie.deltastreamer.source.test.max_unique_records")
+ .key(STREAMER_CONFIG_PREFIX + "source.test.max_unique_records")
.defaultValue(Integer.MAX_VALUE)
+ .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.test.max_unique_records")
.withDocumentation("Maximum number of unique records generated for the
run");
public static final ConfigProperty<Boolean>
USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS = ConfigProperty
-
.key("hoodie.deltastreamer.source.test.datagen.use_rocksdb_for_storing_existing_keys")
+ .key(STREAMER_CONFIG_PREFIX +
"source.test.datagen.use_rocksdb_for_storing_existing_keys")
.defaultValue(false)
+ .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.test.datagen.use_rocksdb_for_storing_existing_keys")
.withDocumentation("If true, uses Rocks DB for storing datagen keys");
public static final ConfigProperty<String>
ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS = ConfigProperty
- .key("hoodie.deltastreamer.source.test.datagen.rocksdb_base_dir")
+ .key(STREAMER_CONFIG_PREFIX + "source.test.datagen.rocksdb_base_dir")
.noDefaultValue()
+ .withAlternatives(DELTA_STREAMER_CONFIG_PREFIX +
"source.test.datagen.rocksdb_base_dir")
.withDocumentation("Base Dir for storing datagen keys");
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
index 3c74388860e..2b2013d04cd 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/HoodieDeltaStreamerTestBase.java
@@ -204,8 +204,8 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
// Source schema is the target schema of upstream table
-
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/target.avsc");
-
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
+
downstreamProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
dfsBasePath + "/target.avsc");
+
downstreamProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs, dfsBasePath
+ "/test-downstream-source.properties");
// Properties used for testing invalid key generator
@@ -214,8 +214,8 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
invalidProps.setProperty("hoodie.datasource.write.keygenerator.class",
"invalid");
invalidProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
invalidProps.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
-
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
-
invalidProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
+
invalidProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
+
invalidProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidProps, dfs, dfsBasePath +
"/" + PROPS_FILENAME_TEST_INVALID);
// Properties used for testing inferring key generator for complex key
generator
@@ -223,8 +223,8 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
inferKeygenProps.setProperty("include", "base.properties");
inferKeygenProps.setProperty("hoodie.datasource.write.recordkey.field",
"timestamp,_row_key");
inferKeygenProps.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
-
inferKeygenProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
-
inferKeygenProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
+
inferKeygenProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
+
inferKeygenProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(inferKeygenProps, dfs,
dfsBasePath + "/" + PROPS_FILENAME_INFER_COMPLEX_KEYGEN);
// Properties used for testing inferring key generator for non-partitioned
key generator
@@ -240,8 +240,8 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
UtilitiesTestBase.Helpers.savePropsToDFS(properties, dfs, dfsBasePath +
"/" + PROPS_INVALID_TABLE_CONFIG_FILE);
TypedProperties invalidHiveSyncProps = new TypedProperties();
-
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"uber_db.dummy_table_uber");
-
invalidHiveSyncProps.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
+
invalidHiveSyncProps.setProperty("hoodie.streamer.ingestion.tablesToBeIngested",
"uber_db.dummy_table_uber");
+
invalidHiveSyncProps.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_hive_sync_uber_config.properties");
UtilitiesTestBase.Helpers.savePropsToDFS(invalidHiveSyncProps, dfs,
dfsBasePath + "/" + PROPS_INVALID_HIVE_SYNC_TEST_SOURCE1);
}
@@ -251,8 +251,8 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
+ props.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
dfsBasePath + "/source.avsc");
+ props.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
dfsBasePath + "/target.avsc");
// Hive Configs
props.setProperty(HiveSyncConfigHolder.HIVE_URL.key(),
HiveTestService.HS2_JDBC_URL);
@@ -266,9 +266,9 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
protected static void
populateInvalidTableConfigFilePathProps(TypedProperties props, String
dfsBasePath) {
props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
-
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyyMMdd");
- props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"uber_db.dummy_table_uber");
-
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_uber_config.properties");
+ props.setProperty("hoodie.keygen.timebased.output.dateformat", "yyyyMMdd");
+ props.setProperty("hoodie.streamer.ingestion.tablesToBeIngested",
"uber_db.dummy_table_uber");
+
props.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/invalid_uber_config.properties");
}
protected static void populateAllCommonProps(TypedProperties props, String
dfsBasePath, String brokerAddress) {
@@ -279,10 +279,10 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
protected static void populateCommonProps(TypedProperties props, String
dfsBasePath) {
props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
-
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat",
"yyyyMMdd");
- props.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
-
props.setProperty("hoodie.deltastreamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/uber_config.properties");
-
props.setProperty("hoodie.deltastreamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
dfsBasePath + "/config/short_trip_uber_config.properties");
+ props.setProperty("hoodie.keygen.timebased.output.dateformat", "yyyyMMdd");
+ props.setProperty("hoodie.streamer.ingestion.tablesToBeIngested",
"short_trip_db.dummy_table_short_trip,uber_db.dummy_table_uber");
+
props.setProperty("hoodie.streamer.ingestion.uber_db.dummy_table_uber.configFile",
dfsBasePath + "/config/uber_config.properties");
+
props.setProperty("hoodie.streamer.ingestion.short_trip_db.dummy_table_short_trip.configFile",
dfsBasePath + "/config/short_trip_uber_config.properties");
}
protected static void populateCommonKafkaProps(TypedProperties props, String
brokerAddress) {
@@ -291,7 +291,7 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
props.setProperty("auto.offset.reset", "earliest");
props.setProperty("key.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
props.setProperty("value.serializer",
"org.apache.kafka.common.serialization.StringSerializer");
- props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
String.valueOf(5000));
+ props.setProperty("hoodie.streamer.kafka.source.maxEvents",
String.valueOf(5000));
}
protected static void populateCommonHiveProps(TypedProperties props) {
@@ -384,12 +384,12 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
parquetProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
parquetProps.setProperty("hoodie.datasource.write.partitionpath.field",
partitionPath);
if (useSchemaProvider) {
-
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/" + sourceSchemaFile);
+
parquetProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
basePath + "/" + sourceSchemaFile);
if (hasTransformer) {
-
parquetProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/" + targetSchemaFile);
+
parquetProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
basePath + "/" + targetSchemaFile);
}
}
- parquetProps.setProperty("hoodie.deltastreamer.source.dfs.root",
parquetSourceRoot);
+ parquetProps.setProperty("hoodie.streamer.source.dfs.root",
parquetSourceRoot);
if (!StringUtils.isNullOrEmpty(emptyBatchParam)) {
parquetProps.setProperty(TestParquetDFSSourceEmptyBatch.RETURN_EMPTY_BATCH,
emptyBatchParam);
}
@@ -405,11 +405,11 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
props.setProperty("hoodie.embed.timeline.server", "false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field",
partitionPath);
- props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
- props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
String.valueOf(5000));
+ props.setProperty("hoodie.streamer.source.kafka.topic", topicName);
+ props.setProperty("hoodie.streamer.kafka.source.maxEvents",
String.valueOf(5000));
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
props.setProperty(KafkaSourceConfig.KAFKA_AVRO_VALUE_DESERIALIZER_CLASS.key(),
ByteArrayDeserializer.class.getName());
- props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+ props.setProperty("hoodie.streamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ?
String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
@@ -617,10 +617,10 @@ public class HoodieDeltaStreamerTestBase extends
UtilitiesTestBase {
}
List<String> cfgs = new ArrayList<>();
cfgs.add(HoodieCommonConfig.SET_NULL_FOR_MISSING_COLUMNS.key() +
"=true");
-
cfgs.add("hoodie.deltastreamer.source.hoodieincr.read_latest_on_missing_ckpt="
+ addReadLatestOnMissingCkpt);
- cfgs.add("hoodie.deltastreamer.source.hoodieincr.path=" + srcBasePath);
+
cfgs.add("hoodie.streamer.source.hoodieincr.read_latest_on_missing_ckpt=" +
addReadLatestOnMissingCkpt);
+ cfgs.add("hoodie.streamer.source.hoodieincr.path=" + srcBasePath);
// No partition
-
cfgs.add("hoodie.deltastreamer.source.hoodieincr.partition.fields=datestr");
+ cfgs.add("hoodie.streamer.source.hoodieincr.partition.fields=datestr");
cfg.configs = cfgs;
return cfg;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
index 2a2c4dafb1e..34486a07ab8 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamer.java
@@ -376,7 +376,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer.Config cfg =
TestHelpers.makeDropAllConfig(tableBasePath, WriteOperationType.UPSERT);
TypedProperties props =
new DFSPropertiesConfiguration(fs.getConf(), new Path(basePath + "/" +
PROPS_FILENAME_TEST_SOURCE)).getProps();
- props.put("hoodie.deltastreamer.checkpoint.provider.path", bootstrapPath);
+ props.put("hoodie.streamer.checkpoint.provider.path", bootstrapPath);
cfg.initialCheckpointProvider = checkpointProviderClass;
// create regular kafka connect hdfs dirs
fs.mkdirs(new Path(bootstrapPath));
@@ -568,8 +568,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer.Config cfg = TestHelpers.makeConfig(tableBasePath,
WriteOperationType.INSERT,
Collections.singletonList(TestIdentityTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
addRecordMerger(recordType, cfg.configs);
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file="
+ basePath + "/source.avsc");
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file="
+ basePath + "/source.avsc");
+ cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" +
basePath + "/source.avsc");
+ cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" +
basePath + "/source.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
+ "=false");
@@ -582,8 +582,8 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Collections.singletonList(TripsWithEvolvedOptionalFieldTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
addRecordMerger(recordType, cfg.configs);
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file="
+ basePath + "/source.avsc");
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file="
+ basePath + "/source_evolved.avsc");
+ cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" +
basePath + "/source.avsc");
+ cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" +
basePath + "/source_evolved.avsc");
cfg.configs.add(DataSourceWriteOptions.RECONCILE_SCHEMA().key() + "=true");
if (!useSchemaPostProcessor) {
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
+ "=false");
@@ -607,9 +607,9 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
cfg = TestHelpers.makeConfig(tableBasePath, WriteOperationType.UPSERT,
Collections.singletonList(TestIdentityTransformer.class.getName()),
PROPS_FILENAME_TEST_SOURCE, false, true, false, null, tableType);
addRecordMerger(recordType, cfg.configs);
- cfg.configs.add("hoodie.deltastreamer.schemaprovider.source.schema.file="
+ basePath + "/source.avsc");
+ cfg.configs.add("hoodie.streamer.schemaprovider.source.schema.file=" +
basePath + "/source.avsc");
if (useUserProvidedSchema) {
-
cfg.configs.add("hoodie.deltastreamer.schemaprovider.target.schema.file=" +
basePath + "/source_evolved.avsc");
+ cfg.configs.add("hoodie.streamer.schemaprovider.target.schema.file=" +
basePath + "/source_evolved.avsc");
}
if (!useSchemaPostProcessor) {
cfg.configs.add(HoodieSchemaProviderConfig.SPARK_AVRO_POST_PROCESSOR_ENABLE.key()
+ "=false");
@@ -1822,12 +1822,12 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
orcProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
orcProps.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
if (useSchemaProvider) {
-
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/" + "source.avsc");
+
orcProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
basePath + "/" + "source.avsc");
if (transformerClassNames != null) {
-
orcProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/" + "target.avsc");
+
orcProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
basePath + "/" + "target.avsc");
}
}
- orcProps.setProperty("hoodie.deltastreamer.source.dfs.root",
ORC_SOURCE_ROOT);
+ orcProps.setProperty("hoodie.streamer.source.dfs.root", ORC_SOURCE_ROOT);
UtilitiesTestBase.Helpers.savePropsToDFS(orcProps, fs, basePath + "/" +
PROPS_FILENAME_TEST_ORC);
String tableBasePath = basePath + "/test_orc_source_table" + testNum;
@@ -1852,11 +1852,11 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
props.setProperty("hoodie.embed.timeline.server", "false");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field", "driver");
- props.setProperty("hoodie.deltastreamer.source.dfs.root",
JSON_KAFKA_SOURCE_ROOT);
- props.setProperty("hoodie.deltastreamer.source.kafka.topic", topicName);
- props.setProperty("hoodie.deltastreamer.source.kafka.checkpoint.type",
kafkaCheckpointType);
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source_uber.avsc");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/target_uber.avsc");
+ props.setProperty("hoodie.streamer.source.dfs.root",
JSON_KAFKA_SOURCE_ROOT);
+ props.setProperty("hoodie.streamer.source.kafka.topic", topicName);
+ props.setProperty("hoodie.streamer.source.kafka.checkpoint.type",
kafkaCheckpointType);
+ props.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
basePath + "/source_uber.avsc");
+ props.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
basePath + "/target_uber.avsc");
props.setProperty("auto.offset.reset", autoResetValue);
if (extraProps != null && !extraProps.isEmpty()) {
extraProps.forEach(props::setProperty);
@@ -2255,22 +2255,22 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
csvProps.setProperty("hoodie.datasource.write.recordkey.field",
recordKeyField);
csvProps.setProperty("hoodie.datasource.write.partitionpath.field",
partitionPath);
if (useSchemaProvider) {
-
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source-flattened.avsc");
+
csvProps.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
basePath + "/source-flattened.avsc");
if (hasTransformer) {
-
csvProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/target-flattened.avsc");
+
csvProps.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
basePath + "/target-flattened.avsc");
}
}
- csvProps.setProperty("hoodie.deltastreamer.source.dfs.root", sourceRoot);
+ csvProps.setProperty("hoodie.streamer.source.dfs.root", sourceRoot);
if (sep != ',') {
if (sep == '\t') {
- csvProps.setProperty("hoodie.deltastreamer.csv.sep", "\\t");
+ csvProps.setProperty("hoodie.streamer.csv.sep", "\\t");
} else {
- csvProps.setProperty("hoodie.deltastreamer.csv.sep",
Character.toString(sep));
+ csvProps.setProperty("hoodie.streamer.csv.sep",
Character.toString(sep));
}
}
if (hasHeader) {
- csvProps.setProperty("hoodie.deltastreamer.csv.header",
Boolean.toString(hasHeader));
+ csvProps.setProperty("hoodie.streamer.csv.header",
Boolean.toString(hasHeader));
}
UtilitiesTestBase.Helpers.savePropsToDFS(csvProps, fs, basePath + "/" +
PROPS_FILENAME_TEST_CSV);
@@ -2391,7 +2391,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
sqlSourceProps.setProperty("hoodie.embed.timeline.server", "false");
sqlSourceProps.setProperty("hoodie.datasource.write.recordkey.field",
"_row_key");
sqlSourceProps.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
- sqlSourceProps.setProperty("hoodie.deltastreamer.source.sql.sql.query",
"select * from test_sql_table");
+ sqlSourceProps.setProperty("hoodie.streamer.source.sql.sql.query", "select
* from test_sql_table");
UtilitiesTestBase.Helpers.savePropsToDFS(sqlSourceProps, fs, basePath +
"/" + PROPS_FILENAME_TEST_SQL_SOURCE);
@@ -2465,7 +2465,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
HoodieDeltaStreamer.Config downstreamCfg =
TestHelpers.makeConfigForHudiIncrSrc(tableBasePath,
downstreamTableBasePath,
WriteOperationType.BULK_INSERT, true, null);
-
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=1");
+
downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=1");
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
insertInTable(tableBasePath, 9, WriteOperationType.UPSERT);
@@ -2481,7 +2481,7 @@ public class TestHoodieDeltaStreamer extends
HoodieDeltaStreamerTestBase {
downstreamCfg.configs.remove(downstreamCfg.configs.size() - 1);
downstreamCfg.configs.add(DataSourceReadOptions.INCREMENTAL_FALLBACK_TO_FULL_TABLE_SCAN().key()
+ "=true");
//Adding this conf to make testing easier :)
-
downstreamCfg.configs.add("hoodie.deltastreamer.source.hoodieincr.num_instants=10");
+
downstreamCfg.configs.add("hoodie.streamer.source.hoodieincr.num_instants=10");
downstreamCfg.operation = WriteOperationType.UPSERT;
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
new HoodieDeltaStreamer(downstreamCfg, jsc).sync();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
index 2745edef584..635b57c9fa6 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieDeltaStreamerWithMultiWriter.java
@@ -320,8 +320,8 @@ public class TestHoodieDeltaStreamerWithMultiWriter extends
HoodieDeltaStreamerT
props.setProperty("hoodie.datasource.write.keygenerator.class",
TestHoodieDeltaStreamer.TestGenerator.class.getName());
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source.avsc");
-
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/target.avsc");
+ props.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
basePath + "/source.avsc");
+ props.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
basePath + "/target.avsc");
props.setProperty("include", "base.properties");
props.setProperty("hoodie.write.concurrency.mode",
"optimistic_concurrency_control");
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
index 26ea61e31fe..0c5de863436 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/deltastreamer/TestHoodieMultiTableDeltaStreamer.java
@@ -178,16 +178,16 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
List<TableExecutionContext> executionContexts =
streamer.getTableExecutionContexts();
TypedProperties properties = executionContexts.get(1).getProperties();
-
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source_uber.avsc");
-
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/target_uber.avsc");
+
properties.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
basePath + "/source_uber.avsc");
+
properties.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
basePath + "/target_uber.avsc");
properties.setProperty("hoodie.datasource.write.partitionpath.field",
"timestamp");
- properties.setProperty("hoodie.deltastreamer.source.kafka.topic",
topicName2);
+ properties.setProperty("hoodie.streamer.source.kafka.topic", topicName2);
executionContexts.get(1).setProperties(properties);
TypedProperties properties1 = executionContexts.get(0).getProperties();
-
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/source_short_trip_uber.avsc");
-
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
basePath + "/target_short_trip_uber.avsc");
+
properties1.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
basePath + "/source_short_trip_uber.avsc");
+
properties1.setProperty("hoodie.streamer.schemaprovider.target.schema.file",
basePath + "/target_short_trip_uber.avsc");
properties1.setProperty("hoodie.datasource.write.partitionpath.field",
"timestamp");
- properties1.setProperty("hoodie.deltastreamer.source.kafka.topic",
topicName1);
+ properties1.setProperty("hoodie.streamer.source.kafka.topic", topicName1);
executionContexts.get(0).setProperties(properties1);
String targetBasePath1 =
executionContexts.get(0).getConfig().targetBasePath;
String targetBasePath2 =
executionContexts.get(1).getConfig().targetBasePath;
@@ -288,7 +288,7 @@ public class TestHoodieMultiTableDeltaStreamer extends
HoodieDeltaStreamerTestBa
props.setProperty("include", "base.properties");
props.setProperty("hoodie.datasource.write.recordkey.field", "_row_key");
props.setProperty("hoodie.datasource.write.partitionpath.field",
"partition_path");
- props.setProperty("hoodie.deltastreamer.source.dfs.root",
parquetSourceRoot);
+ props.setProperty("hoodie.streamer.source.dfs.root", parquetSourceRoot);
return props;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
index e2ae67aae23..75e812acf37 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHiveSchemaProvider.java
@@ -55,8 +55,8 @@ public class TestHiveSchemaProvider extends
SparkClientFunctionalTestHarnessWith
@BeforeAll
public static void init() {
Pair<String, String> dbAndTableName =
paresDBAndTableName(SOURCE_SCHEMA_TABLE_NAME);
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.database",
dbAndTableName.getLeft());
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table",
dbAndTableName.getRight());
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.hive.database",
dbAndTableName.getLeft());
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.hive.table",
dbAndTableName.getRight());
}
@Disabled
@@ -84,8 +84,8 @@ public class TestHiveSchemaProvider extends
SparkClientFunctionalTestHarnessWith
public void testTargetSchema() throws Exception {
try {
Pair<String, String> dbAndTableName =
paresDBAndTableName(TARGET_SCHEMA_TABLE_NAME);
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.database",
dbAndTableName.getLeft());
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.hive.table",
dbAndTableName.getRight());
+
PROPS.setProperty("hoodie.streamer.schemaprovider.target.schema.hive.database",
dbAndTableName.getLeft());
+
PROPS.setProperty("hoodie.streamer.schemaprovider.target.schema.hive.table",
dbAndTableName.getRight());
createSchemaTable(SOURCE_SCHEMA_TABLE_NAME);
createSchemaTable(TARGET_SCHEMA_TABLE_NAME);
Schema targetSchema =
UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(), PROPS,
jsc()).getTargetSchema();
@@ -105,7 +105,7 @@ public class TestHiveSchemaProvider extends
SparkClientFunctionalTestHarnessWith
@Test
public void testNotExistTable() {
String wrongName = "wrong_schema_tab";
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.hive.table",
wrongName);
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.hive.table",
wrongName);
Assertions.assertThrows(NoSuchTableException.class, () -> {
try {
UtilHelpers.createSchemaProvider(HiveSchemaProvider.class.getName(),
PROPS, jsc()).getSourceSchema();
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
index 05a623f0e09..82588429db5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestJdbcbasedSchemaProvider.java
@@ -51,13 +51,13 @@ public class TestJdbcbasedSchemaProvider extends
SparkClientFunctionalTestHarnes
@BeforeAll
public static void init() {
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.connection.url",
JDBC_URL);
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.driver.type",
JDBC_DRIVER);
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.username",
JDBC_USER);
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.password",
JDBC_PASS);
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.dbtable",
"triprec");
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.timeout",
"0");
-
PROPS.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.jdbc.nullable",
"false");
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.connection.url",
JDBC_URL);
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.driver.type",
JDBC_DRIVER);
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.username",
JDBC_USER);
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.password",
JDBC_PASS);
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.dbtable",
"triprec");
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.timeout",
"0");
+
PROPS.setProperty("hoodie.streamer.schemaprovider.source.schema.jdbc.nullable",
"false");
}
@Test
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
index 397e72a0ec4..88f67723c85 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/schema/TestSchemaRegistryProvider.java
@@ -64,10 +64,10 @@ class TestSchemaRegistryProvider {
private static TypedProperties getProps() {
return new TypedProperties() {
{
- put("hoodie.deltastreamer.schemaprovider.registry.baseUrl", "http://"
+ BASIC_AUTH + "@localhost");
- put("hoodie.deltastreamer.schemaprovider.registry.urlSuffix",
"-value");
- put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://foo:bar@localhost");
- put("hoodie.deltastreamer.source.kafka.topic", "foo");
+ put("hoodie.streamer.schemaprovider.registry.baseUrl", "http://" +
BASIC_AUTH + "@localhost");
+ put("hoodie.streamer.schemaprovider.registry.urlSuffix", "-value");
+ put("hoodie.streamer.schemaprovider.registry.url",
"http://foo:bar@localhost");
+ put("hoodie.streamer.source.kafka.topic", "foo");
}
};
}
@@ -102,8 +102,8 @@ class TestSchemaRegistryProvider {
@Test
public void testGetSourceSchemaShouldRequestSchemaWithoutCreds() throws
IOException {
TypedProperties props = getProps();
- props.put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://localhost");
- props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter",
DummySchemaConverter.class.getName());
+ props.put("hoodie.streamer.schemaprovider.registry.url",
"http://localhost");
+ props.put("hoodie.streamer.schemaprovider.registry.schemaconverter",
DummySchemaConverter.class.getName());
SchemaRegistryProvider spyUnderTest = getUnderTest(props);
Schema actual = spyUnderTest.getSourceSchema();
assertNotNull(actual);
@@ -114,8 +114,8 @@ class TestSchemaRegistryProvider {
@Test
public void testGetTargetSchemaShouldRequestSchemaWithoutCreds() throws
IOException {
TypedProperties props = getProps();
- props.put("hoodie.deltastreamer.schemaprovider.registry.url",
"http://localhost");
- props.put("hoodie.deltastreamer.schemaprovider.registry.schemaconverter",
DummySchemaConverter.class.getName());
+ props.put("hoodie.streamer.schemaprovider.registry.url",
"http://localhost");
+ props.put("hoodie.streamer.schemaprovider.registry.schemaconverter",
DummySchemaConverter.class.getName());
SchemaRegistryProvider spyUnderTest = getUnderTest(props);
Schema actual = spyUnderTest.getTargetSchema();
assertNotNull(actual);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
index 011a1f626b2..c5fc7bfaafa 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/BaseTestKafkaSource.java
@@ -169,7 +169,7 @@ abstract class BaseTestKafkaSource extends
SparkClientFunctionalTestHarness {
testUtils.createTopic(topic, 2);
TypedProperties props = createPropsForKafkaSource(topic, Long.MAX_VALUE,
"earliest");
SourceFormatAdapter kafkaSource = createSource(props);
- props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents", "500");
+ props.setProperty("hoodie.streamer.kafka.source.maxEvents", "500");
/*
1. maxEventsFromKafkaSourceProp set to more than generated insert records
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
index 1cda910b707..5ccf9ad2b29 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroDFSSource.java
@@ -41,7 +41,7 @@ public class TestAvroDFSSource extends
AbstractDFSSourceTestBase {
@Override
protected Source prepareDFSSource() {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+ props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
try {
return new AvroDFSSource(props, jsc, sparkSession, schemaProvider);
} catch (IOException e) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
index 558181f4258..497757ab378 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestAvroKafkaSource.java
@@ -97,11 +97,11 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
protected TypedProperties createPropsForKafkaSource(String topic, Long
maxEventsToReadFromKafkaSource, String resetStrategy) {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
+ props.setProperty("hoodie.streamer.source.kafka.topic", topic);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", resetStrategy);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+ props.setProperty("hoodie.streamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ?
String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
@@ -160,8 +160,8 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
"test", dataGen.generateGenericRecord());
JavaRDD<ConsumerRecord<Object, Object>> rdd =
jsc().parallelize(Arrays.asList(recordConsumerRecord));
TypedProperties props = new TypedProperties();
- props.put("hoodie.deltastreamer.source.kafka.topic", "test");
- props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
SCHEMA_PATH);
+ props.put("hoodie.streamer.source.kafka.topic", "test");
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
SCHEMA_PATH);
SchemaProvider schemaProvider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(),
props, jsc()), props, jsc(), new ArrayList<>());
@@ -191,11 +191,11 @@ public class TestAvroKafkaSource extends
SparkClientFunctionalTestHarness {
final String topic = TEST_TOPIC_PREFIX + "testKafkaOffsetAppend";
TypedProperties props = createPropsForKafkaSource(topic, null, "earliest");
- props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
SCHEMA_PATH);
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
SCHEMA_PATH);
SchemaProvider schemaProvider =
UtilHelpers.wrapSchemaProviderWithPostProcessor(
UtilHelpers.createSchemaProvider(FilebasedSchemaProvider.class.getName(),
props, jsc()), props, jsc(), new ArrayList<>());
- props.put("hoodie.deltastreamer.source.kafka.value.deserializer.class",
ByteArrayDeserializer.class.getName());
+ props.put("hoodie.streamer.source.kafka.value.deserializer.class",
ByteArrayDeserializer.class.getName());
int numPartitions = 2;
int numMessages = 30;
testUtils.createTopic(topic,numPartitions);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
index 8eaa1d95b23..6a2bbcd0136 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestCsvDFSSource.java
@@ -48,9 +48,9 @@ public class TestCsvDFSSource extends
AbstractDFSSourceTestBase {
@Override
public Source prepareDFSSource() {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
- props.setProperty("hoodie.deltastreamer.csv.header",
Boolean.toString(true));
- props.setProperty("hoodie.deltastreamer.csv.sep", "\t");
+ props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
+ props.setProperty("hoodie.streamer.csv.header", Boolean.toString(true));
+ props.setProperty("hoodie.streamer.csv.sep", "\t");
return new CsvDFSSource(props, jsc, sparkSession, schemaProvider);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
index c1844c7a2a1..3b018473dc4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestGcsEventsHoodieIncrSource.java
@@ -114,8 +114,8 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
jsc = JavaSparkContext.fromSparkContext(spark().sparkContext());
String schemaFilePath =
TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
TypedProperties props = new TypedProperties();
- props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
- props.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.streamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc));
MockitoAnnotations.initMocks(this);
}
@@ -263,14 +263,14 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
setMockQueryRunner(inputDs, Option.of(snapshotCheckPoint));
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix",
"path/to/skip");
+
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
"path/to/skip");
//1. snapshot query, read all records
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1,
typedProperties);
//2. incremental query, as commit is present in timeline
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L,
exptected2, typedProperties);
//3. snapshot query with source limit less than first commit size
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3,
typedProperties);
-
typedProperties.setProperty("hoodie.deltastreamer.source.cloud.data.ignore.relpath.prefix",
"path/to");
+
typedProperties.setProperty("hoodie.streamer.source.cloud.data.ignore.relpath.prefix",
"path/to");
//4. As snapshotQuery will return 1 -> same would be return as
nextCheckpoint (dataset is empty due to ignore prefix).
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
}
@@ -316,7 +316,7 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
private void readAndAssert(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy,
Option<String> checkpointToPull, long
sourceLimit, String expectedCheckpoint) {
TypedProperties typedProperties = setProps(missingCheckpointStrategy);
- typedProperties.put("hoodie.deltastreamer.source.hoodieincr.file.format",
"json");
+ typedProperties.put("hoodie.streamer.source.hoodieincr.file.format",
"json");
readAndAssert(missingCheckpointStrategy, checkpointToPull, sourceLimit,
expectedCheckpoint, typedProperties);
}
@@ -388,10 +388,10 @@ public class TestGcsEventsHoodieIncrSource extends
SparkClientFunctionalTestHarn
private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy) {
Properties properties = new Properties();
//String schemaFilePath =
TestGcsEventsHoodieIncrSource.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
- //properties.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
- properties.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
- properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path",
basePath());
-
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
+ //properties.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
+ properties.put("hoodie.streamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+ properties.setProperty("hoodie.streamer.source.hoodieincr.path",
basePath());
+
properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy",
missingCheckpointStrategy.name());
properties.setProperty(CloudSourceConfig.DATAFILE_FORMAT.key(), "json");
return new TypedProperties(properties);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
index b9e20fb3a19..3d9f3362a15 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestHoodieIncrSource.java
@@ -337,8 +337,8 @@ public class TestHoodieIncrSource extends
SparkClientFunctionalTestHarness {
String expectedCheckpoint, Option<String>
snapshotCheckPointImplClassOpt) {
Properties properties = new Properties();
- properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path",
basePath());
-
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
missingCheckpointStrategy.name());
+ properties.setProperty("hoodie.streamer.source.hoodieincr.path",
basePath());
+
properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy",
missingCheckpointStrategy.name());
// TODO: [HUDI-7081] get rid of this
properties.setProperty(HoodieReaderConfig.FILE_GROUP_READER_ENABLED.key(),
"false");
snapshotCheckPointImplClassOpt.map(className ->
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
index dcd12ac7c8e..ade781e6c8b 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJdbcSource.java
@@ -77,11 +77,11 @@ public class TestJdbcSource extends UtilitiesTestBase {
@BeforeEach
public void setup() throws Exception {
super.setup();
- PROPS.setProperty("hoodie.deltastreamer.jdbc.url", JDBC_URL);
- PROPS.setProperty("hoodie.deltastreamer.jdbc.driver.class", JDBC_DRIVER);
- PROPS.setProperty("hoodie.deltastreamer.jdbc.user", JDBC_USER);
- PROPS.setProperty("hoodie.deltastreamer.jdbc.password", JDBC_PASS);
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.name", "triprec");
+ PROPS.setProperty("hoodie.streamer.jdbc.url", JDBC_URL);
+ PROPS.setProperty("hoodie.streamer.jdbc.driver.class", JDBC_DRIVER);
+ PROPS.setProperty("hoodie.streamer.jdbc.user", JDBC_USER);
+ PROPS.setProperty("hoodie.streamer.jdbc.password", JDBC_PASS);
+ PROPS.setProperty("hoodie.streamer.jdbc.table.name", "triprec");
connection = DriverManager.getConnection(JDBC_URL, JDBC_USER, JDBC_PASS);
}
@@ -93,8 +93,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testSingleCommit() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"last_insert");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name",
"last_insert");
try {
int numRecords = 100;
@@ -116,8 +116,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testInsertAndUpdate() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"last_insert");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name",
"last_insert");
try {
final String commitTime = "000";
@@ -150,8 +150,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testTwoCommits() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"last_insert");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name",
"last_insert");
try {
// Add 10 records with commit time "000"
@@ -178,8 +178,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testIncrementalFetchWithCommitTime() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"last_insert");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name",
"last_insert");
try {
// Add 10 records with commit time "000"
@@ -204,8 +204,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testIncrementalFetchWithNoMatchingRows() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"last_insert");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name",
"last_insert");
try {
// Add 10 records with commit time "000"
@@ -226,8 +226,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testIncrementalFetchWhenTableRecordsMoreThanSourceLimit() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"id");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "id");
try {
// Add 100 records with commit time "000"
@@ -257,8 +257,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testIncrementalFetchWhenLastCheckpointMoreThanTableRecords() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"id");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name", "id");
try {
// Add 100 records with commit time "000"
@@ -284,8 +284,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testIncrementalFetchFallbackToFullFetchWhenError() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "true");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"last_insert");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "true");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name",
"last_insert");
try {
// Add 10 records with commit time "000"
@@ -299,14 +299,14 @@ public class TestJdbcSource extends UtilitiesTestBase {
// Add 10 records with commit time "001"
insert("001", 10, connection, DATA_GENERATOR, PROPS);
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"dummy_col");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name",
"dummy_col");
assertThrows(HoodieException.class, () -> {
// Start incremental scan with a dummy column that does not exist.
// This will throw an exception as the default behavior is to not
fallback to full fetch.
runSource(Option.of(batch.getCheckpointForNextBatch()), -1);
});
-
PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.fallback.to.full.fetch",
"true");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.fallback.to.full.fetch",
"true");
// Start incremental scan with a dummy column that does not exist.
// This will fallback to full fetch mode but still throw an exception
checkpointing will fail.
@@ -321,7 +321,7 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testFullFetchWithCommitTime() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
try {
// Add 10 records with commit time "000"
@@ -345,8 +345,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testFullFetchWithCheckpoint() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.table.incr.column.name",
"last_insert");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
+ PROPS.setProperty("hoodie.streamer.jdbc.table.incr.column.name",
"last_insert");
try {
// Add 10 records with commit time "000"
@@ -360,7 +360,7 @@ public class TestJdbcSource extends UtilitiesTestBase {
// Get max of incremental column
Column incrementalColumn = rowDataset
-
.col(PROPS.getString("hoodie.deltastreamer.jdbc.table.incr.column.name"));
+ .col(PROPS.getString("hoodie.streamer.jdbc.table.incr.column.name"));
final String max =
rowDataset.agg(functions.max(incrementalColumn).cast(DataTypes.StringType)).first()
.getString(0);
@@ -382,10 +382,10 @@ public class TestJdbcSource extends UtilitiesTestBase {
// Write secret string to fs in a file
writeSecretToFs();
// Remove secret string from props
- PROPS.remove("hoodie.deltastreamer.jdbc.password");
+ PROPS.remove("hoodie.streamer.jdbc.password");
// Set property to read secret from fs file
- PROPS.setProperty("hoodie.deltastreamer.jdbc.password.file",
"file:///tmp/hudi/config/secret");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+ PROPS.setProperty("hoodie.streamer.jdbc.password.file",
"file:///tmp/hudi/config/secret");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
// Add 10 records with commit time 000
clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
Dataset<Row> rowDataset = runSource(Option.empty(), 10).getBatch().get();
@@ -401,8 +401,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
// Write secret string to fs in a file
writeSecretToFs();
// Remove secret string from props
- PROPS.remove("hoodie.deltastreamer.jdbc.password");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+ PROPS.remove("hoodie.streamer.jdbc.password");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
// Add 10 records with commit time 000
clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
runSource(Option.empty(), 10);
@@ -411,9 +411,9 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testSourceWithExtraOptions() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.extra.options.fetchsize",
"10");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
- PROPS.remove("hoodie.deltastreamer.jdbc.table.incr.column.name");
+ PROPS.setProperty("hoodie.streamer.jdbc.extra.options.fetchsize", "10");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
+ PROPS.remove("hoodie.streamer.jdbc.table.incr.column.name");
try {
// Add 20 records with commit time 000
clearAndInsert("000", 20, connection, DATA_GENERATOR, PROPS);
@@ -426,8 +426,8 @@ public class TestJdbcSource extends UtilitiesTestBase {
@Test
public void testSourceWithStorageLevel() {
- PROPS.setProperty("hoodie.deltastreamer.jdbc.storage.level", "NONE");
- PROPS.setProperty("hoodie.deltastreamer.jdbc.incr.pull", "false");
+ PROPS.setProperty("hoodie.streamer.jdbc.storage.level", "NONE");
+ PROPS.setProperty("hoodie.streamer.jdbc.incr.pull", "false");
try {
// Add 10 records with commit time 000
clearAndInsert("000", 10, connection, DATA_GENERATOR, PROPS);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
index fde10b2d9a5..24a341fe9c3 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java
@@ -44,7 +44,7 @@ public class TestJsonDFSSource extends
AbstractDFSSourceTestBase {
@Override
public Source prepareDFSSource() {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+ props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
return new JsonDFSSource(props, jsc, sparkSession, schemaProvider);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
index 6b24f57a50d..4b615c50ee1 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSource.java
@@ -82,7 +82,7 @@ public class TestJsonKafkaSource extends BaseTestKafkaSource {
public void init() throws Exception {
String schemaFilePath =
Objects.requireNonNull(SCHEMA_FILE_URL).toURI().getPath();
TypedProperties props = new TypedProperties();
- props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
schemaProvider = new FilebasedSchemaProvider(props, jsc());
}
@@ -93,11 +93,11 @@ public class TestJsonKafkaSource extends
BaseTestKafkaSource {
static TypedProperties createPropsForJsonKafkaSource(String brokerAddress,
String topic, Long maxEventsToReadFromKafkaSource, String resetStrategy) {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
+ props.setProperty("hoodie.streamer.source.kafka.topic", topic);
props.setProperty("bootstrap.servers", brokerAddress);
props.setProperty("auto.offset.reset", resetStrategy);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+ props.setProperty("hoodie.streamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ?
String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
index b6bc3480e3d..1f1a4e2b5c1 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonKafkaSourcePostProcessor.java
@@ -80,7 +80,7 @@ public class TestJsonKafkaSourcePostProcessor extends
SparkClientFunctionalTestH
public void init() throws Exception {
String schemaFilePath =
Objects.requireNonNull(TestJsonKafkaSource.SCHEMA_FILE_URL).toURI().getPath();
TypedProperties props = new TypedProperties();
- props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
schemaProvider = new FilebasedSchemaProvider(props, jsc());
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
index 44489037e82..159ababcf47 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java
@@ -43,7 +43,7 @@ public class TestParquetDFSSource extends
AbstractDFSSourceTestBase {
@Override
public Source prepareDFSSource() {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot);
+ props.setProperty("hoodie.streamer.source.dfs.root", dfsRoot);
return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
index b56d87c9263..f9679211144 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestProtoKafkaSource.java
@@ -75,11 +75,11 @@ public class TestProtoKafkaSource extends
BaseTestKafkaSource {
protected TypedProperties createPropsForKafkaSource(String topic, Long
maxEventsToReadFromKafkaSource, String resetStrategy) {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.deltastreamer.source.kafka.topic", topic);
+ props.setProperty("hoodie.streamer.source.kafka.topic", topic);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", resetStrategy);
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.setProperty("hoodie.deltastreamer.kafka.source.maxEvents",
+ props.setProperty("hoodie.streamer.kafka.source.maxEvents",
maxEventsToReadFromKafkaSource != null ?
String.valueOf(maxEventsToReadFromKafkaSource) :
String.valueOf(KafkaSourceConfig.MAX_EVENTS_FROM_KAFKA_SOURCE.defaultValue()));
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
index 90fbeb3bb35..a9dd11c5544 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestS3EventsHoodieIncrSource.java
@@ -105,8 +105,8 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
metaClient = getHoodieMetaClient(hadoopConf(), basePath());
String schemaFilePath =
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_gcs_data.avsc").getPath();
TypedProperties props = new TypedProperties();
- props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
- props.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.streamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
this.schemaProvider = Option.of(new FilebasedSchemaProvider(props, jsc));
}
@@ -186,10 +186,10 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
private TypedProperties setProps(IncrSourceHelper.MissingCheckpointStrategy
missingCheckpointStrategy) {
Properties properties = new Properties();
- properties.setProperty("hoodie.deltastreamer.source.hoodieincr.path",
basePath());
-
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.missing.checkpoint.strategy",
+ properties.setProperty("hoodie.streamer.source.hoodieincr.path",
basePath());
+
properties.setProperty("hoodie.streamer.source.hoodieincr.missing.checkpoint.strategy",
missingCheckpointStrategy.name());
-
properties.setProperty("hoodie.deltastreamer.source.hoodieincr.file.format",
"json");
+ properties.setProperty("hoodie.streamer.source.hoodieincr.file.format",
"json");
return new TypedProperties(properties);
}
@@ -354,7 +354,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
setMockQueryRunner(inputDs);
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
+
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 1000L, "2",
typedProperties);
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"),
1000L, "2", typedProperties);
@@ -388,7 +388,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
.thenReturn(Option.empty());
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
+
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1"), 50L,
"2#path/to/file4.json", typedProperties);
}
@@ -420,7 +420,7 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
.thenReturn(Option.empty());
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
+
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of("1#path/to/file3.json"),
50L, "3#path/to/file4.json", typedProperties);
@@ -457,14 +457,14 @@ public class TestS3EventsHoodieIncrSource extends
SparkClientFunctionalTestHarne
when(mockCloudDataFetcher.getCloudObjectDataDF(Mockito.any(),
Mockito.any(), Mockito.any(), eq(schemaProvider)))
.thenReturn(Option.empty());
TypedProperties typedProperties = setProps(READ_UPTO_LATEST_COMMIT);
-
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
+
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to/skip");
//1. snapshot query, read all records
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50000L, exptected1,
typedProperties);
//2. incremental query, as commit is present in timeline
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.of(exptected1), 10L,
exptected2, typedProperties);
//3. snapshot query with source limit less than first commit size
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected3,
typedProperties);
-
typedProperties.setProperty("hoodie.deltastreamer.source.s3incr.ignore.key.prefix",
"path/to");
+
typedProperties.setProperty("hoodie.streamer.source.s3incr.ignore.key.prefix",
"path/to");
//4. As snapshotQuery will return 1 -> same would be return as
nextCheckpoint (dataset is empty due to ignore prefix).
readAndAssert(READ_UPTO_LATEST_COMMIT, Option.empty(), 50L, exptected4,
typedProperties);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
index 89769954d38..ee488e38c6a 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlFileBasedSource.java
@@ -51,8 +51,8 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
public class TestSqlFileBasedSource extends UtilitiesTestBase {
private final boolean useFlattenedSchema = false;
- private final String sqlFileSourceConfig =
"hoodie.deltastreamer.source.sql.file";
- private final String sqlFileSourceConfigEmitChkPointConf =
"hoodie.deltastreamer.source.sql.checkpoint.emit";
+ private final String sqlFileSourceConfig = "hoodie.streamer.source.sql.file";
+ private final String sqlFileSourceConfigEmitChkPointConf =
"hoodie.streamer.source.sql.checkpoint.emit";
protected FilebasedSchemaProvider schemaProvider;
protected HoodieTestDataGenerator dataGenerator = new
HoodieTestDataGenerator();
private String dfsRoot;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
index 64578f3bae3..a738003a3fc 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestSqlSource.java
@@ -50,7 +50,7 @@ import static org.junit.jupiter.api.Assertions.assertThrows;
public class TestSqlSource extends UtilitiesTestBase {
private final boolean useFlattenedSchema = false;
- private final String sqlSourceConfig =
"hoodie.deltastreamer.source.sql.sql.query";
+ private final String sqlSourceConfig =
"hoodie.streamer.source.sql.sql.query";
protected FilebasedSchemaProvider schemaProvider;
protected HoodieTestDataGenerator dataGenerator = new
HoodieTestDataGenerator();
private String dfsRoot;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
index c9f46144e96..a57383c43b2 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/debezium/TestAbstractDebeziumSource.java
@@ -86,12 +86,12 @@ public abstract class TestAbstractDebeziumSource extends
UtilitiesTestBase {
private TypedProperties createPropsForJsonSource() {
TypedProperties props = new TypedProperties();
- props.setProperty("hoodie.deltastreamer.source.kafka.topic",
testTopicName);
+ props.setProperty("hoodie.streamer.source.kafka.topic", testTopicName);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("auto.offset.reset", "earliest");
props.setProperty(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
- props.setProperty("hoodie.deltastreamer.schemaprovider.registry.url",
"localhost");
-
props.setProperty("hoodie.deltastreamer.source.kafka.value.deserializer.class",
StringDeserializer.class.getName());
+ props.setProperty("hoodie.streamer.schemaprovider.registry.url",
"localhost");
+ props.setProperty("hoodie.streamer.source.kafka.value.deserializer.class",
StringDeserializer.class.getName());
props.setProperty(ConsumerConfig.GROUP_ID_CONFIG,
UUID.randomUUID().toString());
return props;
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
index b97e2fa80a0..79f15975cb5 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestCloudObjectsSelectorCommon.java
@@ -69,7 +69,7 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1));
TypedProperties properties = new TypedProperties();
-
properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
"country,state");
+
properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path",
"country,state");
Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties,
"json");
Assertions.assertTrue(result.isPresent());
Assertions.assertEquals(1, result.get().count());
@@ -82,9 +82,9 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
TypedProperties props = new TypedProperties();
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
String schemaFilePath =
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
- props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
- props.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
-
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
"country,state");
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.streamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+ props.put("hoodie.streamer.source.cloud.data.partition.fields.from.path",
"country,state");
List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1));
Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, props, "json",
Option.of(new FilebasedSchemaProvider(props, jsc)));
Assertions.assertTrue(result.isPresent());
@@ -97,8 +97,8 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
public void partitionKeyNotPresentInPath() {
List<CloudObjectMetadata> input = Collections.singletonList(new
CloudObjectMetadata("src/test/resources/data/partitioned/country=US/state=CA/data.json",
1));
TypedProperties properties = new TypedProperties();
-
properties.put("hoodie.deltastreamer.source.cloud.data.reader.comma.separated.path.format",
"false");
-
properties.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
"unknown");
+
properties.put("hoodie.streamer.source.cloud.data.reader.comma.separated.path.format",
"false");
+
properties.put("hoodie.streamer.source.cloud.data.partition.fields.from.path",
"unknown");
Option<Dataset<Row>> result =
CloudObjectsSelectorCommon.loadAsDataset(sparkSession, input, properties,
"json");
Assertions.assertTrue(result.isPresent());
Assertions.assertEquals(1, result.get().count());
@@ -111,9 +111,9 @@ public class TestCloudObjectsSelectorCommon extends
HoodieSparkClientTestHarness
TypedProperties props = new TypedProperties();
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc");
String schemaFilePath =
TestCloudObjectsSelectorCommon.class.getClassLoader().getResource("schema/sample_data_schema.avsc").getPath();
- props.put("hoodie.deltastreamer.schemaprovider.source.schema.file",
schemaFilePath);
- props.put("hoodie.deltastreamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
-
props.put("hoodie.deltastreamer.source.cloud.data.partition.fields.from.path",
"country,state");
+ props.put("hoodie.streamer.schemaprovider.source.schema.file",
schemaFilePath);
+ props.put("hoodie.streamer.schema.provider.class.name",
FilebasedSchemaProvider.class.getName());
+ props.put("hoodie.streamer.source.cloud.data.partition.fields.from.path",
"country,state");
// Setting this config so that dataset repartition happens inside
`loadAsDataset`
props.put("hoodie.streamer.source.cloud.data.partition.max.size", "1");
List<CloudObjectMetadata> input = Arrays.asList(
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
index d3031729e6e..fc3ab90a036 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/helpers/TestKafkaOffsetGen.java
@@ -65,9 +65,9 @@ public class TestKafkaOffsetGen {
private TypedProperties getConsumerConfigs(String autoOffsetReset, String
kafkaCheckpointType) {
TypedProperties props = new TypedProperties();
- props.put("hoodie.deltastreamer.source.kafka.checkpoint.type",
kafkaCheckpointType);
+ props.put("hoodie.streamer.source.kafka.checkpoint.type",
kafkaCheckpointType);
props.put("auto.offset.reset", autoOffsetReset);
- props.put("hoodie.deltastreamer.source.kafka.topic", testTopicName);
+ props.put("hoodie.streamer.source.kafka.topic", testTopicName);
props.setProperty("bootstrap.servers", testUtils.brokerAddress());
props.setProperty("key.deserializer", StringDeserializer.class.getName());
props.setProperty("value.deserializer",
StringDeserializer.class.getName());
@@ -250,7 +250,7 @@ public class TestKafkaOffsetGen {
testUtils.createTopic(testTopicName, 1);
boolean topicExists = kafkaOffsetGen.checkTopicExists(new
KafkaConsumer(props));
assertTrue(topicExists);
- props.put("hoodie.deltastreamer.source.kafka.topic", "random");
+ props.put("hoodie.streamer.source.kafka.topic", "random");
kafkaOffsetGen = new KafkaOffsetGen(props);
topicExists = kafkaOffsetGen.checkTopicExists(new KafkaConsumer(props));
assertFalse(topicExists);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
index a949335a21a..a200f3a5151 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/UtilitiesTestBase.java
@@ -450,14 +450,14 @@ public class UtilitiesTestBase {
public static TypedProperties setupSchemaOnDFS(String scope, String
filename) throws IOException {
UtilitiesTestBase.Helpers.copyToDFS(scope + "/" + filename, fs, basePath
+ "/" + filename);
TypedProperties props = new TypedProperties();
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/" + filename);
+ props.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
basePath + "/" + filename);
return props;
}
public static TypedProperties setupSchemaOnDFSWithAbsoluteScope(String
scope, String filename) throws IOException {
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(scope + "/" +
filename, fs, basePath + "/" + filename);
TypedProperties props = new TypedProperties();
-
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
basePath + "/" + filename);
+ props.setProperty("hoodie.streamer.schemaprovider.source.schema.file",
basePath + "/" + filename);
return props;
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
index 56d435ddf0f..08e73d36bc0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/AbstractBaseTestSource.java
@@ -23,6 +23,7 @@ import org.apache.hudi.common.model.HoodieKey;
import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.testutils.HoodieTestDataGenerator;
import org.apache.hudi.common.testutils.RawTripTestPayload;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.collection.RocksDBBasedMap;
import org.apache.hudi.exception.HoodieIOException;
import org.apache.hudi.utilities.config.SourceTestConfig;
@@ -63,11 +64,10 @@ public abstract class AbstractBaseTestSource extends
AvroSource {
public static void initDataGen(TypedProperties props, int partition) {
try {
- boolean useRocksForTestDataGenKeys =
props.getBoolean(SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.key(),
- SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS.defaultValue());
- String baseStoreDir =
props.getString(SourceTestConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS.key(),
+ boolean useRocksForTestDataGenKeys =
ConfigUtils.getBooleanWithAltKeys(props,
SourceTestConfig.USE_ROCKSDB_FOR_TEST_DATAGEN_KEYS);
+ String baseStoreDir = ConfigUtils.getStringWithAltKeys(props,
SourceTestConfig.ROCKSDB_BASE_DIR_FOR_TEST_DATAGEN_KEYS,
File.createTempFile("test_data_gen", ".keys").getParent()) + "/" +
partition;
- LOG.info("useRocksForTestDataGenKeys=" + useRocksForTestDataGenKeys + ",
BaseStoreDir=" + baseStoreDir);
+ LOG.info("useRocksForTestDataGenKeys={}, BaseStoreDir={}",
useRocksForTestDataGenKeys, baseStoreDir);
dataGeneratorMap.put(partition, new
HoodieTestDataGenerator(HoodieTestDataGenerator.DEFAULT_PARTITION_PATHS,
useRocksForTestDataGenKeys ? new RocksDBBasedMap<>(baseStoreDir) :
new HashMap<>()));
} catch (IOException e) {
@@ -106,18 +106,17 @@ public abstract class AbstractBaseTestSource extends
AvroSource {
protected static Stream<GenericRecord> fetchNextBatch(TypedProperties props,
int sourceLimit, String instantTime,
int partition) {
- int maxUniqueKeys =
- props.getInteger(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(),
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.defaultValue());
+ int maxUniqueKeys = ConfigUtils.getIntWithAltKeys(props,
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP);
HoodieTestDataGenerator dataGenerator = dataGeneratorMap.get(partition);
// generate `sourceLimit` number of upserts each time.
int numExistingKeys =
dataGenerator.getNumExistingKeys(HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA);
- LOG.info("NumExistingKeys=" + numExistingKeys);
+ LOG.info("NumExistingKeys={}", numExistingKeys);
int numUpdates = Math.min(numExistingKeys, sourceLimit / 2);
int numInserts = sourceLimit - numUpdates;
- LOG.info("Before adjustments => numInserts=" + numInserts + ",
numUpdates=" + numUpdates);
+ LOG.info("Before adjustments => numInserts={}, numUpdates={}", numInserts,
numUpdates);
boolean reachedMax = false;
if (numInserts + numExistingKeys > maxUniqueKeys) {
@@ -134,17 +133,16 @@ public abstract class AbstractBaseTestSource extends
AvroSource {
Stream<GenericRecord> deleteStream = Stream.empty();
Stream<GenericRecord> updateStream;
long memoryUsage1 = Runtime.getRuntime().totalMemory() -
Runtime.getRuntime().freeMemory();
- LOG.info("Before DataGen. Memory Usage=" + memoryUsage1 + ", Total
Memory=" + Runtime.getRuntime().totalMemory()
- + ", Free Memory=" + Runtime.getRuntime().freeMemory());
+ LOG.info("Before DataGen. Memory Usage={}, Total Memory={}, Free
Memory={}", memoryUsage1, Runtime.getRuntime().totalMemory(),
+ Runtime.getRuntime().freeMemory());
if (!reachedMax && numUpdates >= 50) {
- LOG.info("After adjustments => NumInserts=" + numInserts + ",
NumUpdates=" + (numUpdates - 50) + ", NumDeletes=50, maxUniqueRecords="
- + maxUniqueKeys);
+ LOG.info("After adjustments => NumInserts={}, NumUpdates={},
NumDeletes=50, maxUniqueRecords={}", numInserts, (numUpdates - 50),
maxUniqueKeys);
// if we generate update followed by deletes -> some keys in update
batch might be picked up for deletes. Hence generating delete batch followed by
updates
deleteStream =
dataGenerator.generateUniqueDeleteRecordStream(instantTime,
50).map(AbstractBaseTestSource::toGenericRecord);
updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime,
numUpdates - 50, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.map(AbstractBaseTestSource::toGenericRecord);
} else {
- LOG.info("After adjustments => NumInserts=" + numInserts + ",
NumUpdates=" + numUpdates + ", maxUniqueRecords=" + maxUniqueKeys);
+ LOG.info("After adjustments => NumInserts={}, NumUpdates={},
maxUniqueRecords={}", numInserts, numUpdates, maxUniqueKeys);
updateStream = dataGenerator.generateUniqueUpdatesStream(instantTime,
numUpdates, HoodieTestDataGenerator.TRIP_EXAMPLE_SCHEMA)
.map(AbstractBaseTestSource::toGenericRecord);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
index 4bcbdbbe874..808a8efb8a4 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/testutils/sources/DistributedTestDataSource.java
@@ -19,6 +19,7 @@
package org.apache.hudi.utilities.testutils.sources;
import org.apache.hudi.common.config.TypedProperties;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.utilities.config.SourceTestConfig;
import org.apache.hudi.utilities.schema.SchemaProvider;
@@ -46,15 +47,14 @@ public class DistributedTestDataSource extends
AbstractBaseTestSource {
public DistributedTestDataSource(TypedProperties props, JavaSparkContext
sparkContext, SparkSession sparkSession,
SchemaProvider schemaProvider) {
super(props, sparkContext, sparkSession, schemaProvider);
- this.numTestSourcePartitions =
- props.getInteger(SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP.key(),
SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP.defaultValue());
+ this.numTestSourcePartitions = ConfigUtils.getIntWithAltKeys(props,
SourceTestConfig.NUM_SOURCE_PARTITIONS_PROP);
}
@Override
protected InputBatch<JavaRDD<GenericRecord>> fetchNewData(Option<String>
lastCkptStr, long sourceLimit) {
int nextCommitNum = lastCkptStr.map(s -> Integer.parseInt(s) +
1).orElse(0);
String instantTime = String.format("%05d", nextCommitNum);
- LOG.info("Source Limit is set to " + sourceLimit);
+ LOG.info("Source Limit is set to {}", sourceLimit);
// No new data.
if (sourceLimit <= 0) {
@@ -65,15 +65,14 @@ public class DistributedTestDataSource extends
AbstractBaseTestSource {
newProps.putAll(props);
// Set the maxUniqueRecords per partition for TestDataSource
- int maxUniqueRecords =
- props.getInteger(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(),
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.defaultValue());
+ int maxUniqueRecords = ConfigUtils.getIntWithAltKeys(props,
SourceTestConfig.MAX_UNIQUE_RECORDS_PROP);
String maxUniqueRecordsPerPartition = String.valueOf(Math.max(1,
maxUniqueRecords / numTestSourcePartitions));
newProps.setProperty(SourceTestConfig.MAX_UNIQUE_RECORDS_PROP.key(),
maxUniqueRecordsPerPartition);
int perPartitionSourceLimit = Math.max(1, (int) (sourceLimit /
numTestSourcePartitions));
JavaRDD<GenericRecord> avroRDD =
sparkContext.parallelize(IntStream.range(0,
numTestSourcePartitions).boxed().collect(Collectors.toList()),
numTestSourcePartitions).mapPartitionsWithIndex((p, idx) -> {
- LOG.info("Initializing source with newProps=" + newProps);
+ LOG.info("Initializing source with newProps={}", newProps);
if (!dataGeneratorMap.containsKey(p)) {
initDataGen(newProps, p);
}
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
index 1b0cc7f52a6..ea2ce8ed86f 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlFileBasedTransformer.java
@@ -87,7 +87,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
public void testSqlFileBasedTransformerIncorrectConfig() {
// Test if the class throws hoodie IO exception correctly when given a
incorrect config.
props.setProperty(
- "hoodie.deltastreamer.transformer.sql.file",
+ "hoodie.streamer.transformer.sql.file",
UtilitiesTestBase.basePath + "/non-exist-sql-file.sql");
assertThrows(
HoodieTransformException.class,
@@ -103,7 +103,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
// Test if the SQL file based transformer works as expected for the
invalid SQL statements.
props.setProperty(
- "hoodie.deltastreamer.transformer.sql.file",
+ "hoodie.streamer.transformer.sql.file",
UtilitiesTestBase.basePath + "/sql-file-transformer-invalid.sql");
assertThrows(
ParseException.class,
@@ -119,7 +119,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
// Test if the SQL file based transformer works as expected for the empty
SQL statements.
props.setProperty(
- "hoodie.deltastreamer.transformer.sql.file",
+ "hoodie.streamer.transformer.sql.file",
UtilitiesTestBase.basePath + "/sql-file-transformer-empty.sql");
Dataset<Row> emptyRow = sqlFileTransformer.apply(jsc, sparkSession,
inputDatasetRows, props);
String[] actualRows =
emptyRow.as(Encoders.STRING()).collectAsList().toArray(new String[0]);
@@ -136,7 +136,7 @@ public class TestSqlFileBasedTransformer extends
UtilitiesTestBase {
// Test if the SQL file based transformer works as expected for the
correct input.
props.setProperty(
- "hoodie.deltastreamer.transformer.sql.file",
+ "hoodie.streamer.transformer.sql.file",
UtilitiesTestBase.basePath + "/sql-file-transformer.sql");
Dataset<Row> transformedRow =
sqlFileTransformer.apply(jsc, sparkSession, inputDatasetRows, props);
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java
index 6f05dc1b184..e9f6f9e4fd3 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/transform/TestSqlQueryBasedTransformer.java
@@ -78,7 +78,7 @@ public class TestSqlQueryBasedTransformer {
+ "from\n"
+ "\t<SRC>";
TypedProperties props = new TypedProperties();
- props.put("hoodie.deltastreamer.transformer.sql", transSql);
+ props.put("hoodie.streamer.transformer.sql", transSql);
// transform
SqlQueryBasedTransformer transformer = new SqlQueryBasedTransformer();
diff --git
a/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties
b/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties
index 3a5edb2b6f2..35beefab7b2 100644
--- a/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties
+++ b/hudi-utilities/src/test/resources/streamer-config/dfs-source.properties
@@ -20,8 +20,8 @@ include=base.properties
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=driver
# Schema provider props (change to absolute path based on your installation)
-hoodie.deltastreamer.filebased.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/source.avsc
-hoodie.deltastreamer.filebased.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/target.avsc
+hoodie.streamer.filebased.schemaprovider.source.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/source.avsc
+hoodie.streamer.filebased.schemaprovider.target.schema.file=file:///path/to/hoodie/hoodie-utilities/src/main/resources/streamer-props/target.avsc
# DFS Source
-hoodie.deltastreamer.source.dfs.root=file:///tmp/hoodie-dfs-input
+hoodie.streamer.source.dfs.root=file:///tmp/hoodie-dfs-input
diff --git
a/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties
b/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties
index 5c569c5d0a0..248de399272 100644
---
a/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties
+++
b/hudi-utilities/src/test/resources/streamer-config/invalid_hive_sync_uber_config.properties
@@ -18,6 +18,6 @@
include=base.properties
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=created_at
-hoodie.deltastreamer.source.kafka.topic=test_topic
-hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
-hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd
\ No newline at end of file
+hoodie.streamer.source.kafka.topic=test_topic
+hoodie.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd
\ No newline at end of file
diff --git
a/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties
b/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties
index e256b8c77fb..87edb1a1df7 100644
--- a/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties
+++ b/hudi-utilities/src/test/resources/streamer-config/kafka-source.properties
@@ -20,10 +20,10 @@ include=base.properties
hoodie.datasource.write.recordkey.field=impressionid
hoodie.datasource.write.partitionpath.field=userid
# schema provider configs
-hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
+hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/impressions-value/versions/latest
# Kafka Source
-#hoodie.deltastreamer.source.kafka.topic=uber_trips
-hoodie.deltastreamer.source.kafka.topic=impressions
+#hoodie.streamer.source.kafka.topic=uber_trips
+hoodie.streamer.source.kafka.topic=impressions
#Kafka props
bootstrap.servers=localhost:9092
auto.offset.reset=earliest
diff --git
a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
index d415e19eb20..b74f5a080f3 100644
---
a/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
+++
b/hudi-utilities/src/test/resources/streamer-config/short_trip_uber_config.properties
@@ -18,11 +18,11 @@
include=base.properties
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=created_at
-hoodie.deltastreamer.source.kafka.topic=topic2
-hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
-hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
+hoodie.streamer.source.kafka.topic=topic2
+hoodie.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestGenerator
-hoodie.deltastreamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/
-hoodie.deltastreamer.schemaprovider.registry.urlSuffix=-value/versions/latest
-hoodie.deltastreamer.transformer.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer
+hoodie.streamer.schemaprovider.registry.baseUrl=http://localhost:8081/subjects/
+hoodie.streamer.schemaprovider.registry.urlSuffix=-value/versions/latest
+hoodie.streamer.transformer.class=org.apache.hudi.utilities.deltastreamer.TestHoodieDeltaStreamer$TestIdentityTransformer
diff --git
a/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties
b/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties
index 9172337d038..9bfbd889de9 100644
---
a/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties
+++
b/hudi-utilities/src/test/resources/streamer-config/sql-transformer.properties
@@ -16,4 +16,4 @@
# limitations under the License.
###
include=base.properties
-hoodie.deltastreamer.transformer.sql=SELECT a.timestamp, a._row_key,
a.partition_path, a.trip_type, a.rider, a.driver, a.begin_lat, a.begin_lon,
a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight,
a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare,
a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS
haversine_distance FROM <SRC> a
+hoodie.streamer.transformer.sql=SELECT a.timestamp, a._row_key,
a.partition_path, a.trip_type, a.rider, a.driver, a.begin_lat, a.begin_lon,
a.end_lat, a.end_lon, a.distance_in_meters, a.seconds_since_epoch, a.weight,
a.nation, a.current_date, a.current_ts, a.height, a.city_to_state, a.fare,
a.tip_history, a.`_hoodie_is_deleted`, CAST(1.0 AS DOUBLE) AS
haversine_distance FROM <SRC> a
diff --git
a/hudi-utilities/src/test/resources/streamer-config/uber_config.properties
b/hudi-utilities/src/test/resources/streamer-config/uber_config.properties
index f5b079265d4..a8e278249e8 100644
--- a/hudi-utilities/src/test/resources/streamer-config/uber_config.properties
+++ b/hudi-utilities/src/test/resources/streamer-config/uber_config.properties
@@ -18,10 +18,10 @@
include=base.properties
hoodie.datasource.write.recordkey.field=_row_key
hoodie.datasource.write.partitionpath.field=created_at
-hoodie.deltastreamer.source.kafka.topic=topic1
-hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
-hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
+hoodie.streamer.source.kafka.topic=topic1
+hoodie.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
+hoodie.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
hoodie.datasource.hive_sync.database=uber_hive_db
hoodie.datasource.hive_sync.table=uber_hive_dummy_table
-hoodie.deltastreamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest
-hoodie.deltastreamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/random-value/versions/latest
\ No newline at end of file
+hoodie.streamer.schemaprovider.registry.url=http://localhost:8081/subjects/random-value/versions/latest
+hoodie.streamer.schemaprovider.registry.targetUrl=http://localhost:8081/subjects/random-value/versions/latest
\ No newline at end of file