Jonathan Vexler created HUDI-8259:
-------------------------------------
Summary: Use record merger instead of avro payload for COW
Key: HUDI-8259
URL: https://issues.apache.org/jira/browse/HUDI-8259
Project: Apache Hudi
Issue Type: Improvement
Components: writer-core
Reporter: Jonathan Vexler
Fix For: 1.1.0
We only try to use the merger if parquet log blocks are set currently. The
change will probably look something like:
{code:java}
public HoodieRecordMerger getRecordMerger() {
List<String> mergers = StringUtils.split(getString(RECORD_MERGER_IMPLS),
",").stream()
.map(String::trim)
.distinct()
.collect(Collectors.toList());
return getRecordMerger(getString(BASE_PATH), getRecordMergeMode(),
engineType,
getLogDataBlockFormat(), mergers, getStringOpt(RECORD_MERGER_STRATEGY),
getTableType());
} public static HoodieRecordMerger getRecordMerger(String basePath,
RecordMergeMode mergeMode,
EngineType engineType,
HoodieLogBlock.HoodieLogBlockType logBlockType,
List<String> mergers,
Option<String> strategy,
HoodieTableType tableType) {
if (tableType == HoodieTableType.COPY_ON_WRITE) {
return getRecordMergerBasedOnMergeMode(basePath, mergeMode, engineType,
mergers, strategy);
}
switch (logBlockType) {
case AVRO_DATA_BLOCK:
case HFILE_DATA_BLOCK:
return HoodieAvroRecordMerger.INSTANCE;
case PARQUET_DATA_BLOCK:
return getRecordMergerBasedOnMergeMode(basePath, mergeMode, engineType,
mergers, strategy);
default:
throw new IllegalStateException("This log block type is not
implemented");
}
} private static HoodieRecordMerger getRecordMergerBasedOnMergeMode(String
basePath,
RecordMergeMode mergeMode,
EngineType
engineType,
List<String> mergers,
Option<String> strategy) {
//TODO: [HUDI-8202] make this custom mergers only
switch (mergeMode) {
case EVENT_TIME_ORDERING:
switch (engineType) {
case SPARK:
return
HoodieRecordUtils.loadRecordMerger("org.apache.hudi.DefaultSparkRecordMerger");
default:
return HoodieRecordUtils.createRecordMerger(basePath, engineType,
mergers, strategy);
}
case OVERWRITE_WITH_LATEST:
switch (engineType) {
case SPARK:
return
HoodieRecordUtils.loadRecordMerger("org.apache.hudi.OverwriteWithLatestSparkRecordMerger");
default:
return HoodieRecordUtils.createRecordMerger(basePath, engineType,
mergers, strategy);
}
case CUSTOM:
default:
return HoodieRecordUtils.createRecordMerger(basePath, engineType,
mergers, strategy);
}
} {code}
But the bulk of the work will be addressing any issues that arise from the
change
--
This message was sent by Atlassian Jira
(v8.20.10#820010)