This is an automated email from the ASF dual-hosted git repository.

chia7712 pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new ced79ee12f2 KAFKA-16552 Create an internal config to control 
InitialTaskDelayMs in LogManager to speed up tests (#15719)
ced79ee12f2 is described below

commit ced79ee12f295ebe30a4dc9a90228a18c9fc94df
Author: Kuan-Po (Cooper) Tseng <brandb...@gmail.com>
AuthorDate: Sat Apr 20 20:34:02 2024 +0800

    KAFKA-16552 Create an internal config to control InitialTaskDelayMs in 
LogManager to speed up tests (#15719)
    
    Reviewers: Luke Chen <show...@gmail.com>, Chia-Ping Tsai 
<chia7...@gmail.com>
---
 .../java/kafka/server/builders/LogManagerBuilder.java  | 10 +++++++++-
 core/src/main/scala/kafka/log/LogManager.scala         | 18 +++++++++---------
 core/src/main/scala/kafka/server/KafkaConfig.scala     |  2 ++
 core/src/test/scala/unit/kafka/log/LogLoaderTest.scala |  3 ++-
 .../src/test/scala/unit/kafka/log/LogManagerTest.scala | 13 ++++++++-----
 core/src/test/scala/unit/kafka/utils/TestUtils.scala   |  6 ++++--
 .../org/apache/kafka/jmh/server/CheckpointBench.java   |  3 ++-
 .../apache/kafka/server/config/ServerLogConfigs.java   |  5 +++++
 8 files changed, 41 insertions(+), 19 deletions(-)

diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java 
b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
index fdc9c80913c..629293e3653 100644
--- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
+++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java
@@ -22,6 +22,7 @@ import kafka.server.BrokerTopicStats;
 import kafka.server.metadata.ConfigRepository;
 import org.apache.kafka.common.utils.Time;
 import org.apache.kafka.server.common.MetadataVersion;
+import org.apache.kafka.server.config.ServerLogConfigs;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.apache.kafka.storage.internals.log.LogConfig;
 import org.apache.kafka.storage.internals.log.LogDirFailureChannel;
@@ -55,6 +56,7 @@ public class LogManagerBuilder {
     private Time time = Time.SYSTEM;
     private boolean keepPartitionMetadataFile = true;
     private boolean remoteStorageSystemEnable = false;
+    private long initialTaskDelayMs = 
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT;
 
     public LogManagerBuilder setLogDirs(List<File> logDirs) {
         this.logDirs = logDirs;
@@ -151,6 +153,11 @@ public class LogManagerBuilder {
         return this;
     }
 
+    public LogManagerBuilder setInitialTaskDelayMs(long initialTaskDelayMs) {
+        this.initialTaskDelayMs = initialTaskDelayMs;
+        return this;
+    }
+
     public LogManager build() {
         if (logDirs == null) throw new RuntimeException("you must set 
logDirs");
         if (configRepository == null) throw new RuntimeException("you must set 
configRepository");
@@ -179,6 +186,7 @@ public class LogManagerBuilder {
                               logDirFailureChannel,
                               time,
                               keepPartitionMetadataFile,
-                              remoteStorageSystemEnable);
+                              remoteStorageSystemEnable,
+                              initialTaskDelayMs);
     }
 }
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index 3519f72448d..05796e4cbae 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -81,14 +81,13 @@ class LogManager(logDirs: Seq[File],
                  logDirFailureChannel: LogDirFailureChannel,
                  time: Time,
                  val keepPartitionMetadataFile: Boolean,
-                 remoteStorageSystemEnable: Boolean) extends Logging {
+                 remoteStorageSystemEnable: Boolean,
+                 val initialTaskDelayMs: Long) extends Logging {
 
   import LogManager._
 
   private val metricsGroup = new KafkaMetricsGroup(this.getClass)
 
-  val InitialTaskDelayMs: Int = 30 * 1000
-
   private val logCreationOrDeletionLock = new Object
   private val currentLogs = new Pool[TopicPartition, UnifiedLog]()
   // Future logs are put in the directory with "-future" suffix. Future log is 
created when user wants to move replica
@@ -628,24 +627,24 @@ class LogManager(logDirs: Seq[File],
       info("Starting log cleanup with a period of %d 
ms.".format(retentionCheckMs))
       scheduler.schedule("kafka-log-retention",
                          () => cleanupLogs(),
-                         InitialTaskDelayMs,
+                         initialTaskDelayMs,
                          retentionCheckMs)
       info("Starting log flusher with a default period of %d 
ms.".format(flushCheckMs))
       scheduler.schedule("kafka-log-flusher",
                          () => flushDirtyLogs(),
-                         InitialTaskDelayMs,
+                         initialTaskDelayMs,
                          flushCheckMs)
       scheduler.schedule("kafka-recovery-point-checkpoint",
                          () => checkpointLogRecoveryOffsets(),
-                         InitialTaskDelayMs,
+                         initialTaskDelayMs,
                          flushRecoveryOffsetCheckpointMs)
       scheduler.schedule("kafka-log-start-offset-checkpoint",
                          () => checkpointLogStartOffsets(),
-                         InitialTaskDelayMs,
+                         initialTaskDelayMs,
                          flushStartOffsetCheckpointMs)
       scheduler.scheduleOnce("kafka-delete-logs", // will be rescheduled after 
each delete logs with a dynamic period
                          () => deleteLogs(),
-                         InitialTaskDelayMs)
+                         initialTaskDelayMs)
     }
     if (cleanerConfig.enableCleaner) {
       _cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, 
logDirFailureChannel, time = time)
@@ -1584,7 +1583,8 @@ object LogManager {
       time = time,
       keepPartitionMetadataFile = keepPartitionMetadataFile,
       interBrokerProtocolVersion = config.interBrokerProtocolVersion,
-      remoteStorageSystemEnable = 
config.remoteLogManagerConfig.enableRemoteStorageSystem())
+      remoteStorageSystemEnable = 
config.remoteLogManagerConfig.enableRemoteStorageSystem(),
+      initialTaskDelayMs = config.logInitialTaskDelayMs)
   }
 
   /**
diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala 
b/core/src/main/scala/kafka/server/KafkaConfig.scala
index 6c23eb667cf..9cee2aa61a9 100755
--- a/core/src/main/scala/kafka/server/KafkaConfig.scala
+++ b/core/src/main/scala/kafka/server/KafkaConfig.scala
@@ -573,6 +573,7 @@ object KafkaConfig {
       .define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, CLASS, 
null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC)
       .define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, 
null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC)
       .define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, 
BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW, 
ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC)
+      .defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, 
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, 
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC)
 
       /** ********* Replication configuration ***********/
       .define(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, INT, 
ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, MEDIUM, 
ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DOC)
@@ -1150,6 +1151,7 @@ class KafkaConfig private(doLog: Boolean, val props: 
java.util.Map[_, _], dynami
   def logFlushIntervalMs: java.lang.Long = 
Option(getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG)).getOrElse(getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG))
   def minInSyncReplicas = getInt(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG)
   def logPreAllocateEnable: java.lang.Boolean = 
getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG)
+  def logInitialTaskDelayMs: java.lang.Long = 
Option(getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG)).getOrElse(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT)
 
   // We keep the user-provided String as `MetadataVersion.fromVersionString` 
can choose a slightly different version (eg if `0.10.0`
   // is passed, `0.10.0-IV0` may be picked)
diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala 
b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
index f26924aca19..02dbf35e440 100644
--- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala
@@ -127,7 +127,8 @@ class LogLoaderTest {
         logDirFailureChannel = logDirFailureChannel,
         time = time,
         keepPartitionMetadataFile = config.usesTopicId,
-        remoteStorageSystemEnable = 
config.remoteLogManagerConfig.enableRemoteStorageSystem()) {
+        remoteStorageSystemEnable = 
config.remoteLogManagerConfig.enableRemoteStorageSystem(),
+        initialTaskDelayMs = config.logInitialTaskDelayMs) {
 
         override def loadLog(logDir: File, hadCleanShutdown: Boolean, 
recoveryPoints: Map[TopicPartition, Long],
                              logStartOffsets: Map[TopicPartition, Long], 
defaultConfig: LogConfig,
diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala 
b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
index 9992992670a..692e64647a7 100755
--- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala
@@ -69,12 +69,14 @@ class LogManagerTest {
   var logManager: LogManager = _
   val name = "kafka"
   val veryLargeLogFlushInterval = 10000000L
+  val initialTaskDelayMs: Long = 10 * 1000
 
   @BeforeEach
   def setUp(): Unit = {
     logDir = TestUtils.tempDir()
     logManager = createLogManager()
     logManager.startup(Set.empty)
+    assertEquals(initialTaskDelayMs, logManager.initialTaskDelayMs)
   }
 
   @AfterEach
@@ -413,7 +415,7 @@ class LogManagerTest {
     assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, 
"Check we have the expected number of segments.")
 
     // this cleanup shouldn't find any expired segments but should delete some 
to reduce size
-    time.sleep(logManager.InitialTaskDelayMs)
+    time.sleep(logManager.initialTaskDelayMs)
     assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 
segments")
     time.sleep(log.config.fileDeleteDelayMs + 1)
 
@@ -482,7 +484,7 @@ class LogManagerTest {
       val set = TestUtils.singletonRecords("test".getBytes())
       log.appendAsLeader(set, leaderEpoch = 0)
     }
-    time.sleep(logManager.InitialTaskDelayMs)
+    time.sleep(logManager.initialTaskDelayMs)
     assertTrue(lastFlush != log.lastFlushTime, "Time based flush should have 
been triggered")
   }
 
@@ -604,7 +606,8 @@ class LogManagerTest {
       configRepository = configRepository,
       logDirs = logDirs,
       time = this.time,
-      recoveryThreadsPerDataDir = recoveryThreadsPerDataDir)
+      recoveryThreadsPerDataDir = recoveryThreadsPerDataDir,
+      initialTaskDelayMs = initialTaskDelayMs)
   }
 
   @Test
@@ -637,9 +640,9 @@ class LogManagerTest {
         fileInIndex.get.getAbsolutePath)
     }
 
-    time.sleep(logManager.InitialTaskDelayMs)
+    time.sleep(logManager.initialTaskDelayMs)
     assertTrue(logManager.hasLogsToBeDeleted, "Logs deleted too early")
-    time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - 
logManager.InitialTaskDelayMs)
+    time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - 
logManager.initialTaskDelayMs)
     assertFalse(logManager.hasLogsToBeDeleted, "Logs not deleted")
   }
 
diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala 
b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
index d9383820fca..ff2ed19c32c 100755
--- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala
+++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala
@@ -1506,7 +1506,8 @@ object TestUtils extends Logging {
                        recoveryThreadsPerDataDir: Int = 4,
                        transactionVerificationEnabled: Boolean = false,
                        log: Option[UnifiedLog] = None,
-                       remoteStorageSystemEnable: Boolean = false): LogManager 
= {
+                       remoteStorageSystemEnable: Boolean = false,
+                       initialTaskDelayMs: Long = 
ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT): LogManager = {
     val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile),
                    initialOfflineDirs = Array.empty[File],
                    configRepository = configRepository,
@@ -1526,7 +1527,8 @@ object TestUtils extends Logging {
                    logDirFailureChannel = new 
LogDirFailureChannel(logDirs.size),
                    keepPartitionMetadataFile = true,
                    interBrokerProtocolVersion = interBrokerProtocolVersion,
-                   remoteStorageSystemEnable = remoteStorageSystemEnable)
+                   remoteStorageSystemEnable = remoteStorageSystemEnable,
+                   initialTaskDelayMs = initialTaskDelayMs)
 
     if (log.isDefined) {
       val spyLogManager = Mockito.spy(logManager)
diff --git 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
index bb98f8c5be3..fb76bab02cf 100644
--- 
a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
+++ 
b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java
@@ -22,6 +22,7 @@ import kafka.server.AlterPartitionManager;
 import kafka.server.BrokerFeatures;
 import kafka.server.BrokerTopicStats;
 import kafka.server.KafkaConfig;
+import org.apache.kafka.server.config.ServerLogConfigs;
 import org.apache.kafka.server.util.MockTime;
 import org.apache.kafka.storage.internals.log.CleanerConfig;
 import org.apache.kafka.storage.internals.log.LogConfig;
@@ -109,7 +110,7 @@ public class CheckpointBench {
             
JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList());
         this.logManager = 
TestUtils.createLogManager(JavaConverters.asScalaBuffer(files),
                 new LogConfig(new Properties()), new MockConfigRepository(), 
new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d,
-                        1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 
1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), 
false);
+                        1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 
1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), 
false, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT);
         scheduler.startup();
         final BrokerTopicStats brokerTopicStats = new 
BrokerTopicStats(Optional.empty());
         final MetadataCache metadataCache =
diff --git 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
index dcd5e8ea2dd..4c682c8c1f8 100644
--- 
a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
+++ 
b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java
@@ -185,4 +185,9 @@ public class ServerLogConfigs {
             "broker will not perform down-conversion for consumers expecting 
an older message format. The broker responds " +
             "with <code>UNSUPPORTED_VERSION</code> error for consume requests 
from such older clients. This configuration" +
             "does not apply to any message format conversion that might be 
required for replication to followers.";
+
+    public static final String LOG_INITIAL_TASK_DELAY_MS_CONFIG = LOG_PREFIX + 
"initial.task.delay.ms";
+    public static final long LOG_INITIAL_TASK_DELAY_MS_DEFAULT = 30 * 1000L;
+    public static final String LOG_INITIAL_TASK_DELAY_MS_DOC = "The initial 
task delay in millisecond when initializing " +
+            "tasks in LogManager. This should be used for testing only.";
 }

Reply via email to