pratyakshsharma commented on a change in pull request #4925:
URL: https://github.com/apache/hudi/pull/4925#discussion_r820212361



##########
File path: 
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
##########
@@ -177,6 +182,183 @@ public void testMultiTableExecutionWithKafkaSource() 
throws IOException {
     testNum++;
   }
 
+  @Test
+  public void 
testMultiTableExecutionWithKafkaSourceWhenOneSinkTableBoundMultiSources() 
throws IOException {
+    //create topics for each table
+    String topicName1 = "topic" + testNum++;
+    String topicName2 = "topic" + testNum;
+    testUtils.createTopic(topicName1, 2);
+    testUtils.createTopic(topicName2, 2);
+
+    HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
+    testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+
+    HoodieMultiTableDeltaStreamer.Config cfg = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config", 
JsonKafkaSource.class.getName(), false, false);
+    String commonPropsFile = cfg.propsFilePath;
+    FileSystem fs = FSUtils.getFs(commonPropsFile, jsc.hadoopConfiguration());
+    TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(), 
new Path(commonPropsFile), new ArrayList<>()).getProps();
+    
commonProperties.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
 "sink_table");
+    commonProperties.setProperty("hoodie.deltastreamer.source.sourcesBoundTo" 
+ "." + "sink_table", topicName2 + "," + topicName1);
+    commonProperties.setProperty("hoodie.deltastreamer.source.default." + 
topicName2 + ".configFile", dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
+    commonProperties.setProperty("hoodie.deltastreamer.source.default." + 
topicName1 + ".configFile", dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
+    
commonProperties.setProperty("hoodie.deltastreamer.source.kafka.enable.commit.offset",
 "true");
+    UtilitiesTestBase.Helpers.savePropsToDFS(commonProperties, fs, dfsBasePath 
+ "/" + PROPS_FILENAME_TEST_SOURCE2);
+
+    HoodieMultiTableDeltaStreamer.Config cfg1 = 
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE2, dfsBasePath + "/config", 
JsonKafkaSource.class.getName(), false, false);
+    // set the mode that fetch data from multi sources is true
+    cfg1.allowFetchFromMultipleSources = true;
+    cfg1.allowContinuousWhenMultipleSources = true;
+    HoodieMultiTableDeltaStreamer streamer = new 
HoodieMultiTableDeltaStreamer(cfg1, jsc);
+    List<TableExecutionContext> executionContexts = 
streamer.getTableExecutionContexts();
+    TypedProperties properties = executionContexts.get(1).getProperties();
+    
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source_uber.avsc");
+    
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target_uber.avsc");
+    properties.setProperty("hoodie.deltastreamer.source.kafka.topic", 
topicName2);
+    properties.setProperty("hoodie.deltastreamer.current.source.name", 
topicName2);
+    properties.setProperty("hoodie.deltastreamer.current.source.checkpoint", 
topicName2 + ",0:0,1:0");
+    executionContexts.get(1).setProperties(properties);
+
+    TypedProperties properties1 = executionContexts.get(0).getProperties();
+    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
 dfsBasePath + "/source_short_trip_uber.avsc");
+    
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
 dfsBasePath + "/target_short_trip_uber.avsc");
+    properties1.setProperty("hoodie.deltastreamer.source.kafka.topic", 
topicName1);
+    properties1.setProperty("hoodie.deltastreamer.current.source.name", 
topicName1);
+    properties1.setProperty("hoodie.deltastreamer.current.source.checkpoint", 
topicName1 + ",0:0,1:0");
+    executionContexts.get(0).setProperties(properties1);
+
+    String targetBasePath1 = 
executionContexts.get(0).getConfig().targetBasePath;
+    String targetBasePath2 = 
executionContexts.get(1).getConfig().targetBasePath;
+    streamer.sync();
+    // assert whether targetBasePath1 equals targetBasePath2 because that 
there is only one sink table.
+    assertEquals(targetBasePath1, targetBasePath2);
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(15, targetBasePath1 
+ "/*/*.parquet", sqlContext);
+
+    //insert updates for already existing records in kafka topics
+    testUtils.sendMessages(topicName1, 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5, 
HoodieTestDataGenerator.TRIP_SCHEMA)));
+    testUtils.sendMessages(topicName2, 
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10, 
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+
+    streamer = new HoodieMultiTableDeltaStreamer(cfg1, jsc);
+    streamer.getTableExecutionContexts().get(1).setProperties(properties);
+    streamer.getTableExecutionContexts().get(0).setProperties(properties1);
+    streamer.sync();
+
+    assertEquals(2, streamer.getSuccessTables().size());
+    assertTrue(streamer.getFailedTables().isEmpty());
+
+    //assert the record count matches now
+    TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(15, targetBasePath1 
+ "/*/*.parquet", sqlContext);
+    testNum++;
+  }
+
+  @Test
+  public void 
testMultiTableExecutionWithKafkaSourceWhenMultiSinkTablesBoundMultiSources() 
throws IOException {

Review comment:
       In line with the difference of schema comment here - 
https://github.com/apache/hudi/pull/4925#issuecomment-1055454800, I feel it 
will be better to modify this test case to include the below scenario ->
   1. sink_table1 bound to 2 topics
   2. sink_table2 bound to a single topic
   3. for sink_table1, let us have different schemas for the 2 source topics, I 
do not intend to have big differences in their schemas, absence of a single 
field in one of the schemas should suffice. Let us show the use case of 
matching the schemas to the target schema as discussed in the quoted comment. 
   
   This should help users in understanding the kind of use cases covered in 
this PR. Later on we can either update the existing blog or write a new one to 
document this feature properly for everyone. Let me know in case of any doubts.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to