scwhittle commented on code in PR #34275:
URL: https://github.com/apache/beam/pull/34275#discussion_r1995479936


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -558,8 +560,8 @@ public ProcessContinuation processElement(
                         .subtract(BigDecimal.valueOf(expectedOffset), 
MathContext.DECIMAL128)
                         .doubleValue()
                     * 
avgRecordSize.estimateRecordByteSizeToOffsetCountRatio()));
-        KafkaMetrics kafkaResults = KafkaSinkMetrics.kafkaMetrics();
-        kafkaResults.updateBacklogBytes(
+        kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();

Review Comment:
   see above, maybe remove this and just accumulate in the existing metrics



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -454,7 +454,9 @@ public ProcessContinuation processElement(
       final Stopwatch sw = Stopwatch.createStarted();
 
       while (true) {
-        rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition());
+        KafkaMetrics kafkaMetrics = KafkaSinkMetrics.kafkaMetrics();
+        rawRecords = poll(consumer, kafkaSourceDescriptor.getTopicPartition(), 
kafkaMetrics);
+        kafkaMetrics.flushBufferedMetrics();

Review Comment:
   what about not flushing here and just falling through?
   
   would it be ok to just flush once outside the while loop with a try { } 
finally { kafkaMetrics.flushBufferedMetrics(); } block?



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/ReadFromKafkaDoFn.java:
##########
@@ -583,13 +585,15 @@ private boolean topicPartitionExists(
 
   // see https://github.com/apache/beam/issues/25962
   private ConsumerRecords<byte[], byte[]> poll(
-      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition) {
+      Consumer<byte[], byte[]> consumer, TopicPartition topicPartition, 
KafkaMetrics kafkaMetrics) {
     final Stopwatch sw = Stopwatch.createStarted();
     long previousPosition = -1;
     java.time.Duration elapsed = java.time.Duration.ZERO;
     java.time.Duration timeout = 
java.time.Duration.ofSeconds(this.consumerPollingTimeout);
     while (true) {
       final ConsumerRecords<byte[], byte[]> rawRecords = 
consumer.poll(timeout.minus(elapsed));
+      elapsed = sw.elapsed();

Review Comment:
   how about removing the elapsed calculation below?
   Also elapsed doesn't need to be defined outside the loop, could just be 
   final java.time.Duration elapsed = sw.elapsed(); here
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: github-unsubscr...@beam.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to