the-other-tim-brown commented on code in PR #18036:
URL: https://github.com/apache/hudi/pull/18036#discussion_r2742338042


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -140,21 +147,119 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);
-    this.rootFieldWriters = getFieldWriters(structType, schema);
+    // Use shreddedSchema for creating writers when shredded Variants are 
present
+    this.rootFieldWriters = getFieldWriters(shreddedSchema, structType, 
schema);
     this.hadoopConf = hadoopConf;
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
   }
 
+  /**
+   * Generates a shredded schema from the given structType and hoodieSchema.
+   * <p>
+   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded 
struct schema.
+   *
+   * @param structType The original Spark StructType
+   * @param hoodieSchema The HoodieSchema containing shredding information
+   * @return A StructType with shredded Variant fields replaced by their 
shredded schemas
+   */
+  private StructType generateShreddedSchema(StructType structType, 
HoodieSchema hoodieSchema) {
+    StructField[] fields = structType.fields();
+    StructField[] shreddedFields = new StructField[fields.length];
+    boolean hasShredding = false;
+
+    for (int i = 0; i < fields.length; i++) {
+      StructField field = fields[i];
+      DataType dataType = field.dataType();
+
+      // Check if this is a Variant field that should be shredded
+      if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) 
{
+        HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
+            .flatMap(s -> s.getField(field.name()))
+            .map(f -> f.schema())
+            .orElse(null);
+
+        if (fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) {
+          HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) 
fieldHoodieSchema;
+          if (variantSchema.isShredded() && 
variantSchema.getTypedValueField().isPresent()) {
+            // Use plain types for SparkShreddingUtils (unwraps nested {value, 
typed_value} structs if present)
+            HoodieSchema typedValueSchema = 
variantSchema.getPlainTypedValueSchema().get();
+            DataType typedValueDataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(typedValueSchema);
+
+            // Generate the shredding schema using SparkAdapter
+            StructType shreddedStruct = 
SparkAdapterSupport$.MODULE$.sparkAdapter()
+                .generateVariantShreddingSchema(typedValueDataType, true, 
false);
+
+            // Add metadata to mark this as a shredding struct
+            StructType markedShreddedStruct = 
SparkAdapterSupport$.MODULE$.sparkAdapter()
+                .addVariantWriteShreddingMetadata(shreddedStruct);

Review Comment:
   Could we combine these?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -140,21 +147,119 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);
-    this.rootFieldWriters = getFieldWriters(structType, schema);
+    // Use shreddedSchema for creating writers when shredded Variants are 
present
+    this.rootFieldWriters = getFieldWriters(shreddedSchema, structType, 
schema);
     this.hadoopConf = hadoopConf;
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
   }
 
+  /**
+   * Generates a shredded schema from the given structType and hoodieSchema.
+   * <p>
+   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded 
struct schema.
+   *
+   * @param structType The original Spark StructType
+   * @param hoodieSchema The HoodieSchema containing shredding information
+   * @return A StructType with shredded Variant fields replaced by their 
shredded schemas
+   */
+  private StructType generateShreddedSchema(StructType structType, 
HoodieSchema hoodieSchema) {
+    StructField[] fields = structType.fields();
+    StructField[] shreddedFields = new StructField[fields.length];
+    boolean hasShredding = false;
+
+    for (int i = 0; i < fields.length; i++) {
+      StructField field = fields[i];
+      DataType dataType = field.dataType();
+
+      // Check if this is a Variant field that should be shredded
+      if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) 
{
+        HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
+            .flatMap(s -> s.getField(field.name()))
+            .map(f -> f.schema())
+            .orElse(null);
+
+        if (fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) {
+          HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) 
fieldHoodieSchema;
+          if (variantSchema.isShredded() && 
variantSchema.getTypedValueField().isPresent()) {
+            // Use plain types for SparkShreddingUtils (unwraps nested {value, 
typed_value} structs if present)
+            HoodieSchema typedValueSchema = 
variantSchema.getPlainTypedValueSchema().get();
+            DataType typedValueDataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(typedValueSchema);
+
+            // Generate the shredding schema using SparkAdapter
+            StructType shreddedStruct = 
SparkAdapterSupport$.MODULE$.sparkAdapter()
+                .generateVariantShreddingSchema(typedValueDataType, true, 
false);
+
+            // Add metadata to mark this as a shredding struct
+            StructType markedShreddedStruct = 
SparkAdapterSupport$.MODULE$.sparkAdapter()
+                .addVariantWriteShreddingMetadata(shreddedStruct);
+
+            shreddedFields[i] = new StructField(field.name(), 
markedShreddedStruct, field.nullable(), field.metadata());
+            hasShredding = true;
+            continue;
+          }
+        }
+      }
+
+      // Not a shredded Variant, keep the original field
+      shreddedFields[i] = field;
+    }
+
+    return hasShredding ? new StructType(shreddedFields) : structType;
+  }
+
+  /**
+   * Creates field writers for each field in the schema.
+   * Convenience method for nested structs where shredding doesn't apply.
+   *
+   * @param schema The schema for both writing and data access
+   * @param hoodieSchema The HoodieSchema for type information
+   * @return Array of ValueWriters for each field
+   */
   private ValueWriter[] getFieldWriters(StructType schema, HoodieSchema 
hoodieSchema) {
-    return Arrays.stream(schema.fields()).map(field -> {
+    return getFieldWriters(schema, schema, hoodieSchema);
+  }
+
+  /**
+   * Creates field writers for each field in the schema.
+   * <p>
+   * When shredding is enabled, shreddedSchema contains the shredded struct 
types, while originalSchema contains the original VariantType for data access.
+   *
+   * @param shreddedSchema The schema with shredded Variant fields (may be 
same as originalSchema)
+   * @param originalSchema The original schema for accessing data from 
InternalRow
+   * @param hoodieSchema The HoodieSchema for type information
+   * @return Array of ValueWriters for each field
+   */
+  private ValueWriter[] getFieldWriters(StructType shreddedSchema, StructType 
originalSchema, HoodieSchema hoodieSchema) {
+    StructField[] shreddedFields = shreddedSchema.fields();
+    StructField[] originalFields = originalSchema.fields();
+    ValueWriter[] writers = new ValueWriter[shreddedFields.length];
+
+    for (int i = 0; i < shreddedFields.length; i++) {
+      StructField shreddedField = shreddedFields[i];
+      StructField originalField = originalFields[i];
+
       HoodieSchema fieldSchema = Option.ofNullable(hoodieSchema)
-          .flatMap(s -> s.getField(field.name()))
+          .flatMap(s -> s.getField(shreddedField.name()))
           // Note: Cannot use HoodieSchemaField::schema method reference due 
to Java 17 compilation ambiguity
           .map(f -> f.schema())
           .orElse(null);
-      return makeWriter(fieldSchema, field.dataType());
-    }).toArray(ValueWriter[]::new);
+
+      // Check if this field is a shredded Variant (shreddedField has 
shredding struct, originalField has VariantType)
+      if (shreddedField.dataType() instanceof StructType
+          && 
SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantShreddingStruct((StructType)
 shreddedField.dataType())
+          && 
SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(originalField.dataType()))
 {

Review Comment:
   Similarly, could this be a single check?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -140,21 +147,119 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);
-    this.rootFieldWriters = getFieldWriters(structType, schema);
+    // Use shreddedSchema for creating writers when shredded Variants are 
present
+    this.rootFieldWriters = getFieldWriters(shreddedSchema, structType, 
schema);
     this.hadoopConf = hadoopConf;
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
   }
 
+  /**
+   * Generates a shredded schema from the given structType and hoodieSchema.
+   * <p>
+   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded 
struct schema.
+   *
+   * @param structType The original Spark StructType
+   * @param hoodieSchema The HoodieSchema containing shredding information
+   * @return A StructType with shredded Variant fields replaced by their 
shredded schemas
+   */
+  private StructType generateShreddedSchema(StructType structType, 
HoodieSchema hoodieSchema) {
+    StructField[] fields = structType.fields();
+    StructField[] shreddedFields = new StructField[fields.length];
+    boolean hasShredding = false;
+
+    for (int i = 0; i < fields.length; i++) {
+      StructField field = fields[i];
+      DataType dataType = field.dataType();
+
+      // Check if this is a Variant field that should be shredded
+      if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) 
{
+        HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
+            .flatMap(s -> s.getField(field.name()))
+            .map(f -> f.schema())
+            .orElse(null);
+
+        if (fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) {
+          HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) 
fieldHoodieSchema;
+          if (variantSchema.isShredded() && 
variantSchema.getTypedValueField().isPresent()) {

Review Comment:
   Do we expect a case where `isShredded` is true but the value field is not 
present?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -140,21 +147,119 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);
-    this.rootFieldWriters = getFieldWriters(structType, schema);
+    // Use shreddedSchema for creating writers when shredded Variants are 
present
+    this.rootFieldWriters = getFieldWriters(shreddedSchema, structType, 
schema);
     this.hadoopConf = hadoopConf;
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
   }
 
+  /**
+   * Generates a shredded schema from the given structType and hoodieSchema.
+   * <p>
+   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded 
struct schema.
+   *
+   * @param structType The original Spark StructType
+   * @param hoodieSchema The HoodieSchema containing shredding information
+   * @return A StructType with shredded Variant fields replaced by their 
shredded schemas
+   */
+  private StructType generateShreddedSchema(StructType structType, 
HoodieSchema hoodieSchema) {
+    StructField[] fields = structType.fields();
+    StructField[] shreddedFields = new StructField[fields.length];
+    boolean hasShredding = false;
+
+    for (int i = 0; i < fields.length; i++) {
+      StructField field = fields[i];
+      DataType dataType = field.dataType();
+
+      // Check if this is a Variant field that should be shredded
+      if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) 
{
+        HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
+            .flatMap(s -> s.getField(field.name()))
+            .map(f -> f.schema())
+            .orElse(null);
+
+        if (fieldHoodieSchema.getType() == HoodieSchemaType.VARIANT) {
+          HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) 
fieldHoodieSchema;
+          if (variantSchema.isShredded() && 
variantSchema.getTypedValueField().isPresent()) {
+            // Use plain types for SparkShreddingUtils (unwraps nested {value, 
typed_value} structs if present)
+            HoodieSchema typedValueSchema = 
variantSchema.getPlainTypedValueSchema().get();
+            DataType typedValueDataType = 
HoodieSchemaConversionUtils.convertHoodieSchemaToStructType(typedValueSchema);
+
+            // Generate the shredding schema using SparkAdapter
+            StructType shreddedStruct = 
SparkAdapterSupport$.MODULE$.sparkAdapter()
+                .generateVariantShreddingSchema(typedValueDataType, true, 
false);
+
+            // Add metadata to mark this as a shredding struct
+            StructType markedShreddedStruct = 
SparkAdapterSupport$.MODULE$.sparkAdapter()
+                .addVariantWriteShreddingMetadata(shreddedStruct);
+
+            shreddedFields[i] = new StructField(field.name(), 
markedShreddedStruct, field.nullable(), field.metadata());
+            hasShredding = true;
+            continue;
+          }
+        }
+      }
+
+      // Not a shredded Variant, keep the original field
+      shreddedFields[i] = field;
+    }
+
+    return hasShredding ? new StructType(shreddedFields) : structType;
+  }
+
+  /**
+   * Creates field writers for each field in the schema.
+   * Convenience method for nested structs where shredding doesn't apply.
+   *
+   * @param schema The schema for both writing and data access
+   * @param hoodieSchema The HoodieSchema for type information
+   * @return Array of ValueWriters for each field
+   */
   private ValueWriter[] getFieldWriters(StructType schema, HoodieSchema 
hoodieSchema) {
-    return Arrays.stream(schema.fields()).map(field -> {
+    return getFieldWriters(schema, schema, hoodieSchema);
+  }
+
+  /**
+   * Creates field writers for each field in the schema.
+   * <p>
+   * When shredding is enabled, shreddedSchema contains the shredded struct 
types, while originalSchema contains the original VariantType for data access.
+   *
+   * @param shreddedSchema The schema with shredded Variant fields (may be 
same as originalSchema)
+   * @param originalSchema The original schema for accessing data from 
InternalRow
+   * @param hoodieSchema The HoodieSchema for type information
+   * @return Array of ValueWriters for each field
+   */
+  private ValueWriter[] getFieldWriters(StructType shreddedSchema, StructType 
originalSchema, HoodieSchema hoodieSchema) {
+    StructField[] shreddedFields = shreddedSchema.fields();
+    StructField[] originalFields = originalSchema.fields();
+    ValueWriter[] writers = new ValueWriter[shreddedFields.length];
+
+    for (int i = 0; i < shreddedFields.length; i++) {
+      StructField shreddedField = shreddedFields[i];
+      StructField originalField = originalFields[i];
+
       HoodieSchema fieldSchema = Option.ofNullable(hoodieSchema)
-          .flatMap(s -> s.getField(field.name()))
+          .flatMap(s -> s.getField(shreddedField.name()))
           // Note: Cannot use HoodieSchemaField::schema method reference due 
to Java 17 compilation ambiguity
           .map(f -> f.schema())
           .orElse(null);
-      return makeWriter(fieldSchema, field.dataType());
-    }).toArray(ValueWriter[]::new);
+
+      // Check if this field is a shredded Variant (shreddedField has 
shredding struct, originalField has VariantType)
+      if (shreddedField.dataType() instanceof StructType

Review Comment:
   Nitpick: can we move this into the `makeWriter` so all the logic for 
creating the writer methods is contained in one place?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to