[ 
https://issues.apache.org/jira/browse/KAFKA-4128?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15568706#comment-15568706
 ] 

Mazhar Shaikh edited comment on KAFKA-4128 at 10/13/16 2:18 PM:
----------------------------------------------------------------

Hi Gwen Shapira,

My concern for this bug is as below :

1. When ever a follower connects to leader, where follower has more messages 
(offset) then leader, then follower truncates/Drop these msg to last 
Highwatermark.

   =>Here, Do we have any configuration which will avoid this dropping of msg 
and instead replicate it to master ?
     
2. What can be the possible reason for ZookeeperSession timeout, considering 
there is no issues with garbage collection.


Broker = 6
replica = 2
Total Partitions : 96, 
Partition per broker : 16 (Leader) + 16 (Follower)





I see this specific error is not handled in kafka code.
------------------------------------------------------------------------------------------------------------
 /**
   * Handle a partition whose offset is out of range and return a new fetch 
offset.
   */


def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
    val replica = replicaMgr.getReplica(topicAndPartition.topic, 
topicAndPartition.partition).get

    /**
     * Unclean leader election: A follower goes down, in the meanwhile the 
leader keeps appending messages. The follower comes back up
     * and before it has completely caught up with the leader's logs, all 
replicas in the ISR go down. The follower is now uncleanly
     * elected as the new leader, and it starts appending messages from the 
client. The old leader comes back up, becomes a follower
     * and it may discover that the current leader's end offset is behind its 
own end offset.
     *
     * In such a case, truncate the current follower's log to the current 
leader's end offset and continue fetching.
     *
     * There is a potential for a mismatch between the logs of the two replicas 
here. We don't fix this mismatch as of now.
     */

------------------------------------------------------------------------------------------------------------
There is a potential for a mismatch between the logs of the two replicas here. 
We don't fix this mismatch as of now.
------------------------------------------------------------------------------------------------------------


was (Author: mazhar.shaikh.in):
Hi Gwen Shapira,

My concern for this bug is as below :

1. When ever a follower connects to leader, where follower has more messages 
(offset) then leader, then follower truncates/Drop these msg to last 
Highwatermark.

   =>Here, Do we have any configuration which will avoid this dropping of msg 
and instead replicate it to master ?
     
2. What can be the possible reason for ZookeeperSession timeout, considering 
there is no issues with garbage collection.


Broker = 6
replica = 2
Total Partitions : 96, 
Partition per broker : 16 (Leader) + 16 (Follower)



I see this specific error is not handled in kafka code.
------------------------------------------------------------------------------------------------------------
 /**
   * Handle a partition whose offset is out of range and return a new fetch 
offset.
   */
  def handleOffsetOutOfRange(topicAndPartition: TopicAndPartition): Long = {
    val replica = replicaMgr.getReplica(topicAndPartition.topic, 
topicAndPartition.partition).get

    /**
     * Unclean leader election: A follower goes down, in the meanwhile the 
leader keeps appending messages. The follower comes back up
     * and before it has completely caught up with the leader's logs, all 
replicas in the ISR go down. The follower is now uncleanly
     * elected as the new leader, and it starts appending messages from the 
client. The old leader comes back up, becomes a follower
     * and it may discover that the current leader's end offset is behind its 
own end offset.
     *
     * In such a case, truncate the current follower's log to the current 
leader's end offset and continue fetching.
     *
     * There is a potential for a mismatch between the logs of the two replicas 
here. We don't fix this mismatch as of now.
     */

------------------------------------------------------------------------------------------------------------
There is a potential for a mismatch between the logs of the two replicas here. 
We don't fix this mismatch as of now.
------------------------------------------------------------------------------------------------------------

> Kafka broker losses messages when zookeeper session times out
> -------------------------------------------------------------
>
>                 Key: KAFKA-4128
>                 URL: https://issues.apache.org/jira/browse/KAFKA-4128
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8.2.1, 0.9.0.1
>            Reporter: Mazhar Shaikh
>            Priority: Critical
>
> Pumping 30k msgs/second after some 6-8 hrs of run below logs are printed and 
> the messages are lost.
> [More than 5k messages are lost on every partitions]
> Below are few logs:
> [2016-09-06 05:00:42,595] INFO Client session timed out, have not heard from 
> server in 20903ms for sessionid 0x256fabec47c0003, closing socket connection 
> and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:42,696] INFO zookeeper state changed (Disconnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-09-06 05:00:42,753] INFO Partition [topic,62] on broker 4: Shrinking 
> ISR for partition [topic,62] from 4,2 to 4 (kafka.cluster.Partition)
> [2016-09-06 05:00:43,585] INFO Opening socket connection to server 
> b0/169.254.2.1:2182. Will not attempt to authenticate using SASL (unknown 
> error) (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:43,586] INFO Socket connection established to 
> b0/169.254.2.1:2182, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:43,587] INFO Unable to read additional data from server 
> sessionid 0x256fabec47c0003, likely server has closed socket, closing socket 
> connection and attempting reconnect (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,644] INFO Opening socket connection to server 
> b1/169.254.2.116:2181. Will not attempt to authenticate using SASL (unknown 
> error) (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,651] INFO Socket connection established to 
> b1/169.254.2.116:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,658] INFO zookeeper state changed (Expired) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-09-06 05:00:44,659] INFO Initiating client connection, 
> connectString=b2.broker.com:2181,b1.broker.com:2181,zoo3.broker.com:2182 
> sessionTimeout=15000 watcher=org.I0Itec.zkclient.ZkClient@37b8e86a 
> (org.apache.zookeeper.ZooKeeper)
> [2016-09-06 05:00:44,659] INFO Unable to reconnect to ZooKeeper service, 
> session 0x256fabec47c0003 has expired, closing socket connection 
> (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,661] INFO EventThread shut down 
> (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,662] INFO Opening socket connection to server 
> b2/169.254.2.216:2181. Will not attempt to authenticate using SASL (unknown 
> error) (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,662] INFO Socket connection established to 
> b2/169.254.2.216:2181, initiating session (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,665] ERROR Error handling event ZkEvent[New session 
> event sent to 
> kafka.controller.KafkaController$SessionExpirationListener@33b7dedc] 
> (org.I0Itec.zkclient.ZkEventThread)
> java.lang.IllegalStateException: Kafka scheduler has not been started
>         at kafka.utils.KafkaScheduler.ensureStarted(KafkaScheduler.scala:114)
>         at kafka.utils.KafkaScheduler.shutdown(KafkaScheduler.scala:86)
>         at 
> kafka.controller.KafkaController.onControllerResignation(KafkaController.scala:350)
>         at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply$mcZ$sp(KafkaController.scala:1108)
>         at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
>         at 
> kafka.controller.KafkaController$SessionExpirationListener$$anonfun$handleNewSession$1.apply(KafkaController.scala:1107)
>         at kafka.utils.Utils$.inLock(Utils.scala:535)
>         at 
> kafka.controller.KafkaController$SessionExpirationListener.handleNewSession(KafkaController.scala:1107)
>         at org.I0Itec.zkclient.ZkClient$4.run(ZkClient.java:472)
>         at org.I0Itec.zkclient.ZkEventThread.run(ZkEventThread.java:71)
> [2016-09-06 05:00:44,666] INFO re-registering broker info in ZK for broker 4 
> (kafka.server.KafkaHealthcheck)
> [2016-09-06 05:00:44,801] INFO Session establishment complete on server 
> b2/169.254.2.216:2181, sessionid = 0x256fabec47c0005, negotiated timeout = 
> 15000 (org.apache.zookeeper.ClientCnxn)
> [2016-09-06 05:00:44,802] INFO zookeeper state changed (SyncConnected) 
> (org.I0Itec.zkclient.ZkClient)
> [2016-09-06 05:00:44,812] INFO Registered broker 4 at path /brokers/ids/4 
> with address b5.broker.com:9092. (kafka.utils.ZkUtils$)
> [2016-09-06 05:00:44,813] INFO done re-registering broker 
> (kafka.server.KafkaHealthcheck)
> [2016-09-06 05:00:44,814] INFO Subscribing to /brokers/topics path to watch 
> for new topics (kafka.server.KafkaHealthcheck)
> [2016-09-06 05:00:44,831] INFO Partition [topic,62] on broker 4: Expanding 
> ISR for partition [topic,62] from 4 to 4,2 (kafka.cluster.Partition)
> [2016-09-06 05:00:44,865] INFO New leader is 1 
> (kafka.server.ZookeeperLeaderElector$LeaderChangeListener)
> [2016-09-06 05:00:45,762] INFO [ReplicaFetcherManager on broker 4] Removed 
> fetcher for partitions 
> [topic,63],[topic,62],[topic,58],[topic,52],[topic,56],[topic,49],[topic,60],[topic,54],[topic,48],[topic,61],[topic,50],[topic,53],[topic,59],[topic,57],[topic,51],[topic,55]
>  (kafka.server.ReplicaFetcherManager)
> [2016-09-06 05:00:45,764] INFO Truncating log topic-63 to offset 9971093. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,771] INFO Truncating log topic-62 to offset 9971439. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,773] INFO Truncating log topic-58 to offset 9971298. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,774] INFO Truncating log topic-52 to offset 9740617. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,775] INFO Truncating log topic-56 to offset 9971829. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,777] INFO Truncating log topic-49 to offset 9740178. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,778] INFO Truncating log topic-60 to offset 9971167. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,780] INFO Truncating log topic-54 to offset 9738892. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,781] INFO Truncating log topic-48 to offset 9739499. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,787] INFO Truncating log topic-61 to offset 9972030. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,788] INFO Truncating log topic-50 to offset 9740175. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,793] INFO Truncating log topic-53 to offset 9740074. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,794] INFO Truncating log topic-59 to offset 9971052. 
> (kafka.log.Log)
> [2016-09-06 05:00:45,795] INFO Truncating log topic-57 to offset 9970962. 
> (kafka.log.Log)



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to