pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612445



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String 
configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file 
mapping
+  // commonProps are passed as parameter which contain table to config file 
mapping
   private void populateTableExecutionContextList(TypedProperties properties, 
String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + 
tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : 
"default";
-      String currentTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + 
Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, 
Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), 
new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = 
tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) 
? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && 
StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(),
 ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update 
the same target.
+    String sourcesToBeBound = 
properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + 
sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? 
targetTableWithDataBase[0] : "default";

Review comment:
       May be we can extract this line as a separate function, this is 
duplicated at L140 and L127 in your code.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to