This is an automated email from the ASF dual-hosted git repository.
yihua pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 49072d1e2e7 [HUDI-7508] Avoid collecting records in
HoodieStreamerUtils.createHoodieRecords and JsonKafkaSource mapPartitions
(#10872)
49072d1e2e7 is described below
commit 49072d1e2e721f27623dba840ad6ea41a252fd15
Author: Vinish Reddy <[email protected]>
AuthorDate: Sat May 11 08:50:59 2024 +0530
[HUDI-7508] Avoid collecting records in
HoodieStreamerUtils.createHoodieRecords and JsonKafkaSource mapPartitions
(#10872)
Co-authored-by: Y Ethan Guo <[email protected]>
---
.../hudi/utilities/sources/JsonKafkaSource.java | 18 ++++++++----------
.../hudi/utilities/streamer/HoodieStreamerUtils.java | 20 ++++++++------------
2 files changed, 16 insertions(+), 22 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
index 71f0c4db3f1..a8f70e7c854 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/JsonKafkaSource.java
@@ -21,6 +21,8 @@ package org.apache.hudi.utilities.sources;
import org.apache.hudi.common.config.TypedProperties;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.StringUtils;
+import org.apache.hudi.common.util.collection.ClosableIterator;
+import org.apache.hudi.common.util.collection.CloseableMappingIterator;
import org.apache.hudi.utilities.UtilHelpers;
import org.apache.hudi.utilities.config.JsonKafkaPostProcessorConfig;
import org.apache.hudi.utilities.exception.HoodieSourcePostProcessException;
@@ -43,8 +45,6 @@ import org.apache.spark.streaming.kafka010.LocationStrategies;
import org.apache.spark.streaming.kafka010.OffsetRange;
import java.io.IOException;
-import java.util.LinkedList;
-import java.util.List;
import static org.apache.hudi.common.util.ConfigUtils.getStringWithAltKeys;
import static
org.apache.hudi.utilities.schema.KafkaOffsetPostProcessor.KAFKA_SOURCE_KEY_COLUMN;
@@ -80,28 +80,26 @@ public class JsonKafkaSource extends
KafkaSource<JavaRDD<String>> {
return postProcess(maybeAppendKafkaOffsets(kafkaRDD));
}
- protected JavaRDD<String>
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
+ protected JavaRDD<String>
maybeAppendKafkaOffsets(JavaRDD<ConsumerRecord<Object, Object>> kafkaRDD) {
if (this.shouldAddOffsets) {
return kafkaRDD.mapPartitions(partitionIterator -> {
- List<String> stringList = new LinkedList<>();
- ObjectMapper om = new ObjectMapper();
- partitionIterator.forEachRemaining(consumerRecord -> {
+ ObjectMapper objectMapper = new ObjectMapper();
+ return new
CloseableMappingIterator<>(ClosableIterator.wrap(partitionIterator),
consumerRecord -> {
String recordValue = consumerRecord.value().toString();
String recordKey = StringUtils.objToString(consumerRecord.key());
try {
- ObjectNode jsonNode = (ObjectNode) om.readTree(recordValue);
+ ObjectNode jsonNode = (ObjectNode)
objectMapper.readTree(recordValue);
jsonNode.put(KAFKA_SOURCE_OFFSET_COLUMN, consumerRecord.offset());
jsonNode.put(KAFKA_SOURCE_PARTITION_COLUMN,
consumerRecord.partition());
jsonNode.put(KAFKA_SOURCE_TIMESTAMP_COLUMN,
consumerRecord.timestamp());
if (recordKey != null) {
jsonNode.put(KAFKA_SOURCE_KEY_COLUMN, recordKey);
}
- stringList.add(om.writeValueAsString(jsonNode));
+ return objectMapper.writeValueAsString(jsonNode);
} catch (Throwable e) {
- stringList.add(recordValue);
+ return recordValue;
}
});
- return stringList.iterator();
});
}
return kafkaRDD.map(consumerRecord -> (String) consumerRecord.value());
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
index 2ecf0b02fb6..3be64fefbb3 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/streamer/HoodieStreamerUtils.java
@@ -31,6 +31,7 @@ import org.apache.hudi.common.model.HoodieRecord;
import org.apache.hudi.common.model.HoodieRecordPayload;
import org.apache.hudi.common.model.HoodieSparkRecord;
import org.apache.hudi.common.model.WriteOperationType;
+import org.apache.hudi.common.util.ConfigUtils;
import org.apache.hudi.common.util.Either;
import org.apache.hudi.common.util.Option;
import org.apache.hudi.common.util.collection.ClosableIterator;
@@ -55,10 +56,8 @@ import org.apache.spark.sql.avro.HoodieAvroDeserializer;
import org.apache.spark.sql.catalyst.InternalRow;
import org.apache.spark.sql.types.StructType;
-import java.util.ArrayList;
import java.util.Arrays;
import java.util.Iterator;
-import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
@@ -81,6 +80,8 @@ public class HoodieStreamerUtils {
String
instantTime, Option<BaseErrorTableWriter> errorTableWriter) {
boolean shouldCombine = cfg.filterDupes ||
cfg.operation.equals(WriteOperationType.UPSERT);
boolean shouldErrorTable = errorTableWriter.isPresent() &&
props.getBoolean(ERROR_ENABLE_VALIDATE_RECORD_CREATION.key(),
ERROR_ENABLE_VALIDATE_RECORD_CREATION.defaultValue());
+ boolean useConsistentLogicalTimestamp = ConfigUtils.getBooleanWithAltKeys(
+ props,
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED);
Set<String> partitionColumns = getPartitionColumns(props);
return avroRDDOptional.map(avroRDD -> {
SerializableSchema avroSchema = new
SerializableSchema(schemaProvider.getTargetSchema());
@@ -94,23 +95,18 @@ public class HoodieStreamerUtils {
props.setProperty(KeyGenUtils.RECORD_KEY_GEN_INSTANT_TIME_CONFIG, instantTime);
}
BuiltinKeyGenerator builtinKeyGenerator = (BuiltinKeyGenerator)
HoodieSparkKeyGeneratorFactory.createKeyGenerator(props);
- List<Either<HoodieRecord,String>> avroRecords = new
ArrayList<>();
- while (genericRecordIterator.hasNext()) {
- GenericRecord genRec = genericRecordIterator.next();
+ return new
CloseableMappingIterator<>(ClosableIterator.wrap(genericRecordIterator), genRec
-> {
try {
HoodieKey hoodieKey = new
HoodieKey(builtinKeyGenerator.getRecordKey(genRec),
builtinKeyGenerator.getPartitionPath(genRec));
GenericRecord gr = isDropPartitionColumns(props) ?
HoodieAvroUtils.removeFields(genRec, partitionColumns) : genRec;
HoodieRecordPayload payload = shouldCombine ?
DataSourceUtils.createPayload(cfg.payloadClassName, gr,
- (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, props.getBoolean(
-
KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.key(),
-
Boolean.parseBoolean(KeyGeneratorOptions.KEYGENERATOR_CONSISTENT_LOGICAL_TIMESTAMP_ENABLED.defaultValue()))))
+ (Comparable) HoodieAvroUtils.getNestedFieldVal(gr,
cfg.sourceOrderingField, false, useConsistentLogicalTimestamp))
: DataSourceUtils.createPayload(cfg.payloadClassName,
gr);
- avroRecords.add(Either.left(new
HoodieAvroRecord<>(hoodieKey, payload)));
+ return Either.left(new HoodieAvroRecord<>(hoodieKey,
payload));
} catch (Exception e) {
- avroRecords.add(generateErrorRecordOrThrowException(genRec,
e, shouldErrorTable));
+ return generateErrorRecordOrThrowException(genRec, e,
shouldErrorTable);
}
- }
- return avroRecords.iterator();
+ });
});
} else if (recordType == HoodieRecord.HoodieRecordType.SPARK) {