Naireen commented on code in PR #33408:
URL: https://github.com/apache/beam/pull/33408#discussion_r1909343596


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -71,11 +79,17 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
 
     abstract HashMap<String, ConcurrentLinkedQueue<Duration>> 
perTopicRpcLatencies();
 
+    static ConcurrentHashMap<String, Gauge> backlogGauges = new 
ConcurrentHashMap<String, Gauge>();
+
+    abstract HashMap<String, Long> perTopicPartitionBacklogs();

Review Comment:
   What would the sum represent? the sum of latencies? but each individual one 
is important, and a sum would lose information. 
   A gauge isn't quite clear either, if you have two concurrent rpcs that 
completed, what value do you return? 
   
   A histogram of values provides more information (and allows us to see the 
spread of values)



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -93,6 +107,21 @@ public void updateSuccessfulRpcMetrics(String topic, 
Duration elapsedTime) {
       }
     }
 
+    /**
+     * @param topicName topicName
+     * @param partitionId partitionId for the topic Only included in the 
metric key if
+     *     'supportsMetricsDeletion' is enabled.
+     * @param backlog backlog for the topic Only included in the metric key if
+     *     'supportsMetricsDeletion' is enabled.
+     */

Review Comment:
   Removed, thanks for catching that.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to