[ 
https://issues.apache.org/jira/browse/HUDI-7508?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Vinish Reddy updated HUDI-7508:
-------------------------------
    Description: 
This block of code is problematic and can lead to OOM when we are we converting 
the iterator into a list and then returning the iterator back. This just holds 
up memory in the heap when the executor is running this block of code.
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java#L86]


{{records = avroRDD.mapPartitions(}}
{{(FlatMapFunction<Iterator<GenericRecord>, Either<HoodieRecord,String>>) 
genericRecordIterator -> {}}
{{if (autoGenerateRecordKeys)}}

{{{ props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId())); 
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime); 
}}}

{{BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);}}
{{List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>();}}
{{while (genericRecordIterator.hasNext()) {}}
{{GenericRecord genRec = genericRecordIterator.next();}}
{{try}}

{{{ HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec)); GenericRecord gr = 
isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, 
partitionColumns) : genRec; HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr, (Comparable) 
HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, 
props.getBoolean( 
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), 
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
 : DataSourceUtils.createPayload(cfg.payloadClassName, gr); 
avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload))); }}}

{{catch (Exception e) {}}
{{if (!shouldErrorTable)}}

{{{ throw e; }}}

{{avroRecords.add(generateErrorRecord(genRec));}}
{{}}}
{{}}}
{{return avroRecords.iterator();}}
{{});}}
 
 

  was:
This block of code is problematic and can lead to OOM when we are we converting 
the iterator into a list and then returning the iterator back. This just holds 
up memory in the heap when the executor is running this block of code.
[https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java#L86]
        records = avroRDD.mapPartitions(
            (FlatMapFunction<Iterator<GenericRecord>, 
Either<HoodieRecord,String>>) genericRecordIterator -> \{
              if (autoGenerateRecordKeys) {
                
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
String.valueOf(TaskContext.getPartitionId()));
                
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
              }
              BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
              List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>();
              while (genericRecordIterator.hasNext()) \{
                GenericRecord genRec = genericRecordIterator.next();
                try {
                  HoodieKey hoodieKey = new 
HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
builtinKeyGenerator.getPartitionPath(genRec));
                  GenericRecord gr = isDropPartitionColumns(props) ? 
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
                  HoodieRecordPayload payload = shouldCombine ? 
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
                      (Comparable) HoodieAvroUtils.getNestedFieldVal(gr, 
cfg.sourceOrderingField, false, props.getBoolean(
                          
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
                          
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
                      : DataSourceUtils.createPayload(cfg.payloadClassName, gr);
                  avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, 
payload)));
                } catch (Exception e) \{
                  if (!shouldErrorTable) {
                    throw e;
                  }
                  avroRecords.add(generateErrorRecord(genRec));
                }
              }
              return avroRecords.iterator();
            });
 
 


> Avoid converting iterator to list HoodieStreamerUtils.createHoodieRecords
> -------------------------------------------------------------------------
>
>                 Key: HUDI-7508
>                 URL: https://issues.apache.org/jira/browse/HUDI-7508
>             Project: Apache Hudi
>          Issue Type: Bug
>          Components: deltastreamer
>            Reporter: Vinish Reddy
>            Assignee: Vinish Reddy
>            Priority: Major
>
> This block of code is problematic and can lead to OOM when we are we 
> converting the iterator into a list and then returning the iterator back. 
> This just holds up memory in the heap when the executor is running this block 
> of code.
> [https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java#L86]
> {{records = avroRDD.mapPartitions(}}
> {{(FlatMapFunction<Iterator<GenericRecord>, Either<HoodieRecord,String>>) 
> genericRecordIterator -> {}}
> {{if (autoGenerateRecordKeys)}}
> {{{ props.setProperty(KeyGenUtils.RECORD_KEY_GEN_PARTITION_ID_CONFIG, 
> String.valueOf(TaskContext.getPartitionId())); 
> props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, 
> instantTime); }}}
> {{BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator) 
> HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);}}
> {{List<Either<HoodieRecord,String>> avroRecords = new ArrayList<>();}}
> {{while (genericRecordIterator.hasNext()) {}}
> {{GenericRecord genRec = genericRecordIterator.next();}}
> {{try}}
> {{{ HoodieKey hoodieKey = new 
> HoodieKey(builtinKeyGenerator.getRecordKey(genRec), 
> builtinKeyGenerator.getPartitionPath(genRec)); GenericRecord gr = 
> isDropPartitionColumns(props) ? HoodieAvroUtils.removeFields(genRec, 
> partitionColumns) : genRec; HoodieRecordPayload payload = shouldCombine ? 
> DataSourceUtils.createPayload(cfg.payloadClassName, gr, (Comparable) 
> HoodieAvroUtils.getNestedFieldVal(gr, cfg.sourceOrderingField, false, 
> props.getBoolean( 
> KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(), 
> Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
>  : DataSourceUtils.createPayload(cfg.payloadClassName, gr); 
> avroRecords.add(Either.left(new HoodieAvroRecord<>(hoodieKey, payload))); }}}
> {{catch (Exception e) {}}
> {{if (!shouldErrorTable)}}
> {{{ throw e; }}}
> {{avroRecords.add(generateErrorRecord(genRec));}}
> {{}}}
> {{}}}
> {{return avroRecords.iterator();}}
> {{});}}
>  
>  



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to