kbuci opened a new issue, #17921:
URL: https://github.com/apache/hudi/issues/17921

   ### Feature Description
   
   **What the feature achieves:**
   Add a pluggable class that all HUDI parquet-based spark/flink writers will 
invoke before creating write support / parquet file writer objects. 
Specifically, it can have the below interface
   
   ```
   class HoodieParquetConfigInjector
   . . .
   // Returns a copy of both config objects where additional properties are 
added
   public void Pair<Configuration, HoodieWriteConfig> withProps(Path path, 
Configuration hadoopconf, HoodieConfig hoodieConfig)'
   . . . 
   
   ```
   And HUDI will invoke the method before passing on the config objects. For 
example, 
`org.apache.hudi.io.storage.HoodieSparkFileWriterFactory#newParquetFileWriter(java.lang.String,
 org.apache.hudi.storage.StoragePath, 
org.apache.hudi.common.config.HoodieConfig, 
org.apache.hudi.common.schema.HoodieSchema, 
org.apache.hudi.common.engine.TaskContextSupplier)` would now look like 
   
   ```
     @Override
     protected HoodieFileWriter newParquetFileWriter(
         String instantTime, StoragePath path, HoodieConfig config, 
HoodieSchema schema,
         TaskContextSupplier taskContextSupplier) throws IOException {
       boolean populateMetaFields = 
config.getBooleanOrDefault(HoodieTableConfig.POPULATE_META_FIELDS);
       String compressionCodecName = 
config.getStringOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_CODEC_NAME);
       // Support PARQUET_COMPRESSION_CODEC_NAME is ""
       if (compressionCodecName.isEmpty()) {
         compressionCodecName = null;
       }
     // >>>> New code: inject parquet configs
      HoodieParquetConfigInjector hoodieParquetConfigInjector =. . . . 
      // Call hoodieParquetConfigInjector::withProps(path, storage.getConf(), 
config); 
      newConf  = . . .
      newConfig = . . .
      
   
   
     // <<<<< 
       
     // Now we use newConf and newConfig 
   
       HoodieRowParquetWriteSupport writeSupport = 
getHoodieRowParquetWriteSupport(newConf, schema,
          newConfig, enableBloomFilter(populateMetaFields,newConfig));
       HoodieRowParquetConfig parquetConfig = new 
HoodieRowParquetConfig(writeSupport,
           CompressionCodecName.fromConf(compressionCodecName),
          newConfig.getIntOrDefault(HoodieStorageConfig.PARQUET_BLOCK_SIZE),
          newConfig.getIntOrDefault(HoodieStorageConfig.PARQUET_PAGE_SIZE),
          newConfig.getLongOrDefault(HoodieStorageConfig.PARQUET_MAX_FILE_SIZE),
           newConf.unwrapAs(Configuration.class),
           
newConfig.getDoubleOrDefault(HoodieStorageConfig.PARQUET_COMPRESSION_RATIO_FRACTION),
           
newConfig.getBooleanOrDefault(HoodieStorageConfig.PARQUET_DICTIONARY_ENABLED));
       parquetConfig.getHadoopConf().addResource(writeSupport.getHadoopConf());
   
       return new HoodieSparkParquetWriter(path, parquetConfig, instantTime, 
taskContextSupplier, populateMetaFields);
     }
   ```
   
   
   **Why this feature is needed:**
   Our internal configuration/deployment of parquet requires custom 
(internally-defined) hadoop keys be passed in based on the HUDI table name and 
the current partition path. These are needed for parquet library observability 
and other internal use cases. 
   
   ```
   hadoopConf.set("hadoop.some_internal_key", // get HUDI table name from 
HoodieConfig)
   hadoopConf.set("hadoop.some_other_internal_key", // get partition path of 
current file being written)
   // Pass in hadoopConf when raw hadoop config is required
   
   hoodieConfig,set("hadoop.some_internal_key", // get HUDI table name from 
HoodieConfig)
   hoodieConfig.set("hadoop.some_other_internal_key", // get partition path of 
current file being written)
   // Pass in hoodieConfig when HoodieConfig is required, such as write support 
classes
   
   
   ```
   We cannot set these "globally" in HUDI-agnostic spark/flink  context 
objects, since
   - Different write clients may be clients for different HUDI tables
   - We only know the exact partition Path when a new "file writer" object is 
initialized (a file cannot span multiple partitions)
   
   Having the above interface in HUDI like the above would allow us to upstream 
our changes without requiring other users to pass the same exact configs that 
we do (Since we would maintain our own internal class that implements 
`HoodieParquetConfigInjector`) . 
   
   
   
   
   ### User Experience
   
   **How users will use this feature:**
   - Configuration changes needed
   - API changes
   - Usage examples
   
   
   ### Hudi RFC Requirements
   
   **RFC PR link:** (if applicable)
   
   **Why RFC is/isn't needed:**
   - Does this change public interfaces/APIs? (Yes/No)
   - Does this change storage format? (Yes/No)
   - Justification:
   


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to