scwhittle commented on code in PR #33503:
URL: https://github.com/apache/beam/pull/33503#discussion_r1962369386


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -146,14 +154,31 @@ private void recordRpcLatencyMetrics() {
       }
     }
 
-    private void recordBacklogBytes() {
+    private void recordBacklogBytesInternal() {
       for (Map.Entry<String, Long> backlogs : 
perTopicPartitionBacklogs().entrySet()) {
         Gauge gauge =
             KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", 
backlogs.getKey()));
         gauge.set(backlogs.getValue());
       }
     }
 
+    /**
+     * This is for recording backlog bytes on the current thread.
+     *
+     * @param topicName topicName
+     * @param partitionId partitionId for the topic Only included in the 
metric key if
+     *     'supportsMetricsDeletion' is enabled.
+     * @param backlogBytes backlog for the topic Only included in the metric 
key if
+     *     'supportsMetricsDeletion' is enabled.
+     */
+    @Override
+    public void recordBacklogBytes(String topicName, int partitionId, long 
backlogBytes) {
+      Gauge perPartion =
+          Metrics.gauge(

Review Comment:
   ie can we create the gauge here with the getPerWorkerGauge method? and then 
ensure that those metrics have the per-worker metadata populated appropriately?



##########
runners/core-java/src/main/java/org/apache/beam/runners/core/metrics/MetricsContainerImpl.java:
##########
@@ -293,6 +293,13 @@ public MetricUpdates getUpdates() {
           .setLabel(MonitoringInfoConstants.Labels.NAME, 
metricKey.metricName().getName())
           .setLabel(MonitoringInfoConstants.Labels.PTRANSFORM, 
metricKey.stepName());
     }
+
+    // Based on namespace, add per worker metrics label to enable separate 
runner based sink based
+    // processing.
+    if (metricName.getNamespace().equals("BigQuerySink")

Review Comment:
   it seems like this woudl be cleaner if it was at the call-site when creating 
the metric.
   
   That would both make it more obvious there and less based upon magic 
constant and would allow us to support some kafka metrics per-worker but others 
aggregated or add per-worker metrics to some namespace that already has metrics.
   
   It seems like maybe we could either add it to MetricKey, or we could plumb 
it into the cell and then update the function where we generate metadata to 
base it upon that instead.



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -332,7 +332,6 @@ public long getSplitBacklogBytes() {
       if (pBacklog == UnboundedReader.BACKLOG_UNKNOWN) {
         return UnboundedReader.BACKLOG_UNKNOWN;
       }
-      backlogBytes += pBacklog;

Review Comment:
   revert?



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to