This is an automated email from the ASF dual-hosted git repository.
bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git
The following commit(s) were added to refs/heads/master by this push:
new a97f249 SAMZA-2515: Kafka consumer synchronized (#1351)
a97f249 is described below
commit a97f24985dc6074601b84c9980fc90f5f0ec2728
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Mon Apr 27 18:06:05 2020 -0700
SAMZA-2515: Kafka consumer synchronized (#1351)
---
.../java/org/apache/samza/system/kafka/KafkaConsumerProxy.java | 10 +++++++++-
1 file changed, 9 insertions(+), 1 deletion(-)
diff --git
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 4ecfc6a..71ebe00 100644
---
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -172,7 +172,15 @@ public class KafkaConsumerProxy<K, V> {
private void initializeLags() {
// This is expensive, so only do it once at the beginning. After the first
poll, we can rely on metrics for lag.
- Map<TopicPartition, Long> endOffsets =
kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+
+ Map<TopicPartition, Long> endOffsets;
+ // Synchronize, in case the consumer is used in some other thread
(metadata or something else)
+ synchronized (kafkaConsumer) {
+ endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+ }
+ if (endOffsets == null) {
+ throw new SamzaException("Failed to fetch kafka consumer endoffsets for
system " + systemName);
+ }
endOffsets.forEach((tp, offset) -> {
SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
long startingOffset = nextOffsets.get(ssp);