This is an automated email from the ASF dual-hosted git repository.

mimaison pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 6eb44ad8699 KAFKA-14485: Move LogCleaner exceptions to storage module 
(#18534)
6eb44ad8699 is described below

commit 6eb44ad86994b92edd3c0d807830b79807adda59
Author: Mickael Maison <mimai...@users.noreply.github.com>
AuthorDate: Thu Jan 16 17:26:29 2025 +0100

    KAFKA-14485: Move LogCleaner exceptions to storage module (#18534)
    
    
    Reviewers: Luke Chen <show...@gmail.com>, Ken Huang <s7133...@gmail.com>
---
 core/src/main/scala/kafka/log/LogCleaner.scala              |  3 +--
 core/src/main/scala/kafka/log/LogCleanerManager.scala       |  3 +--
 core/src/test/scala/unit/kafka/log/LogCleanerTest.scala     | 13 ++++++-------
 .../storage/internals/log/LogCleaningAbortedException.java  | 11 +++++------
 .../storage/internals/log/ThreadShutdownException.java      | 11 +++++------
 5 files changed, 18 insertions(+), 23 deletions(-)

diff --git a/core/src/main/scala/kafka/log/LogCleaner.scala 
b/core/src/main/scala/kafka/log/LogCleaner.scala
index 43193016fd0..aac22865c98 100644
--- a/core/src/main/scala/kafka/log/LogCleaner.scala
+++ b/core/src/main/scala/kafka/log/LogCleaner.scala
@@ -21,7 +21,6 @@ import java.io.{File, IOException}
 import java.nio._
 import java.util.Date
 import java.util.concurrent.TimeUnit
-import kafka.common._
 import kafka.log.LogCleaner.{CleanerRecopyPercentMetricName, 
DeadThreadCountMetricName, MaxBufferUtilizationPercentMetricName, 
MaxCleanTimeMetricName, MaxCompactionDelayMetricsName}
 import kafka.server.{BrokerReconfigurable, KafkaConfig}
 import kafka.utils.{Logging, Pool}
@@ -35,7 +34,7 @@ import org.apache.kafka.common.utils.{BufferSupplier, Time}
 import org.apache.kafka.server.config.ServerConfigs
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 import org.apache.kafka.server.util.ShutdownableThread
-import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, 
LastRecord, LogDirFailureChannel, LogSegment, 
LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, TransactionIndex}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, CleanerConfig, 
LastRecord, LogCleaningAbortedException, LogDirFailureChannel, LogSegment, 
LogSegmentOffsetOverflowException, OffsetMap, SkimpyOffsetMap, 
ThreadShutdownException, TransactionIndex}
 import org.apache.kafka.storage.internals.utils.Throttler
 
 import scala.jdk.CollectionConverters._
diff --git a/core/src/main/scala/kafka/log/LogCleanerManager.scala 
b/core/src/main/scala/kafka/log/LogCleanerManager.scala
index 7238eacad9e..3e126e45ffe 100755
--- a/core/src/main/scala/kafka/log/LogCleanerManager.scala
+++ b/core/src/main/scala/kafka/log/LogCleanerManager.scala
@@ -21,14 +21,13 @@ import java.lang.{Long => JLong}
 import java.io.File
 import java.util.concurrent.TimeUnit
 import java.util.concurrent.locks.ReentrantLock
-import kafka.common.LogCleaningAbortedException
 import kafka.utils.CoreUtils._
 import kafka.utils.{Logging, Pool}
 import org.apache.kafka.common.{KafkaException, TopicPartition}
 import org.apache.kafka.common.errors.KafkaStorageException
 import org.apache.kafka.common.utils.Time
 import org.apache.kafka.storage.internals.checkpoint.OffsetCheckpointFile
-import org.apache.kafka.storage.internals.log.LogDirFailureChannel
+import org.apache.kafka.storage.internals.log.{LogCleaningAbortedException, 
LogDirFailureChannel}
 import org.apache.kafka.server.metrics.KafkaMetricsGroup
 
 import java.util.Comparator
diff --git a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
index 8e3a69dde1e..f7746c7a69b 100644
--- a/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogCleanerTest.scala
@@ -17,7 +17,6 @@
 
 package kafka.log
 
-import kafka.common._
 import kafka.server.KafkaConfig
 import kafka.utils.{CoreUtils, Logging, Pool, TestUtils}
 import org.apache.kafka.common.TopicPartition
@@ -29,7 +28,7 @@ import org.apache.kafka.common.utils.Utils
 import org.apache.kafka.coordinator.transaction.TransactionLogConfig
 import org.apache.kafka.server.metrics.{KafkaMetricsGroup, KafkaYammerMetrics}
 import org.apache.kafka.server.util.MockTime
-import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
CleanerConfig, LocalLog, LogAppendInfo, LogConfig, LogDirFailureChannel, 
LogFileUtils, LogLoader, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, 
ProducerStateManagerConfig}
+import org.apache.kafka.storage.internals.log.{AbortedTxn, AppendOrigin, 
CleanerConfig, LocalLog, LogAppendInfo, LogCleaningAbortedException, LogConfig, 
LogDirFailureChannel, LogFileUtils, LogLoader, LogSegment, LogSegments, 
LogStartOffsetIncrementReason, OffsetMap, ProducerStateManager, 
ProducerStateManagerConfig}
 import org.apache.kafka.storage.internals.utils.Throttler
 import org.apache.kafka.storage.log.metrics.BrokerTopicStats
 import org.junit.jupiter.api.Assertions._
@@ -1217,7 +1216,7 @@ class LogCleanerTest extends Logging {
 
     def distinctValuesBySegment = log.logSegments.asScala.map(s => 
s.log.records.asScala.map(record => 
TestUtils.readString(record.value)).toSet.size).toSeq
 
-    val disctinctValuesBySegmentBeforeClean = distinctValuesBySegment
+    val distinctValuesBySegmentBeforeClean = distinctValuesBySegment
     assertTrue(distinctValuesBySegment.reverse.tail.forall(_ > N),
       "Test is not effective unless each segment contains duplicates. Increase 
segment size or decrease number of keys.")
 
@@ -1225,10 +1224,10 @@ class LogCleanerTest extends Logging {
 
     val distinctValuesBySegmentAfterClean = distinctValuesBySegment
 
-    
assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
+    
assertTrue(distinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
       .take(numCleanableSegments).forall { case (before, after) => after < 
before },
       "The cleanable segments should have fewer number of values after 
cleaning")
-    
assertTrue(disctinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
+    
assertTrue(distinctValuesBySegmentBeforeClean.zip(distinctValuesBySegmentAfterClean)
       .slice(numCleanableSegments, numTotalSegments).forall { x => x._1 == 
x._2 }, "The uncleanable segments should have the same number of values after 
cleaning")
   }
 
@@ -1240,9 +1239,9 @@ class LogCleanerTest extends Logging {
     val log = makeLog(config = LogConfig.fromProps(logConfig.originals, 
logProps))
 
     // create 6 segments with only one message in each segment
-    def createRecorcs = TestUtils.singletonRecords(value = 
Array.fill[Byte](25)(0), key = 1.toString.getBytes)
+    def createRecords = TestUtils.singletonRecords(value = 
Array.fill[Byte](25)(0), key = 1.toString.getBytes)
     for (_ <- 0 until 6)
-      log.appendAsLeader(createRecorcs, leaderEpoch = 0)
+      log.appendAsLeader(createRecords, leaderEpoch = 0)
 
     val logToClean = LogToClean(new TopicPartition("test", 0), log, 
log.activeSegment.baseOffset, log.activeSegment.baseOffset)
 
diff --git a/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaningAbortedException.java
similarity index 75%
rename from core/src/main/scala/kafka/common/LogCleaningAbortedException.scala
rename to 
storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaningAbortedException.java
index dfded33f009..1c1f8d90752 100644
--- a/core/src/main/scala/kafka/common/LogCleaningAbortedException.scala
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/LogCleaningAbortedException.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,11 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package kafka.common
+package org.apache.kafka.storage.internals.log;
 
 /**
  * Thrown when a log cleaning task is requested to be aborted.
  */
-class LogCleaningAbortedException extends RuntimeException() {
+public class LogCleaningAbortedException extends RuntimeException {
 }
diff --git a/core/src/main/scala/kafka/common/ThreadShutdownException.scala 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ThreadShutdownException.java
similarity index 75%
rename from core/src/main/scala/kafka/common/ThreadShutdownException.scala
rename to 
storage/src/main/java/org/apache/kafka/storage/internals/log/ThreadShutdownException.java
index 8cd6601ce5a..02c7167487a 100644
--- a/core/src/main/scala/kafka/common/ThreadShutdownException.scala
+++ 
b/storage/src/main/java/org/apache/kafka/storage/internals/log/ThreadShutdownException.java
@@ -1,10 +1,10 @@
-/**
+/*
  * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
+ * contributor license agreements. See the NOTICE file distributed with
  * this work for additional information regarding copyright ownership.
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
+ * the License. You may obtain a copy of the License at
  *
  *    http://www.apache.org/licenses/LICENSE-2.0
  *
@@ -14,11 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
-package kafka.common
+package org.apache.kafka.storage.internals.log;
 
 /**
  * An exception that indicates a thread is being shut down normally.
  */
-class ThreadShutdownException extends RuntimeException {
+public class ThreadShutdownException extends RuntimeException {
 }

Reply via email to