This is an automated email from the ASF dual-hosted git repository.

voonhous pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git


The following commit(s) were added to refs/heads/master by this push:
     new b7adeccfbb5e feat(schema): Config path implemented for spark record 
type (#18062)
b7adeccfbb5e is described below

commit b7adeccfbb5ea6af5f41369bd452fdb326a37b04
Author: voonhous <[email protected]>
AuthorDate: Tue Jun 2 20:06:30 2026 +0800

    feat(schema): Config path implemented for spark record type (#18062)
    
    - Address comments
---
 .../storage/row/HoodieRowParquetWriteSupport.java  | 53 ++++++++++++++++++++--
 .../hudi/common/config/HoodieStorageConfig.java    | 45 ++++++++++++++++++
 2 files changed, 94 insertions(+), 4 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index 0c6f25e3e50c..f8204169d56c 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -84,6 +84,9 @@ import scala.Enumeration;
 import scala.Function1;
 
 import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_FIELD_ID_WRITE_ENABLED;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_ALLOW_READING_SHREDDED;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST;
+import static 
org.apache.hudi.common.config.HoodieStorageConfig.PARQUET_VARIANT_WRITE_SHREDDING_ENABLED;
 import static 
org.apache.hudi.config.HoodieWriteConfig.ALLOW_OPERATION_METADATA_FIELD;
 import static org.apache.hudi.config.HoodieWriteConfig.AVRO_SCHEMA_STRING;
 import static org.apache.hudi.config.HoodieWriteConfig.INTERNAL_SCHEMA_STRING;
@@ -116,6 +119,8 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
   private static final String MAP_REPEATED_NAME = "key_value";
   private static final String MAP_KEY_NAME = "key";
   private static final String MAP_VALUE_NAME = "value";
+  private static final String SPARK_VARIANT_WRITE_SHREDDING_ENABLED = 
"spark.sql.variant.writeShredding.enabled";
+  private static final String SPARK_VARIANT_ALLOW_READING_SHREDDED = 
"spark.sql.variant.allowReadingShredded";
 
   private static final String SESSION_LOCAL_TIME_ZONE_KEY = 
"spark.sql.session.timeZone";
   private static final String PARQUET_METADATA_TIME_ZONE_KEY = 
"org.apache.spark.timeZone";
@@ -137,6 +142,8 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
    * For non-shredded cases, this is identical to structType.
    */
   private final StructType shreddedSchema;
+  private final boolean variantWriteShreddingEnabled;
+  private final String variantForceShreddingSchemaForTest;
   private RecordConsumer recordConsumer;
 
   public HoodieRowParquetWriteSupport(Configuration conf, StructType 
structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
@@ -145,6 +152,16 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
     hadoopConf.set("spark.sql.parquet.writeLegacyFormat", 
writeLegacyFormatEnabled);
     hadoopConf.set("spark.sql.parquet.outputTimestampType", 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE));
     hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", 
config.getStringOrDefault(PARQUET_FIELD_ID_WRITE_ENABLED));
+
+    // Variant shredding configs
+    this.variantWriteShreddingEnabled = 
config.getBooleanOrDefault(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED);
+    this.variantForceShreddingSchemaForTest = 
config.getString(PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST);
+    hadoopConf.setBoolean(SPARK_VARIANT_WRITE_SHREDDING_ENABLED, 
variantWriteShreddingEnabled);
+    hadoopConf.setBoolean(SPARK_VARIANT_ALLOW_READING_SHREDDED, 
config.getBooleanOrDefault(PARQUET_VARIANT_ALLOW_READING_SHREDDED));
+    if (variantForceShreddingSchemaForTest != null && 
!variantForceShreddingSchemaForTest.isEmpty()) {
+      hadoopConf.set("spark.sql.variant.forceShreddingSchemaForTest", 
variantForceShreddingSchemaForTest);
+    }
+
     this.writeLegacyListFormat = Boolean.parseBoolean(writeLegacyFormatEnabled)
         || 
Boolean.parseBoolean(config.getStringOrDefault(AvroWriteSupport.WRITE_OLD_LIST_STRUCTURE,
 "false"));
     this.structType = structType;
@@ -168,15 +185,34 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
   /**
    * Generates a shredded schema from the given structType and hoodieSchema.
    * <p>
-   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()),
-   * the VariantType is replaced with a shredded struct schema. This method 
recursively processes
-   * nested struct fields to handle Variant fields at any depth.
+   * For Variant fields that are configured for shredding (based on 
HoodieSchema.Variant.isShredded()), the VariantType is replaced with a shredded 
struct schema.
+   * <p>
+   * Shredding behavior is controlled by:
+   * <ul>
+   *   <li>{@code hoodie.parquet.variant.write.shredding.enabled} - Master 
switch for shredding (default: true).
+   *       When false, no shredding happens regardless of schema 
configuration.</li>
+   *   <li>{@code hoodie.parquet.variant.force.shredding.schema.for.test} - 
When set, forces this DDL schema
+   *       as the typed_value schema for ALL variant columns, overriding 
schema-driven shredding.</li>
+   * </ul>
+   *
+   * This method recursively processes nested struct fields to handle Variant 
fields at any depth.
    *
    * @param structType The original Spark StructType
    * @param hoodieSchema The HoodieSchema containing shredding information
    * @return A StructType with shredded Variant fields replaced by their 
shredded schemas
    */
   private StructType generateShreddedSchema(StructType structType, 
HoodieSchema hoodieSchema) {
+    // If write shredding is disabled, skip shredding entirely
+    if (!variantWriteShreddingEnabled) {
+      return structType;
+    }
+
+    // Parse forced shredding schema if configured
+    StructType forcedShreddingSchema = null;
+    if (variantForceShreddingSchemaForTest != null && 
!variantForceShreddingSchemaForTest.isEmpty()) {
+      forcedShreddingSchema = 
StructType.fromDDL(variantForceShreddingSchemaForTest);
+    }
+
     StructField[] fields = structType.fields();
     StructField[] shreddedFields = new StructField[fields.length];
     boolean hasShredding = false;
@@ -185,6 +221,16 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
       StructField field = fields[i];
       DataType dataType = field.dataType();
 
+      // If a forced shredding schema is configured, use it for all variant 
columns
+      if (forcedShreddingSchema != null
+          && 
SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) {
+        StructType markedShreddedStruct = 
SparkAdapterSupport$.MODULE$.sparkAdapter()
+            .generateVariantWriteShreddingSchema(forcedShreddingSchema, true, 
false);
+        shreddedFields[i] = new StructField(field.name(), 
markedShreddedStruct, field.nullable(), field.metadata());
+        hasShredding = true;
+        continue;
+      }
+
       // Get the HoodieSchema for this field (if available)
       // Use getNonNullType() to unwrap nullable unions (e.g., ["null", 
"string"] -> "string")
       HoodieSchema fieldHoodieSchema = Option.ofNullable(hoodieSchema)
@@ -193,7 +239,6 @@ public class HoodieRowParquetWriteSupport extends 
WriteSupport<InternalRow> {
           .map(HoodieSchemaField::schema)
           .orElse(null);
 
-      // Check if this is a Variant field that should be shredded
       if (SparkAdapterSupport$.MODULE$.sparkAdapter().isVariantType(dataType)) 
{
         if (fieldHoodieSchema != null && fieldHoodieSchema.getType() == 
HoodieSchemaType.VARIANT) {
           HoodieSchema.Variant variantSchema = (HoodieSchema.Variant) 
fieldHoodieSchema;
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index 95947e6092c2..17f2bc6a9b89 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -213,6 +213,36 @@ public class HoodieStorageConfig extends HoodieConfig {
       .withDocumentation("Control whether to write bloom filter or not. 
Default true. "
           + "We can set to false in non bloom index cases for CPU resource 
saving.");
 
+  public static final ConfigProperty<Boolean> 
PARQUET_VARIANT_WRITE_SHREDDING_ENABLED = ConfigProperty
+      .key("hoodie.parquet.variant.write.shredding.enabled")
+      .defaultValue(true)
+      .sinceVersion("1.1.0")
+      .withDocumentation("Controls whether variant columns are written in 
shredded format. "
+          + "When enabled (default), variant columns with shredding 
information in the schema will be written "
+          + "in shredded format with typed_value columns. When disabled, 
variant columns are always written "
+          + "in unshredded format regardless of the schema. "
+          + "Equivalent to Spark's spark.sql.variant.writeShredding.enabled.");
+
+  public static final ConfigProperty<String> 
PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST = ConfigProperty
+      .key("hoodie.parquet.variant.force.shredding.schema.for.test")
+      .noDefaultValue()
+      .markAdvanced()
+      .sinceVersion("1.1.0")
+      .withDocumentation("Forces a specific shredding schema for all variant 
columns, intended for testing. "
+          + "The value should be a DDL-format schema string (e.g., 'a int, b 
string, c decimal(15, 1)'). "
+          + "When set and write shredding is enabled, this schema overrides 
the schema-driven shredding "
+          + "configuration for all variant columns. "
+          + "Equivalent to Spark's 
spark.sql.variant.forceShreddingSchemaForTest.");
+
+  public static final ConfigProperty<Boolean> 
PARQUET_VARIANT_ALLOW_READING_SHREDDED = ConfigProperty
+      .key("hoodie.parquet.variant.allow.reading.shredded")
+      .defaultValue(true)
+      .sinceVersion("1.1.0")
+      .withDocumentation("Controls whether shredded variant data can be read. "
+          + "When enabled (default), the reader will reconstruct variant 
values from shredded components. "
+          + "When disabled, only unshredded variant data can be read. "
+          + "Equivalent to Spark's spark.sql.variant.allowReadingShredded.");
+
   public static final ConfigProperty<Boolean> WRITE_UTC_TIMEZONE = 
ConfigProperty
       .key("hoodie.parquet.write.utc-timezone.enabled")
       .defaultValue(true)
@@ -549,6 +579,21 @@ public class HoodieStorageConfig extends HoodieConfig {
       return this;
     }
 
+    public Builder parquetVariantWriteShreddingEnabled(boolean enabled) {
+      storageConfig.setValue(PARQUET_VARIANT_WRITE_SHREDDING_ENABLED, 
String.valueOf(enabled));
+      return this;
+    }
+
+    public Builder parquetVariantForceShreddingSchemaForTest(String 
schemaString) {
+      storageConfig.setValue(PARQUET_VARIANT_FORCE_SHREDDING_SCHEMA_FOR_TEST, 
schemaString);
+      return this;
+    }
+
+    public Builder parquetVariantAllowReadingShredded(boolean allowed) {
+      storageConfig.setValue(PARQUET_VARIANT_ALLOW_READING_SHREDDED, 
String.valueOf(allowed));
+      return this;
+    }
+
     public Builder hfileCompressionAlgorithm(String hfileCompressionAlgorithm) 
{
       storageConfig.setValue(HFILE_COMPRESSION_ALGORITHM_NAME, 
hfileCompressionAlgorithm);
       return this;

Reply via email to