rahil-c commented on code in PR #18036:
URL: https://github.com/apache/hudi/pull/18036#discussion_r2915064805
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +148,153 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns.
+ // Falls back to the provided schema if no shredded Variant columns are
present.
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()),
+ * the VariantType is replaced with a shredded struct schema. This method
recursively processes
+ * nested struct fields to handle Variant fields at any depth.
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ StructField[] fields = structType.fields();
+ StructField[] shreddedFields = new StructField[fields.length];
+ boolean hasShredding = false;
+
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ DataType dataType = field.dataType();
+
+ // Get the HoodieSchema for this field (if available)
+ // Use getNonNullType() to unwrap nullable unions (e.g., ["null",
"string"] -> "string")
+ HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
+ .map(HoodieSchema::getNonNullType)
+ .flatMap(s -> s.hasFields() ? s.getField(field.name()) :
Option.empty())
+ .map(HoodieSchemaField::schema)
+ .orElse(null);
+
+ // Check if this is a Variant field that should be shredded
+ if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType))
{
+ if (fieldHoodieSchema != null && fieldHoodieSchema.getType() ==
HoodieSchemaType.VARIANT) {
+ HoodieSchema.Variant variantSchema = (HoodieSchema.Variant)
fieldHoodieSchema;
+ // If typed_value field exists, the variant is shredded
+ if (variantSchema.getTypedValueField().isPresent()) {
+ // Use plain types for SparkShreddingUtils (unwraps nested {value,
typed_value} structs if present)
+ HoodieSchema typedValueSchema =
variantSchema.getPlainTypedValueSchema().get();
+ DataType typedValueDataType =
HoodieSchemaConversionUtils.convertHoodieSchemaToDataType(typedValueSchema);
+
+ // Generate the shredding schema with write metadata using
SparkAdapter
+ StructType markedShreddedStruct =
SparkAdapterSupport$.MODULE$.sparkAdapter()
+ .generateVariantWriteShreddingSchema(typedValueDataType, true,
false);
Review Comment:
Is it always safe to pass `isTopLevel` as true and `isObjectField` as false?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]