This is an automated email from the ASF dual-hosted git repository.

frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new aaed164be63 MINOR: Refactor brokerContactTimesMs and 
brokerRegistrationStates to use Long and Integer (#19888)
aaed164be63 is described below

commit aaed164be63f48d7e6c1a0614b45a670156d8701
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Fri Jun 6 16:03:23 2025 +0800

    MINOR: Refactor brokerContactTimesMs and brokerRegistrationStates to use 
Long and Integer (#19888)
    
    This PR simplifies two ConcurrentHashMap fields by removing their Atomic
    wrappers:
    
    - Change `brokerContactTimesMs` from `ConcurrentHashMap<Integer,
    AtomicLong>` to `ConcurrentHashMap<Integer, Long>`.
    
    - Change `brokerRegistrationStates` from `ConcurrentHashMap<Integer,
    AtomicInteger>` to `ConcurrentHashMap<Integer, Integer>`.
    
    This removes mutable holders without affecting thread safety (see
    discussion in #19828).
    
    Reviewers: Chia-Ping Tsai <[email protected]>, TengYao Chi
    <[email protected]>, Kevin Wu <[email protected]>, Ken Huang
    <[email protected]>
---
 .../controller/metrics/ControllerMetadataMetrics.java    | 12 ++++--------
 .../controller/metrics/QuorumControllerMetrics.java      | 16 ++++++----------
 2 files changed, 10 insertions(+), 18 deletions(-)

diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
index 072f83fad3c..d85f8b86d4a 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
@@ -75,7 +75,7 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
     private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
     private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
     private final AtomicInteger controllerShutdownBrokerCount = new 
AtomicInteger(0);
-    private final Map<Integer, AtomicInteger> brokerRegistrationStates = new 
ConcurrentHashMap<>();
+    private final Map<Integer, Integer> brokerRegistrationStates = new 
ConcurrentHashMap<>();
     private final AtomicInteger globalTopicCount = new AtomicInteger(0);
     private final AtomicInteger globalPartitionCount = new AtomicInteger(0);
     private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
@@ -227,18 +227,14 @@ public final class ControllerMetadataMetrics implements 
AutoCloseable {
             return;
         }
         BrokerRegistrationState brokerState = 
getBrokerRegistrationState(brokerRegistration);
-        if (brokerRegistrationStates.containsKey(brokerId)) {
-            brokerRegistrationStates.get(brokerId).set(brokerState.state());
-        } else {
-            brokerRegistrationStates.put(brokerId, new 
AtomicInteger(brokerState.state()));
-        }
+        brokerRegistrationStates.put(brokerId, brokerState.state());
     }
 
     public int brokerRegistrationState(int brokerId) {
         return this.brokerRegistrationStates.getOrDefault(
             brokerId,
-            new AtomicInteger(BrokerRegistrationState.UNREGISTERED.state())
-        ).get();
+            BrokerRegistrationState.UNREGISTERED.state()
+        );
     }
     
     public void setGlobalTopicCount(int topicCount) {
diff --git 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 9a9f8f70bb5..310a2c1dd61 100644
--- 
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++ 
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -80,7 +80,7 @@ public class QuorumControllerMetrics implements AutoCloseable 
{
     private final AtomicLong operationsStarted = new AtomicLong(0);
     private final AtomicLong operationsTimedOut = new AtomicLong(0);
     private final AtomicLong newActiveControllers = new AtomicLong(0);
-    private final Map<Integer, AtomicLong> brokerContactTimesMs = new 
ConcurrentHashMap<>();
+    private final Map<Integer, Long> brokerContactTimesMs = new 
ConcurrentHashMap<>();
     private final int sessionTimeoutMs;
 
     private Consumer<Long> newHistogram(MetricName name, boolean biased) {
@@ -160,7 +160,7 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
     }
 
     public void addTimeSinceLastHeartbeatMetric(int brokerId) {
-        brokerContactTimesMs.put(brokerId, new 
AtomicLong(time.milliseconds()));
+        brokerContactTimesMs.put(brokerId, time.milliseconds());
         registry.ifPresent(r -> r.newGauge(
             getBrokerIdTagMetricName(
                 "KafkaController",
@@ -267,19 +267,15 @@ public class QuorumControllerMetrics implements 
AutoCloseable {
     }
 
     public void updateBrokerContactTime(int brokerId) {
-        AtomicLong contactTime = 
brokerContactTimesMs.computeIfAbsent(brokerId, k -> new AtomicLong());
-        contactTime.set(time.milliseconds());
+        brokerContactTimesMs.put(brokerId, time.milliseconds());
     }
 
     public int timeSinceLastHeartbeatMs(int brokerId) {
-        if (!brokerContactTimesMs.containsKey(brokerId)) {
+        Long lastTime = brokerContactTimesMs.get(brokerId);
+        if (lastTime == null) {
             return sessionTimeoutMs;
-        } else {
-            return Math.min(
-                (int) (time.milliseconds() - 
brokerContactTimesMs.get(brokerId).get()),
-                sessionTimeoutMs
-            );
         }
+        return Math.min((int) (time.milliseconds() - lastTime), 
sessionTimeoutMs);
     }
 
     @Override

Reply via email to