This is an automated email from the ASF dual-hosted git repository.
ijuma pushed a commit to branch 2.3
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.3 by this push:
new a784008 KAFKA-9265: Fix kafka.log.Log instance leak on log deletion
(#7773)
a784008 is described below
commit a78400827651d952d6f4746dc5bd2205e25862de
Author: Vikas Singh <[email protected]>
AuthorDate: Wed Dec 4 05:52:29 2019 -0800
KAFKA-9265: Fix kafka.log.Log instance leak on log deletion (#7773)
KAFKA-8448 fixes problem with similar leak. The Log objects are being
held in ScheduledExecutor PeriodicProducerExpirationCheck callback. The
fix in KAFKA-8448 was to change the policy of ScheduledExecutor to
remove the scheduled task when it gets canceled (by calling
setRemoveOnCancelPolicy(true)).
This works when a log is closed using close() method. But when a log is
deleted either when the topic gets deleted or when the rebalancing
operation moves the replica away from broker, the delete() operation is
invoked. Log.delete() doesn't close the pending scheduled task and that
leaks Log instance.
Fix is to close the scheduled task in the Log.delete() method too.
Reviewers: Jason Gustafson <[email protected]>, Ismael Juma
<[email protected]>
---
core/src/main/scala/kafka/log/Log.scala | 1 +
.../main/scala/kafka/utils/KafkaScheduler.scala | 2 +-
core/src/test/scala/unit/kafka/log/LogTest.scala | 24 ++++++++++++++++++++++
3 files changed, 26 insertions(+), 1 deletion(-)
diff --git a/core/src/main/scala/kafka/log/Log.scala
b/core/src/main/scala/kafka/log/Log.scala
index 6ca83f8..23f5e23 100644
--- a/core/src/main/scala/kafka/log/Log.scala
+++ b/core/src/main/scala/kafka/log/Log.scala
@@ -1798,6 +1798,7 @@ class Log(@volatile var dir: File,
lock synchronized {
checkIfMemoryMappedBufferClosed()
removeLogMetrics()
+ producerExpireCheck.cancel(true)
logSegments.foreach(_.deleteIfExists())
segments.clear()
leaderEpochCache.foreach(_.clear())
diff --git a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
index cee4478..f41b8f8 100755
--- a/core/src/main/scala/kafka/utils/KafkaScheduler.scala
+++ b/core/src/main/scala/kafka/utils/KafkaScheduler.scala
@@ -130,7 +130,7 @@ class KafkaScheduler(val threads: Int,
/**
* Package private for testing.
*/
- private[utils] def taskRunning(task: ScheduledFuture[_]): Boolean = {
+ private[kafka] def taskRunning(task: ScheduledFuture[_]): Boolean = {
executor.getQueue().contains(task)
}
diff --git a/core/src/test/scala/unit/kafka/log/LogTest.scala
b/core/src/test/scala/unit/kafka/log/LogTest.scala
index f62205f..0a669c5 100755
--- a/core/src/test/scala/unit/kafka/log/LogTest.scala
+++ b/core/src/test/scala/unit/kafka/log/LogTest.scala
@@ -374,6 +374,30 @@ class LogTest {
}
}
+ /**
+ * Test that "PeriodicProducerExpirationCheck" scheduled task gets canceled
after log
+ * is deleted.
+ */
+ @Test
+ def testProducerExpireCheckAfterDelete(): Unit = {
+ val scheduler = new KafkaScheduler(1)
+ try {
+ scheduler.startup()
+ val logConfig = LogTest.createLogConfig()
+ val log = createLog(logDir, logConfig, scheduler = scheduler)
+
+ val producerExpireCheck = log.producerExpireCheck
+ assertTrue("producerExpireCheck isn't as part of scheduled tasks",
+ scheduler.taskRunning(producerExpireCheck))
+
+ log.delete()
+ assertFalse("producerExpireCheck is part of scheduled tasks even after
log deletion",
+ scheduler.taskRunning(producerExpireCheck))
+ } finally {
+ scheduler.shutdown();
+ }
+ }
+
private def
testProducerSnapshotsRecoveryAfterUncleanShutdown(messageFormatVersion:
String): Unit = {
val logConfig = LogTest.createLogConfig(segmentBytes = 64 * 10,
messageFormatVersion = messageFormatVersion)
var log = createLog(logDir, logConfig)