Repository: kafka Updated Branches: refs/heads/0.11.0 eef121f2f -> c54ef62f2
KAFKA-3123; Ensure cleaning is resumed if truncateTo throws â¦f range >From https://github.com/apache/kafka/pull/1716#discussion_r112000498, ensure >the cleaner is restarted if Log.truncateTo throws Author: Mickael Maison <mickael.mai...@gmail.com> Reviewers: Ismael Juma <ism...@juma.me.uk> Closes #3296 from mimaison/KAFKA-3123 (cherry picked from commit 9180cb23d7a9816927ae9a7736d7faff673e1def) Signed-off-by: Ismael Juma <ism...@juma.me.uk> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c54ef62f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c54ef62f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c54ef62f Branch: refs/heads/0.11.0 Commit: c54ef62f21bccfbc6edda042026d13f234e915eb Parents: eef121f Author: Mickael Maison <mickael.mai...@gmail.com> Authored: Mon Jun 12 22:36:47 2017 +0100 Committer: Ismael Juma <ism...@juma.me.uk> Committed: Mon Jun 12 22:37:28 2017 +0100 ---------------------------------------------------------------------- core/src/main/scala/kafka/log/LogManager.scala | 15 +++++++++------ 1 file changed, 9 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c54ef62f/core/src/main/scala/kafka/log/LogManager.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/log/LogManager.scala b/core/src/main/scala/kafka/log/LogManager.scala index d8cdf90..61879be 100755 --- a/core/src/main/scala/kafka/log/LogManager.scala +++ b/core/src/main/scala/kafka/log/LogManager.scala @@ -324,13 +324,16 @@ class LogManager(val logDirs: Array[File], // If the log does not exist, skip it if (log != null) { //May need to abort and pause the cleaning of the log, and resume after truncation is done. - val needToStopCleaner: Boolean = truncateOffset < log.activeSegment.baseOffset - if (needToStopCleaner && cleaner != null) + val needToStopCleaner = cleaner != null && truncateOffset < log.activeSegment.baseOffset + if (needToStopCleaner) cleaner.abortAndPauseCleaning(topicPartition) - log.truncateTo(truncateOffset) - if (needToStopCleaner && cleaner != null) { - cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) - cleaner.resumeCleaning(topicPartition) + try { + log.truncateTo(truncateOffset) + if (needToStopCleaner) + cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, topicPartition, log.activeSegment.baseOffset) + } finally { + if (needToStopCleaner) + cleaner.resumeCleaning(topicPartition) } } }