This is an automated email from the ASF dual-hosted git repository. chia7712 pushed a commit to branch trunk in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push: new ced79ee12f2 KAFKA-16552 Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests (#15719) ced79ee12f2 is described below commit ced79ee12f295ebe30a4dc9a90228a18c9fc94df Author: Kuan-Po (Cooper) Tseng <brandb...@gmail.com> AuthorDate: Sat Apr 20 20:34:02 2024 +0800 KAFKA-16552 Create an internal config to control InitialTaskDelayMs in LogManager to speed up tests (#15719) Reviewers: Luke Chen <show...@gmail.com>, Chia-Ping Tsai <chia7...@gmail.com> --- .../java/kafka/server/builders/LogManagerBuilder.java | 10 +++++++++- core/src/main/scala/kafka/log/LogManager.scala | 18 +++++++++--------- core/src/main/scala/kafka/server/KafkaConfig.scala | 2 ++ core/src/test/scala/unit/kafka/log/LogLoaderTest.scala | 3 ++- .../src/test/scala/unit/kafka/log/LogManagerTest.scala | 13 ++++++++----- core/src/test/scala/unit/kafka/utils/TestUtils.scala | 6 ++++-- .../org/apache/kafka/jmh/server/CheckpointBench.java | 3 ++- .../apache/kafka/server/config/ServerLogConfigs.java | 5 +++++ 8 files changed, 41 insertions(+), 19 deletions(-) diff --git a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java index fdc9c80913c..629293e3653 100644 --- a/core/src/main/java/kafka/server/builders/LogManagerBuilder.java +++ b/core/src/main/java/kafka/server/builders/LogManagerBuilder.java @@ -22,6 +22,7 @@ import kafka.server.BrokerTopicStats; import kafka.server.metadata.ConfigRepository; import org.apache.kafka.common.utils.Time; import org.apache.kafka.server.common.MetadataVersion; +import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.apache.kafka.storage.internals.log.LogConfig; import org.apache.kafka.storage.internals.log.LogDirFailureChannel; @@ -55,6 +56,7 @@ public class LogManagerBuilder { private Time time = Time.SYSTEM; private boolean keepPartitionMetadataFile = true; private boolean remoteStorageSystemEnable = false; + private long initialTaskDelayMs = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT; public LogManagerBuilder setLogDirs(List<File> logDirs) { this.logDirs = logDirs; @@ -151,6 +153,11 @@ public class LogManagerBuilder { return this; } + public LogManagerBuilder setInitialTaskDelayMs(long initialTaskDelayMs) { + this.initialTaskDelayMs = initialTaskDelayMs; + return this; + } + public LogManager build() { if (logDirs == null) throw new RuntimeException("you must set logDirs"); if (configRepository == null) throw new RuntimeException("you must set configRepository"); @@ -179,6 +186,7 @@ public class LogManagerBuilder { logDirFailureChannel, time, keepPartitionMetadataFile, - remoteStorageSystemEnable); + remoteStorageSystemEnable, + initialTaskDelayMs); } } diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index 3519f72448d..05796e4cbae 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -81,14 +81,13 @@ class LogManager(logDirs: Seq[File], logDirFailureChannel: LogDirFailureChannel, time: Time, val keepPartitionMetadataFile: Boolean, - remoteStorageSystemEnable: Boolean) extends Logging { + remoteStorageSystemEnable: Boolean, + val initialTaskDelayMs: Long) extends Logging { import LogManager._ private val metricsGroup = new KafkaMetricsGroup(this.getClass) - val InitialTaskDelayMs: Int = 30 * 1000 - private val logCreationOrDeletionLock = new Object private val currentLogs = new Pool[TopicPartition, UnifiedLog]() // Future logs are put in the directory with "-future" suffix. Future log is created when user wants to move replica @@ -628,24 +627,24 @@ class LogManager(logDirs: Seq[File], info("Starting log cleanup with a period of %d ms.".format(retentionCheckMs)) scheduler.schedule("kafka-log-retention", () => cleanupLogs(), - InitialTaskDelayMs, + initialTaskDelayMs, retentionCheckMs) info("Starting log flusher with a default period of %d ms.".format(flushCheckMs)) scheduler.schedule("kafka-log-flusher", () => flushDirtyLogs(), - InitialTaskDelayMs, + initialTaskDelayMs, flushCheckMs) scheduler.schedule("kafka-recovery-point-checkpoint", () => checkpointLogRecoveryOffsets(), - InitialTaskDelayMs, + initialTaskDelayMs, flushRecoveryOffsetCheckpointMs) scheduler.schedule("kafka-log-start-offset-checkpoint", () => checkpointLogStartOffsets(), - InitialTaskDelayMs, + initialTaskDelayMs, flushStartOffsetCheckpointMs) scheduler.scheduleOnce("kafka-delete-logs", // will be rescheduled after each delete logs with a dynamic period () => deleteLogs(), - InitialTaskDelayMs) + initialTaskDelayMs) } if (cleanerConfig.enableCleaner) { _cleaner = new LogCleaner(cleanerConfig, liveLogDirs, currentLogs, logDirFailureChannel, time = time) @@ -1584,7 +1583,8 @@ object LogManager { time = time, keepPartitionMetadataFile = keepPartitionMetadataFile, interBrokerProtocolVersion = config.interBrokerProtocolVersion, - remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem()) + remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem(), + initialTaskDelayMs = config.logInitialTaskDelayMs) } /** diff --git a/core/src/main/scala/kafka/server/KafkaConfig.scala b/core/src/main/scala/kafka/server/KafkaConfig.scala index 6c23eb667cf..9cee2aa61a9 100755 --- a/core/src/main/scala/kafka/server/KafkaConfig.scala +++ b/core/src/main/scala/kafka/server/KafkaConfig.scala @@ -573,6 +573,7 @@ object KafkaConfig { .define(ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.CREATE_TOPIC_POLICY_CLASS_NAME_DOC) .define(ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_CONFIG, CLASS, null, LOW, ServerLogConfigs.ALTER_CONFIG_POLICY_CLASS_NAME_DOC) .define(ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_CONFIG, BOOLEAN, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DEFAULT, LOW, ServerLogConfigs.LOG_MESSAGE_DOWNCONVERSION_ENABLE_DOC) + .defineInternal(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG, LONG, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT, atLeast(0), LOW, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DOC) /** ********* Replication configuration ***********/ .define(ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_CONFIG, INT, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DEFAULT, MEDIUM, ReplicationConfigs.CONTROLLER_SOCKET_TIMEOUT_MS_DOC) @@ -1150,6 +1151,7 @@ class KafkaConfig private(doLog: Boolean, val props: java.util.Map[_, _], dynami def logFlushIntervalMs: java.lang.Long = Option(getLong(ServerLogConfigs.LOG_FLUSH_INTERVAL_MS_CONFIG)).getOrElse(getLong(ServerLogConfigs.LOG_FLUSH_SCHEDULER_INTERVAL_MS_CONFIG)) def minInSyncReplicas = getInt(ServerLogConfigs.MIN_IN_SYNC_REPLICAS_CONFIG) def logPreAllocateEnable: java.lang.Boolean = getBoolean(ServerLogConfigs.LOG_PRE_ALLOCATE_CONFIG) + def logInitialTaskDelayMs: java.lang.Long = Option(getLong(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_CONFIG)).getOrElse(ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT) // We keep the user-provided String as `MetadataVersion.fromVersionString` can choose a slightly different version (eg if `0.10.0` // is passed, `0.10.0-IV0` may be picked) diff --git a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala index f26924aca19..02dbf35e440 100644 --- a/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogLoaderTest.scala @@ -127,7 +127,8 @@ class LogLoaderTest { logDirFailureChannel = logDirFailureChannel, time = time, keepPartitionMetadataFile = config.usesTopicId, - remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem()) { + remoteStorageSystemEnable = config.remoteLogManagerConfig.enableRemoteStorageSystem(), + initialTaskDelayMs = config.logInitialTaskDelayMs) { override def loadLog(logDir: File, hadCleanShutdown: Boolean, recoveryPoints: Map[TopicPartition, Long], logStartOffsets: Map[TopicPartition, Long], defaultConfig: LogConfig, diff --git a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala index 9992992670a..692e64647a7 100755 --- a/core/src/test/scala/unit/kafka/log/LogManagerTest.scala +++ b/core/src/test/scala/unit/kafka/log/LogManagerTest.scala @@ -69,12 +69,14 @@ class LogManagerTest { var logManager: LogManager = _ val name = "kafka" val veryLargeLogFlushInterval = 10000000L + val initialTaskDelayMs: Long = 10 * 1000 @BeforeEach def setUp(): Unit = { logDir = TestUtils.tempDir() logManager = createLogManager() logManager.startup(Set.empty) + assertEquals(initialTaskDelayMs, logManager.initialTaskDelayMs) } @AfterEach @@ -413,7 +415,7 @@ class LogManagerTest { assertEquals(numMessages * setSize / segmentBytes, log.numberOfSegments, "Check we have the expected number of segments.") // this cleanup shouldn't find any expired segments but should delete some to reduce size - time.sleep(logManager.InitialTaskDelayMs) + time.sleep(logManager.initialTaskDelayMs) assertEquals(6, log.numberOfSegments, "Now there should be exactly 6 segments") time.sleep(log.config.fileDeleteDelayMs + 1) @@ -482,7 +484,7 @@ class LogManagerTest { val set = TestUtils.singletonRecords("test".getBytes()) log.appendAsLeader(set, leaderEpoch = 0) } - time.sleep(logManager.InitialTaskDelayMs) + time.sleep(logManager.initialTaskDelayMs) assertTrue(lastFlush != log.lastFlushTime, "Time based flush should have been triggered") } @@ -604,7 +606,8 @@ class LogManagerTest { configRepository = configRepository, logDirs = logDirs, time = this.time, - recoveryThreadsPerDataDir = recoveryThreadsPerDataDir) + recoveryThreadsPerDataDir = recoveryThreadsPerDataDir, + initialTaskDelayMs = initialTaskDelayMs) } @Test @@ -637,9 +640,9 @@ class LogManagerTest { fileInIndex.get.getAbsolutePath) } - time.sleep(logManager.InitialTaskDelayMs) + time.sleep(logManager.initialTaskDelayMs) assertTrue(logManager.hasLogsToBeDeleted, "Logs deleted too early") - time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.InitialTaskDelayMs) + time.sleep(logManager.currentDefaultConfig.fileDeleteDelayMs - logManager.initialTaskDelayMs) assertFalse(logManager.hasLogsToBeDeleted, "Logs not deleted") } diff --git a/core/src/test/scala/unit/kafka/utils/TestUtils.scala b/core/src/test/scala/unit/kafka/utils/TestUtils.scala index d9383820fca..ff2ed19c32c 100755 --- a/core/src/test/scala/unit/kafka/utils/TestUtils.scala +++ b/core/src/test/scala/unit/kafka/utils/TestUtils.scala @@ -1506,7 +1506,8 @@ object TestUtils extends Logging { recoveryThreadsPerDataDir: Int = 4, transactionVerificationEnabled: Boolean = false, log: Option[UnifiedLog] = None, - remoteStorageSystemEnable: Boolean = false): LogManager = { + remoteStorageSystemEnable: Boolean = false, + initialTaskDelayMs: Long = ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT): LogManager = { val logManager = new LogManager(logDirs = logDirs.map(_.getAbsoluteFile), initialOfflineDirs = Array.empty[File], configRepository = configRepository, @@ -1526,7 +1527,8 @@ object TestUtils extends Logging { logDirFailureChannel = new LogDirFailureChannel(logDirs.size), keepPartitionMetadataFile = true, interBrokerProtocolVersion = interBrokerProtocolVersion, - remoteStorageSystemEnable = remoteStorageSystemEnable) + remoteStorageSystemEnable = remoteStorageSystemEnable, + initialTaskDelayMs = initialTaskDelayMs) if (log.isDefined) { val spyLogManager = Mockito.spy(logManager) diff --git a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java index bb98f8c5be3..fb76bab02cf 100644 --- a/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java +++ b/jmh-benchmarks/src/main/java/org/apache/kafka/jmh/server/CheckpointBench.java @@ -22,6 +22,7 @@ import kafka.server.AlterPartitionManager; import kafka.server.BrokerFeatures; import kafka.server.BrokerTopicStats; import kafka.server.KafkaConfig; +import org.apache.kafka.server.config.ServerLogConfigs; import org.apache.kafka.server.util.MockTime; import org.apache.kafka.storage.internals.log.CleanerConfig; import org.apache.kafka.storage.internals.log.LogConfig; @@ -109,7 +110,7 @@ public class CheckpointBench { JavaConverters.seqAsJavaList(brokerProperties.logDirs()).stream().map(File::new).collect(Collectors.toList()); this.logManager = TestUtils.createLogManager(JavaConverters.asScalaBuffer(files), new LogConfig(new Properties()), new MockConfigRepository(), new CleanerConfig(1, 4 * 1024 * 1024L, 0.9d, - 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), false); + 1024 * 1024, 32 * 1024 * 1024, Double.MAX_VALUE, 15 * 1000, true), time, MetadataVersion.latestTesting(), 4, false, Option.empty(), false, ServerLogConfigs.LOG_INITIAL_TASK_DELAY_MS_DEFAULT); scheduler.startup(); final BrokerTopicStats brokerTopicStats = new BrokerTopicStats(Optional.empty()); final MetadataCache metadataCache = diff --git a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java index dcd5e8ea2dd..4c682c8c1f8 100644 --- a/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java +++ b/server-common/src/main/java/org/apache/kafka/server/config/ServerLogConfigs.java @@ -185,4 +185,9 @@ public class ServerLogConfigs { "broker will not perform down-conversion for consumers expecting an older message format. The broker responds " + "with <code>UNSUPPORTED_VERSION</code> error for consume requests from such older clients. This configuration" + "does not apply to any message format conversion that might be required for replication to followers."; + + public static final String LOG_INITIAL_TASK_DELAY_MS_CONFIG = LOG_PREFIX + "initial.task.delay.ms"; + public static final long LOG_INITIAL_TASK_DELAY_MS_DEFAULT = 30 * 1000L; + public static final String LOG_INITIAL_TASK_DELAY_MS_DOC = "The initial task delay in millisecond when initializing " + + "tasks in LogManager. This should be used for testing only."; }