This is an automated email from the ASF dual-hosted git repository.
vinoth pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/incubator-hudi.git
The following commit(s) were added to refs/heads/master by this push:
new 347e297 [HUDI-596] Close KafkaConsumer every time (#1303)
347e297 is described below
commit 347e297ac19ed55172e84e13075e19ce060954c6
Author: dengziming <[email protected]>
AuthorDate: Tue Feb 4 15:42:21 2020 +0800
[HUDI-596] Close KafkaConsumer every time (#1303)
---
.../utilities/sources/helpers/KafkaOffsetGen.java | 50 +++++++++++-----------
1 file changed, 26 insertions(+), 24 deletions(-)
diff --git
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
index ed5e4e9..a92a441 100644
---
a/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
+++
b/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/helpers/KafkaOffsetGen.java
@@ -172,33 +172,35 @@ public class KafkaOffsetGen {
public OffsetRange[] getNextOffsetRanges(Option<String> lastCheckpointStr,
long sourceLimit) {
// Obtain current metadata for the topic
- KafkaConsumer consumer = new KafkaConsumer(kafkaParams);
- List<PartitionInfo> partitionInfoList;
- partitionInfoList = consumer.partitionsFor(topicName);
- Set<TopicPartition> topicPartitions = partitionInfoList.stream()
- .map(x -> new TopicPartition(x.topic(),
x.partition())).collect(Collectors.toSet());
-
- // Determine the offset ranges to read from
Map<TopicPartition, Long> fromOffsets;
- if (lastCheckpointStr.isPresent()) {
- fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr,
topicPartitions);
- } else {
- KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
- .valueOf(props.getString("auto.offset.reset",
Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
- switch (autoResetValue) {
- case EARLIEST:
- fromOffsets = consumer.beginningOffsets(topicPartitions);
- break;
- case LATEST:
- fromOffsets = consumer.endOffsets(topicPartitions);
- break;
- default:
- throw new HoodieNotSupportedException("Auto reset value must be one
of 'smallest' or 'largest' ");
+ Map<TopicPartition, Long> toOffsets;
+ try (KafkaConsumer consumer = new KafkaConsumer(kafkaParams)) {
+ List<PartitionInfo> partitionInfoList;
+ partitionInfoList = consumer.partitionsFor(topicName);
+ Set<TopicPartition> topicPartitions = partitionInfoList.stream()
+ .map(x -> new TopicPartition(x.topic(),
x.partition())).collect(Collectors.toSet());
+
+ // Determine the offset ranges to read from
+ if (lastCheckpointStr.isPresent()) {
+ fromOffsets = checkupValidOffsets(consumer, lastCheckpointStr,
topicPartitions);
+ } else {
+ KafkaResetOffsetStrategies autoResetValue = KafkaResetOffsetStrategies
+ .valueOf(props.getString("auto.offset.reset",
Config.DEFAULT_AUTO_RESET_OFFSET.toString()).toUpperCase());
+ switch (autoResetValue) {
+ case EARLIEST:
+ fromOffsets = consumer.beginningOffsets(topicPartitions);
+ break;
+ case LATEST:
+ fromOffsets = consumer.endOffsets(topicPartitions);
+ break;
+ default:
+ throw new HoodieNotSupportedException("Auto reset value must be
one of 'smallest' or 'largest' ");
+ }
}
- }
- // Obtain the latest offsets.
- Map<TopicPartition, Long> toOffsets = consumer.endOffsets(topicPartitions);
+ // Obtain the latest offsets.
+ toOffsets = consumer.endOffsets(topicPartitions);
+ }
// Come up with final set of OffsetRanges to read (account for new
partitions, limit number of events)
long maxEventsToReadFromKafka =
props.getLong(Config.MAX_EVENTS_FROM_KAFKA_SOURCE_PROP,