dnishimura commented on a change in pull request #1016: SAMZA-2179: Move the StartpointVisitor abstraction to SystemAdmin interface. URL: https://github.com/apache/samza/pull/1016#discussion_r279802695
########## File path: samza-kafka/src/main/java/org/apache/samza/system/kafka/KafkaSystemAdmin.java ########## @@ -386,12 +389,10 @@ private OffsetsMaps fetchTopicPartitionsMetadata(List<TopicPartition> topicParti Map<TopicPartition, Long> oldestOffsetsWithLong; Map<TopicPartition, Long> upcomingOffsetsWithLong; - synchronized (metadataConsumer) { - oldestOffsetsWithLong = metadataConsumer.beginningOffsets(topicPartitions); - LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong); - upcomingOffsetsWithLong = metadataConsumer.endOffsets(topicPartitions); - LOG.debug("Kafka-fetched endOffsets: {}", upcomingOffsetsWithLong); - } + oldestOffsetsWithLong = threadSafeKafkaConsumer.execute(consumer -> consumer.beginningOffsets(topicPartitions)); + LOG.debug("Kafka-fetched beginningOffsets: {}", oldestOffsetsWithLong); + upcomingOffsetsWithLong = threadSafeKafkaConsumer.execute(consumer -> consumer.endOffsets(topicPartitions)); Review comment: Any potential side-effects from splitting the `beginningOffsets` and `endOffsets` call from one synchronized block into two? Any reason to change the scope of the synchronized block and not pass in the previous synchronized block as part of the lambda function? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: us...@infra.apache.org With regards, Apache Git Services