[ 
https://issues.apache.org/jira/browse/KAFKA-626?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Neha Narkhede resolved KAFKA-626.
---------------------------------

    Resolution: Fixed
      Assignee: Neha Narkhede
    
> Produce requests dropped due to socket timeouts on get metadata requests
> ------------------------------------------------------------------------
>
>                 Key: KAFKA-626
>                 URL: https://issues.apache.org/jira/browse/KAFKA-626
>             Project: Kafka
>          Issue Type: Bug
>    Affects Versions: 0.8
>            Reporter: Neha Narkhede
>            Assignee: Neha Narkhede
>            Priority: Blocker
>              Labels: bugs
>
> The setup of the test includes 2 servers with the following properties 
> overridden -
> num.partitions=10
> default.replication.factor=2
> Ran producer performance to send 1000 messages to 8 topics in async mode. 
> Each of the topics are auto created on the broker and default to 10 
> partitions. No broker was bounced during this test. 
> The producer log has the following errors -
> [2012-11-18 17:44:04,622] WARN fetching topic metadata for topics 
> [Set(test1114, test1117, test1115, test1116, test1118)] from broker 
> [id:0,creatorId:localhost-1353289442325,host:localhost,port:9091] failed 
> (kafka.client.ClientUtils$)
> java.net.SocketTimeoutException
>         at 
> sun.nio.ch.SocketAdaptor$SocketInputStream.read(SocketAdaptor.java:201)
>         at sun.nio.ch.ChannelInputStream.read(ChannelInputStream.java:86)
>         at 
> java.nio.channels.Channels$ReadableByteChannelImpl.read(Channels.java:221)
>         at kafka.utils.Utils$.read(Utils.scala:393)
>         at 
> kafka.network.BoundedByteBufferReceive.readFrom(BoundedByteBufferReceive.scala:54)
>         at kafka.network.Receive$class.readCompletely(Transmission.scala:56)
>         at 
> kafka.network.BoundedByteBufferReceive.readCompletely(BoundedByteBufferReceive.scala:29)
>         at kafka.network.BlockingChannel.receive(BlockingChannel.scala:100)
>         at kafka.producer.SyncProducer.liftedTree1$1(SyncProducer.scala:76)
>         at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:74)
>         at kafka.producer.SyncProducer.send(SyncProducer.scala:101)
>         at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:25)
>         at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:75)
>         at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$1.apply$mcV$sp(DefaultEventHandler.scala:62)
>         at kafka.utils.Utils$.swallow(Utils.scala:185)
>         at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>         at kafka.utils.Utils$.swallowError(Utils.scala:44)
>         at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:62)
>         at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:103)
>         at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:86)
>         at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:66)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>         at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:65)
>         at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:43)
> [2012-11-18 17:44:04,624] INFO Fetching metadata for topic Set(test1114, 
> test1117, test1115, test1116, test1118) (kafka.client.ClientUtils$)
> [2012-11-18 17:44:04,624] INFO Connected to localhost:9092 for producing 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:04,805] INFO Disconnecting from localhost:9092 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:04,806] INFO Disconnecting from 127.0.0.1:9091 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:04,806] INFO Disconnecting from 127.0.0.1:9092 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:04,815] INFO Connected to 127.0.0.1:9092 for producing 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:04,910] INFO Connected to 127.0.0.1:9091 for producing 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:05,048] INFO Fetching metadata for topic Set(test1115, 
> test1118) (kafka.client.ClientUtils$)
> [2012-11-18 17:44:05,049] INFO Connected to localhost:9091 for producing 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:05,111] INFO Disconnecting from localhost:9091 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:05,112] INFO Disconnecting from 127.0.0.1:9091 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:05,112] INFO Disconnecting from 127.0.0.1:9092 
> (kafka.producer.SyncProducer)
> [2012-11-18 17:44:05,114] ERROR Failed to send the following requests: 
> ArrayBuffer(KeyedMessage(test1115,1,Message(magic = 2, attributes = 0, crc = 
> 1950606895, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=100 
> cap=100])), KeyedMessage(test1115,11,Message(magic = 2, attributes = 0, crc = 
> 1950606895, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=100 
> cap=100])), KeyedMessage(test1115,21,Message(magic = 2, attributes = 0, crc = 
> 1950606895, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=100 
> cap=100])), KeyedMessage(test1118,5,Message(magic = 2, attributes = 0, crc = 
> 1950606895, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=100 
> cap=100])), KeyedMessage(test1118,15,Message(magic = 2, attributes = 0, crc = 
> 1950606895, key = null, payload = java.nio.HeapByteBuffer[pos=0 lim=100 
> cap=100]))) (kafka.producer.async.DefaultEventHandler)
> [2012-11-18 17:44:05,122] ERROR Error in handling batch of 200 events 
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3 
> tries.
>         at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:70)
>         at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:103)
>         at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:86)
>         at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:66)
>         at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>         at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:65)
>         at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:43)
> These errors don't happen when I run producer performance on fewer topics. 
> Also, the consumer receives 8995 messages, the expected messages is 8000 
> (1000/topic). Since the producer failed to send a request, most of these 
> messages could be duplicates from previous requests.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira

Reply via email to