This is an automated email from the ASF dual-hosted git repository.
chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 6836fa259e0 KAFKA-17320 Move SensorAccess to server-common module
(#16864)
6836fa259e0 is described below
commit 6836fa259e0c50cc6a8d5099b3fbbacda9c2aadc
Author: Dmitry Werner <[email protected]>
AuthorDate: Sun Sep 1 20:41:28 2024 +0500
KAFKA-17320 Move SensorAccess to server-common module (#16864)
Reviewers: Chia-Ping Tsai <[email protected]>
---
.../kafka/log/remote/quota/RLMQuotaManager.java | 10 +--
.../kafka/log/remote/quota/RLMQuotaMetrics.java | 6 +-
.../scala/kafka/server/ClientQuotaManager.scala | 7 +-
.../kafka/server/ReplicationQuotaManager.scala | 2 +-
.../src/main/scala/kafka/server/SensorAccess.scala | 71 ------------------
.../apache/kafka/server/quota/SensorAccess.java | 83 ++++++++++++++++++++++
6 files changed, 91 insertions(+), 88 deletions(-)
diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
index eb677c7fee4..1d6b5baaa15 100644
--- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
+++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
@@ -16,8 +16,6 @@
*/
package kafka.log.remote.quota;
-import kafka.server.SensorAccess;
-
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.KafkaMetric;
import org.apache.kafka.common.metrics.MetricConfig;
@@ -29,6 +27,7 @@ import org.apache.kafka.common.metrics.stats.SimpleRate;
import org.apache.kafka.common.utils.Time;
import org.apache.kafka.server.quota.QuotaType;
import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,8 +37,6 @@ import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import scala.runtime.BoxedUnit;
-
public class RLMQuotaManager {
private static final Logger LOGGER =
LoggerFactory.getLogger(RLMQuotaManager.class);
@@ -112,10 +109,7 @@ public class RLMQuotaManager {
return sensorAccess.getOrCreate(
quotaType.toString(),
RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS,
- sensor -> {
- sensor.add(metricName(), new SimpleRate(),
getQuotaMetricConfig(quota));
- return BoxedUnit.UNIT;
- }
+ sensor -> sensor.add(metricName(), new SimpleRate(),
getQuotaMetricConfig(quota))
);
}
}
\ No newline at end of file
diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java
b/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java
index 96dd6b1b7e6..d5b7ac3268c 100644
--- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java
+++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java
@@ -16,17 +16,14 @@
*/
package kafka.log.remote.quota;
-import kafka.server.SensorAccess;
-
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.metrics.stats.Avg;
import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.server.quota.SensorAccess;
import java.util.concurrent.locks.ReentrantReadWriteLock;
-import scala.runtime.BoxedUnit;
-
public class RLMQuotaMetrics {
private final Sensor sensor;
@@ -37,7 +34,6 @@ public class RLMQuotaMetrics {
this.sensor = sensorAccess.getOrCreate(name, expirationTime, s -> {
s.add(metrics.metricName(name + "-avg", group,
String.format(descriptionFormat, "average")), new Avg());
s.add(metrics.metricName(name + "-max", group,
String.format(descriptionFormat, "maximum")), new Max());
- return BoxedUnit.UNIT;
});
}
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 144b9d2cfce..82c0532161b 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
import java.{lang, util}
import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.function.Consumer
import kafka.network.RequestChannel
import kafka.server.ClientQuotaManager._
import kafka.utils.Logging
@@ -29,7 +30,7 @@ import org.apache.kafka.common.metrics.stats.{Avg,
CumulativeSum, Rate}
import org.apache.kafka.common.security.auth.KafkaPrincipal
import org.apache.kafka.common.utils.{Sanitizer, Time}
import org.apache.kafka.server.config.{ClientQuotaManagerConfig,
ZooKeeperInternals}
-import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity,
ClientQuotaType, QuotaType, QuotaUtils, ThrottleCallback, ThrottledChannel}
+import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity,
ClientQuotaType, QuotaType, QuotaUtils, SensorAccess, ThrottleCallback,
ThrottledChannel}
import org.apache.kafka.server.util.ShutdownableThread
import org.apache.kafka.network.Session
@@ -350,7 +351,7 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
sensorAccessor.getOrCreate(
getQuotaSensorName(metricTags),
ClientQuotaManager.InactiveSensorExpirationTimeSeconds,
- registerQuotaMetrics(metricTags)
+ sensor => registerQuotaMetrics(metricTags)(sensor)
),
sensorAccessor.getOrCreate(
getThrottleTimeSensorName(metricTags),
@@ -391,7 +392,7 @@ class ClientQuotaManager(private val config:
ClientQuotaManagerConfig,
.quota(new Quota(quotaLimit, true))
}
- protected def getOrCreateSensor(sensorName: String, expirationTimeSeconds:
Long, registerMetrics: Sensor => Unit): Sensor = {
+ protected def getOrCreateSensor(sensorName: String, expirationTimeSeconds:
Long, registerMetrics: Consumer[Sensor]): Sensor = {
sensorAccessor.getOrCreate(
sensorName,
expirationTimeSeconds,
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index ce50b99ff54..6ae4b5aa284 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
import org.apache.kafka.common.metrics.stats.SimpleRate
import org.apache.kafka.common.utils.Time
import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
-import org.apache.kafka.server.quota.QuotaType
+import org.apache.kafka.server.quota.{QuotaType, SensorAccess}
trait ReplicaQuota {
def record(value: Long): Unit
diff --git a/core/src/main/scala/kafka/server/SensorAccess.scala
b/core/src/main/scala/kafka/server/SensorAccess.scala
deleted file mode 100644
index 3a063f2aba7..00000000000
--- a/core/src/main/scala/kafka/server/SensorAccess.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka.server
-
-import java.util.concurrent.locks.ReadWriteLock
-
-import org.apache.kafka.common.metrics.{Metrics, Sensor}
-
-/**
- * Class which centralises the logic for creating/accessing sensors.
- * The quota can be updated by wrapping it in the passed MetricConfig
- *
- * The later arguments are passed as methods as they are only called when the
sensor is instantiated.
- */
-class SensorAccess(lock: ReadWriteLock, metrics: Metrics) {
-
- def getOrCreate(sensorName: String, expirationTime: Long, registerMetrics:
Sensor => Unit): Sensor = {
- var sensor: Sensor = null
-
- /* Acquire the read lock to fetch the sensor. It is safe to call getSensor
from multiple threads.
- * The read lock allows a thread to create a sensor in isolation. The
thread creating the sensor
- * will acquire the write lock and prevent the sensors from being read
while they are being created.
- * It should be sufficient to simply check if the sensor is null without
acquiring a read lock but the
- * sensor being present doesn't mean that it is fully initialized i.e. all
the Metrics may not have been added.
- * This read lock waits until the writer thread has released its lock i.e.
fully initialized the sensor
- * at which point it is safe to read
- */
- lock.readLock().lock()
- try sensor = metrics.getSensor(sensorName)
- finally lock.readLock().unlock()
-
- /* If the sensor is null, try to create it else return the existing sensor
- * The sensor can be null, hence the null checks
- */
- if (sensor == null) {
- /* Acquire a write lock because the sensor may not have been created and
we only want one thread to create it.
- * Note that multiple threads may acquire the write lock if they all see
a null sensor initially
- * In this case, the writer checks the sensor after acquiring the lock
again.
- * This is safe from Double Checked Locking because the references are
read
- * after acquiring read locks and hence they cannot see a partially
published reference
- */
- lock.writeLock().lock()
- try {
- // Set the var for both sensors in case another thread has won the
race to acquire the write lock. This will
- // ensure that we initialise `ClientSensors` with non-null parameters.
- sensor = metrics.getSensor(sensorName)
- if (sensor == null) {
- sensor = metrics.sensor(sensorName, null, expirationTime)
- registerMetrics(sensor)
- }
- } finally {
- lock.writeLock().unlock()
- }
- }
- sensor
- }
-}
diff --git
a/server-common/src/main/java/org/apache/kafka/server/quota/SensorAccess.java
b/server-common/src/main/java/org/apache/kafka/server/quota/SensorAccess.java
new file mode 100644
index 00000000000..ed68048dc3d
--- /dev/null
+++
b/server-common/src/main/java/org/apache/kafka/server/quota/SensorAccess.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Consumer;
+
+/**
+ * Class which centralises the logic for creating/accessing sensors.
+ * The quota can be updated by wrapping it in the passed MetricConfig.
+ * The later arguments are passed as methods as they are only called when the
sensor is instantiated.
+ */
+public class SensorAccess {
+ private final ReadWriteLock lock;
+ private final Metrics metrics;
+
+ public SensorAccess(ReadWriteLock lock, Metrics metrics) {
+ this.lock = lock;
+ this.metrics = metrics;
+ }
+
+ public Sensor getOrCreate(String sensorName, long expirationTime,
Consumer<Sensor> registerMetrics) {
+ Sensor sensor;
+
+ /* Acquire the read lock to fetch the sensor. It is safe to call
getSensor from multiple threads.
+ * The read lock allows a thread to create a sensor in isolation. The
thread creating the sensor
+ * will acquire the write lock and prevent the sensors from being read
while they are being created.
+ * It should be sufficient to simply check if the sensor is null
without acquiring a read lock but the
+ * sensor being present doesn't mean that it is fully initialized i.e.
all the Metrics may not have been added.
+ * This read lock waits until the writer thread has released its lock
i.e. fully initialized the sensor
+ * at which point it is safe to read
+ */
+ lock.readLock().lock();
+ try {
+ sensor = metrics.getSensor(sensorName);
+ } finally {
+ lock.readLock().unlock();
+ }
+
+ /* If the sensor is null, try to create it else return the existing
sensor
+ * The sensor can be null, hence the null checks
+ */
+ if (sensor == null) {
+ /* Acquire a write lock because the sensor may not have been
created and we only want one thread to create it.
+ * Note that multiple threads may acquire the write lock if they
all see a null sensor initially
+ * In this case, the writer checks the sensor after acquiring the
lock again.
+ * This is safe from Double-Checked Locking because the references
are read
+ * after acquiring read locks and hence they cannot see a
partially published reference
+ */
+ lock.writeLock().lock();
+ try {
+ // Set the var for both sensors in case another thread has won
the race to acquire the write lock. This will
+ // ensure that we initialise `ClientSensors` with non-null
parameters.
+ sensor = metrics.getSensor(sensorName);
+ if (sensor == null) {
+ sensor = metrics.sensor(sensorName, null, expirationTime);
+ registerMetrics.accept(sensor);
+ }
+ } finally {
+ lock.writeLock().unlock();
+ }
+ }
+
+ return sensor;
+ }
+}