This is an automated email from the ASF dual-hosted git repository.

bharathkk pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/samza.git


The following commit(s) were added to refs/heads/master by this push:
     new a97f249  SAMZA-2515: Kafka consumer synchronized (#1351)
a97f249 is described below

commit a97f24985dc6074601b84c9980fc90f5f0ec2728
Author: lakshmi-manasa-g <[email protected]>
AuthorDate: Mon Apr 27 18:06:05 2020 -0700

    SAMZA-2515: Kafka consumer synchronized (#1351)
---
 .../java/org/apache/samza/system/kafka/KafkaConsumerProxy.java | 10 +++++++++-
 1 file changed, 9 insertions(+), 1 deletion(-)

diff --git 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
index 4ecfc6a..71ebe00 100644
--- 
a/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
+++ 
b/samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
@@ -172,7 +172,15 @@ public class KafkaConsumerProxy<K, V> {
 
   private void initializeLags() {
     // This is expensive, so only do it once at the beginning. After the first 
poll, we can rely on metrics for lag.
-    Map<TopicPartition, Long> endOffsets = 
kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+
+    Map<TopicPartition, Long> endOffsets;
+    // Synchronize, in case the consumer is used in some other thread 
(metadata or something else)
+    synchronized (kafkaConsumer) {
+      endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+    }
+    if (endOffsets == null) {
+      throw new SamzaException("Failed to fetch kafka consumer endoffsets for 
system " + systemName);
+    }
     endOffsets.forEach((tp, offset) -> {
       SystemStreamPartition ssp = topicPartitionToSSP.get(tp);
       long startingOffset = nextOffsets.get(ssp);

Reply via email to