[ 
https://issues.apache.org/jira/browse/KAFKA-4825?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15898312#comment-15898312
 ] 

Jun Rao edited comment on KAFKA-4825 at 3/6/17 10:53 PM:
---------------------------------------------------------

[~benstopford], thanks for  reporting this. Took a look at the log and it does 
appear that this is a result of KIP-101.

I first grepped all log truncations on partition topic-11. As you can see, a 
few broker truncated the log to exactly offset 372 twice in a very short 
window. This typically means that the leader was changed very quickly in a 
short window, which can expose the issue in KIP-101.

cat worker2/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:43,853] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:33:00,661] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:33:04,172] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)

cat worker5/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:43,765] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:33:00,600] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:35:28,133] INFO Truncating log test_topic-11 to offset 1031. 
(kafka.log.Log)

cat worker6/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:54,875] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:32:55,665] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:33:00,676] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:35:28,145] INFO Truncating log test_topic-11 to offset 1031. 
(kafka.log.Log)

The worker to broker mapping is the following.
worker2 (broker 3), worker5 (broker 1), worker6 (broker 2), worker8 (broker 4)

Looking at the controller log in broker 4, it first took over as the controller 
at 06:32:58.

[2017-03-01 06:32:58,351] INFO [Controller 4]: Broker 4 starting become 
controller state transition (kafka.controller.KafkaController)

As part of the controller initialization logic, it sends the first 
LeaderAndIsrRequest for  topic-11 with leader=3 (current leader), which 
triggers the first log truncation.

{code}
[2017-03-01 06:33:00,641] DEBUG [Partition state machine on Controller 4]: 
After leader election, leader cache is updated to Map([test_topic,7] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,0] -> 
(Leader:3,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,4] -> 
(Leader:2,ISR:3,4,2,LeaderEpoch:3,ControllerEpoch:1), [test_topic,15] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,19] -> 
(Leader:4,ISR:4,3,LeaderEpoch:4,ControllerEpoch:1), [test_topic,10] -> 
(Leader:2,ISR:2,4,LeaderEpoch:2,ControllerEpoch:2), [test_topic,16] -> 
(Leader:3,ISR:3,4,LeaderEpoch:1,ControllerEpoch:1), [test_topic,2] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,3] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:3,ControllerEpoch:1), [test_topic,18] -> 
(Leader:3,ISR:3,2,LeaderEpoch:1,ControllerEpoch:1), [test_topic,9] -> 
(Leader:3,ISR:3,4,LeaderEpoch:4,ControllerEpoch:1), [test_topic,6] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,13] -> 
(Leader:2,ISR:2,4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,12] -> 
(Leader:4,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,14] -> 
(Leader:3,ISR:3,4,2,LeaderEpoch:2,ControllerEpoch:1), [test_topic,8] -> 
(Leader:2,ISR:4,2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,1] -> 
(Leader:2,ISR:2,3,4,LeaderEpoch:2,ControllerEpoch:1), [test_topic,11] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,5] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,17] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1)) 
(kafka.controller.PartitionStateMachine)
{code}

Almost immediately after that, broker 3 undergoes controlled shutdown. So, the 
controller moves the leader to broker 1. When broker 1 becomes the new leader, 
it may not have refetched the message at offset 372, causing the data loss.

{code}
[2017-03-01 06:33:00,668] DEBUG [ControlledShutdownLeaderSelector]: Partition 
[test_topic,11] : current leader = 3, new leader = 1 
(kafka.controller.ControlledShutdownLeaderSelector)
[2017-03-01 06:33:00,669] DEBUG [Partition state machine on Controller 4]: 
After leader election, leader cache is updated to Map([test_topic,7] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,0] -> 
(Leader:3,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,4] -> 
(Leader:2,ISR:3,4,2,LeaderEpoch:3,ControllerEpoch:1), [test_topic,15] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,19] -> 
(Leader:4,ISR:4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,10] -> 
(Leader:2,ISR:2,4,LeaderEpoch:2,ControllerEpoch:2), [test_topic,16] -> 
(Leader:3,ISR:3,4,LeaderEpoch:1,ControllerEpoch:1), [test_topic,2] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,3] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:3,ControllerEpoch:1), [test_topic,18] -> 
(Leader:2,ISR:2,LeaderEpoch:2,ControllerEpoch:2), [test_topic,9] -> 
(Leader:4,ISR:4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,6] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,13] -> 
(Leader:2,ISR:2,4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,12] -> 
(Leader:4,ISR:2,4,LeaderEpoch:4,ControllerEpoch:2), [test_topic,14] -> 
(Leader:3,ISR:3,4,2,LeaderEpoch:2,ControllerEpoch:1), [test_topic,8] -> 
(Leader:2,ISR:4,2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,1] -> 
(Leader:2,ISR:2,3,4,LeaderEpoch:2,ControllerEpoch:1), [test_topic,11] -> 
(Leader:1,ISR:2,1,LeaderEpoch:5,ControllerEpoch:2), [test_topic,5] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,17] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1)) 
(kafka.controller.PartitionStateMachine)
{code}


was (Author: junrao):
[~benstopford], thanks for  reporting this. Took a look at the log and it does 
appear that this is a result of KIP-101.

I first grepped all log truncations on partition topic-11. As you can see, a 
few broker truncated the log to exactly offset 372 twice in a very short 
window. This typically means that the leader was changed very quickly in a 
short window, which can expose the issue in KIP-101.

cat worker2/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:43,853] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:33:00,661] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:33:04,172] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)

cat worker5/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:43,765] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:33:00,600] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:35:28,133] INFO Truncating log test_topic-11 to offset 1031. 
(kafka.log.Log)

cat worker6/debug/server.log | grep -i "Truncating log" | grep topic-11
[2017-03-01 06:32:54,875] INFO Truncating log test_topic-11 to offset 0. 
(kafka.log.Log)
[2017-03-01 06:32:55,665] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:33:00,676] INFO Truncating log test_topic-11 to offset 372. 
(kafka.log.Log)
[2017-03-01 06:35:28,145] INFO Truncating log test_topic-11 to offset 1031. 
(kafka.log.Log)

The worker to broker mapping is the following.
worker2 (broker 3), worker5 (broker 1), worker6 (broker 2), worker8 (broker 4)

Looking at the controller log in broker 4, it first took over as the controller 
at 06:32:58.

[2017-03-01 06:32:58,351] INFO [Controller 4]: Broker 4 starting become 
controller state transition (kafka.controller.KafkaController)

As part of the controller initialization logic, it sends the first 
LeaderAndIsrRequest for  topic-11 with leader=3 (current leader), which 
triggers the first log truncation.

[2017-03-01 06:33:00,641] DEBUG [Partition state machine on Controller 4]: 
After leader election, leader cache is updated to Map([test_topic,7] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,0] -> 
(Leader:3,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,4] -> 
(Leader:2,ISR:3,4,2,LeaderEpoch:3,ControllerEpoch:1), [test_topic,15] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,19] -> 
(Leader:4,ISR:4,3,LeaderEpoch:4,ControllerEpoch:1), [test_topic,10] -> 
(Leader:2,ISR:2,4,LeaderEpoch:2,ControllerEpoch:2), [test_topic,16] -> 
(Leader:3,ISR:3,4,LeaderEpoch:1,ControllerEpoch:1), [test_topic,2] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,3] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:3,ControllerEpoch:1), [test_topic,18] -> 
(Leader:3,ISR:3,2,LeaderEpoch:1,ControllerEpoch:1), [test_topic,9] -> 
(Leader:3,ISR:3,4,LeaderEpoch:4,ControllerEpoch:1), [test_topic,6] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,13] -> 
(Leader:2,ISR:2,4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,12] -> 
(Leader:4,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,14] -> 
(Leader:3,ISR:3,4,2,LeaderEpoch:2,ControllerEpoch:1), [test_topic,8] -> 
(Leader:2,ISR:4,2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,1] -> 
(Leader:2,ISR:2,3,4,LeaderEpoch:2,ControllerEpoch:1), [test_topic,11] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,5] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,17] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1)) 
(kafka.controller.PartitionStateMachine)

Almost immediately after that, broker 3 undergoes controlled shutdown. So, the 
controller moves the leader to broker 1. When broker 1 becomes the new leader, 
it may not have refetched the message at offset 372, causing the data loss.

[2017-03-01 06:33:00,668] DEBUG [ControlledShutdownLeaderSelector]: Partition 
[test_topic,11] : current leader = 3, new leader = 1 
(kafka.controller.ControlledShutdownLeaderSelector)
[2017-03-01 06:33:00,669] DEBUG [Partition state machine on Controller 4]: 
After leader election, leader cache is updated to Map([test_topic,7] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,0] -> 
(Leader:3,ISR:2,3,4,LeaderEpoch:3,ControllerEpoch:1), [test_topic,4] -> 
(Leader:2,ISR:3,4,2,LeaderEpoch:3,ControllerEpoch:1), [test_topic,15] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,19] -> 
(Leader:4,ISR:4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,10] -> 
(Leader:2,ISR:2,4,LeaderEpoch:2,ControllerEpoch:2), [test_topic,16] -> 
(Leader:3,ISR:3,4,LeaderEpoch:1,ControllerEpoch:1), [test_topic,2] -> 
(Leader:4,ISR:4,2,1,LeaderEpoch:4,ControllerEpoch:1), [test_topic,3] -> 
(Leader:4,ISR:4,2,3,LeaderEpoch:3,ControllerEpoch:1), [test_topic,18] -> 
(Leader:2,ISR:2,LeaderEpoch:2,ControllerEpoch:2), [test_topic,9] -> 
(Leader:4,ISR:4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,6] -> 
(Leader:3,ISR:3,2,1,LeaderEpoch:2,ControllerEpoch:1), [test_topic,13] -> 
(Leader:2,ISR:2,4,LeaderEpoch:5,ControllerEpoch:2), [test_topic,12] -> 
(Leader:4,ISR:2,4,LeaderEpoch:4,ControllerEpoch:2), [test_topic,14] -> 
(Leader:3,ISR:3,4,2,LeaderEpoch:2,ControllerEpoch:1), [test_topic,8] -> 
(Leader:2,ISR:4,2,LeaderEpoch:5,ControllerEpoch:2), [test_topic,1] -> 
(Leader:2,ISR:2,3,4,LeaderEpoch:2,ControllerEpoch:1), [test_topic,11] -> 
(Leader:1,ISR:2,1,LeaderEpoch:5,ControllerEpoch:2), [test_topic,5] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1), [test_topic,17] -> 
(Leader:2,ISR:2,4,3,LeaderEpoch:2,ControllerEpoch:1)) 
(kafka.controller.PartitionStateMachine)


> Likely Data Loss in ReassignPartitionsTest System Test
> ------------------------------------------------------
>
>                 Key: KAFKA-4825
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4825
>             Project: Kafka
>          Issue Type: Bug
>            Reporter: Ben Stopford
>              Labels: reliability
>         Attachments: problem.zip
>
>
> A failure in the below test may imply to a genuine missing message. 
> kafkatest.tests.core.reassign_partitions_test.ReassignPartitionsTest.test_reassign_partitions.bounce_brokers=True.security_protocol=PLAINTEXT
> The test - which reassigns partition whilst bouncing cluster members - 
> reconciles messages ack'd with messages received in the consumer. 
> The interesting part is that we received two ack's for the same offset, with 
> different messages:
> {"topic":"test_topic","partition":11,"name":"producer_send_success","value":"7447","time_ms":1488349980718,"offset":372,"key":null}
> {"topic":"test_topic","partition":11,"name":"producer_send_success","value":"7487","time_ms":1488349981780,"offset":372,"key":null}
> When searching the log files, via kafka.tools.DumpLogSegments, only the later 
> message is found. 
> The missing message lies midway through the test and appears to occur after a 
> leader moves (after 7447 is sent there is a ~1s pause, then 7487 is sent, 
> along with a backlog of messages for partitions 11, 16, 6). 
> The overall implication is a message appears to be acknowledged but later 
> lost. 
> Looking at the test itself it seems valid. The producer is initialised with 
> acks = -1. The callback checks for an exception in the onCompletion callback 
> and uses this to track acknowledgement in the test. 
> https://jenkins.confluent.io/job/system-test-kafka/521/console
> http://testing.confluent.io/confluent-kafka-system-test-results/?prefix=2017-03-01--001.1488363091--apache--trunk--c9872cb/ReassignPartitionsTest/test_reassign_partitions/bounce_brokers=True.security_protocol=PLAINTEXT/



--
This message was sent by Atlassian JIRA
(v6.3.15#6346)

Reply via email to