scwhittle commented on code in PR #34244:
URL: https://github.com/apache/beam/pull/34244#discussion_r1988954208


##########
runners/google-cloud-dataflow-java/worker/src/main/java/org/apache/beam/runners/dataflow/worker/StreamingStepMetricsContainer.java:
##########
@@ -377,11 +385,11 @@ private void deleteStaleCounters(
    */
   @VisibleForTesting
   Iterable<PerStepNamespaceMetrics> extractPerWorkerMetricUpdates() {
-    ConcurrentHashMap<MetricName, Long> counters = new 
ConcurrentHashMap<MetricName, Long>();
-    ConcurrentHashMap<MetricName, Long> per_worker_gauges =
-        new ConcurrentHashMap<MetricName, Long>();
-    ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot> histograms =
-        new ConcurrentHashMap<MetricName, LockFreeHistogram.Snapshot>();
+    ImmutableMap.Builder<MetricName, Long> counters = new 
ImmutableMap.Builder<MetricName, Long>();
+    ImmutableMap.Builder<MetricName, Long> per_worker_gauges =

Review Comment:
   how about perWorkerGaugeUpdates and similar for histogram map?
   using _ isn't java naming convention and updates clarifies it is based upon 
counter snapshot



##########
sdks/java/core/src/main/java/org/apache/beam/sdk/metrics/DelegatingHistogram.java:
##########
@@ -69,11 +50,8 @@ private Optional<Histogram> getHistogram() {
     if (container == null) {
       return Optional.empty();
     }
-    if (perWorkerHistogram) {
-      return Optional.of(container.getPerWorkerHistogram(name, bucketType));
-    } else {
-      return Optional.of(container.getHistogram(name, bucketType));
-    }
+    // LOG.info("Xxx get histogram");

Review Comment:
   rm



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaSinkMetrics.java:
##########
@@ -72,9 +72,10 @@ public static Histogram createRPCLatencyHistogram(RpcMethod 
method, String topic
     nameBuilder.addLabel(TOPIC_LABEL, topic);
 
     MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
-    HistogramData.BucketType buckets = HistogramData.ExponentialBuckets.of(1, 
17);
+    
nameBuilder.addMetricLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, 
"true");

Review Comment:
   before building



##########
sdks/java/io/google-cloud-platform/src/main/java/org/apache/beam/sdk/io/gcp/bigquery/BigQuerySinkMetrics.java:
##########
@@ -114,13 +115,14 @@ static Histogram createRPCLatencyHistogram(RpcMethod 
method) {
         LabeledMetricNameUtils.MetricNameBuilder.baseNameBuilder(RPC_LATENCY);
     nameBuilder.addLabel(RPC_METHOD, method.toString());
     MetricName metricName = nameBuilder.build(METRICS_NAMESPACE);
+    
nameBuilder.addMetricLabel(MonitoringInfoConstants.Labels.PER_WORKER_METRIC, 
"true");

Review Comment:
   needs to be before building



##########
sdks/java/io/kafka/src/test/java/org/apache/beam/sdk/io/kafka/KafkaMetricsTest.java:
##########
@@ -73,11 +73,10 @@ public TestMetricsContainer() {
     }
 
     @Override
-    public TestHistogramCell getPerWorkerHistogram(
+    public TestHistogramCell getHistogram(
         MetricName metricName, HistogramData.BucketType bucketType) {
-      perWorkerHistograms.computeIfAbsent(
-          KV.of(metricName, bucketType), kv -> new TestHistogramCell(kv));
-      return perWorkerHistograms.get(KV.of(metricName, bucketType));
+      histograms.computeIfAbsent(KV.of(metricName, bucketType), kv -> new 
TestHistogramCell(kv));

Review Comment:
   verify the per-worker field is set? To catch error of adding to builder 
after built



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to