[ 
https://issues.apache.org/jira/browse/KAFKA-6388?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16297674#comment-16297674
 ] 

Ismael Juma commented on KAFKA-6388:
------------------------------------

It does look like the code is inconsistent.

{code}
val newOffset = math.max(expectedNextOffset, logEndOffset)
        val logFile = Log.logFile(dir, newOffset)
        val offsetIdxFile = offsetIndexFile(dir, newOffset)
        val timeIdxFile = timeIndexFile(dir, newOffset)
        val txnIdxFile = transactionIndexFile(dir, newOffset)
        for (file <- List(logFile, offsetIdxFile, timeIdxFile, txnIdxFile) if 
file.exists) {
          warn(s"Newly rolled segment file ${file.getAbsolutePath} already 
exists; deleting it first")
          Files.delete(file.toPath)
        }

        Option(segments.lastEntry).foreach(_.getValue.onBecomeInactiveSegment())

        // take a snapshot of the producer state to facilitate recovery. It is 
useful to have the snapshot
        // offset align with the new segment offset since this ensures we can 
recover the segment by beginning
        // with the corresponding snapshot file and scanning the segment data. 
Because the segment base offset
        // may actually be ahead of the current producer state end offset 
(which corresponds to the log end offset),
        // we manually override the state offset here prior to taking the 
snapshot.
        producerStateManager.updateMapEndOffset(newOffset)
        producerStateManager.takeSnapshot()

        val segment = LogSegment.open(dir,
          baseOffset = newOffset,
          config,
          time = time,
          fileAlreadyExists = false,
          initFileSize = initFileSize,
          preallocate = config.preallocate)
        val prev = addSegment(segment)
        if (prev != null)
          throw new KafkaException("Trying to roll a new log segment for topic 
partition %s with start offset %d while it already exists.".format(name, 
newOffset))
{code}

Note how it tries to delete the files if they already exist, but then an 
exception is thrown if the segment exists in memory. This also means that we 
potentially attempt to delete a file that is currently open.

> Error while trying to roll a segment that already exists
> --------------------------------------------------------
>
>                 Key: KAFKA-6388
>                 URL: https://issues.apache.org/jira/browse/KAFKA-6388
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 1.0.0
>            Reporter: David Hay
>            Assignee: Neha Narkhede
>            Priority: Blocker
>
> Recreating this issue from KAFKA-654 as we've been hitting it repeatedly in 
> our attempts to get a stable 1.0 cluster running (upgrading from 0.8.2.2).
> After spending 30 min or more spewing log messages like this:
> {noformat}
> [2017-12-19 16:44:28,998] INFO Replica loaded for partition 
> screening.save.results.screening.save.results.processor.error-43 with initial 
> high watermark 0 (kafka.cluster.Replica)
> {noformat}
> Eventually, the replica thread throws the error below (also referenced in the 
> original issue).  If I remove that partition from the data directory and 
> bounce the broker, it eventually rebalances (assuming it doesn't hit a 
> different partition with the same error).
> {noformat}
> 2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 00000000000000000002.log already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 00000000000000000002.index already exists; deleting it first (kafka.log.Log)
> [2017-12-19 15:16:24,227] WARN Newly rolled segment file 
> 00000000000000000002.timeindex already exists; deleting it first 
> (kafka.log.Log)
> [2017-12-19 15:16:24,232] INFO [ReplicaFetcherManager on broker 2] Removed 
> fetcher for partitions __consumer_offsets-20 
> (kafka.server.ReplicaFetcherManager)
> [2017-12-19 15:16:24,297] ERROR [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Error due to (kafka.server.ReplicaFetcherThread)
> kafka.common.KafkaException: Error processing data for partition 
> sr.new.sr.new.processor.error-38 offset 2
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:204)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:172)
>         at scala.Option.foreach(Option.scala:257)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:172)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1.apply(AbstractFetcherThread.scala:169)
>         at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
>         at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:48)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply$mcV$sp(AbstractFetcherThread.scala:169)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2.apply(AbstractFetcherThread.scala:169)
>         at kafka.utils.CoreUtils$.inLock(CoreUtils.scala:217)
>         at 
> kafka.server.AbstractFetcherThread.processFetchRequest(AbstractFetcherThread.scala:167)
>         at 
> kafka.server.AbstractFetcherThread.doWork(AbstractFetcherThread.scala:113)
>         at kafka.utils.ShutdownableThread.run(ShutdownableThread.scala:64)
> Caused by: kafka.common.KafkaException: Trying to roll a new log segment for 
> topic partition sr.new.sr.new.processor.error-38 with start offset 2 while it 
> already exists.
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1338)
>         at kafka.log.Log$$anonfun$roll$2.apply(Log.scala:1297)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.roll(Log.scala:1297)
>         at kafka.log.Log.kafka$log$Log$$maybeRoll(Log.scala:1284)
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:710)
>         at kafka.log.Log$$anonfun$append$2.apply(Log.scala:624)
>         at kafka.log.Log.maybeHandleIOException(Log.scala:1669)
>         at kafka.log.Log.append(Log.scala:624)
>         at kafka.log.Log.appendAsFollower(Log.scala:607)
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:102)
>         at 
> kafka.server.ReplicaFetcherThread.processPartitionData(ReplicaFetcherThread.scala:41)
>         at 
> kafka.server.AbstractFetcherThread$$anonfun$processFetchRequest$2$$anonfun$apply$mcV$sp$1$$anonfun$apply$2.apply(AbstractFetcherThread.scala:184)
>         ... 13 more
> [2017-12-19 15:16:24,302] INFO [ReplicaFetcher replicaId=2, leaderId=1, 
> fetcherId=0] Stopped (kafka.server.ReplicaFetcherThread)
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to