bvaradar commented on code in PR #18036:
URL: https://github.com/apache/hudi/pull/18036#discussion_r2915348921


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +148,153 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns.
+    // Falls back to the provided schema if no shredded Variant columns are 
present.
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);
-    this.rootFieldWriters = getFieldWriters(structType, schema);
+    // Use shreddedSchema for creating writers when shredded Variants are 
present
+    this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
     this.hadoopConf = hadoopConf;
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
   }
 
+  /**
+   * Generates a shredded schema from the given structType and hoodieSchema.
+   * <p>
+   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()),
+   * the VariantType is replaced with a shredded struct schema. This method 
recursively processes
+   * nested struct fields to handle Variant fields at any depth.
+   *
+   * @param structType The original Spark StructType
+   * @param hoodieSchema The HoodieSchema containing shredding information
+   * @return A StructType with shredded Variant fields replaced by their 
shredded schemas
+   */
+  private StructType generateShreddedSchema(StructType structType, 
HoodieSchema hoodieSchema) {
+    StructField[] fields = structType.fields();
+    StructField[] shreddedFields = new StructField[fields.length];
+    boolean hasShredding = false;
+
+    for (int i = 0; i < fields.length; i++) {
+      StructField field = fields[i];
+      DataType dataType = field.dataType();
+
+      // Get the HoodieSchema for this field (if available)
+      // Use getNonNullType() to unwrap nullable unions (e.g., ["null", 
"string"] -> "string")
+      HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
+          .map(HoodieSchema::getNonNullType)
+          .flatMap(s -> s.hasFields() ? s.getField(field.name()) : 
Option.empty())
+          .map(HoodieSchemaField::schema)
+          .orElse(null);
+
+      // Check if this is a Variant field that should be shredded
+      if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) 
{
+        if (fieldHoodieSchema != null && fieldHoodieSchema.getType() == 
HoodieSchemaType.VARIANT) {
+          HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) 
fieldHoodieSchema;
+          // If typed_value field exists, the variant is shredded
+          if (variantSchema.getTypedValueField().isPresent()) {
+            // Use plain types for SparkShreddingUtils (unwraps nested {value, 
typed_value} structs if present)
+            HoodieSchema typedValueSchema = 
variantSchema.getPlainTypedValueSchema().get();
+            DataType typedValueDataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(typedValueSchema);
+
+            // Generate the shredding schema with write metadata using 
SparkAdapter
+            StructType markedShreddedStruct = 
SparkAdapterSupport$.MODULE$.sparkAdapter()
+                .generateVariantWriteShreddingSchema(typedValueDataType, true, 
false);
+
+            shreddedFields[i] = new StructField(field.name(), 
markedShreddedStruct, field.nullable(), field.metadata());
+            hasShredding = true;
+            continue;
+          }
+        }
+      }
+
+      // Recursively process nested fields (for VariantType without shredding, 
or for nested structures like StructType/ArrayType/MapType)
+      DataType processedDataType = processNestedDataType(field.dataType(), 
fieldHoodieSchema);
+      if (processedDataType != field.dataType()) {
+        shreddedFields[i] = new StructField(field.name(), processedDataType, 
field.nullable(), field.metadata());
+        hasShredding = true;
+      } else {
+        // No shredding in this field or its nested fields
+        shreddedFields[i] = field;
+      }
+    }
+
+    return hasShredding ? new StructType(shreddedFields) : structType;
+  }
+
+  /**
+   * Recursively processes nested data types (structs, arrays, maps) to handle 
Variant shredding at any depth.
+   *
+   * @param dataType The data type to process
+   * @param hoodieSchema The corresponding HoodieSchema
+   * @return The processed data type with shredded Variants, or the original 
if no shredding needed
+   */
+  private DataType processNestedDataType(DataType dataType, HoodieSchema 
hoodieSchema) {
+    // Check if this is a Variant type that should be shredded
+    if (hoodieSchema != null && hoodieSchema.getType() == 
HoodieSchemaType.VARIANT) {
+      HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) hoodieSchema;
+      // If typed_value field exists, the variant is shredded
+      if (variantSchema.getTypedValueField().isPresent()) {
+        // Use plain types for SparkShreddingUtils (unwraps nested {value, 
typed_value} structs if present)
+        HoodieSchema typedValueSchema = 
variantSchema.getPlainTypedValueSchema().get();
+        DataType typedValueDataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(typedValueSchema);
+
+        // Generate the shredding schema with write metadata using SparkAdapter
+        return SparkAdapterSupport$.MODULE$.sparkAdapter()
+            .generateVariantWriteShreddingSchema(typedValueDataType, true, 
false);
+      }
+      return dataType;
+    }
+
+    if (dataType instanceof StructType) {
+      StructType structType = (StructType) dataType;
+      StructType processedStruct = generateShreddedSchema(structType, 
hoodieSchema);

Review Comment:
   In generateShreddedSchema, we call hoodieSchema.getNonNullType() before 
attempting to retrieve fields, specifically to unwrap nullable unions. But in 
the MapType branch of processNestedDataType, we call 
hoodieSchema.getValueType() directly without any getNonNullType() call first. 
If the map's value schema happens to be wrapped in a nullable union in 
HoodieSchema. Can you check if there is inconsistency here and add testcases 
with nonnull cases.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +147,89 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);
-    this.rootFieldWriters = getFieldWriters(structType, schema);
+    // Use shreddedSchema for creating writers when shredded Variants are 
present
+    this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
     this.hadoopConf = hadoopConf;
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
   }
 
+  /**
+   * Generates a shredded schema from the given structType and hoodieSchema.
+   * <p>
+   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded 
struct schema.
+   *
+   * @param structType The original Spark StructType
+   * @param hoodieSchema The HoodieSchema containing shredding information
+   * @return A StructType with shredded Variant fields replaced by their 
shredded schemas
+   */
+  private StructType generateShreddedSchema(StructType structType, 
HoodieSchema hoodieSchema) {
+    StructField[] fields = structType.fields();
+    StructField[] shreddedFields = new StructField[fields.length];
+    boolean hasShredding = false;
+
+    for (int i = 0; i < fields.length; i++) {

Review Comment:
   @voonhous :  Checking : Do we have tests for the nested shredded schema ? 



##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java:
##########
@@ -411,6 +411,8 @@ public static DataType convertToDataType(HoodieSchema 
hoodieSchema) {
         return convertRecord(hoodieSchema);
       case UNION:
         return convertUnion(hoodieSchema);
+      case VARIANT:

Review Comment:
   Do we need to handle shredded vs non-shredded here ?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +148,153 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns.
+    // Falls back to the provided schema if no shredded Variant columns are 
present.
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);

Review Comment:
   sanity checking: we call ParquetWriteSupport.setSchema(structType, 
hadoopConf) using the original Spark schema, but later build the Parquet 
MessageType from shreddedSchema. Is it intentional for those two schema views 
to diverge?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to