mynameborat commented on a change in pull request #1351:
URL: https://github.com/apache/samza/pull/1351#discussion_r416119790



##########
File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
##########
@@ -170,9 +171,18 @@ Throwable getFailureCause() {
     return failureCause;
   }
 
-  private void initializeLags() {
+  @VisibleForTesting
+  void initializeLags() {

Review comment:
       revert this to private?

##########
File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
##########
@@ -170,9 +171,18 @@ Throwable getFailureCause() {
     return failureCause;
   }
 
-  private void initializeLags() {
+  @VisibleForTesting
+  void initializeLags() {
     // This is expensive, so only do it once at the beginning. After the first 
poll, we can rely on metrics for lag.
-    Map<TopicPartition, Long> endOffsets = 
kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+
+    Map<TopicPartition, Long> endOffsets;
+    // Synchronize, in case the consumer is used in some other thread 
(metadata or something else)
+    synchronized (kafkaConsumer) {
+      endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+    }
+    if (endOffsets == null) {
+      throw new SamzaException("Failed to fetch kafka consumer endoffsets for 
system " + systemName);

Review comment:
       sounds good 👍 




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to