pratyakshsharma commented on a change in pull request #1566:
URL: https://github.com/apache/hudi/pull/1566#discussion_r495499116
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -536,14 +564,35 @@ public void syncHive(HiveConf conf) {
* SchemaProvider creation is a precursor to HoodieWriteClient and
AsyncCompactor creation. This method takes care of
* this constraint.
*/
- private void setupWriteClient() {
- LOG.info("Setting up Hoodie Write Client");
- if ((null != schemaProvider) && (null == writeClient)) {
- registerAvroSchemas(schemaProvider);
- HoodieWriteConfig hoodieCfg = getHoodieClientConfig(schemaProvider);
- writeClient = new HoodieWriteClient<>(jssc, hoodieCfg, true);
- onInitializingHoodieWriteClient.apply(writeClient);
+ public void setupWriteClient() throws IOException {
+ if ((null != schemaProvider)) {
+ Schema sourceSchema = schemaProvider.getSourceSchema();
+ Schema targetSchema = schemaProvider.getTargetSchema();
+ createNewWriteClient(sourceSchema, targetSchema);
+ }
+ }
+
+ private void createNewWriteClient(Schema sourceSchema, Schema targetSchema)
throws IOException {
+ LOG.info("Setting up new Hoodie Write Client");
+ registerAvroSchemas(sourceSchema, targetSchema);
+ HoodieWriteConfig hoodieCfg = getHoodieClientConfig(targetSchema);
+ if (hoodieCfg.isEmbeddedTimelineServerEnabled()) {
+ if (!embeddedTimelineService.isPresent()) {
+ embeddedTimelineService =
EmbeddedTimelineServerHelper.createEmbeddedTimelineService(jssc.hadoopConfiguration(),
+ jssc.getConf(), hoodieCfg);
+ } else {
+
EmbeddedTimelineServerHelper.updateWriteConfigWithTimelineServer(embeddedTimelineService.get(),
hoodieCfg);
Review comment:
As per my understanding, in every iteration of deltaStreamer, only the
target schema can change, rest of the HoodieWriteConfig will remain same.
Rather than creating a new hoodieCfg variable in this method every time schema
changes, can we have a class level HoodieWriteConfig variable, where we only
update the schema whenever it changes? This way we can do away with this call
`updateWriteConfigWithTimelineServer` here. WDYT?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]