bvaradar commented on a change in pull request #1566:
URL: https://github.com/apache/incubator-hudi/pull/1566#discussion_r416220182
##########
File path:
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java
##########
@@ -162,18 +162,23 @@ public DeltaSync(HoodieDeltaStreamer.Config cfg,
SparkSession sparkSession, Sche
this.fs = fs;
this.onInitializingHoodieWriteClient = onInitializingHoodieWriteClient;
this.props = props;
- this.schemaProvider = schemaProvider;
refreshTimeline();
-
this.transformer =
UtilHelpers.createTransformer(cfg.transformerClassNames);
this.keyGenerator = DataSourceUtils.createKeyGenerator(props);
-
- this.formatAdapter = new SourceFormatAdapter(
- UtilHelpers.createSource(cfg.sourceClassName, props, jssc,
sparkSession, schemaProvider));
-
this.conf = conf;
+ refreshSchemaProvider(schemaProvider);
+ }
+ /**
+ * Very useful when DeltaStreamer is running in continuous mode.
+ * @param schemaProvider
+ * @throws IOException
+ */
+ public void refreshSchemaProvider(SchemaProvider schemaProvider) throws
IOException {
Review comment:
@pratyakshsharma : It looks like refreshSchemaProvider not only
refreshes schema-provider but also recreates Source and setup WriteClient
@vinothchandar : Recreating DeltaSync each run would require to handle
embedded timeline server reuse and async compaction triggering differently.
Another option is to have explicit refreshSchema() API in SchemaProvider (with
default implementation (for compatibility) and implementing refresh in existing
Schema Provider implementation) and have delta-streamer call this ? Let me know
your thoughts on this ?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]