yihua commented on code in PR #18036:
URL: https://github.com/apache/hudi/pull/18036#discussion_r3035044502
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +147,89 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ StructField[] fields = structType.fields();
+ StructField[] shreddedFields = new StructField[fields.length];
+ boolean hasShredding = false;
+
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ DataType dataType = field.dataType();
+
+ // Check if this is a Variant field that should be shredded
+ if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType))
{
+ HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
Review Comment:
🤖 Good question. Looking at the code, `fieldHoodieSchema` can indeed be null
(via `.orElse(null)` at line 89). When it's null, the `isVariantType` check at
line 92 still passes but the null check on `fieldHoodieSchema` at line 93
causes shredding to be silently skipped. This seems intentional as a graceful
fallback (the `testSchemaMismatchNonVariantSparkType` test exercises this), but
it might be worth adding a log warning when a Variant field has no
corresponding HoodieSchema so misconfigurations don't go unnoticed.
##########
hudi-spark-datasource/hudi-spark4.0.x/src/test/java/org/apache/hudi/io/storage/row/TestHoodieRowParquetWriteSupportVariant.java:
##########
@@ -155,9 +190,71 @@ public void testWriteShreddedVariant() throws IOException {
assertEquals(BINARY, valueField.asPrimitiveType().getPrimitiveTypeName());
assertEquals(OPTIONAL, valueField.getRepetition(), "Shredded variant value
must be OPTIONAL");
- // Verify typed_value is omitted (as implementation skips it)
boolean hasTypedValue = vGroup.getFields().stream().anyMatch(f ->
f.getName().equals("typed_value"));
Review Comment:
🤖 The assertion was changed from `assertFalse` to `assertTrue` but the
failure message still says "should be omitted". If this assertion ever fails,
the message will be misleading. Could you update it to something like
`"typed_value field should be present in shredded variant"`?
##########
hudi-client/hudi-flink-client/src/main/java/org/apache/hudi/util/HoodieSchemaConverter.java:
##########
@@ -554,8 +554,7 @@ private static DataType convertUnion(HoodieSchema schema) {
/**
* Converts a Variant schema to Flink's ROW type.
- * Variant is represented as ROW<`metadata` BYTES, `value` BYTES> in Flink.
- * // TODO: We are only supporting unshredded for now, support shredded in
the future
+ * Variant is represented as ROW<`value` BYTES, `metadata` BYTES> in Flink.
Review Comment:
🤖 The updated comment says `ROW<value BYTES, metadata BYTES>` but the
implementation at line 570-572 creates `ROW<metadata BYTES, value BYTES>`
(metadata first). Looks like the field order was accidentally swapped in the
comment.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -403,6 +557,33 @@ private ValueWriter makeWriter(HoodieSchema schema,
DataType dataType) {
}
}
+ /**
+ * Creates a ValueWriter for a shredded Variant column.
+ * This writer converts a Variant value into its shredded components
(metadata, value, typed_value) and writes them to Parquet.
+ *
+ * @param shreddedStructType The shredded StructType (with shredding
metadata)
+ * @return A ValueWriter that handles shredded Variant writing
+ */
+ private ValueWriter makeShreddedVariantWriter(StructType shreddedStructType)
{
+ // Create writers for the shredded struct fields
+ // The shreddedStructType contains: metadata (binary), value (binary),
typed_value (optional)
+ ValueWriter[] shreddedFieldWriters =
Arrays.stream(shreddedStructType.fields())
Review Comment:
🤖 The field writers here are created with `makeWriter(null,
field.dataType())` — passing `null` for the HoodieSchema. For metadata/value
(BinaryType) that's fine, but for complex `typed_value` sub-fields (e.g.,
timestamps, decimals), won't this lose type metadata like precision? Or does
`SparkShreddingUtils.castShredded` guarantee the output matches Spark's default
type assumptions?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]