bvaradar commented on code in PR #18062:
URL: https://github.com/apache/hudi/pull/18062#discussion_r2921881229
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -129,6 +142,16 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
hadoopConf.set("spark.sql.parquet.writeLegacyFormat",
writeLegacyFormatEnabled);
hadoopConf.set("spark.sql.parquet.outputTimestampType",
config.getStringOrDefault(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE));
hadoopConf.set("spark.sql.parquet.fieldId.write.enabled",
config.getStringOrDefault(PARQUET_FIELD_ID_WRITE_ENABLED));
+
+ // Variant shredding configs
+ this.variantWriteShreddingEnabled =
config.getBooleanOrDefault(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED);
+ this.variantForceShreddingSchemaForTest =
config.getString(PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST);
+ hadoopConf.setBoolean("spark.sql.variant.writeShredding.enabled",
variantWriteShreddingEnabled);
+ hadoopConf.setBoolean("spark.sql.variant.allowReadingShredded",
config.getBooleanOrDefault(PARQUET_VARIANT_ALLOW_READING_SHREDDED));
Review Comment:
Question : Is this setting need to be set for all readers ?
##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java:
##########
@@ -168,6 +168,36 @@ public class HoodieStorageConfig extends HoodieConfig {
.withDocumentation("Control whether to write bloom filter or not.
Default true. "
+ "We can set to false in non bloom index cases for CPU resource
saving.");
+ public static final ConfigProperty<Boolean>
PARQUET_VARIANT_WRITE_SHREDDING_ENABLED = ConfigProperty
+ .key("hoodie.parquet.variant.write.shredding.enabled")
+ .defaultValue(true)
+ .sinceVersion("1.1.0")
+ .withDocumentation("Controls whether variant columns are written in
shredded format. "
+ + "When enabled (default), variant columns with shredding
information in the schema will be written "
+ + "in shredded format with typed_value columns. When disabled,
variant columns are always written "
+ + "in unshredded format regardless of the schema. "
+ + "Equivalent to Spark's spark.sql.variant.writeShredding.enabled.");
+
+ public static final ConfigProperty<String>
PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST = ConfigProperty
+ .key("hoodie.parquet.variant.force.shredding.schema.for.test")
Review Comment:
Can a table have multiple variant columns? If so, how does this config set
and use ?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +162,118 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
Review Comment:
Same question as the last PR: The original non-shredded structType is still
being passed to setSchema(...) even though shreddedSchema is what gets passed
to convert(...) in init(). This means the schema stored in Hadoop configuration
doesn't match the actual Parquet message type written to the file footer.
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +162,118 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ * <p>
+ * Shredding behavior is controlled by:
+ * <ul>
+ * <li>{@code hoodie.parquet.variant.write.shredding.enabled} - Master
switch for shredding (default: true).
+ * When false, no shredding happens regardless of schema
configuration.</li>
+ * <li>{@code hoodie.parquet.variant.force.shredding.schema.for.test} -
When set, forces this DDL schema
+ * as the typed_value schema for ALL variant columns, overriding
schema-driven shredding.</li>
+ * </ul>
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
Review Comment:
hmmm. does this process nested types. Maybe a rebase issue ?
##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +162,118 @@ public HoodieRowParquetWriteSupport(Configuration conf,
StructType structType, O
HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
return HoodieSchemaUtils.addMetadataFields(parsedSchema,
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
});
+ // Generate shredded schema if there are shredded Variant columns
+ this.shreddedSchema = generateShreddedSchema(structType, schema);
ParquetWriteSupport.setSchema(structType, hadoopConf);
- this.rootFieldWriters = getFieldWriters(structType, schema);
+ // Use shreddedSchema for creating writers when shredded Variants are
present
+ this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
this.hadoopConf = hadoopConf;
this.bloomFilterWriteSupportOpt =
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
}
+ /**
+ * Generates a shredded schema from the given structType and hoodieSchema.
+ * <p>
+ * For Variant fields that are configured for shredding (based on
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded
struct schema.
+ * <p>
+ * Shredding behavior is controlled by:
+ * <ul>
+ * <li>{@code hoodie.parquet.variant.write.shredding.enabled} - Master
switch for shredding (default: true).
+ * When false, no shredding happens regardless of schema
configuration.</li>
+ * <li>{@code hoodie.parquet.variant.force.shredding.schema.for.test} -
When set, forces this DDL schema
+ * as the typed_value schema for ALL variant columns, overriding
schema-driven shredding.</li>
+ * </ul>
+ *
+ * @param structType The original Spark StructType
+ * @param hoodieSchema The HoodieSchema containing shredding information
+ * @return A StructType with shredded Variant fields replaced by their
shredded schemas
+ */
+ private StructType generateShreddedSchema(StructType structType,
HoodieSchema hoodieSchema) {
+ // If write shredding is disabled, skip shredding entirely
+ if (!variantWriteShreddingEnabled) {
+ return structType;
+ }
+
+ // Parse forced shredding schema if configured
+ StructType forcedShreddingSchema = null;
+ if (variantForceShreddingSchemaForTest != null &&
!variantForceShreddingSchemaForTest.isEmpty()) {
+ forcedShreddingSchema =
StructType.fromDDL(variantForceShreddingSchemaForTest);
+ }
+
+ StructField[] fields = structType.fields();
+ StructField[] shreddedFields = new StructField[fields.length];
+ boolean hasShredding = false;
+
+ for (int i = 0; i < fields.length; i++) {
+ StructField field = fields[i];
+ DataType dataType = field.dataType();
+
+ // Check if this is a Variant field that should be shredded
+ if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType))
{
+ // If a forced shredding schema is configured, use it for all variant
columns
Review Comment:
// If a forced shredding schema is configured, use it for all
variant columns
-- How will this work for variant columns in the same table with different
schema ?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]