sjvanrossum commented on code in PR #33408:
URL: https://github.com/apache/beam/pull/33408#discussion_r1920233993


##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -67,15 +74,22 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaMetricsImpl.class);
 
-    static HashMap<String, Histogram> latencyHistograms = new HashMap<String, 
Histogram>();
+    static ConcurrentHashMap<String, Histogram> latencyHistograms =
+        new ConcurrentHashMap<String, Histogram>();

Review Comment:
   ```suggestion
       private static final Map<String, Histogram> LATENCY_HISTOGRAMS =
           new ConcurrentHashMap<>();
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -86,13 +100,26 @@ public void updateSuccessfulRpcMetrics(String topic, 
Duration elapsedTime) {
         if (latencies == null) {
           latencies = new ConcurrentLinkedQueue<Duration>();
           latencies.add(elapsedTime);
-          perTopicRpcLatencies().put(topic, latencies);
+          perTopicRpcLatencies().putIfAbsent(topic, latencies);

Review Comment:
   If I'm not mistaken, L99-106 can be replaced with 
`perTopicRpcLatencies().computeIfAbsent(topic, 
ConcurrentLinkedQueue::new).add(elapsedTime);` which reduces locking 
operations. It also changes what happens in L100-104, which currently drops 
`elapsedTime` if the call at 103 observes that a write for that key had 
happened in between.



##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java:
##########
@@ -345,4 +372,76 @@ public void testConvert_convertCountersAndHistograms() {
         parsedMetricNames,
         IsMapContaining.hasEntry(histogramMetricName, 
parsedHistogramMetricName));
   }
+
+  @Test
+  public void testConvert_successfulyConvertGauges() {

Review Comment:
   ```suggestion
     public void testConvert_successfullyConvertGauges() {
   ```



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -67,15 +74,22 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaMetricsImpl.class);
 
-    static HashMap<String, Histogram> latencyHistograms = new HashMap<String, 
Histogram>();
+    static ConcurrentHashMap<String, Histogram> latencyHistograms =
+        new ConcurrentHashMap<String, Histogram>();
+
+    abstract ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>> 
perTopicRpcLatencies();
 
-    abstract HashMap<String, ConcurrentLinkedQueue<Duration>> 
perTopicRpcLatencies();
+    static ConcurrentHashMap<String, Gauge> backlogGauges = new 
ConcurrentHashMap<String, Gauge>();

Review Comment:
   Hmmm... I searched for `backlogGauges`, but I'm not finding any uses of this 
it.
   Is this still needed?



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -743,6 +747,16 @@ private void reportBacklog() {
     backlogElementsOfSplit.set(splitBacklogMessages);
   }
 
+  private void reportBacklogMetrics() {

Review Comment:
   SGTM



##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -67,15 +74,22 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
 
     private static final Logger LOG = 
LoggerFactory.getLogger(KafkaMetricsImpl.class);
 
-    static HashMap<String, Histogram> latencyHistograms = new HashMap<String, 
Histogram>();
+    static ConcurrentHashMap<String, Histogram> latencyHistograms =
+        new ConcurrentHashMap<String, Histogram>();
+
+    abstract ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>> 
perTopicRpcLatencies();
 
-    abstract HashMap<String, ConcurrentLinkedQueue<Duration>> 
perTopicRpcLatencies();
+    static ConcurrentHashMap<String, Gauge> backlogGauges = new 
ConcurrentHashMap<String, Gauge>();

Review Comment:
   ```suggestion
       private static final Map<String, Gauge> BACKLOG_GAUGES = new 
ConcurrentHashMap<>();
   ```



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to