dnishimura commented on a change in pull request #1016: SAMZA-2179: Move the 
StartpointVisitor abstraction to SystemAdmin interface.
URL: https://github.com/apache/samza/pull/1016#discussion_r279802695
 
 

 ##########
 File path: 
samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java
 ##########
 @@ -386,12 +389,10 @@ private OffsetsMaps 
fetchTopicPartitionsMetadata(List<TopicPartition> topicParti
     Map<TopicPartition, Long> oldestOffsetsWithLong;
     Map<TopicPartition, Long> upcomingOffsetsWithLong;
 
-    synchronized (metadataConsumer) {
-      oldestOffsetsWithLong = 
metadataConsumer.beginningOffsets(topicPartitions);
-      LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong);
-      upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions);
-      LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong);
-    }
+    oldestOffsetsWithLong = threadSafeKafkaConsumer.execute(consumer -> 
consumer.beginningOffsets(topicPartitions));
+    LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong);
+    upcomingOffsetsWithLong = threadSafeKafkaConsumer.execute(consumer -> 
consumer.endOffsets(topicPartitions));
 
 Review comment:
   Any potential side-effects from splitting the `beginningOffsets` and 
`endOffsets` call from one synchronized block into two? Any reason to change 
the scope of the synchronized block and not pass in the previous synchronized 
block as part of the lambda function?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to