pratyakshsharma commented on a change in pull request #4645:
URL: https://github.com/apache/hudi/pull/4645#discussion_r810612142



##########
File path: 
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/HoodieMultiTableDeltaStreamer.java
##########
@@ -104,43 +108,111 @@ private void checkIfTableConfigFileExists(String 
configFolder, FileSystem fs, St
     }
   }
 
-  //commonProps are passed as parameter which contain table to config file 
mapping
+  // commonProps are passed as parameter which contain table to config file 
mapping
   private void populateTableExecutionContextList(TypedProperties properties, 
String configFolder, FileSystem fs, Config config) throws IOException {
-    List<String> tablesToBeIngested = getTablesToBeIngested(properties);
-    logger.info("tables to be ingested via MultiTableDeltaStreamer : " + 
tablesToBeIngested);
-    TableExecutionContext executionContext;
-    for (String table : tablesToBeIngested) {
-      String[] tableWithDatabase = table.split("\\.");
-      String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] : 
"default";
-      String currentTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : table;
-      String configProp = Constants.INGESTION_PREFIX + database + 
Constants.DELIMITER + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
-      String configFilePath = properties.getString(configProp, 
Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
-      checkIfTableConfigFileExists(configFolder, fs, configFilePath);
-      TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), 
new Path(configFilePath), new ArrayList<String>()).getProps();
-      properties.forEach((k, v) -> {
-        if (tableProperties.get(k) == null) {
-          tableProperties.setProperty(k.toString(), v.toString());
-        }
-      });
-      final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
-      //copy all the values from config to cfg
-      String targetBasePath = resetTarget(config, database, currentTable);
-      Helpers.deepCopyConfigs(config, cfg);
-      String overriddenTargetBasePath = 
tableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
-      cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath) 
? targetBasePath : overriddenTargetBasePath;
-      if (cfg.enableMetaSync && 
StringUtils.isNullOrEmpty(tableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key(),
 ""))) {
-        throw new HoodieException("Meta sync table field not provided!");
+    // Parameter SOURCES_TO_BE_BOUND indicates that multiple sources update 
the same target.
+    String sourcesToBeBound = 
properties.getProperty(Constants.SOURCES_TO_BE_BOUND);
+    if (!StringUtils.isNullOrEmpty(sourcesToBeBound)) {
+      // populate the table execution context by traversing source tables
+      List<String> sourcesToBeBonded = getSourcesToBeBound(properties);
+      logger.info("Source tables to be bound via MultiTableDeltaStreamer : " + 
sourcesToBeBonded);
+      String tableToBeIngested = getTableToBeIngested(properties);
+      String[] targetTableWithDataBase = tableToBeIngested.split("\\.");
+      String targetDataBase = targetTableWithDataBase.length > 1 ? 
targetTableWithDataBase[0] : "default";
+      String targetTable = targetTableWithDataBase.length > 1 ? 
targetTableWithDataBase[1] : targetTableWithDataBase[0];
+      String targetBasePath = resetTarget(config, targetDataBase, targetTable);
+
+      for (String source : sourcesToBeBonded) {
+        String[] tableWithDatabase = source.split("\\.");
+        String currentSourceDataBase = tableWithDatabase.length > 1 ? 
tableWithDatabase[0] : "default";
+        String currentSourceTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : source;
+        String configProp = Constants.SOURCE_PREFIX + currentSourceDataBase + 
Constants.PATH_CUR_DIR + currentSourceTable + Constants.INGESTION_CONFIG_SUFFIX;
+        TableExecutionContext executionContext = 
populateTableExecutionContext(properties, configFolder, fs, config, configProp, 
currentSourceDataBase, currentSourceTable, targetBasePath);
+        this.tableExecutionContexts.add(executionContext);
+      }
+    } else {
+      // populate the table execution context by traversing target tables
+      List<String> tablesToBeIngested = getTablesToBeIngested(properties);
+      logger.info("Tables to be ingested via MultiTableDeltaStreamer : " + 
tablesToBeIngested);
+
+      for (String table : tablesToBeIngested) {
+        String[] tableWithDatabase = table.split("\\.");
+        String database = tableWithDatabase.length > 1 ? tableWithDatabase[0] 
: "default";
+        String currentTable = tableWithDatabase.length > 1 ? 
tableWithDatabase[1] : table;
+        String configProp = Constants.INGESTION_PREFIX + database + 
Constants.PATH_CUR_DIR + currentTable + Constants.INGESTION_CONFIG_SUFFIX;
+        TableExecutionContext executionContext = 
populateTableExecutionContext(properties, configFolder, fs, config, configProp, 
database, currentTable, null);
+        this.tableExecutionContexts.add(executionContext);
       }
-      populateSchemaProviderProps(cfg, tableProperties);
-      executionContext = new TableExecutionContext();
-      executionContext.setProperties(tableProperties);
-      executionContext.setConfig(cfg);
-      executionContext.setDatabase(database);
-      executionContext.setTableName(currentTable);
-      this.tableExecutionContexts.add(executionContext);
     }
   }
 
+  private TableExecutionContext populateTableExecutionContext(TypedProperties 
properties, String configFolder,
+      FileSystem fs, Config config, String configProp, String database, String 
currentTable, String targetBasePath) throws IOException {
+    // copy all common properties to current table properties
+    TypedProperties currentTableProperties = 
getCurrentTableProperties(properties, configFolder, fs, configProp, database,
+        currentTable);
+
+    // copy all the values from config to cfg
+    final HoodieDeltaStreamer.Config cfg = new HoodieDeltaStreamer.Config();
+    Helpers.deepCopyConfigs(config, cfg);
+
+    // calculate the value of targetBasePath which is a property of cfg
+    calculateTargetBasePath(config, database, currentTable, targetBasePath, 
currentTableProperties, cfg);
+
+    if (cfg.enableHiveSync && 
StringUtils.isNullOrEmpty(currentTableProperties.getString(DataSourceWriteOptions.HIVE_TABLE().key())))
 {
+      throw new HoodieException("Hive sync table field not provided!");
+    }
+
+    populateSchemaProviderProps(cfg, currentTableProperties);
+    TableExecutionContext executionContext = new TableExecutionContext();
+    executionContext.setProperties(currentTableProperties);
+    executionContext.setConfig(cfg);
+    executionContext.setDatabase(database);
+    executionContext.setTableName(currentTable);
+    return executionContext;
+  }
+
+  private TypedProperties getCurrentTableProperties(TypedProperties 
properties, String configFolder, FileSystem fs,
+      String configProp, String database, String currentTable) throws 
IOException {
+
+    String configFilePath = properties.getString(configProp, 
Helpers.getDefaultConfigFilePath(configFolder, database, currentTable));
+    checkIfTableConfigFileExists(configFolder, fs, configFilePath);
+
+    TypedProperties tableProperties = UtilHelpers.readConfig(fs.getConf(), new 
Path(configFilePath), new ArrayList<>()).getProps();
+    properties.forEach((k, v) -> {
+      if (tableProperties.get(k) == null) {
+        tableProperties.setProperty(k.toString(), v.toString());
+      }
+    });
+
+    return tableProperties;
+  }
+
+  private void calculateTargetBasePath(Config config, String database, String 
currentTable, String targetBasePath,
+      TypedProperties currentTableProperties, HoodieDeltaStreamer.Config cfg) {
+
+    String overriddenTargetBasePath = 
currentTableProperties.getString(Constants.TARGET_BASE_PATH_PROP, "");
+
+    if (StringUtils.isNullOrEmpty(targetBasePath)) {
+      targetBasePath = resetTarget(config, database, currentTable);
+    }
+
+    cfg.targetBasePath = StringUtils.isNullOrEmpty(overriddenTargetBasePath)
+      ? targetBasePath
+      : overriddenTargetBasePath;
+  }
+
+  private List<String> getSourcesToBeBound(TypedProperties properties) {
+    String combinedSourcesString = 
properties.getString(Constants.SOURCES_TO_BE_BOUND, null);
+    return StringUtils.isNullOrEmpty(combinedSourcesString)
+      ? new ArrayList<>()
+      : Arrays.asList(combinedSourcesString.split(Constants.COMMA_SEPARATOR));
+  }
+
+  private String getTableToBeIngested(TypedProperties properties) {

Review comment:
       what is the need of introducing this new function? Can we reuse existing 
`getTablesToBeIngested` in your new code as well?




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to