This is an automated email from the ASF dual-hosted git repository. yihua pushed a commit to branch branch-0.x in repository https://gitbox.apache.org/repos/asf/hudi.git
commit fb456d51316a02cca2b565b71c3f64c95576484f Author: Jon Vexler <[email protected]> AuthorDate: Tue May 14 13:02:36 2024 -0700 [HUDI-7385] Add config for custom write support for parquet row writer (#10598) Co-authored-by: Jonathan Vexler <=> --- .../hudi/io/storage/HoodieSparkFileWriterFactory.java | 19 +++++++++++-------- .../row/HoodieInternalRowFileWriterFactory.java | 4 ++-- .../io/storage/row/HoodieRowParquetWriteSupport.java | 19 +++++++++++++++---- .../hudi/common/config/HoodieStorageConfig.java | 11 +++++++++++ .../hudi/io/storage/HoodieFileWriterFactory.java | 2 +- .../row/TestHoodieInternalRowParquetWriter.java | 3 ++- 6 files changed, 42 insertions(+), 16 deletions(-) diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java index 5feefa3bee2..7091c2b240f 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/HoodieSparkFileWriterFactory.java @@ -34,6 +34,7 @@ import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.parquet.hadoop.metadata.CompressionCodecName; import org.apache.spark.sql.HoodieInternalRowUtils; +import org.apache.spark.sql.types.StructType; import java.io.IOException; @@ -44,15 +45,13 @@ public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { String instantTime, Path path, Configuration conf, HoodieConfig config, Schema schema, TaskContextSupplier taskContextSupplier) throws IOException { boolean populateMetaFields = config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS); - Option<BloomFilter> filter = enableBloomFilter(populateMetaFields, config) ? Option.of(createBloomFilter(config)) : Option.empty(); String compressionCodecName = config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); // Support PARQUET_COMPRESSION_CODEC_NAME is "" if (compressionCodecName.isEmpty()) { compressionCodecName = null; } - HoodieRowParquetWriteSupport writeSupport = new HoodieRowParquetWriteSupport(conf, - HoodieInternalRowUtils.getCachedSchema(schema), filter, - HoodieStorageConfig.newBuilder().fromProperties(config.getProps()).build()); + HoodieRowParquetWriteSupport writeSupport = getHoodieRowParquetWriteSupport(conf, schema, + config, enableBloomFilter(populateMetaFields, config)); HoodieRowParquetConfig parquetConfig = new HoodieRowParquetConfig(writeSupport, CompressionCodecName.fromConf(compressionCodecName), config.getIntOrDefault(HoodieStorageConfig.PARQUET_BLOCK_SIZE), @@ -69,10 +68,7 @@ public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { protected HoodieFileWriter newParquetFileWriter( FSDataOutputStream outputStream, Configuration conf, HoodieConfig config, Schema schema) throws IOException { boolean enableBloomFilter = false; - Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty(); - HoodieRowParquetWriteSupport writeSupport = new HoodieRowParquetWriteSupport(conf, - HoodieInternalRowUtils.getCachedSchema(schema), filter, - HoodieStorageConfig.newBuilder().fromProperties(config.getProps()).build()); + HoodieRowParquetWriteSupport writeSupport = getHoodieRowParquetWriteSupport(conf, schema, config, enableBloomFilter); String compressionCodecName = config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME); // Support PARQUET_COMPRESSION_CODEC_NAME is "" if (compressionCodecName.isEmpty()) { @@ -100,4 +96,11 @@ public class HoodieSparkFileWriterFactory extends HoodieFileWriterFactory { TaskContextSupplier taskContextSupplier) throws IOException { throw new HoodieIOException("Not support write to Orc file"); } + + private static HoodieRowParquetWriteSupport getHoodieRowParquetWriteSupport(Configuration conf, Schema schema, + HoodieConfig config, boolean enableBloomFilter) { + Option<BloomFilter> filter = enableBloomFilter ? Option.of(createBloomFilter(config)) : Option.empty(); + StructType structType = HoodieInternalRowUtils.getCachedSchema(schema); + return HoodieRowParquetWriteSupport.getHoodieRowParquetWriteSupport(conf, structType, filter, config); + } } diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java index 8a61c7c44d9..ad362d17014 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieInternalRowFileWriterFactory.java @@ -66,8 +66,8 @@ public class HoodieInternalRowFileWriterFactory { Option<BloomFilter> bloomFilterOpt ) throws IOException { - HoodieRowParquetWriteSupport writeSupport = - new HoodieRowParquetWriteSupport(table.getHadoopConf(), structType, bloomFilterOpt, writeConfig.getStorageConfig()); + HoodieRowParquetWriteSupport writeSupport = HoodieRowParquetWriteSupport + .getHoodieRowParquetWriteSupport(table.getHadoopConf(), structType, bloomFilterOpt, writeConfig); return new HoodieInternalRowParquetWriter( path, diff --git a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java index 3a1b6d000be..99102c30922 100644 --- a/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java +++ b/hudi-client/hudi-spark-client/src/main/java/org/apache/hudi/io/storage/row/HoodieRowParquetWriteSupport.java @@ -21,8 +21,11 @@ package org.apache.hudi.io.storage.row; import org.apache.hadoop.conf.Configuration; import org.apache.hudi.avro.HoodieBloomFilterWriteSupport; import org.apache.hudi.common.bloom.BloomFilter; +import org.apache.hudi.common.config.HoodieConfig; import org.apache.hudi.common.config.HoodieStorageConfig; import org.apache.hudi.common.util.Option; +import org.apache.hudi.common.util.ReflectionUtils; + import org.apache.parquet.hadoop.api.WriteSupport; import org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport; import org.apache.spark.sql.types.StructType; @@ -41,11 +44,11 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport { private final Configuration hadoopConf; private final Option<HoodieBloomFilterWriteSupport<UTF8String>> bloomFilterWriteSupportOpt; - public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieStorageConfig config) { + public HoodieRowParquetWriteSupport(Configuration conf, StructType structType, Option<BloomFilter> bloomFilterOpt, HoodieConfig config) { Configuration hadoopConf = new Configuration(conf); - hadoopConf.set("spark.sql.parquet.writeLegacyFormat", config.getString(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED)); - hadoopConf.set("spark.sql.parquet.outputTimestampType", config.getString(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE)); - hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", config.getString(PARQUET_FIELD_ID_WRITE_ENABLED)); + hadoopConf.set("spark.sql.parquet.writeLegacyFormat", config.getStringOrDefault(HoodieStorageConfig.PARQUET_WRITE_LEGACY_FORMAT_ENABLED)); + hadoopConf.set("spark.sql.parquet.outputTimestampType", config.getStringOrDefault(HoodieStorageConfig.PARQUET_OUTPUT_TIMESTAMP_TYPE)); + hadoopConf.set("spark.sql.parquet.fieldId.write.enabled", config.getStringOrDefault(PARQUET_FIELD_ID_WRITE_ENABLED)); setSchema(structType, hadoopConf); this.hadoopConf = hadoopConf; @@ -89,4 +92,12 @@ public class HoodieRowParquetWriteSupport extends ParquetWriteSupport { } } + public static HoodieRowParquetWriteSupport getHoodieRowParquetWriteSupport(Configuration conf, StructType structType, + Option<BloomFilter> bloomFilterOpt, HoodieConfig config) { + return (HoodieRowParquetWriteSupport) ReflectionUtils.loadClass( + config.getStringOrDefault(HoodieStorageConfig.HOODIE_PARQUET_SPARK_ROW_WRITE_SUPPORT_CLASS), + new Class<?>[] {Configuration.class, StructType.class, Option.class, HoodieConfig.class}, + conf, structType, bloomFilterOpt, config); + } + } diff --git a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java index d68b8326ca8..f3ad183def4 100644 --- a/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java +++ b/hudi-common/src/main/java/org/apache/hudi/common/config/HoodieStorageConfig.java @@ -226,6 +226,17 @@ public class HoodieStorageConfig extends HoodieConfig { + "and it is loaded at runtime. This is only required when trying to " + "override the existing write context."); + public static final ConfigProperty<String> HOODIE_PARQUET_SPARK_ROW_WRITE_SUPPORT_CLASS = ConfigProperty + .key("hoodie.parquet.spark.row.write.support.class") + .defaultValue("org.apache.hudi.io.storage.row.HoodieRowParquetWriteSupport") + .markAdvanced() + .sinceVersion("0.15.0") + .withDocumentation("Provided write support class should extend HoodieRowParquetWriteSupport class " + + "and it is loaded at runtime. This is only required when trying to " + + "override the existing write context when `hoodie.datasource.write.row.writer.enable=true`."); + + + /** * @deprecated Use {@link #PARQUET_MAX_FILE_SIZE} and its methods instead */ diff --git a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java index 3c521441b1a..2594ee0e105 100644 --- a/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java +++ b/hudi-common/src/main/java/org/apache/hudi/io/storage/HoodieFileWriterFactory.java @@ -121,7 +121,7 @@ public class HoodieFileWriterFactory { throw new UnsupportedOperationException(); } - protected BloomFilter createBloomFilter(HoodieConfig config) { + protected static BloomFilter createBloomFilter(HoodieConfig config) { return BloomFilterFactory.createBloomFilter( config.getIntOrDefault(HoodieStorageConfig.BLOOM_FILTER_NUM_ENTRIES_VALUE), config.getDoubleOrDefault(HoodieStorageConfig.BLOOM_FILTER_FPP_VALUE), diff --git a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java index fb455926312..0e4dc22b8ce 100644 --- a/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java +++ b/hudi-spark-datasource/hudi-spark/src/test/java/org/apache/hudi/io/storage/row/TestHoodieInternalRowParquetWriter.java @@ -131,6 +131,7 @@ public class TestHoodieInternalRowParquetWriter extends HoodieSparkClientTestHar writeConfig.getBloomFilterFPP(), writeConfig.getDynamicBloomFilterMaxNumEntries(), writeConfig.getBloomFilterType()); - return new HoodieRowParquetWriteSupport(hadoopConf, SparkDatasetTestUtils.STRUCT_TYPE, Option.of(filter), writeConfig.getStorageConfig()); + return HoodieRowParquetWriteSupport.getHoodieRowParquetWriteSupport(hadoopConf, + SparkDatasetTestUtils.STRUCT_TYPE, Option.of(filter), writeConfig); } }
