This is an automated email from the ASF dual-hosted git repository.

yihua pushed a commit to branch branch-0.x
in repository https://gitbox.apache.org/repos/asf/hudi.git

commit fb456d51316a02cca2b565b71c3f64c95576484f
Author: Jon Vexler <[email protected]>
AuthorDate: Tue May 14 13:02:36 2024 -0700

    [HUDI-7385] Add config for custom write support for parquet row writer 
(#10598)
    
    Co-authored-by: Jonathan Vexler <=>
---
 .../hudi/io/storage/HoodieSparkFileWriterFactory.java | 19 +++++++++++--------
 .../row/HoodieInternalRowFileWriterFactory.java       |  4 ++--
 .../io/storage/row/HoodieRowParquetWriteSupport.java  | 19 +++++++++++++++----
 .../hudi/common/config/HoodieStorageConfig.java       | 11 +++++++++++
 .../hudi/io/storage/HoodieFileWriterFactory.java      |  2 +-
 .../row/TestHoodieInternalRowParquetWriter.java       |  3 ++-
 6 files changed, 42 insertions(+), 16 deletions(-)

diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
index 5feefa3bee2..7091c2b240f 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream;
 import org.apache.hadoop.fs.Path;
 import org.apache.parquet.hadoop.metadata.CompressionCodecName;
 import org.apache.spark.sql.HoodieInternalRowUtils;
+import org.apache.spark.sql.types.StructType;
 
 import java.io.IOException;
 
@@ -44,15 +45,13 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
       String instantTime, Path path, Configuration conf, HoodieConfig config, 
Schema schema,
       TaskContextSupplier taskContextSupplier) throws IOException {
     boolean populateMetaFields = 
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
-    Option<BloomFilter> filter = enableBloomFilter(populateMetaFields, config) 
? Option.of(createBloomFilter(config)) : Option.empty();
     String compressionCodecName = 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
     // Support PARQUET_COMPRESSION_CODEC_NAME is ""
     if (compressionCodecName.isEmpty()) {
       compressionCodecName = null;
     }
-    HoodieRowParquetWriteSupport writeSupport = new 
HoodieRowParquetWriteSupport(conf,
-        HoodieInternalRowUtils.getCachedSchema(schema), filter,
-        
HoodieStorageConfig.newBuilder().fromProperties(config.getProps()).build());
+    HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(conf, schema,
+        config, enableBloomFilter(populateMetaFields, config));
     HoodieRowParquetConfig parquetConfig = new 
HoodieRowParquetConfig(writeSupport,
         CompressionCodecName.fromConf(compressionCodecName),
         config.getIntOrDefault(HoodieStorageConfig.PARQUET_BLOCK_SIZE),
@@ -69,10 +68,7 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
   protected HoodieFileWriter newParquetFileWriter(
       FSDataOutputStream outputStream, Configuration conf, HoodieConfig 
config, Schema schema) throws IOException {
     boolean enableBloomFilter = false;
-    Option<BloomFilter> filter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
-    HoodieRowParquetWriteSupport writeSupport = new 
HoodieRowParquetWriteSupport(conf,
-        HoodieInternalRowUtils.getCachedSchema(schema), filter,
-        
HoodieStorageConfig.newBuilder().fromProperties(config.getProps()).build());
+    HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(conf, schema, config, enableBloomFilter);
     String compressionCodecName = 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
     // Support PARQUET_COMPRESSION_CODEC_NAME is ""
     if (compressionCodecName.isEmpty()) {
@@ -100,4 +96,11 @@ public class HoodieSparkFileWriterFactory extends 
HoodieFileWriterFactory {
       TaskContextSupplier taskContextSupplier) throws IOException {
     throw new HoodieIOException("Not support write to Orc file");
   }
+
+  private static HoodieRowParquetWriteSupport 
getHoodieRowParquetWriteSupport(Configuration conf, Schema schema,
+                                                                              
HoodieConfig config, boolean enableBloomFilter) {
+    Option<BloomFilter> filter = enableBloomFilter ? 
Option.of(createBloomFilter(config)) : Option.empty();
+    StructType structType = HoodieInternalRowUtils.getCachedSchema(schema);
+    return HoodieRowParquetWriteSupport.getHoodieRowParquetWriteSupport(conf, 
structType, filter, config);
+  }
 }
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
index 8a61c7c44d9..ad362d17014 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java
@@ -66,8 +66,8 @@ public class HoodieInternalRowFileWriterFactory {
                                                                              
Option<BloomFilter> bloomFilterOpt
   )
       throws IOException {
-    HoodieRowParquetWriteSupport writeSupport =
-            new HoodieRowParquetWriteSupport(table.getHadoopConf(), 
structType, bloomFilterOpt, writeConfig.getStorageConfig());
+    HoodieRowParquetWriteSupport writeSupport = HoodieRowParquetWriteSupport
+        .getHoodieRowParquetWriteSupport(table.getHadoopConf(), structType, 
bloomFilterOpt, writeConfig);
 
     return new HoodieInternalRowParquetWriter(
         path,
diff --git 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
index 3a1b6d000be..99102c30922 100644
--- 
a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
+++ 
b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java
@@ -21,8 +21,11 @@ package org.apache.hudi.io.storage.row;
 import org.apache.hadoop.conf.Configuration;
 import org.apache.hudi.avro.HoodieBloomFilterWriteSupport;
 import org.apache.hudi.common.bloom.BloomFilter;
+import org.apache.hudi.common.config.HoodieConfig;
 import org.apache.hudi.common.config.HoodieStorageConfig;
 import org.apache.hudi.common.util.Option;
+import org.apache.hudi.common.util.ReflectionUtils;
+
 import org.apache.parquet.hadoop.api.WriteSupport;
 import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport;
 import org.apache.spark.sql.types.StructType;
@@ -41,11 +44,11 @@ public class HoodieRowParquetWriteSupport extends 
ParquetWriteSupport {
   private final Configuration hadoopConf;
   private final Option<HoodieBloomFilterWriteSupport<UTF8String>> 
bloomFilterWriteSupportOpt;
 
-  public HoodieRowParquetWriteSupport(Configuration conf, StructType 
structType, Option<BloomFilter> bloomFilterOpt, HoodieStorageConfig config) {
+  public HoodieRowParquetWriteSupport(Configuration conf, StructType 
structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
     Configuration hadoopConf = new Configuration(conf);
-    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", 
config.getString(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED));
-    hadoopConf.set("spark.sql.parquet.outputTimestampType", 
config.getString(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE));
-    hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", 
config.getString(PARQUET_FIELD_ID_WRITE_ENABLED));
+    hadoopConf.set("spark.sql.parquet.writeLegacyFormat", 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED));
+    hadoopConf.set("spark.sql.parquet.outputTimestampType", 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE));
+    hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", 
config.getStringOrDefault(PARQUET_FIELD_ID_WRITE_ENABLED));
     setSchema(structType, hadoopConf);
 
     this.hadoopConf = hadoopConf;
@@ -89,4 +92,12 @@ public class HoodieRowParquetWriteSupport extends 
ParquetWriteSupport {
     }
   }
 
+  public static HoodieRowParquetWriteSupport 
getHoodieRowParquetWriteSupport(Configuration conf, StructType structType,
+                                                                             
Option<BloomFilter> bloomFilterOpt, HoodieConfig config) {
+    return (HoodieRowParquetWriteSupport) ReflectionUtils.loadClass(
+        
config.getStringOrDefault(HoodieStorageConfig.HOODIE_PARQUET_SPARK_ROW_WRITE_SUPPORT_CLASS),
+        new Class<?>[] {Configuration.class, StructType.class, Option.class, 
HoodieConfig.class},
+        conf, structType, bloomFilterOpt, config);
+  }
+
 }
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
index d68b8326ca8..f3ad183def4 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java
@@ -226,6 +226,17 @@ public class HoodieStorageConfig extends HoodieConfig {
           + "and it is loaded at runtime. This is only required when trying to 
"
           + "override the existing write context.");
 
+  public static final ConfigProperty<String> 
HOODIE_PARQUET_SPARK_ROW_WRITE_SUPPORT_CLASS = ConfigProperty
+      .key("hoodie.parquet.spark.row.write.support.class")
+      
.defaultValue("org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport")
+      .markAdvanced()
+      .sinceVersion("0.15.0")
+      .withDocumentation("Provided write support class should extend 
HoodieRowParquetWriteSupport class "
+          + "and it is loaded at runtime. This is only required when trying to 
"
+          + "override the existing write context when 
`hoodie.datasource.write.row.writer.enable=true`.");
+
+
+
   /**
    * @deprecated Use {@link #PARQUET_MAX_FILE_SIZE} and its methods instead
    */
diff --git 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
index 3c521441b1a..2594ee0e105 100644
--- 
a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
+++ 
b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java
@@ -121,7 +121,7 @@ public class HoodieFileWriterFactory {
     throw new UnsupportedOperationException();
   }
 
-  protected BloomFilter createBloomFilter(HoodieConfig config) {
+  protected static BloomFilter createBloomFilter(HoodieConfig config) {
     return BloomFilterFactory.createBloomFilter(
         
config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE),
         config.getDoubleOrDefault(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE),
diff --git 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
index fb455926312..0e4dc22b8ce 100644
--- 
a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
+++ 
b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java
@@ -131,6 +131,7 @@ public class TestHoodieInternalRowParquetWriter extends 
HoodieSparkClientTestHar
         writeConfig.getBloomFilterFPP(),
         writeConfig.getDynamicBloomFilterMaxNumEntries(),
         writeConfig.getBloomFilterType());
-    return new HoodieRowParquetWriteSupport(hadoopConf, 
SparkDatasetTestUtils.STRUCT_TYPE, Option.of(filter), 
writeConfig.getStorageConfig());
+    return 
HoodieRowParquetWriteSupport.getHoodieRowParquetWriteSupport(hadoopConf,
+        SparkDatasetTestUtils.STRUCT_TYPE, Option.of(filter), writeConfig);
   }
 }

Reply via email to