jonvex commented on code in PR #9743:
URL: https://github.com/apache/hudi/pull/9743#discussion_r1361283440


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/StreamSync.java:
##########
@@ -661,6 +652,35 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
     return Pair.of(schemaProvider, Pair.of(checkpointStr, records));
   }
 
+  /**
+   * Apply schema reconcile and schema evolution rules(schema on read) and 
generate new target schema provider.
+   *
+   * @param incomingSchema schema of the source data
+   * @param sourceSchemaProvider Source schema provider.
+   * @return the SchemaProvider that can be used as writer schema.
+   */
+  private SchemaProvider getDeducedSchemaProvider(Schema incomingSchema, 
SchemaProvider sourceSchemaProvider) {
+    Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(hoodieSparkContext.jsc(), fs, 
cfg.targetBasePath);
+    HoodieTableMetaClient metaClient = 
HoodieTableMetaClient.builder().setConf(new Configuration(fs.getConf()))
+        .setBasePath(cfg.targetBasePath)
+        .setPayloadClassName(cfg.payloadClassName)
+        .build();
+    Option<InternalSchema> internalSchemaOpt = 
HoodieConversionUtils.toJavaOption(
+        HoodieSchemaUtils.getLatestTableInternalSchema(
+            new HoodieConfig(HoodieStreamer.Config.getProps(fs, cfg)), 
metaClient));
+    // Deduce proper target (writer's) schema for the input dataset, 
reconciling its
+    // schema w/ the table's one
+    Schema targetSchema = HoodieSparkSqlWriter.deduceWriterSchema(
+          incomingSchema,
+          HoodieConversionUtils.toScalaOption(latestTableSchemaOpt),
+          HoodieConversionUtils.toScalaOption(internalSchemaOpt),
+          HoodieConversionUtils.fromProperties(props));
+
+    // Override schema provider with the reconciled target schema
+    return new DelegatingSchemaProvider(props, hoodieSparkContext.jsc(), 
sourceSchemaProvider,

Review Comment:
   In some cases the incoming schema is from the dataframe, not the 
schemaprovider. So we can't just return the schemaprovider if 
deduceWriterSchema doesn't change the incoming schema



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to