This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6836fa259e0 KAFKA-17320 Move SensorAccess to server-common module 
(#16864)
6836fa259e0 is described below

commit 6836fa259e0c50cc6a8d5099b3fbbacda9c2aadc
Author: Dmitry Werner <[email protected]>
AuthorDate: Sun Sep 1 20:41:28 2024 +0500

    KAFKA-17320 Move SensorAccess to server-common module (#16864)
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../kafka/log/remote/quota/RLMQuotaManager.java    | 10 +--
 .../kafka/log/remote/quota/RLMQuotaMetrics.java    |  6 +-
 .../scala/kafka/server/ClientQuotaManager.scala    |  7 +-
 .../kafka/server/ReplicationQuotaManager.scala     |  2 +-
 .../src/main/scala/kafka/server/SensorAccess.scala | 71 ------------------
 .../apache/kafka/server/quota/SensorAccess.java    | 83 ++++++++++++++++++++++
 6 files changed, 91 insertions(+), 88 deletions(-)

diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java 
b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
index eb677c7fee4..1d6b5baaa15 100644
--- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
+++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaManager.java
@@ -16,8 +16,6 @@
  */
 package kafka.log.remote.quota;
 
-import kafka.server.SensorAccess;
-
 import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.metrics.KafkaMetric;
 import org.apache.kafka.common.metrics.MetricConfig;
@@ -29,6 +27,7 @@ import org.apache.kafka.common.metrics.stats.SimpleRate;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.quota.QuotaType;
 import org.apache.kafka.server.quota.QuotaUtils;
+import org.apache.kafka.server.quota.SensorAccess;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,8 +37,6 @@ import java.util.Map;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import scala.runtime.BoxedUnit;
-
 public class RLMQuotaManager {
     private static final Logger LOGGER = 
LoggerFactory.getLogger(RLMQuotaManager.class);
 
@@ -112,10 +109,7 @@ public class RLMQuotaManager {
         return sensorAccess.getOrCreate(
             quotaType.toString(),
             RLMQuotaManagerConfig.INACTIVE_SENSOR_EXPIRATION_TIME_SECONDS,
-            sensor -> {
-                sensor.add(metricName(), new SimpleRate(), 
getQuotaMetricConfig(quota));
-                return BoxedUnit.UNIT;
-            }
+            sensor -> sensor.add(metricName(), new SimpleRate(), 
getQuotaMetricConfig(quota))
         );
     }
 }
\ No newline at end of file
diff --git a/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java 
b/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java
index 96dd6b1b7e6..d5b7ac3268c 100644
--- a/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java
+++ b/core/src/main/java/kafka/log/remote/quota/RLMQuotaMetrics.java
@@ -16,17 +16,14 @@
  */
 package kafka.log.remote.quota;
 
-import kafka.server.SensorAccess;
-
 import org.apache.kafka.common.metrics.Metrics;
 import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.metrics.stats.Avg;
 import org.apache.kafka.common.metrics.stats.Max;
+import org.apache.kafka.server.quota.SensorAccess;
 
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
-import scala.runtime.BoxedUnit;
-
 public class RLMQuotaMetrics {
 
     private final Sensor sensor;
@@ -37,7 +34,6 @@ public class RLMQuotaMetrics {
         this.sensor = sensorAccess.getOrCreate(name, expirationTime, s -> {
             s.add(metrics.metricName(name + "-avg", group, 
String.format(descriptionFormat, "average")), new Avg());
             s.add(metrics.metricName(name + "-max", group, 
String.format(descriptionFormat, "maximum")), new Max());
-            return BoxedUnit.UNIT;
         });
     }
 
diff --git a/core/src/main/scala/kafka/server/ClientQuotaManager.scala 
b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
index 144b9d2cfce..82c0532161b 100644
--- a/core/src/main/scala/kafka/server/ClientQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ClientQuotaManager.scala
@@ -19,6 +19,7 @@ package kafka.server
 import java.{lang, util}
 import java.util.concurrent.{ConcurrentHashMap, DelayQueue, TimeUnit}
 import java.util.concurrent.locks.ReentrantReadWriteLock
+import java.util.function.Consumer
 import kafka.network.RequestChannel
 import kafka.server.ClientQuotaManager._
 import kafka.utils.Logging
@@ -29,7 +30,7 @@ import org.apache.kafka.common.metrics.stats.{Avg, 
CumulativeSum, Rate}
 import org.apache.kafka.common.security.auth.KafkaPrincipal
 import org.apache.kafka.common.utils.{Sanitizer, Time}
 import org.apache.kafka.server.config.{ClientQuotaManagerConfig, 
ZooKeeperInternals}
-import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, 
ClientQuotaType, QuotaType, QuotaUtils, ThrottleCallback, ThrottledChannel}
+import org.apache.kafka.server.quota.{ClientQuotaCallback, ClientQuotaEntity, 
ClientQuotaType, QuotaType, QuotaUtils, SensorAccess, ThrottleCallback, 
ThrottledChannel}
 import org.apache.kafka.server.util.ShutdownableThread
 import org.apache.kafka.network.Session
 
@@ -350,7 +351,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       sensorAccessor.getOrCreate(
         getQuotaSensorName(metricTags),
         ClientQuotaManager.InactiveSensorExpirationTimeSeconds,
-        registerQuotaMetrics(metricTags)
+        sensor => registerQuotaMetrics(metricTags)(sensor)
       ),
       sensorAccessor.getOrCreate(
         getThrottleTimeSensorName(metricTags),
@@ -391,7 +392,7 @@ class ClientQuotaManager(private val config: 
ClientQuotaManagerConfig,
       .quota(new Quota(quotaLimit, true))
   }
 
-  protected def getOrCreateSensor(sensorName: String, expirationTimeSeconds: 
Long, registerMetrics: Sensor => Unit): Sensor = {
+  protected def getOrCreateSensor(sensorName: String, expirationTimeSeconds: 
Long, registerMetrics: Consumer[Sensor]): Sensor = {
     sensorAccessor.getOrCreate(
       sensorName,
       expirationTimeSeconds,
diff --git a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala 
b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
index ce50b99ff54..6ae4b5aa284 100644
--- a/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicationQuotaManager.scala
@@ -27,7 +27,7 @@ import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.metrics.stats.SimpleRate
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.server.config.ReplicationQuotaManagerConfig
-import org.apache.kafka.server.quota.QuotaType
+import org.apache.kafka.server.quota.{QuotaType, SensorAccess}
 
 trait ReplicaQuota {
   def record(value: Long): Unit
diff --git a/core/src/main/scala/kafka/server/SensorAccess.scala 
b/core/src/main/scala/kafka/server/SensorAccess.scala
deleted file mode 100644
index 3a063f2aba7..00000000000
--- a/core/src/main/scala/kafka/server/SensorAccess.scala
+++ /dev/null
@@ -1,71 +0,0 @@
-/**
-  * Licensed to the Apache Software Foundation (ASF) under one or more
-  * contributor license agreements.  See the NOTICE file distributed with
-  * this work for additional information regarding copyright ownership.
-  * The ASF licenses this file to You under the Apache License, Version 2.0
-  * (the "License"); you may not use this file except in compliance with
-  * the License.  You may obtain a copy of the License at
-  *
-  * http://www.apache.org/licenses/LICENSE-2.0
-  *
-  * Unless required by applicable law or agreed to in writing, software
-  * distributed under the License is distributed on an "AS IS" BASIS,
-  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  * See the License for the specific language governing permissions and
-  * limitations under the License.
-  */
-package kafka.server
-
-import java.util.concurrent.locks.ReadWriteLock
-
-import org.apache.kafka.common.metrics.{Metrics, Sensor}
-
-/**
-  * Class which centralises the logic for creating/accessing sensors.
-  * The quota can be updated by wrapping it in the passed MetricConfig
-  *
-  * The later arguments are passed as methods as they are only called when the 
sensor is instantiated.
-  */
-class SensorAccess(lock: ReadWriteLock, metrics: Metrics) {
-
-  def getOrCreate(sensorName: String, expirationTime: Long, registerMetrics: 
Sensor => Unit): Sensor = {
-    var sensor: Sensor = null
-
-    /* Acquire the read lock to fetch the sensor. It is safe to call getSensor 
from multiple threads.
-     * The read lock allows a thread to create a sensor in isolation. The 
thread creating the sensor
-     * will acquire the write lock and prevent the sensors from being read 
while they are being created.
-     * It should be sufficient to simply check if the sensor is null without 
acquiring a read lock but the
-     * sensor being present doesn't mean that it is fully initialized i.e. all 
the Metrics may not have been added.
-     * This read lock waits until the writer thread has released its lock i.e. 
fully initialized the sensor
-     * at which point it is safe to read
-     */
-    lock.readLock().lock()
-    try sensor = metrics.getSensor(sensorName)
-    finally lock.readLock().unlock()
-
-    /* If the sensor is null, try to create it else return the existing sensor
-     * The sensor can be null, hence the null checks
-     */
-    if (sensor == null) {
-      /* Acquire a write lock because the sensor may not have been created and 
we only want one thread to create it.
-       * Note that multiple threads may acquire the write lock if they all see 
a null sensor initially
-       * In this case, the writer checks the sensor after acquiring the lock 
again.
-       * This is safe from Double Checked Locking because the references are 
read
-       * after acquiring read locks and hence they cannot see a partially 
published reference
-       */
-      lock.writeLock().lock()
-      try {
-        // Set the var for both sensors in case another thread has won the 
race to acquire the write lock. This will
-        // ensure that we initialise `ClientSensors` with non-null parameters.
-        sensor = metrics.getSensor(sensorName)
-        if (sensor == null) {
-          sensor = metrics.sensor(sensorName, null, expirationTime)
-          registerMetrics(sensor)
-        }
-      } finally {
-        lock.writeLock().unlock()
-      }
-    }
-    sensor
-  }
-}
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/quota/SensorAccess.java 
b/server-common/src/main/java/org/apache/kafka/server/quota/SensorAccess.java
new file mode 100644
index 00000000000..ed68048dc3d
--- /dev/null
+++ 
b/server-common/src/main/java/org/apache/kafka/server/quota/SensorAccess.java
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.kafka.server.quota;
+
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+
+import java.util.concurrent.locks.ReadWriteLock;
+import java.util.function.Consumer;
+
+/**
+ * Class which centralises the logic for creating/accessing sensors.
+ * The quota can be updated by wrapping it in the passed MetricConfig.
+ * The later arguments are passed as methods as they are only called when the 
sensor is instantiated.
+ */
+public class SensorAccess {
+    private final ReadWriteLock lock;
+    private final Metrics metrics;
+
+    public SensorAccess(ReadWriteLock lock, Metrics metrics) {
+        this.lock = lock;
+        this.metrics = metrics;
+    }
+
+    public Sensor getOrCreate(String sensorName, long expirationTime, 
Consumer<Sensor> registerMetrics) {
+        Sensor sensor;
+
+        /* Acquire the read lock to fetch the sensor. It is safe to call 
getSensor from multiple threads.
+         * The read lock allows a thread to create a sensor in isolation. The 
thread creating the sensor
+         * will acquire the write lock and prevent the sensors from being read 
while they are being created.
+         * It should be sufficient to simply check if the sensor is null 
without acquiring a read lock but the
+         * sensor being present doesn't mean that it is fully initialized i.e. 
all the Metrics may not have been added.
+         * This read lock waits until the writer thread has released its lock 
i.e. fully initialized the sensor
+         * at which point it is safe to read
+         */
+        lock.readLock().lock();
+        try {
+            sensor = metrics.getSensor(sensorName);
+        } finally {
+            lock.readLock().unlock();
+        }
+
+        /* If the sensor is null, try to create it else return the existing 
sensor
+         * The sensor can be null, hence the null checks
+         */
+        if (sensor == null) {
+            /* Acquire a write lock because the sensor may not have been 
created and we only want one thread to create it.
+             * Note that multiple threads may acquire the write lock if they 
all see a null sensor initially
+             * In this case, the writer checks the sensor after acquiring the 
lock again.
+             * This is safe from Double-Checked Locking because the references 
are read
+             * after acquiring read locks and hence they cannot see a 
partially published reference
+             */
+            lock.writeLock().lock();
+            try {
+                // Set the var for both sensors in case another thread has won 
the race to acquire the write lock. This will
+                // ensure that we initialise `ClientSensors` with non-null 
parameters.
+                sensor = metrics.getSensor(sensorName);
+                if (sensor == null) {
+                    sensor = metrics.sensor(sensorName, null, expirationTime);
+                    registerMetrics.accept(sensor);
+                }
+            } finally {
+                lock.writeLock().unlock();
+            }
+        }
+
+        return sensor;
+    }
+}

Reply via email to