This is an automated email from the ASF dual-hosted git repository.
frankvicky pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new aaed164be63 MINOR: Refactor brokerContactTimesMs and
brokerRegistrationStates to use Long and Integer (#19888)
aaed164be63 is described below
commit aaed164be63f48d7e6c1a0614b45a670156d8701
Author: Hong-Yi Chen <[email protected]>
AuthorDate: Fri Jun 6 16:03:23 2025 +0800
MINOR: Refactor brokerContactTimesMs and brokerRegistrationStates to use
Long and Integer (#19888)
This PR simplifies two ConcurrentHashMap fields by removing their Atomic
wrappers:
- Change `brokerContactTimesMs` from `ConcurrentHashMap<Integer,
AtomicLong>` to `ConcurrentHashMap<Integer, Long>`.
- Change `brokerRegistrationStates` from `ConcurrentHashMap<Integer,
AtomicInteger>` to `ConcurrentHashMap<Integer, Integer>`.
This removes mutable holders without affecting thread safety (see
discussion in #19828).
Reviewers: Chia-Ping Tsai <[email protected]>, TengYao Chi
<[email protected]>, Kevin Wu <[email protected]>, Ken Huang
<[email protected]>
---
.../controller/metrics/ControllerMetadataMetrics.java | 12 ++++--------
.../controller/metrics/QuorumControllerMetrics.java | 16 ++++++----------
2 files changed, 10 insertions(+), 18 deletions(-)
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
index 072f83fad3c..d85f8b86d4a 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/metrics/ControllerMetadataMetrics.java
@@ -75,7 +75,7 @@ public final class ControllerMetadataMetrics implements
AutoCloseable {
private final AtomicInteger fencedBrokerCount = new AtomicInteger(0);
private final AtomicInteger activeBrokerCount = new AtomicInteger(0);
private final AtomicInteger controllerShutdownBrokerCount = new
AtomicInteger(0);
- private final Map<Integer, AtomicInteger> brokerRegistrationStates = new
ConcurrentHashMap<>();
+ private final Map<Integer, Integer> brokerRegistrationStates = new
ConcurrentHashMap<>();
private final AtomicInteger globalTopicCount = new AtomicInteger(0);
private final AtomicInteger globalPartitionCount = new AtomicInteger(0);
private final AtomicInteger offlinePartitionCount = new AtomicInteger(0);
@@ -227,18 +227,14 @@ public final class ControllerMetadataMetrics implements
AutoCloseable {
return;
}
BrokerRegistrationState brokerState =
getBrokerRegistrationState(brokerRegistration);
- if (brokerRegistrationStates.containsKey(brokerId)) {
- brokerRegistrationStates.get(brokerId).set(brokerState.state());
- } else {
- brokerRegistrationStates.put(brokerId, new
AtomicInteger(brokerState.state()));
- }
+ brokerRegistrationStates.put(brokerId, brokerState.state());
}
public int brokerRegistrationState(int brokerId) {
return this.brokerRegistrationStates.getOrDefault(
brokerId,
- new AtomicInteger(BrokerRegistrationState.UNREGISTERED.state())
- ).get();
+ BrokerRegistrationState.UNREGISTERED.state()
+ );
}
public void setGlobalTopicCount(int topicCount) {
diff --git
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
index 9a9f8f70bb5..310a2c1dd61 100644
---
a/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
+++
b/metadata/src/main/java/org/apache/kafka/controller/metrics/QuorumControllerMetrics.java
@@ -80,7 +80,7 @@ public class QuorumControllerMetrics implements AutoCloseable
{
private final AtomicLong operationsStarted = new AtomicLong(0);
private final AtomicLong operationsTimedOut = new AtomicLong(0);
private final AtomicLong newActiveControllers = new AtomicLong(0);
- private final Map<Integer, AtomicLong> brokerContactTimesMs = new
ConcurrentHashMap<>();
+ private final Map<Integer, Long> brokerContactTimesMs = new
ConcurrentHashMap<>();
private final int sessionTimeoutMs;
private Consumer<Long> newHistogram(MetricName name, boolean biased) {
@@ -160,7 +160,7 @@ public class QuorumControllerMetrics implements
AutoCloseable {
}
public void addTimeSinceLastHeartbeatMetric(int brokerId) {
- brokerContactTimesMs.put(brokerId, new
AtomicLong(time.milliseconds()));
+ brokerContactTimesMs.put(brokerId, time.milliseconds());
registry.ifPresent(r -> r.newGauge(
getBrokerIdTagMetricName(
"KafkaController",
@@ -267,19 +267,15 @@ public class QuorumControllerMetrics implements
AutoCloseable {
}
public void updateBrokerContactTime(int brokerId) {
- AtomicLong contactTime =
brokerContactTimesMs.computeIfAbsent(brokerId, k -> new AtomicLong());
- contactTime.set(time.milliseconds());
+ brokerContactTimesMs.put(brokerId, time.milliseconds());
}
public int timeSinceLastHeartbeatMs(int brokerId) {
- if (!brokerContactTimesMs.containsKey(brokerId)) {
+ Long lastTime = brokerContactTimesMs.get(brokerId);
+ if (lastTime == null) {
return sessionTimeoutMs;
- } else {
- return Math.min(
- (int) (time.milliseconds() -
brokerContactTimesMs.get(brokerId).get()),
- sessionTimeoutMs
- );
}
+ return Math.min((int) (time.milliseconds() - lastTime),
sessionTimeoutMs);
}
@Override