pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612662
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String
configFolder, FileSystem fs, St
}
}
- //commonProps are passed as parameter which contain table to config file
mapping
+ // commonProps are passed as parameter which contain table to config file
mapping
private void populateTableExecutionContextList(TypedProperties properties,
String configFolder, FileSystem fs, Config config) throws IOException {
- List<String> tablesToBeIngested = getTablesToBeIngested(properties);
- logger.info("tables to be ingested via MultiTableDeltaStreamer : " +
tablesToBeIngested);
- TableExecutionContext executionContext;
- for (String table : tablesToBeIngested) {
- String[] tableWithDatabase = table.split("\\.");
- String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] :
"default";
- String currentTable = tableWithDatabase.length > 1 ?
tableWithDatabase[1] : table;
- String configProp = Constants.INGESTION_PREFIX + database +
Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
- String configFilePath = properties.getString(configProp,
Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
- checkIfTableConfigFileExists(configFolder, fs, configFilePath);
- TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(),
new Path(configFilePath), new ArrayList<String>()).getProps();
- properties.forEach((k, v) -> {
- if (tableProperties.get(k) == null) {
- tableProperties.setProperty(k.toString(), v.toString());
- }
- });
- final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
- //copy all the values from config to cfg
- String targetBasePath = resetTarget(config, database, currentTable);
- Helpers.deepCopyConfigs(config, cfg);
- String overriddenTargetBasePath =
tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
- cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath)
? targetBasePath : overriddenTargetBasePath;
- if (cfg.enableMetaSync &&
StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(),
""))) {
- throw new HoodieException("Meta sync table field not provided!");
+ // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update
the same target.
+ String sourcesToBeBound =
properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+ if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+ // populate the table execution context by traversing source tables
+ List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
Review comment:
I was wondering if we can change this variable name to something more
proper like `sourcesToFetchFrom` or something. However I would leave the final
decision to you.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]