alexeykudinkin commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1028368761


##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -462,37 +466,33 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = 
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null) {
-        // If the target schema is specified through Avro schema,
-        // pass in the schema for the Row-to-Avro conversion
-        // to avoid nullability mismatch between Avro schema and Row schema
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.of(this.userProvidedSchemaProvider.getTargetSchema())
-            ).toJavaRDD());
         schemaProvider = this.userProvidedSchemaProvider;
       } else {
-        // Use Transformed Row's schema if not overridden. If target schema is 
not specified
-        // default to RowBasedSchemaProvider
-        schemaProvider =
-            transformed
-                .map(r -> {
-                  // determine the targetSchemaProvider. use latestTableSchema 
if reconcileSchema is enabled.
-                  SchemaProvider targetSchemaProvider = null;
-                  if (reconcileSchema) {
-                    targetSchemaProvider = 
UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs, 
cfg.targetBasePath);
-                  } else {
-                    targetSchemaProvider = 
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
-                  }
-                  return (SchemaProvider) new DelegatingSchemaProvider(props, 
jssc,
-                      dataAndCheckpoint.getSchemaProvider(), 
targetSchemaProvider); })
-                .orElse(dataAndCheckpoint.getSchemaProvider());
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.ofNullable(schemaProvider.getTargetSchema())
-            ).toJavaRDD());
+        Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+        // Deduce proper target (writer's) schema for the transformed dataset, 
reconciling its
+        // schema w/ the table's one
+        Option<Schema> targetSchemaOpt = transformed.map(df -> {
+          Schema sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
+              
latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName)));
+          // Target (writer's) schema is determined based on the incoming 
source schema
+          // and existing table's one, reconciling the two (if necessary) 
based on configuration
+          return HoodieSparkSqlWriter.deduceWriterSchema(
+                  sourceSchema,
+                  
HoodieConversionUtils.<Schema>toScalaOption(latestTableSchemaOpt),
+                  
HoodieConversionUtils.<InternalSchema>toScalaOption(Option.empty()),

Review Comment:
   We never had support for comprehensive SA from DS. We can take this up as a 
follow-up



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -462,37 +466,33 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = 
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null) {
-        // If the target schema is specified through Avro schema,
-        // pass in the schema for the Row-to-Avro conversion
-        // to avoid nullability mismatch between Avro schema and Row schema
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.of(this.userProvidedSchemaProvider.getTargetSchema())
-            ).toJavaRDD());
         schemaProvider = this.userProvidedSchemaProvider;
       } else {
-        // Use Transformed Row's schema if not overridden. If target schema is 
not specified
-        // default to RowBasedSchemaProvider
-        schemaProvider =
-            transformed
-                .map(r -> {
-                  // determine the targetSchemaProvider. use latestTableSchema 
if reconcileSchema is enabled.
-                  SchemaProvider targetSchemaProvider = null;
-                  if (reconcileSchema) {
-                    targetSchemaProvider = 
UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs, 
cfg.targetBasePath);
-                  } else {
-                    targetSchemaProvider = 
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
-                  }
-                  return (SchemaProvider) new DelegatingSchemaProvider(props, 
jssc,
-                      dataAndCheckpoint.getSchemaProvider(), 
targetSchemaProvider); })
-                .orElse(dataAndCheckpoint.getSchemaProvider());
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.ofNullable(schemaProvider.getTargetSchema())
-            ).toJavaRDD());
+        Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+        // Deduce proper target (writer's) schema for the transformed dataset, 
reconciling its
+        // schema w/ the table's one
+        Option<Schema> targetSchemaOpt = transformed.map(df -> {
+          Schema sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
+              
latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName)));
+          // Target (writer's) schema is determined based on the incoming 
source schema
+          // and existing table's one, reconciling the two (if necessary) 
based on configuration
+          return HoodieSparkSqlWriter.deduceWriterSchema(
+                  sourceSchema,
+                  
HoodieConversionUtils.<Schema>toScalaOption(latestTableSchemaOpt),
+                  
HoodieConversionUtils.<InternalSchema>toScalaOption(Option.empty()),
+                  HoodieConversionUtils.fromProperties(props));
+        });

Review Comment:
   I don't think so -- we now do schema validation every time we write, so any 
schema inconsistencies will be wed out early on



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to