[ 
https://issues.apache.org/jira/browse/KAFKA-772?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13592850#comment-13592850
 ] 

Sriram Subramanian commented on KAFKA-772:
------------------------------------------

The test failed on Monday and then again failed on Friday. It was clear that 
the issue was timing related. We tried to reproduce the failure on the local 
box (repeatedly running the test) but could not reproduce it. I did some code 
browsing but did not have much luck. So I decided to setup tracing and run the 
test repeatedly in a distributed environment over the weekend and was hoping 
that it would fail. Luckily, it did and the trace logs proved to be useful in 
identifying the issue. Thanks to John for setting this up.

What you see below are excerpts from the trace log which pertain to this 
failure at different points in time. In this particular failure, topic_2 / 
partitions 2 had missing logical offsets from 570 to 582 on broker 3 (3 brokers 
in total).

current fetch offset = 582 
current HW = 570
Leader for topic_2/partition 2 = broker 2

1. The lines below show the Fetch request that was issued by broker 3 to broker 
2 just before broker 1 was shutdown. The requested offset is 582 for [test_2,2].

[2013-03-02 12:37:56,034] TRACE [ReplicaFetcherThread-0-2], issuing to broker 2 
of fetch request Name: FetchRequest; Version: 0; CorrelationId: 121; ClientId: 
ReplicaFetcherThread-0-2; ReplicaId: 3; MaxWait: 500 ms; MinBytes: 4096 bytes; 
RequestInfo: [test_1,0] -> PartitionFetchInfo(700,1048576),[test_2,1] -> 
PartitionFetchInfo(677,1048576),[test_2,2] -> 
PartitionFetchInfo(582,1048576),[test_2,0] -> 
PartitionFetchInfo(679,1048576),[test_1,2] -> 
PartitionFetchInfo(600,1048576),[test_1,1] -> PartitionFetchInfo(699,1048576) 
(kafka.server.ReplicaFetcherThread)

2. Broker 1 is shutdown and broker 3 handles leader and isr request. Note that 
[test_2,2] still follows broker 2 but we still issue a makefollower call for it.

[2013-03-02 12:37:56,086] INFO Replica Manager on Broker 3: Handling leader and 
isr request Name: LeaderAndIsrRequest; Version: 0; CorrelationId: 2; ClientId: 
; AckTimeoutMs: 1000 ms; ControllerEpoch: 2; PartitionStateInfo: (test_1,0) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", 
"leaderEpoch":"1" },1),3),(test_2,1) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", 
"leaderEpoch":"2" },2),3),(test_2,2) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", 
"leaderEpoch":"1" },1),3),(test_2,0) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", 
"leaderEpoch":"2" },2),3),(test_1,2) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", 
"leaderEpoch":"2" },2),3),(test_1,1) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", 
"leaderEpoch":"1" },1),3); Leaders: id:2,host:xxxx(kafka.server.ReplicaManager)

3. The leader and isr request results in removing the fetcher to broker 2 for 
[test_2,2], truncating the log to high watermark (570) and then adding back the 
fetcher to the same broker.

[2013-03-02 12:37:56,088] INFO [ReplicaFetcherManager on broker 3] removing 
fetcher on topic test_2, partition 2 (kafka.server.ReplicaFetcherManager)
[2013-03-02 12:37:56,088] INFO [Kafka Log on Broker 3], Truncated log segment 
/tmp/kafka_server_3_logs/test_2-2/00000000000000000000.log to target offset 570 
(kafka.log.Log)
[2013-03-02 12:37:56,088] INFO [ReplicaFetcherManager on broker 3] adding 
fetcher on topic test_2, partion 2, initOffset 570 to broker 2 with fetcherId 0 
(kafka.server.ReplicaFetcherManager)

4. The leader and isr request is completed at this point of time.

[2013-03-02 12:37:56,090] INFO Replica Manager on Broker 3: Completed leader 
and isr request Name: LeaderAndIsrRequest; Version: 0; CorrelationId: 2; 
ClientId: ; AckTimeoutMs: 1000 ms; ControllerEpoch: 2; PartitionStateInfo: 
(test_1,0) -> PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", 
"leader":"2", "leaderEpoch":"1" },1),3),(test_2,1) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", 
"leaderEpoch":"2" },2),3),(test_2,2) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", 
"leaderEpoch":"1" },1),3),(test_2,0) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", 
"leaderEpoch":"2" },2),3),(test_1,2) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,3", "leader":"2", 
"leaderEpoch":"2" },2),3),(test_1,1) -> 
PartitionStateInfo(LeaderIsrAndControllerEpoch({ "ISR":"2,1,3", "leader":"2", 
"leaderEpoch":"1" },1),3); Leaders: id:2,host:xxxx (kafka.server.ReplicaManager)


5.  A log append happens at offset 582 though the nextOffset for the log is at 
570. This append actually pertains to the fetch request at step 1. This 
explains the gap in the log.

[2013-03-02 12:37:56,098] TRACE [Kafka Log on Broker 3], Appending message set 
to test_2-2 offset: 582 nextOffset: 570 messageSet: 
ByteBufferMessageSet(MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
1408289663, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=500 
cap=500]),582), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
3696400058, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=500 
cap=500]),583), MessageAndOffset(Message(magic = 0, attributes = 0, crc = 
2403920749, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=500 
cap=500]),584), ) (kafka.log.Log)

>From the set of steps above, it is clear that some thing is causing the fetch 
>request at step 1 to complete even though step 2 and 3 removed the fetcher for 
>that topic,partition.

Looking at the code now it becomes obvious. The race condition is between the 
thread that removes the fetcher, truncates the log and adds the fetcher back 
and the thread that fetches bytes from the leader. Follow the steps below to 
understand what is happening.

Partition.Scala

          replicaFetcherManager.removeFetcher(topic, partitionId)           --> 
step 2 : Removes the topic,partition – offset mapping from partitionMap in 
AbstractFetcherThread
          // make sure local replica exists
          val localReplica = getOrCreateReplica()
          localReplica.log.get.truncateTo(localReplica.highWatermark)    --> 
step 3 : Truncates to offset 570
          inSyncReplicas = Set.empty[Replica]
          leaderEpoch = leaderAndIsr.leaderEpoch
          zkVersion = leaderAndIsr.zkVersion
          leaderReplicaIdOpt = Some(newLeaderBrokerId)
          // start fetcher thread to current leader
          replicaFetcherManager.addFetcher(topic, partitionId, 
localReplica.logEndOffset, leaderBroker)    --> step 4: Sets the new fetcher to 
fetch from the log end offset which is at 570 at this point

AbstractFetcherThread.Scala

private def processFetchRequest(fetchRequest: FetchRequest) {
    val partitionsWithError = new mutable.HashSet[TopicAndPartition]
    var response: FetchResponse = null
    try {
      trace("issuing to broker %d of fetch request %s".format(sourceBroker.id, 
fetchRequest))
      response = simpleConsumer.fetch(fetchRequest)
    } catch {
      case t =>
        debug("error in fetch %s".format(fetchRequest), t)
        if (isRunning.get) {
          partitionMapLock synchronized {
            partitionsWithError ++= partitionMap.keys
          }
        }
    }
    fetcherStats.requestRate.mark()   -->  step 1 : Fetch completes. Fetch 
request is from offset 582.

    if (response != null) {
      // process fetched data 
      partitionMapLock.lock()     ---> step 5: This is where the fetch request 
is waiting when the addFetcher in Partition.Scala is executing above
      try {
        response.data.foreach {
          case(topicAndPartition, partitionData) =>
            val (topic, partitionId) = topicAndPartition.asTuple
            val currentOffset = partitionMap.get(topicAndPartition)
            if (currentOffset.isDefined) {
              partitionData.error match {
                case ErrorMapping.NoError =>
                  val messages = 
partitionData.messages.asInstanceOf[ByteBufferMessageSet]
                  val validBytes = messages.validBytes
                  val newOffset = messages.lastOption match {          -->  
step 6: The newOffset is set to 587 and partitionMap is updated
                    case Some(m: MessageAndOffset) => m.nextOffset
                    case None => currentOffset.get
                  }
                  partitionMap.put(topicAndPartition, newOffset)
                  fetcherLagStats.getFetcherLagStats(topic, partitionId).lag = 
partitionData.hw - newOffset
                  fetcherStats.byteRate.mark(validBytes)
                  // Once we hand off the partition data to the subclass, we 
can't mess with it any more in this thread
                  processPartitionData(topicAndPartition, currentOffset.get, 
partitionData)    --> step 7: This appends data to the log with logical offsets 
from 582 – 587. Note that the offset passed to this method is 570 
(currentOffset). Hence all offset validation checks in processPartitionData 
passes.
                case ErrorMapping.OffsetOutOfRangeCode =>
                  try {
                    val newOffset = handleOffsetOutOfRange(topicAndPartition)
                    partitionMap.put(topicAndPartition, newOffset)
                    warn("current offset %d for topic %s partition %d out of 
range; reset offset to %d"
                      .format(currentOffset.get, topic, partitionId, newOffset))
                  } catch {
                    case e =>
                      warn("error getting offset for %s %d to broker 
%d".format(topic, partitionId, sourceBroker.id), e)
                      partitionsWithError += topicAndPartition
                  }
                case _ =>
                  warn("error for %s %d to broker %d".format(topic, 
partitionId, sourceBroker.id),
                    ErrorMapping.exceptionFor(partitionData.error))
                  partitionsWithError += topicAndPartition
              }
            }
        }
      } finally {
        partitionMapLock.unlock()
      }
    }
                
> System Test Transient Failure on testcase_0122
> ----------------------------------------------
>
>                 Key: KAFKA-772
>                 URL: https://issues.apache.org/jira/browse/KAFKA-772
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: John Fung
>            Assignee: Sriram Subramanian
>              Labels: kafka-0.8, p1
>         Attachments: KAFKA-772.patch, testcase_0122.tar.gz, 
> testcase_0125.tar.gz
>
>
> * This test case is failing randomly in the past few weeks. Please note there 
> is a small % data loss allowance for the test case with Ack = 1. But the 
> failure in this case is the mismatch of log segment checksum across the 
> replicas.
> * Test description:
> 3 brokers cluster
> Replication factor = 3
> No. topic = 2
> No. partitions = 3
> Controlled failure (kill -15)
> Ack = 1
> * Test case output
> _test_case_name  :  testcase_0122
> _test_class_name  :  ReplicaBasicTest
> arg : auto_create_topic  :  true
> arg : bounce_broker  :  true
> arg : broker_type  :  leader
> arg : message_producing_free_time_sec  :  15
> arg : num_iteration  :  3
> arg : num_partition  :  3
> arg : replica_factor  :  3
> arg : sleep_seconds_between_producer_calls  :  1
> validation_status  : 
>      Leader Election Latency - iter 1 brokerid 3  :  377.00 ms
>      Leader Election Latency - iter 2 brokerid 1  :  374.00 ms
>      Leader Election Latency - iter 3 brokerid 2  :  384.00 ms
>      Leader Election Latency MAX  :  384.00
>      Leader Election Latency MIN  :  374.00
>      Unique messages from consumer on [test_1] at 
> simple_consumer_test_1-0_r1.log  :  1750
>      Unique messages from consumer on [test_1] at 
> simple_consumer_test_1-0_r2.log  :  1750
>      Unique messages from consumer on [test_1] at 
> simple_consumer_test_1-0_r3.log  :  1750
>      Unique messages from consumer on [test_1] at 
> simple_consumer_test_1-1_r1.log  :  1750
>      Unique messages from consumer on [test_1] at 
> simple_consumer_test_1-1_r2.log  :  1750
>      Unique messages from consumer on [test_1] at 
> simple_consumer_test_1-1_r3.log  :  1750
>      Unique messages from consumer on [test_1] at 
> simple_consumer_test_1-2_r1.log  :  1500
>      Unique messages from consumer on [test_1] at 
> simple_consumer_test_1-2_r2.log  :  1500
>      Unique messages from consumer on [test_1] at 
> simple_consumer_test_1-2_r3.log  :  1500
>      Unique messages from consumer on [test_2]  :  5000
>      Unique messages from consumer on [test_2] at 
> simple_consumer_test_2-0_r1.log  :  1714
>      Unique messages from consumer on [test_2] at 
> simple_consumer_test_2-0_r2.log  :  1714
>      Unique messages from consumer on [test_2] at 
> simple_consumer_test_2-0_r3.log  :  1680
>      Unique messages from consumer on [test_2] at 
> simple_consumer_test_2-1_r1.log  :  1708
>      Unique messages from consumer on [test_2] at 
> simple_consumer_test_2-1_r2.log  :  1708
>      Unique messages from consumer on [test_2] at 
> simple_consumer_test_2-1_r3.log  :  1708
>      Unique messages from consumer on [test_2] at 
> simple_consumer_test_2-2_r1.log  :  1469
>      Unique messages from consumer on [test_2] at 
> simple_consumer_test_2-2_r2.log  :  1469
>      Unique messages from consumer on [test_2] at 
> simple_consumer_test_2-2_r3.log  :  1469
>      Unique messages from producer on [test_2]  :  4900
>      Validate for data matched on topic [test_1] across replicas  :  PASSED
>      Validate for data matched on topic [test_2]  :  FAILED
>      Validate for data matched on topic [test_2] across replicas  :  FAILED
>      Validate for merged log segment checksum in cluster [source]  :  FAILED
>      Validate leader election successful  :  PASSED

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to