Naireen commented on code in PR #33503:
URL: https://github.com/apache/beam/pull/33503#discussion_r1982354211
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -146,14 +154,30 @@ private void recordRpcLatencyMetrics() {
}
}
- private void recordBacklogBytes() {
+ private void recordBacklogBytesInternal() {
for (Map.Entry<String, Long> backlogs :
perTopicPartitionBacklogs().entrySet()) {
Gauge gauge =
KafkaSinkMetrics.createBacklogGauge(MetricName.named("KafkaSink",
backlogs.getKey()));
gauge.set(backlogs.getValue());
}
}
+ /**
+ * This is for recording backlog bytes on the current thread.
+ *
+ * @param topicName topicName
+ * @param partitionId partitionId for the topic Only included in the
metric key if
+ * 'supportsMetricsDeletion' is enabled.
+ * @param backlogBytes backlog for the topic Only included in the metric
key if
+ * 'supportsMetricsDeletion' is enabled.
+ */
+ @Override
+ public void recordBacklogBytes(String topicName, int partitionId, long
backlogBytes) {
Review Comment:
So the metrics are based on the implementation of the IO, so for non-sdf
kakfa impl is run on v2, the metrics are on the background thread, so it would
be updated accordingly, and it would still have the per worker label. (Since
the labels are added for all metrics, so that should still be supported)
For the case of sdf impl on v1, the metrics would be created on the
container, but likely would not be exported correctly, since we use stageInfo
to do that, which requires per worker metrics, which would not include the sdf
Kafka metrics.
Your comment:
>Could we change things to be consistent? For example, what if KafkaMetrics
always set the per-worker label and then the dataflow metric container looks
for per-worker metric label and handles them differently if it wants to instead
of the separate perWorkerGauge method?
KafkaMetrics does already always set the label, by the `dataflow metric
container`, do you mean on v1? for v2, there really isn't a separate `dataflow
metric container` container, the runner v2 worker just consumes whatever
metrics are included in the [processBundle response
here:](https://github.com/apache/beam/blob/c7edbb3dda13807c88a4397574b9b016d9dff19f/sdks/java/harness/src/main/java/org/apache/beam/fn/harness/control/ProcessBundleHandler.java#L492),
whos metric container doesn't even have a perWorker counter method (by design
to avoid bloating it)
I do agree with your approach, and if I understand correctly, then we’d want
to really update this [file](
https://github.com/apache/beam/blob/c7edbb3dda13807c88a4397574b9b016d9dff19f/runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java#L389)
So we no longer have perWorker counter handling (instead use the metric labels
to aggregate those metrics separately). Note that this is for the legacy worker
path.
This should more cleanly support the two use cases you’ve mentioned, but
would be a larger refactor to make things more consistent, since we’d want to
port those changes over to BQ as well at some point.
Much of the legacy per worker reporting path is shared, so any changes there
would need some BQ IO changes as well to ensure those metrics aren’t broken.
Regardless of that refactoring though, that doesn’t change the fact that
we’d still need a way to know if metrics are updated on the current thread or
not to ensure it’s added to the metric container correctly. If counters are
added not on the current thread, [counters are
dropped](https://github.com/apache/beam/blob/c7edbb3dda13807c88a4397574b9b016d9dff19f/sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/MetricsEnvironment.java#L144).
Its not clear to me how that could be made cleaner. Open to suggestions
though.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]