[ 
https://issues.apache.org/jira/browse/KAFKA-1414?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14068021#comment-14068021
 ] 

Jay Kreps commented on KAFKA-1414:
----------------------------------

We should check that having multiple threads accessing the same directory will 
actually have a negative impact.

First, I don't think anyone has actually checked that recovery is actually I/O 
bound. Recovery iterates over the log doing a bunch of small message reads, 
decompressing any compressed messages, and checking the CRC of all the data. 
This may actually be CPU bound.

Secondly the OS will do normal readahead which should help protect somewhat 
against random access for two interspersed linear accesses.

This should be easy to test. Run the perf test on a single node broker with 
multiple partitions on a single drive, then kill -9 it. Then run 
{code}
  echo 1 > /proc/sys/vm/drop_caches
{code}
and restart. If we do this twice, once with 1 thread and once with 2. The 
prediction is that the 2 threaded case will be slower than the 1 thread case, 
but it may actually not be.

> Speedup broker startup after hard reset
> ---------------------------------------
>
>                 Key: KAFKA-1414
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1414
>             Project: Kafka
>          Issue Type: Improvement
>          Components: log
>    Affects Versions: 0.8.2, 0.9.0, 0.8.1.1
>            Reporter: Dmitry Bugaychenko
>            Assignee: Jay Kreps
>         Attachments: 
> 0001-KAFKA-1414-Speedup-broker-startup-after-hard-reset-a.patch, 
> KAFKA-1414-rev1.patch, KAFKA-1414-rev2.fixed.patch, KAFKA-1414-rev2.patch, 
> KAFKA-1414-rev3.patch, freebie.patch, parallel-dir-loading-0.8.patch, 
> parallel-dir-loading-trunk-fixed-threadpool.patch, 
> parallel-dir-loading-trunk-threadpool.patch, parallel-dir-loading-trunk.patch
>
>
> After hard reset due to power failure broker takes way too much time 
> recovering unflushed segments in a single thread. This could be easiliy 
> improved launching multiple threads (one per data dirrectory, assuming that 
> typically each data directory is on a dedicated drive). Localy we trie this 
> simple patch to LogManager.loadLogs and it seems to work, however I'm too new 
> to scala, so do not take it literally:
> {code}
>   /**
>    * Recover and load all logs in the given data directories
>    */
>   private def loadLogs(dirs: Seq[File]) {
>     val threads : Array[Thread] = new Array[Thread](dirs.size)
>     var i: Int = 0
>     val me = this
>     for(dir <- dirs) {
>       val thread = new Thread( new Runnable {
>         def run()
>         {
>           val recoveryPoints = me.recoveryPointCheckpoints(dir).read
>           /* load the logs */
>           val subDirs = dir.listFiles()
>           if(subDirs != null) {
>             val cleanShutDownFile = new File(dir, Log.CleanShutdownFile)
>             if(cleanShutDownFile.exists())
>               info("Found clean shutdown file. Skipping recovery for all logs 
> in data directory '%s'".format(dir.getAbsolutePath))
>             for(dir <- subDirs) {
>               if(dir.isDirectory) {
>                 info("Loading log '" + dir.getName + "'")
>                 val topicPartition = Log.parseTopicPartitionName(dir.getName)
>                 val config = topicConfigs.getOrElse(topicPartition.topic, 
> defaultConfig)
>                 val log = new Log(dir,
>                   config,
>                   recoveryPoints.getOrElse(topicPartition, 0L),
>                   scheduler,
>                   time)
>                 val previous = addLogWithLock(topicPartition, log)
>                 if(previous != null)
>                   throw new IllegalArgumentException("Duplicate log 
> directories found: %s, %s!".format(log.dir.getAbsolutePath, 
> previous.dir.getAbsolutePath))
>               }
>             }
>             cleanShutDownFile.delete()
>           }
>         }
>       })
>       thread.start()
>       threads(i) = thread
>       i = i + 1
>     }
>     for(thread <- threads) {
>       thread.join()
>     }
>   }
>   def addLogWithLock(topicPartition: TopicAndPartition, log: Log): Log = {
>     logCreationOrDeletionLock synchronized {
>       this.logs.put(topicPartition, log)
>     }
>   }
> {code}



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to