nsivabalan commented on code in PR #6358:
URL: https://github.com/apache/hudi/pull/6358#discussion_r1027173356


##########
hudi-common/src/main/java/org/apache/hudi/common/table/log/HoodieLogFileReader.java:
##########
@@ -243,6 +243,18 @@ private HoodieLogBlock readBlock() throws IOException {
     }
   }
 
+  private Option<Schema> getTargetReaderSchemaForBlock() {
+    // we should use write schema to read log file,
+    // since when we have done some DDL operation, the readerSchema maybe 
different from writeSchema, avro reader will throw exception.
+    // eg: origin writeSchema is: "a String, b double" then we add a new 
column now the readerSchema will be: "a string, c int, b double". it's wrong to 
use readerSchema to read old log file.
+    // after we read those record by writeSchema,  we rewrite those record 
with readerSchema in AbstractHoodieLogRecordReader
+    if (internalSchema.isEmptySchema()) {
+      return Option.ofNullable(this.readerSchema);
+    } else {
+      return Option.empty();

Review Comment:
   even w/o enabling comprehensive schema evolution, one can evolve schema w/ 
hudi right. 
   So, in such a case, we will still be reading a record written in older 
schema w/ a newer schema. is that ok?
   
   bcoz, in AbstractHoodieLogRecordReader, within 
composeEvolvedSchemaTransformer, we add a transformer to rewrite the record 
only if comprehensive schema evolution is enabled (i.e. internalSchema is non 
empty) 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/schema/SparkAvroPostProcessor.java:
##########
@@ -40,8 +41,13 @@ public SparkAvroPostProcessor(TypedProperties props, 
JavaSparkContext jssc) {
 
   @Override
   public Schema processSchema(Schema schema) {
-    return schema != null ? AvroConversionUtils.convertStructTypeToAvroSchema(
-        AvroConversionUtils.convertAvroSchemaToStructType(schema), 
RowBasedSchemaProvider.HOODIE_RECORD_STRUCT_NAME,
-        RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE) : null;
+    if (schema == null) {
+      return null;
+    }
+
+    StructType structType = 
AvroConversionUtils.convertAvroSchemaToStructType(schema);
+    // NOTE: It's critical that we preserve incoming schema's qualified 
record-name to make
+    //       sure we maintain schema's compatibility (as defined by 
[[AvroSchemaCompatibility]])
+    return AvroConversionUtils.convertStructTypeToAvroSchema(structType, 
schema.getFullName());

Review Comment:
   is this backwards compatible. i.e. if someone already had a pipeline w/ 
0.12.0 (where the schema was using 
RowBasedSchemaProvider.HOODIE_RECORD_NAMESPACE and now once they start using 
0.13.0, the schema's qualified record name is gonna be different. 
   
   so, just to make sure. all our schema backwards compatibility check is 
resilient to this right? 



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -462,37 +466,33 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = 
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null) {
-        // If the target schema is specified through Avro schema,
-        // pass in the schema for the Row-to-Avro conversion
-        // to avoid nullability mismatch between Avro schema and Row schema
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.of(this.userProvidedSchemaProvider.getTargetSchema())
-            ).toJavaRDD());
         schemaProvider = this.userProvidedSchemaProvider;
       } else {
-        // Use Transformed Row's schema if not overridden. If target schema is 
not specified
-        // default to RowBasedSchemaProvider
-        schemaProvider =
-            transformed
-                .map(r -> {
-                  // determine the targetSchemaProvider. use latestTableSchema 
if reconcileSchema is enabled.
-                  SchemaProvider targetSchemaProvider = null;
-                  if (reconcileSchema) {
-                    targetSchemaProvider = 
UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs, 
cfg.targetBasePath);
-                  } else {
-                    targetSchemaProvider = 
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
-                  }
-                  return (SchemaProvider) new DelegatingSchemaProvider(props, 
jssc,
-                      dataAndCheckpoint.getSchemaProvider(), 
targetSchemaProvider); })
-                .orElse(dataAndCheckpoint.getSchemaProvider());
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.ofNullable(schemaProvider.getTargetSchema())
-            ).toJavaRDD());
+        Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+        // Deduce proper target (writer's) schema for the transformed dataset, 
reconciling its
+        // schema w/ the table's one
+        Option<Schema> targetSchemaOpt = transformed.map(df -> {
+          Schema sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
+              
latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName)));
+          // Target (writer's) schema is determined based on the incoming 
source schema
+          // and existing table's one, reconciling the two (if necessary) 
based on configuration
+          return HoodieSparkSqlWriter.deduceWriterSchema(

Review Comment:
   can we move deduceWriterSchema to SchemaUtils(new class) or 
HoodieSparkUtils(instead of HoodieSparkSqlWriter) or some other class since its 
being used across all write paths. 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -57,10 +57,19 @@ class ExpressionPayload(record: GenericRecord,
   }
 
   /**
-   * The schema of this table.
+   * Target schema used for writing records into the table
    */
   private var writeSchema: Schema = _
 
+  /**
+   * Original record's schema
+   *
+   * NOTE: To avoid excessive overhead of serializing original record's Avro 
schema along
+   *       w/ _every_ record, we instead make it to be provided along with 
every request
+   *       requiring this record to be deserialized
+   */
+  private var recordSchema: Schema = _

Review Comment:
   One difference I spot b/w other payloads (OverwriteWithLatest, etc) and 
ExpressionPayload are the usages of these reader and writer schema as instance 
variables. So, once we call combineAndGetUpdateValue() and the call completes, 
then these schemas will add to the payload size in memory which is not the case 
w/ other payloads. 
   
   So, for large datasets w/ 100s of cols, this might posses a problem. 
   Can we make them local variables. just that we might reinit these schemas 
whenever combineAndGetUpdate() or getInsertValue() is called. if not, we 
initialize only once. 
   



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -462,37 +466,33 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = 
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null) {
-        // If the target schema is specified through Avro schema,
-        // pass in the schema for the Row-to-Avro conversion
-        // to avoid nullability mismatch between Avro schema and Row schema
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.of(this.userProvidedSchemaProvider.getTargetSchema())
-            ).toJavaRDD());
         schemaProvider = this.userProvidedSchemaProvider;
       } else {
-        // Use Transformed Row's schema if not overridden. If target schema is 
not specified
-        // default to RowBasedSchemaProvider
-        schemaProvider =
-            transformed
-                .map(r -> {
-                  // determine the targetSchemaProvider. use latestTableSchema 
if reconcileSchema is enabled.
-                  SchemaProvider targetSchemaProvider = null;
-                  if (reconcileSchema) {
-                    targetSchemaProvider = 
UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs, 
cfg.targetBasePath);
-                  } else {
-                    targetSchemaProvider = 
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
-                  }
-                  return (SchemaProvider) new DelegatingSchemaProvider(props, 
jssc,
-                      dataAndCheckpoint.getSchemaProvider(), 
targetSchemaProvider); })
-                .orElse(dataAndCheckpoint.getSchemaProvider());
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.ofNullable(schemaProvider.getTargetSchema())
-            ).toJavaRDD());
+        Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+        // Deduce proper target (writer's) schema for the transformed dataset, 
reconciling its
+        // schema w/ the table's one
+        Option<Schema> targetSchemaOpt = transformed.map(df -> {
+          Schema sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
+              
latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName)));
+          // Target (writer's) schema is determined based on the incoming 
source schema
+          // and existing table's one, reconciling the two (if necessary) 
based on configuration
+          return HoodieSparkSqlWriter.deduceWriterSchema(
+                  sourceSchema,
+                  
HoodieConversionUtils.<Schema>toScalaOption(latestTableSchemaOpt),
+                  
HoodieConversionUtils.<InternalSchema>toScalaOption(Option.empty()),

Review Comment:
   why setting null for InternalSchema? some could enable comprehensive schema 
evol w/ deltastreamer as well right? in such a case, shouldn't 
deduceWriteSchema honor that.



##########
hudi-utilities/src/main/java/org/apache/hudi/utilities/deltastreamer/DeltaSync.java:
##########
@@ -462,37 +466,33 @@ private Pair<SchemaProvider, Pair<String, 
JavaRDD<HoodieRecord>>> fetchFromSourc
       checkpointStr = dataAndCheckpoint.getCheckpointForNextBatch();
       boolean reconcileSchema = 
props.getBoolean(DataSourceWriteOptions.RECONCILE_SCHEMA().key());
       if (this.userProvidedSchemaProvider != null && 
this.userProvidedSchemaProvider.getTargetSchema() != null) {
-        // If the target schema is specified through Avro schema,
-        // pass in the schema for the Row-to-Avro conversion
-        // to avoid nullability mismatch between Avro schema and Row schema
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.of(this.userProvidedSchemaProvider.getTargetSchema())
-            ).toJavaRDD());
         schemaProvider = this.userProvidedSchemaProvider;
       } else {
-        // Use Transformed Row's schema if not overridden. If target schema is 
not specified
-        // default to RowBasedSchemaProvider
-        schemaProvider =
-            transformed
-                .map(r -> {
-                  // determine the targetSchemaProvider. use latestTableSchema 
if reconcileSchema is enabled.
-                  SchemaProvider targetSchemaProvider = null;
-                  if (reconcileSchema) {
-                    targetSchemaProvider = 
UtilHelpers.createLatestSchemaProvider(r.schema(), jssc, fs, 
cfg.targetBasePath);
-                  } else {
-                    targetSchemaProvider = 
UtilHelpers.createRowBasedSchemaProvider(r.schema(), props, jssc);
-                  }
-                  return (SchemaProvider) new DelegatingSchemaProvider(props, 
jssc,
-                      dataAndCheckpoint.getSchemaProvider(), 
targetSchemaProvider); })
-                .orElse(dataAndCheckpoint.getSchemaProvider());
-        avroRDDOptional = transformed
-            .map(t -> HoodieSparkUtils.createRdd(
-                t, HOODIE_RECORD_STRUCT_NAME, HOODIE_RECORD_NAMESPACE, 
reconcileSchema,
-                Option.ofNullable(schemaProvider.getTargetSchema())
-            ).toJavaRDD());
+        Option<Schema> latestTableSchemaOpt = 
UtilHelpers.getLatestTableSchema(jssc, fs, cfg.targetBasePath);
+        // Deduce proper target (writer's) schema for the transformed dataset, 
reconciling its
+        // schema w/ the table's one
+        Option<Schema> targetSchemaOpt = transformed.map(df -> {
+          Schema sourceSchema = 
AvroConversionUtils.convertStructTypeToAvroSchema(df.schema(),
+              
latestTableSchemaOpt.map(Schema::getFullName).orElse(getAvroRecordQualifiedName(cfg.targetTableName)));
+          // Target (writer's) schema is determined based on the incoming 
source schema
+          // and existing table's one, reconciling the two (if necessary) 
based on configuration
+          return HoodieSparkSqlWriter.deduceWriterSchema(
+                  sourceSchema,
+                  
HoodieConversionUtils.<Schema>toScalaOption(latestTableSchemaOpt),
+                  
HoodieConversionUtils.<InternalSchema>toScalaOption(Option.empty()),
+                  HoodieConversionUtils.fromProperties(props));
+        });

Review Comment:
   something to be mindful about not breaking any of the existing pipelines. 
the new deduceWriteSchema w/ canonicalize the schema for all flows. So, are 
there chances that we did not canonicalize w/ 0.12.0 for an existing pipeline, 
but now adding canonicalize might break it or cause any issues? 



##########
hudi-spark-datasource/hudi-spark/src/main/scala/org/apache/spark/sql/hudi/command/payload/ExpressionPayload.scala:
##########
@@ -274,13 +297,11 @@ object ExpressionPayload {
     .maximumSize(1024)
     .build[String, Map[IExpressionEvaluator, IExpressionEvaluator]]()
 
-  private val writeSchemaCache = Caffeine.newBuilder()
+  private val schemaCache = Caffeine.newBuilder()

Review Comment:
   these cache (L296) and schemaCache (L300) are also adding more to memory in 
this payload compared to other payloads. So, yet another difference b/w this 
and others. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to