bvaradar commented on code in PR #18062:
URL: https://github.com/apache/hudi/pull/18062#discussion_r2921881229


##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -129,6 +142,16 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
     hadoopConf.set("spark.sql.parquet.writeLegacyFormat", 
writeLegacyFormatEnabled);
     hadoopConf.set("spark.sql.parquet.outputTimestampType", 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE));
     hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", 
config.getStringOrDefault(PARQUET_FIELD_ID_WRITE_ENABLED));
+
+    // Variant shredding configs
+    this.variantWriteShreddingEnabled = 
config.getBooleanOrDefault(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED);
+    this.variantForceShreddingSchemaForTest = 
config.getString(PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST);
+    hadoopConf.setBoolean("spark.sql.variant.writeShredding.enabled", 
variantWriteShreddingEnabled);
+    hadoopConf.setBoolean("spark.sql.variant.allowReadingShredded", 
config.getBooleanOrDefault(PARQUET_VARIANT_ALLOW_READING_SHREDDED));

Review Comment:
   Question : Is this setting need to be set for all readers ? 



##########
hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java:
##########
@@ -168,6 +168,36 @@ public class HoodieStorageConfig extends HoodieConfig {
       .withDocumentation("Control whether to write bloom filter or not. 
Default true. "
           + "We can set to false in non bloom index cases for CPU resource 
saving.");
 
+  public static final ConfigProperty<Boolean> 
PARQUET_VARIANT_WRITE_SHREDDING_ENABLED = ConfigProperty
+      .key("hoodie.parquet.variant.write.shredding.enabled")
+      .defaultValue(true)
+      .sinceVersion("1.1.0")
+      .withDocumentation("Controls whether variant columns are written in 
shredded format. "
+          + "When enabled (default), variant columns with shredding 
information in the schema will be written "
+          + "in shredded format with typed_value columns. When disabled, 
variant columns are always written "
+          + "in unshredded format regardless of the schema. "
+          + "Equivalent to Spark's spark.sql.variant.writeShredding.enabled.");
+
+  public static final ConfigProperty<String> 
PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST = ConfigProperty
+      .key("hoodie.parquet.variant.force.shredding.schema.for.test")

Review Comment:
   Can a table have multiple variant columns? If so, how does this config set 
and use ?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +162,118 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);

Review Comment:
   Same question as the last PR: The original non-shredded structType is still 
being passed to setSchema(...) even though shreddedSchema is what gets passed 
to convert(...) in init(). This means the schema stored in Hadoop configuration 
doesn't match the actual Parquet message type written to the file footer.



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +162,118 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);
-    this.rootFieldWriters = getFieldWriters(structType, schema);
+    // Use shreddedSchema for creating writers when shredded Variants are 
present
+    this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
     this.hadoopConf = hadoopConf;
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
   }
 
+  /**
+   * Generates a shredded schema from the given structType and hoodieSchema.
+   * <p>
+   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded 
struct schema.
+   * <p>
+   * Shredding behavior is controlled by:
+   * <ul>
+   *   <li>{@code hoodie.parquet.variant.write.shredding.enabled} - Master 
switch for shredding (default: true).
+   *       When false, no shredding happens regardless of schema 
configuration.</li>
+   *   <li>{@code hoodie.parquet.variant.force.shredding.schema.for.test} - 
When set, forces this DDL schema
+   *       as the typed_value schema for ALL variant columns, overriding 
schema-driven shredding.</li>
+   * </ul>
+   *
+   * @param structType The original Spark StructType
+   * @param hoodieSchema The HoodieSchema containing shredding information
+   * @return A StructType with shredded Variant fields replaced by their 
shredded schemas
+   */
+  private StructType generateShreddedSchema(StructType structType, 
HoodieSchema hoodieSchema) {

Review Comment:
   hmmm. does this process nested types.  Maybe a rebase issue ?



##########
hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java:
##########
@@ -139,21 +162,118 @@ public HoodieRowParquetWriteSupport(Configuration conf, 
StructType structType, O
           HoodieSchema parsedSchema = HoodieSchema.parse(schemaString);
           return HoodieSchemaUtils.addMetadataFields(parsedSchema, 
config.getBooleanOrDefault(ALLOW_OPERATION_METADATA_FIELD));
         });
+    // Generate shredded schema if there are shredded Variant columns
+    this.shreddedSchema = generateShreddedSchema(structType, schema);
     ParquetWriteSupport.setSchema(structType, hadoopConf);
-    this.rootFieldWriters = getFieldWriters(structType, schema);
+    // Use shreddedSchema for creating writers when shredded Variants are 
present
+    this.rootFieldWriters = getFieldWriters(shreddedSchema, schema);
     this.hadoopConf = hadoopConf;
     this.bloomFilterWriteSupportOpt = 
bloomFilterOpt.map(HoodieBloomFilterRowWriteSupport::new);
   }
 
+  /**
+   * Generates a shredded schema from the given structType and hoodieSchema.
+   * <p>
+   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded 
struct schema.
+   * <p>
+   * Shredding behavior is controlled by:
+   * <ul>
+   *   <li>{@code hoodie.parquet.variant.write.shredding.enabled} - Master 
switch for shredding (default: true).
+   *       When false, no shredding happens regardless of schema 
configuration.</li>
+   *   <li>{@code hoodie.parquet.variant.force.shredding.schema.for.test} - 
When set, forces this DDL schema
+   *       as the typed_value schema for ALL variant columns, overriding 
schema-driven shredding.</li>
+   * </ul>
+   *
+   * @param structType The original Spark StructType
+   * @param hoodieSchema The HoodieSchema containing shredding information
+   * @return A StructType with shredded Variant fields replaced by their 
shredded schemas
+   */
+  private StructType generateShreddedSchema(StructType structType, 
HoodieSchema hoodieSchema) {
+    // If write shredding is disabled, skip shredding entirely
+    if (!variantWriteShreddingEnabled) {
+      return structType;
+    }
+
+    // Parse forced shredding schema if configured
+    StructType forcedShreddingSchema = null;
+    if (variantForceShreddingSchemaForTest != null && 
!variantForceShreddingSchemaForTest.isEmpty()) {
+      forcedShreddingSchema = 
StructType.fromDDL(variantForceShreddingSchemaForTest);
+    }
+
+    StructField[] fields = structType.fields();
+    StructField[] shreddedFields = new StructField[fields.length];
+    boolean hasShredding = false;
+
+    for (int i = 0; i < fields.length; i++) {
+      StructField field = fields[i];
+      DataType dataType = field.dataType();
+
+      // Check if this is a Variant field that should be shredded
+      if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) 
{
+        // If a forced shredding schema is configured, use it for all variant 
columns

Review Comment:
           // If a forced shredding schema is configured, use it for all 
variant columns
   -- How will this work for variant columns in the same table with different 
schema ?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to