pratyakshsharma commented on a change in pull request #4925:
URL: https://github.com/apache/hudi/pull/4925#discussion_r820212569
##########
File path:
hudi-utilities/src/test/java/org/apache/hudi/utilities/functional/TestHoodieMultiTableDeltaStreamer.java
##########
@@ -177,6 +182,183 @@ public void testMultiTableExecutionWithKafkaSource()
throws IOException {
testNum++;
}
+ @Test
+ public void
testMultiTableExecutionWithKafkaSourceWhenOneSinkTableBoundMultiSources()
throws IOException {
+ //create topics for each table
+ String topicName1 = "topic" + testNum++;
+ String topicName2 = "topic" + testNum;
+ testUtils.createTopic(topicName1, 2);
+ testUtils.createTopic(topicName2, 2);
+
+ HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator();
+ testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 5,
HoodieTestDataGenerator.TRIP_SCHEMA)));
+ testUtils.sendMessages(topicName2,
Helpers.jsonifyRecords(dataGenerator.generateInsertsAsPerSchema("000", 10,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+
+ HoodieMultiTableDeltaStreamer.Config cfg =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE1, dfsBasePath + "/config",
JsonKafkaSource.class.getName(), false, false);
+ String commonPropsFile = cfg.propsFilePath;
+ FileSystem fs = FSUtils.getFs(commonPropsFile, jsc.hadoopConfiguration());
+ TypedProperties commonProperties = UtilHelpers.readConfig(fs.getConf(),
new Path(commonPropsFile), new ArrayList<>()).getProps();
+
commonProperties.setProperty("hoodie.deltastreamer.ingestion.tablesToBeIngested",
"sink_table");
+ commonProperties.setProperty("hoodie.deltastreamer.source.sourcesBoundTo"
+ "." + "sink_table", topicName2 + "," + topicName1);
+ commonProperties.setProperty("hoodie.deltastreamer.source.default." +
topicName2 + ".configFile", dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
+ commonProperties.setProperty("hoodie.deltastreamer.source.default." +
topicName1 + ".configFile", dfsBasePath + "/" + PROPS_FILENAME_TEST_SOURCE1);
+
commonProperties.setProperty("hoodie.deltastreamer.source.kafka.enable.commit.offset",
"true");
+ UtilitiesTestBase.Helpers.savePropsToDFS(commonProperties, fs, dfsBasePath
+ "/" + PROPS_FILENAME_TEST_SOURCE2);
+
+ HoodieMultiTableDeltaStreamer.Config cfg1 =
TestHelpers.getConfig(PROPS_FILENAME_TEST_SOURCE2, dfsBasePath + "/config",
JsonKafkaSource.class.getName(), false, false);
+ // set the mode that fetch data from multi sources is true
+ cfg1.allowFetchFromMultipleSources = true;
+ cfg1.allowContinuousWhenMultipleSources = true;
+ HoodieMultiTableDeltaStreamer streamer = new
HoodieMultiTableDeltaStreamer(cfg1, jsc);
+ List<TableExecutionContext> executionContexts =
streamer.getTableExecutionContexts();
+ TypedProperties properties = executionContexts.get(1).getProperties();
+
properties.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source_uber.avsc");
+
properties.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target_uber.avsc");
+ properties.setProperty("hoodie.deltastreamer.source.kafka.topic",
topicName2);
+ properties.setProperty("hoodie.deltastreamer.current.source.name",
topicName2);
+ properties.setProperty("hoodie.deltastreamer.current.source.checkpoint",
topicName2 + ",0:0,1:0");
+ executionContexts.get(1).setProperties(properties);
+
+ TypedProperties properties1 = executionContexts.get(0).getProperties();
+
properties1.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file",
dfsBasePath + "/source_short_trip_uber.avsc");
+
properties1.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file",
dfsBasePath + "/target_short_trip_uber.avsc");
+ properties1.setProperty("hoodie.deltastreamer.source.kafka.topic",
topicName1);
+ properties1.setProperty("hoodie.deltastreamer.current.source.name",
topicName1);
+ properties1.setProperty("hoodie.deltastreamer.current.source.checkpoint",
topicName1 + ",0:0,1:0");
+ executionContexts.get(0).setProperties(properties1);
+
+ String targetBasePath1 =
executionContexts.get(0).getConfig().targetBasePath;
+ String targetBasePath2 =
executionContexts.get(1).getConfig().targetBasePath;
+ streamer.sync();
+ // assert whether targetBasePath1 equals targetBasePath2 because that
there is only one sink table.
+ assertEquals(targetBasePath1, targetBasePath2);
+ TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(15, targetBasePath1
+ "/*/*.parquet", sqlContext);
+
+ //insert updates for already existing records in kafka topics
+ testUtils.sendMessages(topicName1,
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 5,
HoodieTestDataGenerator.TRIP_SCHEMA)));
+ testUtils.sendMessages(topicName2,
Helpers.jsonifyRecords(dataGenerator.generateUpdatesAsPerSchema("001", 10,
HoodieTestDataGenerator.SHORT_TRIP_SCHEMA)));
+
+ streamer = new HoodieMultiTableDeltaStreamer(cfg1, jsc);
+ streamer.getTableExecutionContexts().get(1).setProperties(properties);
+ streamer.getTableExecutionContexts().get(0).setProperties(properties1);
+ streamer.sync();
+
+ assertEquals(2, streamer.getSuccessTables().size());
+ assertTrue(streamer.getFailedTables().isEmpty());
+
+ //assert the record count matches now
+ TestHoodieDeltaStreamer.TestHelpers.assertRecordCount(15, targetBasePath1
+ "/*/*.parquet", sqlContext);
+ testNum++;
+ }
+
+ @Test
+ public void
testMultiTableExecutionWithKafkaSourceWhenMultiSinkTablesBoundMultiSources()
throws IOException {
Review comment:
Once the suggested changes are incorporated, we can actually remove
`testMultiTableExecutionWithKafkaSourceWhenOneSinkTableBoundMultiSources` and
have a single test case covering the entire scenario.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]