Jonathan Vexler created HUDI-8259:
-------------------------------------

             Summary: Use record merger instead of avro payload for COW
                 Key: HUDI-8259
                 URL: https://issues.apache.org/jira/browse/HUDI-8259
             Project: Apache Hudi
          Issue Type: Improvement
          Components: writer-core
            Reporter: Jonathan Vexler
             Fix For: 1.1.0


We only try to use the merger if parquet log blocks are set currently. The 
change will probably look something like:
{code:java}
  public HoodieRecordMerger getRecordMerger() {
    List<String> mergers = StringUtils.split(getString(RECORD_MERGER_IMPLS), 
",").stream()
        .map(String::trim)
        .distinct()
        .collect(Collectors.toList());
    return getRecordMerger(getString(BASE_PATH), getRecordMergeMode(), 
engineType,
        getLogDataBlockFormat(), mergers, getStringOpt(RECORD_MERGER_STRATEGY), 
getTableType());
  }  public static HoodieRecordMerger getRecordMerger(String basePath,
                                                   RecordMergeMode mergeMode,
                                                   EngineType engineType,
                                                   
HoodieLogBlock.HoodieLogBlockType logBlockType,
                                                   List<String> mergers,
                                                   Option<String> strategy,
                                                   HoodieTableType tableType) {
    if (tableType == HoodieTableType.COPY_ON_WRITE) {
      return getRecordMergerBasedOnMergeMode(basePath, mergeMode, engineType, 
mergers, strategy);
    }
    switch (logBlockType) {
      case AVRO_DATA_BLOCK:
      case HFILE_DATA_BLOCK:
        return HoodieAvroRecordMerger.INSTANCE;
      case PARQUET_DATA_BLOCK:
        return getRecordMergerBasedOnMergeMode(basePath, mergeMode, engineType, 
mergers, strategy);
      default:
        throw new IllegalStateException("This log block type is not 
implemented");
    }
  }  private static HoodieRecordMerger getRecordMergerBasedOnMergeMode(String 
basePath,
                                                                    
RecordMergeMode mergeMode,
                                                                    EngineType 
engineType,
                                                                    
List<String> mergers,
                                                                    
Option<String> strategy) {
    //TODO: [HUDI-8202] make this custom mergers only
    switch (mergeMode) {
      case EVENT_TIME_ORDERING:
        switch (engineType) {
          case SPARK:
            return 
HoodieRecordUtils.loadRecordMerger("org.apache.hudi.DefaultSparkRecordMerger");
          default:
            return HoodieRecordUtils.createRecordMerger(basePath, engineType, 
mergers, strategy);
        }
      case OVERWRITE_WITH_LATEST:
        switch (engineType) {
          case SPARK:
            return 
HoodieRecordUtils.loadRecordMerger("org.apache.hudi.OverwriteWithLatestSparkRecordMerger");
          default:
            return HoodieRecordUtils.createRecordMerger(basePath, engineType, 
mergers, strategy);
        }
      case CUSTOM:
      default:
        return HoodieRecordUtils.createRecordMerger(basePath, engineType, 
mergers, strategy);
    }
  } {code}
But the bulk of the work will be addressing any issues that arise from the 
change



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to