Naireen commented on code in PR #33503:
URL: https://github.com/apache/beam/pull/33503#discussion_r1984058553
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -146,14 +154,30 @@ private void recordRpcLatencyMetrics() {
}
}
- private void recordBacklogBytes() {
+ private void recordBacklogBytesInternal() {
for (Map.Entry<String, Long> backlogs :
perTopicPartitionBacklogs().entrySet()) {
Gauge gauge =
KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink",
backlogs.getKey()));
gauge.set(backlogs.getValue());
}
}
+ /**
+ * This is for recording backlog bytes on the current thread.
+ *
+ * @param topicName topicName
+ * @param partitionId partitionId for the topic Only included in the
metric key if
+ * 'supportsMetricsDeletion' is enabled.
+ * @param backlogBytes backlog for the topic Only included in the metric
key if
+ * 'supportsMetricsDeletion' is enabled.
+ */
+ @Override
+ public void recordBacklogBytes(String topicName, int partitionId, long
backlogBytes) {
Review Comment:
Discussed offline.
I refactored the per worker gauge metrics out, so the path is now more
unified. Theres no worry about breaking BQ per worker metrics, since that uses
only counters and histograms.
Next steps would be to do the same refactoring for histograms and counters,
and then add in the kafka poll metrics for the portable runner (and refactor as
needed)
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]