Vinish Reddy created HUDI-7508:
----------------------------------
Summary: Avoid converting iterator to list
HoodieStreamerUtils.createHoodieRecords
Key: HUDI-7508
URL: https://issues.apache.org/jira/browse/HUDI-7508
Project: Apache Hudi
Issue Type: Bug
Components: deltastreamer
Reporter: Vinish Reddy
Assignee: Vinish Reddy
This block of code is problematic and can lead to OOM when we are we converting
the iterator into a list and then returning the iterator back. This just holds
up memory in the heap when the executor is running this block of code.
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java#L86]
records = avroRDD.mapPartitions(
(FlatMapFunction<Iterator<GenericRecord>,
Either<HoodieRecord,String>>) genericRecordIterator -> \{
if (autoGenerateRecordKeys) {
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
String.valueOf(TaskContext.getPartitionId()));
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
}
BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>();
while (genericRecordIterator.hasNext()) \{
GenericRecord genRec = genericRecordIterator.next();
try {
HoodieKey hoodieKey = new
HoodieKey(builtinKeyGenerator.getRecordKey(genRec),
builtinKeyGenerator.getPartitionPath(genRec));
GenericRecord gr = isDropPartitionColumns(props) ?
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
HoodieRecordPayload payload = shouldCombine ?
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
(Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, props.getBoolean(
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
: DataSourceUtils.createPayload(cfg.payloadClassName, gr);
avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey,
payload)));
} catch (Exception e) \{
if (!shouldErrorTable) {
throw e;
}
avroRecords.add(generateErrorRecord(genRec));
}
}
return avroRecords.iterator();
});
--
This message was sent by Atlassian Jira
(v8.20.10#820010)