[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14062680#comment-14062680
 ] 

ASF GitHub Bot commented on KAFKA-1414:
---------------------------------------

GitHub user ataraxer opened a pull request:

    https://github.com/apache/kafka/pull/26

    KAFKA-1414: Speedup broker startup after hard reset and shutdown

    This patch increases speed of both hard reset and shutdown by introducing 
`log.recovery.threads` and `log.shutdown.threads` properties, which allows to 
perform work required for them in parallel, grained by log directories.
    
    Best performance can be achieved by setting thread count to number of log 
directories, provided that they are located on dedicated drives. Although that 
option should be used with caution due to the possibility of native JVM out of 
memory error.
    
    Patch is compiled of changes proposed by Jay Kreps, Alexey Ozeritskiy, 
Dmitry Bugaychenko by Anton Karamanov.
    
    All tests are passing.

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/ataraxer/kafka kafka-1414

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/kafka/pull/26.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #26
    
----
commit e4a86709d07030c44f077ab20d4329ddb84c4aec
Author: Anton Karamanov <atara...@gmail.com>
Date:   2014-07-15T17:42:15Z

    KAFKA-1414 Speedup broker startup after hard reset and shutdown; patched by 
Jay Kreps,  Alexey Ozeritskiy, Dmitry Bugaychenko and Anton Karamanov

----


> Speedup broker startup after hard reset
> ---------------------------------------
>
>                 Key: KAFKA-1414
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1414
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>    Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Jay Kreps
>         Attachments: parallel-dir-loading-0.8.patch, 
> parallel-dir-loading-trunk-fixed-threadpool.patch, 
> parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch
>
>
> After hard reset due to power failure broker takes way too much time 
> recovering unflushed segments in a single thread. This could be easiliy 
> improved launching multiple threads (one per data dirrectory, assuming that 
> typically each data directory is on a dedicated drive). Localy we trie this 
> simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
> to scala, so do not take it literally:
> {code}
>   /**
>    * Recover and load all logs in the given data directories
>    */
>   private def loadLogs(dirs: Seq[File]) {
>     val threads : Array[Thread] = new Array[Thread](dirs.size)
>     var i: Int = 0
>     val me = this
>     for(dir <- dirs) {
>       val thread = new Thread( new Runnable {
>         def run()
>         {
>           val recoveryPoints = me.recoveryPointCheckpoints(dir).read
>           /* load the logs */
>           val subDirs = dir.listFiles()
>           if(subDirs != null) {
>             val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
>             if(cleanShutDownFile.exists())
>               info("Found clean shutdown file. Skipping recovery for all logs 
> in data directory '%s'".format(dir.getAbsolutePath))
>             for(dir <- subDirs) {
>               if(dir.isDirectory) {
>                 info("Loading log '" + dir.getName + "'")
>                 val topicPartition = Log.parseTopicPartitionName(dir.getName)
>                 val config = topicConfigs.getOrElse(topicPartition.topic, 
> defaultConfig)
>                 val log = new Log(dir,
>                   config,
>                   recoveryPoints.getOrElse(topicPartition, 0L),
>                   scheduler,
>                   time)
>                 val previous = addLogWithLock(topicPartition, log)
>                 if(previous != null)
>                   throw new IllegalArgumentException("Duplicate log 
> directories found: %s, %s!".format(log.dir.getAbsolutePath, 
> previous.dir.getAbsolutePath))
>               }
>             }
>             cleanShutDownFile.delete()
>           }
>         }
>       })
>       thread.start()
>       threads(i) = thread
>       i = i + 1
>     }
>     for(thread <- threads) {
>       thread.join()
>     }
>   }
>   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
>     logCreationOrDeletionLock synchronized {
>       this.logs.put(topicPartition, log)
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to