scwhittle commented on code in PR #33503:
URL: https://github.com/apache/beam/pull/33503#discussion_r1984730676
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -388,7 +372,8 @@ private void deleteStaleCounters(
@VisibleForTesting
Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
ConcurrentHashMap<MetricName, Long> counters = new
ConcurrentHashMap<MetricName, Long>();
- ConcurrentHashMap<MetricName, Long> gauges = new
ConcurrentHashMap<MetricName, Long>();
+ ConcurrentHashMap<MetricName, Long> per_worker_gauges =
Review Comment:
nit: I don't think this (or counters) needs to be Concurrent. How about
using ImmutableMapBuilder and then build when passing to convert?
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -402,12 +387,17 @@ Iterable<PerStepNamespaceMetrics>
extractPerWorkerMetricUpdates() {
counters.put(k, val);
});
- perWorkerGauges.forEach(
+ gauges.forEach(
(k, v) -> {
- Long val = v.getCumulative().value();
- gauges.put(k, val);
- v.reset();
+ // Check if metric name has the per worker label set
+ if (k.getLabels().containsKey("PER_WORKER_METRIC")
+ && k.getLabels().get("PER_WORKER_METRIC").equals("true")) {
Review Comment:
nit: just lookup label once
@Nullable String value = k.getLables().get("PER_WORKER_METRIC");
if (value != null && value.equals("true"))
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -37,6 +37,7 @@ public interface KafkaMetrics {
void updateBacklogBytes(String topic, int partitionId, long backlog);
+ /*Used to update all metrics in container*/
Review Comment:
how about
Flushes the buffered metrics to the current metric container for this thread.
could we rename it to flushBufferedMetrics?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java:
##########
@@ -97,7 +86,15 @@ public static Gauge createBacklogGauge(String topic, int
partitionId) {
* @return Counter.
*/
public static Gauge createBacklogGauge(MetricName name) {
- return new DelegatingGauge(name, false, true);
+ // Use label to differenciate between the type of gauge metric is created
+ // TODO(34195):
+ if
(name.getLabels().containsKey(MonitoringInfoConstants.Labels.PER_WORKER_METRIC)
Review Comment:
maybe we could add a method for this in MonitoringInfoConstant? woudl reduce
duplication of this lookup/comparison logic:
boolean isPerWorkerMetric(Map labels)
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -402,12 +387,17 @@ Iterable<PerStepNamespaceMetrics>
extractPerWorkerMetricUpdates() {
counters.put(k, val);
});
- perWorkerGauges.forEach(
+ gauges.forEach(
(k, v) -> {
- Long val = v.getCumulative().value();
- gauges.put(k, val);
- v.reset();
+ // Check if metric name has the per worker label set
+ if (k.getLabels().containsKey("PER_WORKER_METRIC")
Review Comment:
use the PER_WORKER_METRIC contant
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java:
##########
@@ -97,7 +86,15 @@ public static Gauge createBacklogGauge(String topic, int
partitionId) {
* @return Counter.
*/
public static Gauge createBacklogGauge(MetricName name) {
- return new DelegatingGauge(name, false, true);
+ // Use label to differenciate between the type of gauge metric is created
+ // TODO(34195):
+ if
(name.getLabels().containsKey(MonitoringInfoConstants.Labels.PER_WORKER_METRIC)
+ &&
name.getLabels().get(MonitoringInfoConstants.Labels.PER_WORKER_METRIC).equals("true"))
{
+ return new DelegatingGauge(name, false);
+ } else {
+ // Currently KafkaSink metrics only supports aggregated per worker
metrics.
Review Comment:
should we make it a Preconditions.checkState(isPerWorkerMetric))?
seems perhaps better to fail so that if more metrics are added it's not
confusing why they don't do anything
##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -402,12 +387,17 @@ Iterable<PerStepNamespaceMetrics>
extractPerWorkerMetricUpdates() {
counters.put(k, val);
});
- perWorkerGauges.forEach(
+ gauges.forEach(
(k, v) -> {
- Long val = v.getCumulative().value();
- gauges.put(k, val);
- v.reset();
+ // Check if metric name has the per worker label set
+ if (k.getLabels().containsKey("PER_WORKER_METRIC")
Review Comment:
I think this could be optimized by keeping the perWorkerGauges and
populating it by doing this labels check when the gauge is created.
Could add a comment about possible optimization and do it when fixing the
other metric types.
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]