Naireen commented on code in PR #33503:
URL: https://github.com/apache/beam/pull/33503#discussion_r1982354211


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -146,14 +154,30 @@ private void recordRpcLatencyMetrics() {
       }
     }
 
-    private void recordBacklogBytes() {
+    private void recordBacklogBytesInternal() {
       for (Map.Entry<String, Long> backlogs : 
perTopicPartitionBacklogs().entrySet()) {
         Gauge gauge =
             KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink", 
backlogs.getKey()));
         gauge.set(backlogs.getValue());
       }
     }
 
+    /**
+     * This is for recording backlog bytes on the current thread.
+     *
+     * @param topicName topicName
+     * @param partitionId partitionId for the topic Only included in the 
metric key if
+     *     'supportsMetricsDeletion' is enabled.
+     * @param backlogBytes backlog for the topic Only included in the metric 
key if
+     *     'supportsMetricsDeletion' is enabled.
+     */
+    @Override
+    public void recordBacklogBytes(String topicName, int partitionId, long 
backlogBytes) {

Review Comment:
   So the metrics are based on the implementation of the IO, so for non-sdf 
kakfa impl is run on v2, the metrics are on the background thread, so it would 
be updated accordingly, and it would still have the per worker label. (Since 
the labels are added for all metrics, so that should still be supported) 
   
   For the case of sdf impl on v1, the metrics would be created on the 
container, but likely would not be exported correctly, since we use stageInfo 
to do that, which requires per worker metrics, which would not include the sdf 
Kafka metrics. 
   
   Your comment:
   >Could we change things to be consistent? For example, what if KafkaMetrics 
always set the per-worker label and then the dataflow metric container looks 
for per-worker metric label and handles them differently if it wants to instead 
of the separate perWorkerGauge method?
   
   KafkaMetrics does already always set the label, by the `dataflow metric 
container`, do you mean on v1? for v2, there really isn't a separate `dataflow 
metric container` container, the runner v2 worker just consumes whatever 
metrics are included in the [processBundle response 
here:](https://github.com/apache/beam/blob/c7edbb3dda13807c88a4397574b9b016d9dff19f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java#L492),
 whos metric container doesn't even have a perWorker  counter method (by design 
to avoid bloating it)
   
   I do agree with your approach, and if I understand correctly, then we’d want 
to really update this [file]( 
https://github.com/apache/beam/blob/c7edbb3dda13807c88a4397574b9b016d9dff19f/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java#L389)
 So we no longer have perWorker counter handling (instead use the metric labels 
to aggregate those metrics separately). Note that this is for the legacy worker 
path. 
   
   This should more cleanly support the two use cases you’ve mentioned, but 
would be a larger refactor to make things more consistent, since we’d want to 
port those changes over to BQ as well at some point. 
   Much of the legacy per worker reporting path is shared, so any changes there 
would need some BQ IO changes as well to ensure those metrics aren’t broken.  
   
   Regardless of that refactoring though, that doesn’t change the fact that 
we’d still need a way to know if metrics are updated on the current thread or 
not to ensure it’s added to the metric container correctly. If counters are 
added not on the current thread, [counters are 
dropped](https://github.com/apache/beam/blob/c7edbb3dda13807c88a4397574b9b016d9dff19f/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java#L144).
  Its not clear to me how that could be made cleaner. Open to suggestions 
though. 
   
   



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to