This is an automated email from the ASF dual-hosted git repository.

divijv pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new b0fd99106d3 MINOR: Close UnifiedLog created in tests to avoid resource 
leak (#14453)
b0fd99106d3 is described below

commit b0fd99106d3f814026f0c6ab7a58c54d65b96a3b
Author: Gantigmaa Selenge <[email protected]>
AuthorDate: Fri Sep 29 11:00:01 2023 +0100

    MINOR: Close UnifiedLog created in tests to avoid resource leak (#14453)
    
    Reviewers: Divij Vaidya <[email protected]>, Luke Chen <[email protected]>
---
 core/src/main/scala/kafka/log/UnifiedLog.scala                 |  4 ++--
 core/src/test/scala/unit/kafka/log/LogLoaderTest.scala         | 10 ++++++++--
 core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala        |  6 +++++-
 core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala |  4 ++--
 4 files changed, 17 insertions(+), 7 deletions(-)

diff --git a/core/src/main/scala/kafka/log/UnifiedLog.scala 
b/core/src/main/scala/kafka/log/UnifiedLog.scala
index f5a01c15637..10164a3541f 100644
--- a/core/src/main/scala/kafka/log/UnifiedLog.scala
+++ b/core/src/main/scala/kafka/log/UnifiedLog.scala
@@ -104,7 +104,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
                  @volatile private var _topicId: Option[Uuid],
                  val keepPartitionMetadataFile: Boolean,
                  val remoteStorageSystemEnable: Boolean = false,
-                 @volatile private var logOffsetsListener: LogOffsetsListener 
= LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging {
+                 @volatile private var logOffsetsListener: LogOffsetsListener 
= LogOffsetsListener.NO_OP_OFFSETS_LISTENER) extends Logging with AutoCloseable 
{
 
   import kafka.log.UnifiedLog._
 
@@ -643,7 +643,7 @@ class UnifiedLog(@volatile var logStartOffset: Long,
    * Close this log.
    * The memory mapped buffer for index files of this log will be left open 
until the log is deleted.
    */
-  def close(): Unit = {
+  override def close(): Unit = {
     debug("Closing log")
     lock synchronized {
       logOffsetsListener = LogOffsetsListener.NO_OP_OFFSETS_LISTENER
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index c2aa991e38e..577e4a6f735 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -58,6 +58,7 @@ class LogLoaderTest {
   val producerIdExpirationCheckIntervalMs: Int = 
kafka.server.Defaults.ProducerIdExpirationCheckIntervalMs
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
+  var logsToClose: Seq[UnifiedLog] = Seq()
   val mockTime = new MockTime()
 
   @BeforeEach
@@ -69,6 +70,7 @@ class LogLoaderTest {
   @AfterEach
   def tearDown(): Unit = {
     brokerTopicStats.close()
+    logsToClose.foreach(l => Utils.closeQuietly(l, "UnifiedLog"))
     Utils.delete(tmpDir)
   }
 
@@ -257,8 +259,10 @@ class LogLoaderTest {
                         maxProducerIdExpirationMs: Int = 
producerStateManagerConfig.producerIdExpirationMs,
                         producerIdExpirationCheckIntervalMs: Int = 
producerIdExpirationCheckIntervalMs,
                         lastShutdownClean: Boolean = true): UnifiedLog = {
-    LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, 
logStartOffset, recoveryPoint,
+    val log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, 
time, logStartOffset, recoveryPoint,
       maxTransactionTimeoutMs, new 
ProducerStateManagerConfig(maxProducerIdExpirationMs, false), 
producerIdExpirationCheckIntervalMs, lastShutdownClean)
+    logsToClose = logsToClose :+ log
+    log
   }
 
   private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog, 
LogSegment) = {
@@ -274,7 +278,9 @@ class LogLoaderTest {
 
   private def recoverAndCheck(config: LogConfig, expectedKeys: 
Iterable[Long]): UnifiedLog = {
     // method is called only in case of recovery from hard reset
-    LogTestUtils.recoverAndCheck(logDir, config, expectedKeys, 
brokerTopicStats, mockTime, mockTime.scheduler)
+    val recoveredLog = LogTestUtils.recoverAndCheck(logDir, config, 
expectedKeys, brokerTopicStats, mockTime, mockTime.scheduler)
+    logsToClose = logsToClose :+ recoveredLog
+    recoveredLog
   }
 
   /**
diff --git a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala 
b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
index 90d911e0adf..6cd1a5f3e19 100755
--- a/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/UnifiedLogTest.scala
@@ -59,6 +59,7 @@ class UnifiedLogTest {
   val tmpDir = TestUtils.tempDir()
   val logDir = TestUtils.randomPartitionLogDir(tmpDir)
   val mockTime = new MockTime()
+  var logsToClose: Seq[UnifiedLog] = Seq()
   val producerStateManagerConfig = new 
ProducerStateManagerConfig(kafka.server.Defaults.ProducerIdExpirationMs, false)
   def metricsKeySet = 
KafkaYammerMetrics.defaultRegistry.allMetrics.keySet.asScala
 
@@ -71,6 +72,7 @@ class UnifiedLogTest {
   @AfterEach
   def tearDown(): Unit = {
     brokerTopicStats.close()
+    logsToClose.foreach(l => Utils.closeQuietly(l, "UnifiedLog"))
     Utils.delete(tmpDir)
   }
 
@@ -3954,10 +3956,12 @@ class UnifiedLogTest {
                         remoteStorageSystemEnable: Boolean = false,
                         remoteLogManager: Option[RemoteLogManager] = None,
                         logOffsetsListener: LogOffsetsListener = 
LogOffsetsListener.NO_OP_OFFSETS_LISTENER): UnifiedLog = {
-    LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, time, 
logStartOffset, recoveryPoint,
+    val log = LogTestUtils.createLog(dir, config, brokerTopicStats, scheduler, 
time, logStartOffset, recoveryPoint,
       maxTransactionTimeoutMs, producerStateManagerConfig, 
producerIdExpirationCheckIntervalMs,
       lastShutdownClean, topicId, keepPartitionMetadataFile, new 
ConcurrentHashMap[String, Int],
       remoteStorageSystemEnable, remoteLogManager, logOffsetsListener)
+    logsToClose = logsToClose :+ log
+    log
   }
 
   private def createLogWithOffsetOverflow(logConfig: LogConfig): (UnifiedLog, 
LogSegment) = {
diff --git a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala 
b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
index 7cb26277e93..d2c8e663764 100644
--- a/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
+++ b/core/src/test/scala/unit/kafka/tools/DumpLogSegmentsTest.scala
@@ -105,7 +105,7 @@ class DumpLogSegmentsTest {
 
   @AfterEach
   def tearDown(): Unit = {
-    log.close()
+    Utils.closeQuietly(log, "UnifiedLog")
     Utils.delete(tmpDir)
   }
 
@@ -236,7 +236,7 @@ class DumpLogSegmentsTest {
   def testDumpMetadataRecords(): Unit = {
     val mockTime = new MockTime
     val logConfig = LogTestUtils.createLogConfig(segmentBytes = 1024 * 1024)
-    val log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, 
mockTime.scheduler, mockTime)
+    log = LogTestUtils.createLog(logDir, logConfig, new BrokerTopicStats, 
mockTime.scheduler, mockTime)
 
     val metadataRecords = Seq(
       new ApiMessageAndVersion(

Reply via email to