This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 55d65cb3bad MINOR: Cleanups in CoreUtils (#19175)
55d65cb3bad is described below

commit 55d65cb3badaa8ec89d0085a59474ed9455d5ba0
Author: Mickael Maison <[email protected]>
AuthorDate: Wed Mar 12 19:43:30 2025 +0100

    MINOR: Cleanups in CoreUtils (#19175)
    
    Delete unused methods in CoreUtils and switch to Utils.newInstance().
    
    Reviewers: Chia-Ping Tsai <[email protected]>
---
 .../org/apache/kafka/common/utils/UtilsTest.java   |  1 +
 .../scala/kafka/metrics/KafkaMetricsReporter.scala |  5 +-
 .../main/scala/kafka/server/ReplicaManager.scala   |  6 +--
 .../main/scala/kafka/tools/DumpLogSegments.scala   |  5 +-
 core/src/main/scala/kafka/utils/CoreUtils.scala    | 55 ++--------------------
 .../kafka/server/DynamicConfigChangeTest.scala     |  6 +--
 .../scala/unit/kafka/utils/CoreUtilsTest.scala     | 46 ------------------
 7 files changed, 15 insertions(+), 109 deletions(-)

diff --git a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java 
b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
index 16fc6af154b..4220e84b7cc 100755
--- a/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/utils/UtilsTest.java
@@ -202,6 +202,7 @@ public class UtilsTest {
         assertEquals(10, Utils.abs(10));
         assertEquals(0, Utils.abs(0));
         assertEquals(1, Utils.abs(-1));
+        assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE));
     }
 
     @Test
diff --git a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala 
b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
index 136bb88b289..eb6bae3ced6 100755
--- a/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
+++ b/core/src/main/scala/kafka/metrics/KafkaMetricsReporter.scala
@@ -21,8 +21,9 @@
 package kafka.metrics
 
 import kafka.utils.{CoreUtils, VerifiableProperties}
-import java.util.concurrent.atomic.AtomicBoolean
+import org.apache.kafka.common.utils.Utils
 
+import java.util.concurrent.atomic.AtomicBoolean
 import scala.collection.Seq
 import scala.collection.mutable.ArrayBuffer
 
@@ -62,7 +63,7 @@ object KafkaMetricsReporter {
         val metricsConfig = new KafkaMetricsConfig(verifiableProps)
         if (metricsConfig.reporters.nonEmpty) {
           metricsConfig.reporters.foreach(reporterType => {
-            val reporter = 
CoreUtils.createObject[KafkaMetricsReporter](reporterType)
+            val reporter = Utils.newInstance(reporterType, 
classOf[KafkaMetricsReporter])
             reporter.init(verifiableProps)
             reporters += reporter
             reporter match {
diff --git a/core/src/main/scala/kafka/server/ReplicaManager.scala 
b/core/src/main/scala/kafka/server/ReplicaManager.scala
index 9fdd42b3174..e052d840cb2 100644
--- a/core/src/main/scala/kafka/server/ReplicaManager.scala
+++ b/core/src/main/scala/kafka/server/ReplicaManager.scala
@@ -45,7 +45,7 @@ import org.apache.kafka.common.replica._
 import org.apache.kafka.common.requests.FetchRequest.PartitionData
 import org.apache.kafka.common.requests.ProduceResponse.PartitionResponse
 import org.apache.kafka.common.requests._
-import org.apache.kafka.common.utils.{Exit, Time}
+import org.apache.kafka.common.utils.{Exit, Time, Utils}
 import org.apache.kafka.common.{IsolationLevel, Node, TopicIdPartition, 
TopicPartition, Uuid}
 import org.apache.kafka.image.{LocalReplicaChanges, MetadataImage, TopicsDelta}
 import org.apache.kafka.metadata.LeaderConstants.NO_LEADER
@@ -58,7 +58,7 @@ import 
org.apache.kafka.server.share.fetch.{DelayedShareFetchKey, DelayedShareFe
 import org.apache.kafka.server.storage.log.{FetchParams, FetchPartitionData}
 import org.apache.kafka.server.util.{Scheduler, ShutdownableThread}
 import org.apache.kafka.storage.internals.checkpoint.{LazyOffsetCheckpoints, 
OffsetCheckpointFile, OffsetCheckpoints}
-import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, 
RemoteLogReadResult, RemoteStorageFetchInfo, UnifiedLog => JUnifiedLog, 
VerificationGuard}
+import org.apache.kafka.storage.internals.log.{AppendOrigin, FetchDataInfo, 
LeaderHwChange, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogOffsetMetadata, LogReadInfo, OffsetResultHolder, RecordValidationException, 
RemoteLogReadResult, RemoteStorageFetchInfo, VerificationGuard, UnifiedLog => 
JUnifiedLog}
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 
 import java.io.File
@@ -2585,7 +2585,7 @@ class ReplicaManager(val config: KafkaConfig,
 
   private def createReplicaSelector(): Option[ReplicaSelector] = {
     config.replicaSelectorClassName.map { className =>
-      val tmpReplicaSelector: ReplicaSelector = 
CoreUtils.createObject[ReplicaSelector](className)
+      val tmpReplicaSelector: ReplicaSelector = Utils.newInstance(className, 
classOf[ReplicaSelector])
       tmpReplicaSelector.configure(config.originals())
       tmpReplicaSelector
     }
diff --git a/core/src/main/scala/kafka/tools/DumpLogSegments.scala 
b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
index 68545e0271c..0791d73b6af 100755
--- a/core/src/main/scala/kafka/tools/DumpLogSegments.scala
+++ b/core/src/main/scala/kafka/tools/DumpLogSegments.scala
@@ -21,7 +21,6 @@ import com.fasterxml.jackson.databind.JsonNode
 
 import java.io._
 import com.fasterxml.jackson.databind.node.{IntNode, JsonNodeFactory, 
ObjectNode, TextNode}
-import kafka.utils.CoreUtils
 import org.apache.kafka.clients.consumer.internals.ConsumerProtocol
 import org.apache.kafka.common.message.ConsumerProtocolAssignment
 import org.apache.kafka.common.message.ConsumerProtocolAssignmentJsonConverter
@@ -646,8 +645,8 @@ object DumpLogSegments {
       } else if (options.has(shareStateOpt)) {
         new ShareGroupStateMessageParser
       } else {
-        val valueDecoder = 
CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(valueDecoderOpt))
-        val keyDecoder = 
CoreUtils.createObject[org.apache.kafka.tools.api.Decoder[_]](options.valueOf(keyDecoderOpt))
+        val valueDecoder = Utils.newInstance(options.valueOf(valueDecoderOpt), 
classOf[Decoder[_]])
+        val keyDecoder = Utils.newInstance(options.valueOf(keyDecoderOpt), 
classOf[Decoder[_]])
         new DecoderMessageParser(keyDecoder, valueDecoder)
       }
 
diff --git a/core/src/main/scala/kafka/utils/CoreUtils.scala 
b/core/src/main/scala/kafka/utils/CoreUtils.scala
index 41af6c752fa..98a06f80f46 100755
--- a/core/src/main/scala/kafka/utils/CoreUtils.scala
+++ b/core/src/main/scala/kafka/utils/CoreUtils.scala
@@ -17,14 +17,12 @@
 
 package kafka.utils
 
-import java.io._
-import java.nio._
+import java.io.File
 import java.util.concurrent.locks.{Lock, ReadWriteLock}
-import java.lang.management._
-import java.util.{Base64, Properties, UUID}
+import java.lang.management.ManagementFactory
 import com.typesafe.scalalogging.Logger
 
-import javax.management._
+import javax.management.ObjectName
 import scala.collection._
 import scala.collection.Seq
 import org.apache.kafka.network.EndPoint
@@ -35,7 +33,6 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.network.SocketServerConfigs
 import org.slf4j.event.Level
 
-import java.util
 import scala.jdk.CollectionConverters._
 
 /**
@@ -109,15 +106,6 @@ object CoreUtils {
     }
   }
 
-  /**
-   * Create an instance of the class with the given class name
-   */
-  def createObject[T <: AnyRef](className: String, args: AnyRef*): T = {
-    val klass = Utils.loadClass(className, 
classOf[Object]).asInstanceOf[Class[T]]
-    val constructor = klass.getConstructor(args.map(_.getClass): _*)
-    constructor.newInstance(args: _*)
-  }
-
   /**
    * Execute the given function inside the lock
    */
@@ -134,16 +122,6 @@ object CoreUtils {
 
   def inWriteLock[T](lock: ReadWriteLock)(fun: => T): T = 
inLock[T](lock.writeLock)(fun)
 
-  /**
-   * Returns a list of duplicated items
-   */
-  def duplicates[T](s: Iterable[T]): Iterable[T] = {
-    s.groupBy(identity)
-      .map { case (k, l) => (k, l.size)}
-      .filter { case (_, l) => l > 1 }
-      .keys
-  }
-
   def listenerListToEndPoints(listeners: String, securityProtocolMap: 
Map[ListenerName, SecurityProtocol]): Seq[EndPoint] = {
     listenerListToEndPoints(listeners, securityProtocolMap, 
requireDistinctPorts = true)
   }
@@ -217,31 +195,4 @@ object CoreUtils {
     validate(endPoints)
     endPoints
   }
-
-  def generateUuidAsBase64(): String = {
-    val uuid = UUID.randomUUID()
-    Base64.getUrlEncoder.withoutPadding.encodeToString(getBytesFromUuid(uuid))
-  }
-
-  def getBytesFromUuid(uuid: UUID): Array[Byte] = {
-    // Extract bytes for uuid which is 128 bits (or 16 bytes) long.
-    val uuidBytes = ByteBuffer.wrap(new Array[Byte](16))
-    uuidBytes.putLong(uuid.getMostSignificantBits)
-    uuidBytes.putLong(uuid.getLeastSignificantBits)
-    uuidBytes.array
-  }
-
-  def propsWith(key: String, value: String): Properties = {
-    propsWith((key, value))
-  }
-
-  def propsWith(props: (String, String)*): Properties = {
-    val properties = new Properties()
-    props.foreach { case (k, v) => properties.put(k, v) }
-    properties
-  }
-
-  def replicaToBrokerAssignmentAsScala(map: util.Map[Integer, 
util.List[Integer]]): Map[Int, Seq[Int]] = {
-    map.asScala.map(e => (e._1.asInstanceOf[Int], 
e._2.asScala.map(_.asInstanceOf[Int])))
-  }
 }
diff --git 
a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala 
b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
index f18c4ca980d..048da3eedc8 100644
--- a/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
+++ b/core/src/test/scala/unit/kafka/server/DynamicConfigChangeTest.scala
@@ -518,9 +518,9 @@ class DynamicConfigChangeUnitTest {
   @Test
   def shouldParseRegardlessOfWhitespaceAroundValues(): Unit = {
     def parse(configHandler: TopicConfigHandler, value: String): Seq[Int] = {
-      configHandler.parseThrottledPartitions(
-        
CoreUtils.propsWith(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, 
value),
-        102, QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
+      val props = new Properties()
+      props.put(QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG, 
value)
+      configHandler.parseThrottledPartitions(props, 102, 
QuotaConfig.LEADER_REPLICATION_THROTTLED_REPLICAS_CONFIG)
     }
     val configHandler: TopicConfigHandler = new TopicConfigHandler(null, null, 
null)
     
assertEquals(ReplicationQuotaManager.ALL_REPLICAS.asScala.map(_.toInt).toSeq, 
parse(configHandler, "* "))
diff --git a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala 
b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
index 5f703c7be33..73a2403870f 100755
--- a/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
+++ b/core/src/test/scala/unit/kafka/utils/CoreUtilsTest.scala
@@ -17,16 +17,12 @@
 
 package kafka.utils
 
-import java.util
-import java.util.{Base64, UUID}
 import java.util.concurrent.locks.ReentrantLock
-import java.nio.ByteBuffer
 import java.util.regex.Pattern
 import org.junit.jupiter.api.Assertions._
 import org.junit.jupiter.api.Test
 import kafka.utils.CoreUtils.inLock
 import org.apache.kafka.common.KafkaException
-import org.apache.kafka.common.utils.Utils
 import org.slf4j.event.Level
 
 
@@ -67,23 +63,6 @@ class CoreUtilsTest extends Logging {
     assertEquals(Some("test"+Level.ERROR),loggedMessage)
   }
 
-  @Test
-  def testReadBytes(): Unit = {
-    for (testCase <- List("", "a", "abcd")) {
-      val bytes = testCase.getBytes
-      assertTrue(util.Arrays.equals(bytes, 
Utils.readBytes(ByteBuffer.wrap(bytes))))
-    }
-  }
-
-  @Test
-  def testAbs(): Unit = {
-    assertEquals(0, Utils.abs(Integer.MIN_VALUE))
-    assertEquals(1, Utils.abs(-1))
-    assertEquals(0, Utils.abs(0))
-    assertEquals(1, Utils.abs(1))
-    assertEquals(Integer.MAX_VALUE, Utils.abs(Integer.MAX_VALUE))
-  }
-
   @Test
   def testInLock(): Unit = {
     val lock = new ReentrantLock()
@@ -94,29 +73,4 @@ class CoreUtilsTest extends Logging {
     assertEquals(2, result)
     assertFalse(lock.isLocked, "Should be unlocked")
   }
-
-  @Test
-  def testUrlSafeBase64EncodeUUID(): Unit = {
-
-    // Test a UUID that has no + or / characters in base64 encoding 
[a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46 ->(base64)-> oUm0owbhS0moy4qcSln6Rg==]
-    val clusterId1 = 
Base64.getUrlEncoder.withoutPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
-      "a149b4a3-06e1-4b49-a8cb-8a9c4a59fa46")))
-    assertEquals(clusterId1, "oUm0owbhS0moy4qcSln6Rg")
-    assertEquals(clusterId1.length, 22)
-    assertTrue(clusterIdPattern.matcher(clusterId1).matches())
-
-    // Test a UUID that has + or / characters in base64 encoding 
[d418ec02-277e-4853-81e6-afe30259daec ->(base64)-> 1BjsAid+SFOB5q/jAlna7A==]
-    val clusterId2 = 
Base64.getUrlEncoder.withoutPadding.encodeToString(CoreUtils.getBytesFromUuid(UUID.fromString(
-      "d418ec02-277e-4853-81e6-afe30259daec")))
-    assertEquals(clusterId2, "1BjsAid-SFOB5q_jAlna7A")
-    assertEquals(clusterId2.length, 22)
-    assertTrue(clusterIdPattern.matcher(clusterId2).matches())
-  }
-
-  @Test
-  def testGenerateUuidAsBase64(): Unit = {
-    val clusterId = CoreUtils.generateUuidAsBase64()
-    assertEquals(clusterId.length, 22)
-    assertTrue(clusterIdPattern.matcher(clusterId).matches())
-  }
 }

Reply via email to