sjvanrossum commented on code in PR #33408:
URL: https://github.com/apache/beam/pull/33408#discussion_r1920233993
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -67,15 +74,22 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaMetricsImpl.class);
- static HashMap<String, Histogram> latencyHistograms = new HashMap<String,
Histogram>();
+ static ConcurrentHashMap<String, Histogram> latencyHistograms =
+ new ConcurrentHashMap<String, Histogram>();
Review Comment:
```suggestion
private static final Map<String, Histogram> LATENCY_HISTOGRAMS =
new ConcurrentHashMap<>();
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -86,13 +100,26 @@ public void updateSuccessfulRpcMetrics(String topic,
Duration elapsedTime) {
if (latencies == null) {
latencies = new ConcurrentLinkedQueue<Duration>();
latencies.add(elapsedTime);
- perTopicRpcLatencies().put(topic, latencies);
+ perTopicRpcLatencies().putIfAbsent(topic, latencies);
Review Comment:
If I'm not mistaken, L99-106 can be replaced with
`perTopicRpcLatencies().computeIfAbsent(topic,
ConcurrentLinkedQueue::new).add(elapsedTime);` which reduces locking
operations. It also changes what happens in L100-104, which currently drops
`elapsedTime` if the call at 103 observes that a write for that key had
happened in between.
##########
runners/google-cloud-dataflow-java/worker/src/test/java/org/apache/beam/runners/dataflow/worker/MetricsToPerStepNamespaceMetricsConverterTest.java:
##########
@@ -345,4 +372,76 @@ public void testConvert_convertCountersAndHistograms() {
parsedMetricNames,
IsMapContaining.hasEntry(histogramMetricName,
parsedHistogramMetricName));
}
+
+ @Test
+ public void testConvert_successfulyConvertGauges() {
Review Comment:
```suggestion
public void testConvert_successfullyConvertGauges() {
```
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -67,15 +74,22 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaMetricsImpl.class);
- static HashMap<String, Histogram> latencyHistograms = new HashMap<String,
Histogram>();
+ static ConcurrentHashMap<String, Histogram> latencyHistograms =
+ new ConcurrentHashMap<String, Histogram>();
+
+ abstract ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>>
perTopicRpcLatencies();
- abstract HashMap<String, ConcurrentLinkedQueue<Duration>>
perTopicRpcLatencies();
+ static ConcurrentHashMap<String, Gauge> backlogGauges = new
ConcurrentHashMap<String, Gauge>();
Review Comment:
Hmmm... I searched for `backlogGauges`, but I'm not finding any uses of this
it.
Is this still needed?
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaUnboundedReader.java:
##########
@@ -743,6 +747,16 @@ private void reportBacklog() {
backlogElementsOfSplit.set(splitBacklogMessages);
}
+ private void reportBacklogMetrics() {
Review Comment:
SGTM
##########
sdks/java/io/kafka/src/main/java/org/apache/beam/sdk/io/kafka/KafkaMetrics.java:
##########
@@ -67,15 +74,22 @@ abstract class KafkaMetricsImpl implements KafkaMetrics {
private static final Logger LOG =
LoggerFactory.getLogger(KafkaMetricsImpl.class);
- static HashMap<String, Histogram> latencyHistograms = new HashMap<String,
Histogram>();
+ static ConcurrentHashMap<String, Histogram> latencyHistograms =
+ new ConcurrentHashMap<String, Histogram>();
+
+ abstract ConcurrentHashMap<String, ConcurrentLinkedQueue<Duration>>
perTopicRpcLatencies();
- abstract HashMap<String, ConcurrentLinkedQueue<Duration>>
perTopicRpcLatencies();
+ static ConcurrentHashMap<String, Gauge> backlogGauges = new
ConcurrentHashMap<String, Gauge>();
Review Comment:
```suggestion
private static final Map<String, Gauge> BACKLOG_GAUGES = new
ConcurrentHashMap<>();
```
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]