lakshmi-manasa-g commented on a change in pull request #1351:
URL: https://github.com/apache/samza/pull/1351#discussion_r415854654
##########
File path:
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaConsumerProxy.java
##########
@@ -170,9 +171,18 @@ Throwable getFailureCause() {
return failureCause;
}
- private void initializeLags() {
+ @VisibleForTesting
+ void initializeLags() {
// This is expensive, so only do it once at the beginning. After the first
poll, we can rely on metrics for lag.
- Map<TopicPartition, Long> endOffsets =
kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+
+ Map<TopicPartition, Long> endOffsets;
+ // Synchronize, in case the consumer is used in some other thread
(metadata or something else)
+ synchronized (kafkaConsumer) {
+ endOffsets = kafkaConsumer.endOffsets(topicPartitionToSSP.keySet());
+ }
+ if (endOffsets == null) {
+ throw new SamzaException("Failed to fetch kafka consumer endoffsets for
system " + systemName);
Review comment:
actually, KafkaConsumer.endOffset [does not make a
promise](https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L2189)
that it will be non-null. If i keep digging into implementation details, it
looks like it returns an empty hashmap.. but thats an impl detail not an api
promise. hence added this.
i agree, for this change (to deal with the sync issue), dealing with this
NPE is not needed. but thought it will be a nice to do.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]