Repository: kafka
Updated Branches:
  refs/heads/0.11.0 eef121f2f -> c54ef62f2


KAFKA-3123; Ensure cleaning is resumed if truncateTo throws

…f range

>From https://github.com/apache/kafka/pull/1716#discussion_r112000498, ensure 
>the cleaner is restarted if Log.truncateTo throws

Author: Mickael Maison <mickael.mai...@gmail.com>

Reviewers: Ismael Juma <ism...@juma.me.uk>

Closes #3296 from mimaison/KAFKA-3123

(cherry picked from commit 9180cb23d7a9816927ae9a7736d7faff673e1def)
Signed-off-by: Ismael Juma <ism...@juma.me.uk>


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c54ef62f
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c54ef62f
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c54ef62f

Branch: refs/heads/0.11.0
Commit: c54ef62f21bccfbc6edda042026d13f234e915eb
Parents: eef121f
Author: Mickael Maison <mickael.mai...@gmail.com>
Authored: Mon Jun 12 22:36:47 2017 +0100
Committer: Ismael Juma <ism...@juma.me.uk>
Committed: Mon Jun 12 22:37:28 2017 +0100

----------------------------------------------------------------------
 core/src/main/scala/kafka/log/LogManager.scala | 15 +++++++++------
 1 file changed, 9 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c54ef62f/core/src/main/scala/kafka/log/LogManager.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/log/LogManager.scala 
b/core/src/main/scala/kafka/log/LogManager.scala
index d8cdf90..61879be 100755
--- a/core/src/main/scala/kafka/log/LogManager.scala
+++ b/core/src/main/scala/kafka/log/LogManager.scala
@@ -324,13 +324,16 @@ class LogManager(val logDirs: Array[File],
       // If the log does not exist, skip it
       if (log != null) {
         //May need to abort and pause the cleaning of the log, and resume 
after truncation is done.
-        val needToStopCleaner: Boolean = truncateOffset < 
log.activeSegment.baseOffset
-        if (needToStopCleaner && cleaner != null)
+        val needToStopCleaner = cleaner != null && truncateOffset < 
log.activeSegment.baseOffset
+        if (needToStopCleaner)
           cleaner.abortAndPauseCleaning(topicPartition)
-        log.truncateTo(truncateOffset)
-        if (needToStopCleaner && cleaner != null) {
-          cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, 
topicPartition, log.activeSegment.baseOffset)
-          cleaner.resumeCleaning(topicPartition)
+        try {
+          log.truncateTo(truncateOffset)
+          if (needToStopCleaner)
+            cleaner.maybeTruncateCheckpoint(log.dir.getParentFile, 
topicPartition, log.activeSegment.baseOffset)
+        } finally {
+          if (needToStopCleaner)
+            cleaner.resumeCleaning(topicPartition)
         }
       }
     }

Reply via email to