[
https://issues.apache.org/jira/browse/HUDI-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Vinish Reddy updated HUDI-7508:
-------------------------------
Status: In Progress (was: Open)
> Avoid converting iterator to list HoodieStreamerUtils.createHoodieRecords
> -------------------------------------------------------------------------
>
> Key: HUDI-7508
> URL: https://issues.apache.org/jira/browse/HUDI-7508
> Project: Apache Hudi
> Issue Type: Bug
> Components: deltastreamer
> Reporter: Vinish Reddy
> Assignee: Vinish Reddy
> Priority: Major
> Labels: pull-request-available
>
> This block of code is problematic and can lead to OOM when we are we
> converting the iterator into a list and then returning the iterator back.
> This just holds up memory in the heap when the executor is running this block
> of code.
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java#L86]
> {{records = avroRDD.mapPartitions(}}
> {{(FlatMapFunction<Iterator<GenericRecord>, Either<HoodieRecord,String>>)
> genericRecordIterator -> {}}
> {{if (autoGenerateRecordKeys)}}
> {{{ props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG,
> String.valueOf(TaskContext.getPartitionId()));
> props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG,
> instantTime); }}}
> {{BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
> HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);}}
> {{List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>();}}
> {{while (genericRecordIterator.hasNext()) {}}
> {{GenericRecord genRec = genericRecordIterator.next();}}
> {{try}}
> {{{ HoodieKey hoodieKey = new
> HoodieKey(builtinKeyGenerator.getRecordKey(genRec),
> builtinKeyGenerator.getPartitionPath(genRec)); GenericRecord gr =
> isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec,
> partitionColumns) : genRec; HoodieRecordPayload payload = shouldCombine ?
> DataSourceUtils.createPayload(cfg.payloadClassName, gr, (Comparable)
> HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false,
> props.getBoolean(
> KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
> Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
> : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
> avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload))); }}}
> {{catch (Exception e) {}}
> {{if (!shouldErrorTable)}}
> {{{ throw e; }}}
> {{avroRecords.add(generateErrorRecord(genRec));}}
> {{}}}
> {{}}}
> {{return avroRecords.iterator();}}
> {{});}}
>
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)