voonhous commented on code in PR #18062:
URL: https://github.com/apache/hudi/pull/18062#discussion_r2945414225
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +162,118 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ * <p>
+ * Shredding behavior is controlled by:
+ * <ul>
+ * <li>{@code hoodie.parquet.variant.write.shredding.enabled} - Master
switch for shredding (default: true).
+ * When false, no shredding happens regardless of schema
configuration.</li>
+ * <li>{@code hoodie.parquet.variant.force.shredding.schema.for.test} -
When set, forces this DDL schema
+ * as the typed_value schema for ALL variant columns, overriding
schema-driven shredding.</li>
+ * </ul>
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ // If write shredding is disabled, skip shredding entirely
+ if (!variantWriteShreddingEnabled) {
+ return structType;
+ }
+
+ // Parse forced shredding schema if configured
+ StructType forcedShreddingSchema = null;
+ if (variantForceShreddingSchemaForTest != null &&
!variantForceShreddingSchemaForTest.isEmpty()) {
+ forcedShreddingSchema =
StructType.fromDDL(variantForceShreddingSchemaForTest);
+ }
+
+ StructField[] fields = structType.fields();
+ StructField[] shreddedFields = new StructField[fields.length];
+ boolean hasShredding = false;
+
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ DataType dataType = field.dataType();
+
+ // Check if this is a Variant field that should be shredded
+ if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType))
{
+ // If a forced shredding schema is configured, use it for all variant
columns
Review Comment:
Fields that are not found will just be skipped. For example, if we define
force shredding on variant for `field_a` and `field_a` is not found, it'll just
skip it, which is expected behaviour.
This is so as it is in the nature of unstructured data where not all batches
of the same variant column will have the same schema. Some might have `field_a`
some might not have `field_a`. So, if `field_a` is defined in force shredding
but not found in the data, it will just skip it.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +162,118 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ * <p>
+ * Shredding behavior is controlled by:
+ * <ul>
+ * <li>{@code hoodie.parquet.variant.write.shredding.enabled} - Master
switch for shredding (default: true).
+ * When false, no shredding happens regardless of schema
configuration.</li>
+ * <li>{@code hoodie.parquet.variant.force.shredding.schema.for.test} -
When set, forces this DDL schema
+ * as the typed_value schema for ALL variant columns, overriding
schema-driven shredding.</li>
+ * </ul>
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ // If write shredding is disabled, skip shredding entirely
+ if (!variantWriteShreddingEnabled) {
+ return structType;
+ }
+
+ // Parse forced shredding schema if configured
+ StructType forcedShreddingSchema = null;
+ if (variantForceShreddingSchemaForTest != null &&
!variantForceShreddingSchemaForTest.isEmpty()) {
+ forcedShreddingSchema =
StructType.fromDDL(variantForceShreddingSchemaForTest);
+ }
+
+ StructField[] fields = structType.fields();
+ StructField[] shreddedFields = new StructField[fields.length];
+ boolean hasShredding = false;
+
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ DataType dataType = field.dataType();
+
+ // Check if this is a Variant field that should be shredded
+ if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType))
{
+ // If a forced shredding schema is configured, use it for all variant
columns
Review Comment:
Fields that are not found will just be skipped. For example, if we define
force shredding on variant for `field_a` and `field_a` is not found, it'll just
skip it, which is expected behaviour.
This is so as it is in the nature of unstructured data where not all batches
of the same variant column will have the same schema. Some might have `field_a`
some might not have `field_a`. So, if `field_a` is defined in force shredding
but not found in the data, it will just skip it, no error is thrown.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]