This is an automated email from the ASF dual-hosted git repository.
leesf pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new f4e3b94 [HUDI-1742] Improve table level config priority for
HoodieMultiTableDeltaStreamer (#2744)
f4e3b94 is described below
commit f4e3b949714aceaf8823fd1659e44d3b7e98089a
Author: Nick Young <[email protected]>
AuthorDate: Mon Apr 26 22:05:06 2021 +0800
[HUDI-1742] Improve table level config priority for
HoodieMultiTableDeltaStreamer (#2744)
---
.../deltastreamer/HoodieMultiTableDeltaStreamer.java | 4 +++-
.../utilities/functional/TestHoodieDeltaStreamer.java | 7 +++++++
.../functional/TestHoodieMultiTableDeltaStreamer.java | 18 ++++++++++++++++++
.../short_trip_uber_config.properties | 3 ++-
4 files changed, 30 insertions(+), 2 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
index a39b973..8e557f1 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
@@ -118,7 +118,9 @@ public class HoodieMultiTableDeltaStreamer {
checkIfTableConfigFileExists(configFolder, fs, configFilePath);
TypedProperties tableProperties = UtilHelpers.readConfig(fs, new
Path(configFilePath), new ArrayList<>()).getConfig();
properties.forEach((k, v) -> {
- tableProperties.setProperty(k.toString(), v.toString());
+ if (tableProperties.get(k) == null) {
+ tableProperties.setProperty(k.toString(), v.toString());
+ }
});
final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
//copy all the values from config to cfg
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
index 7d4db2c..362a294 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieDeltaStreamer.java
@@ -1631,6 +1631,13 @@ public class TestHoodieDeltaStreamer extends
UtilitiesTestBase {
}
}
+ public static class TestTableLevelGenerator extends SimpleKeyGenerator {
+
+ public TestTableLevelGenerator(TypedProperties props) {
+ super(props);
+ }
+ }
+
public static class DummyAvroPayload extends OverwriteWithLatestAvroPayload {
public DummyAvroPayload(GenericRecord gr, Comparable orderingVal) {
diff --git
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
index 7b5ce9d..17450a0 100644
---
a/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
+++
b/hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
@@ -213,6 +213,24 @@ public class TestHoodieMultiTableDeltaStreamer extends
TestHoodieDeltaStreamer {
}
}
+ @Test
+ public void testTableLevelProperties() throws IOException {
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config",
TestDataSource.class.getName(), false);
+ HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg, jsc);
+ List<TableExecutionContext> tableExecutionContexts =
streamer.getTableExecutionContexts();
+ tableExecutionContexts.forEach(tableExecutionContext -> {
+ switch (tableExecutionContext.getTableName()) {
+ case "dummy_table_short_trip":
+ String tableLevelKeyGeneratorClass =
tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
+
assertEquals(TestHoodieDeltaStreamer.TestTableLevelGenerator.class.getName(),
tableLevelKeyGeneratorClass);
+ break;
+ default:
+ String defaultKeyGeneratorClass =
tableExecutionContext.getProperties().getString(DataSourceWriteOptions.KEYGENERATOR_CLASS_OPT_KEY());
+ assertEquals(TestHoodieDeltaStreamer.TestGenerator.class.getName(),
defaultKeyGeneratorClass);
+ }
+ });
+ }
+
private String populateCommonPropsAndWriteToFile() throws IOException {
TypedProperties commonProps = new TypedProperties();
populateCommonProps(commonProps);
diff --git
a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
index 52d39ba..243afc9 100644
---
a/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
+++
b/hudi-utilities/src/test/resources/delta-streamer-config/short_trip_uber_config.properties
@@ -21,4 +21,5 @@ hoodie.datasource.write.partitionpath.field=created_at
hoodie.deltastreamer.source.kafka.topic=topic2
hoodie.deltastreamer.keygen.timebased.timestamp.type=UNIX_TIMESTAMP
hoodie.deltastreamer.keygen.timebased.input.dateformat=yyyy-MM-dd HH:mm:ss.S
-hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
\ No newline at end of file
+hoodie.datasource.hive_sync.table=short_trip_uber_hive_dummy_table
+hoodie.datasource.write.keygenerator.class=org.apache.hudi.utilities.functional.TestHoodieDeltaStreamer$TestTableLevelGenerator
\ No newline at end of file