[ 
https://issues.apache.org/jira/browse/KAFKA-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Manikumar resolved KAFKA-1138.
------------------------------
    Resolution: Fixed

> Remote producer uses the hostname defined in broker
> ---------------------------------------------------
>
>                 Key: KAFKA-1138
>                 URL: https://issues.apache.org/jira/browse/KAFKA-1138
>             Project: Kafka
>          Issue Type: Bug
>          Components: producer 
>    Affects Versions: 0.8.0
>            Reporter: Hyun-Gul Roh
>            Assignee: Jun Rao
>
> When the producer API in the node which is not the broker sends message to a 
> broker, only TopicMetadataRequest is sent, but ProducerRequest is not by 
> observing the log of "kafka-request.log"
> According to my analysis, when the producer api sends ProducerRequest, it 
> seems to use the hostname defined in the broker. So, if the hostname is not 
> the one registered in DNS, the producer cannot send the ProducerRequest. 
> I am attaching the log:
> [2013-11-21 15:28:49,464] ERROR Failed to collate messages by topic, 
> partition due to: fetching topic metadata for topics [Set(test)] from broker 
> [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed 
> (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:49,465] INFO Back off for 100 ms before retrying send. 
> Remaining retries = 1 (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:49,566] INFO Fetching metadata from broker 
> id:0,host:111.111.111.111,port:9092 with correlation id 6 for 1 topic(s) 
> Set(test) (kafka.client.ClientUtils$)
> [2013-11-21 15:28:49,819] ERROR Producer connection to 111.111.111.111:9092 
> unsuccessful (kafka.producer.SyncProducer)
> java.net.ConnectException: 연결이 거부됨
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>       at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>       at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>       at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:49,821] WARN Fetching topic metadata with correlation id 6 
> for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] 
> failed (kafka.client.ClientUtils$)
> java.net.ConnectException: 연결이 거부됨
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>       at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>       at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>       at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:49,822] ERROR fetching topic metadata for topics 
> [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] 
> failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] 
> from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>       at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>       at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>       at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> Caused by: java.net.ConnectException: 연결이 거부됨
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       ... 12 more
> [2013-11-21 15:28:49,825] INFO Fetching metadata from broker 
> id:0,host:111.111.111.111,port:9092 with correlation id 7 for 1 topic(s) 
> Set(test) (kafka.client.ClientUtils$)
> [2013-11-21 15:28:50,021] ERROR Producer connection to 111.111.111.111:9092 
> unsuccessful (kafka.producer.SyncProducer)
> java.net.ConnectException: 연결이 거부됨
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>       at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>       at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>       at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,024] WARN Fetching topic metadata with correlation id 7 
> for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] 
> failed (kafka.client.ClientUtils$)
> java.net.ConnectException: 연결이 거부됨
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49)
>       at 
> kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150)
>       at 
> scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57)
>       at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43)
>       at 
> kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150)
>       at 
> kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73)
>       at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>       at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>       at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>       at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,025] ERROR Failed to collate messages by topic, 
> partition due to: fetching topic metadata for topics [Set(test)] from broker 
> [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed 
> (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:50,026] INFO Back off for 100 ms before retrying send. 
> Remaining retries = 0 (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:50,127] INFO Fetching metadata from broker 
> id:0,host:111.111.111.111,port:9092 with correlation id 8 for 1 topic(s) 
> Set(test) (kafka.client.ClientUtils$)
> [2013-11-21 15:28:50,324] ERROR Producer connection to 111.111.111.111:9092 
> unsuccessful (kafka.producer.SyncProducer)
> java.net.ConnectException: 연결이 거부됨
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>       at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>       at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>       at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,326] WARN Fetching topic metadata with correlation id 8 
> for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] 
> failed (kafka.client.ClientUtils$)
> java.net.ConnectException: 연결이 거부됨
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>       at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>       at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>       at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> [2013-11-21 15:28:50,328] ERROR fetching topic metadata for topics 
> [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] 
> failed (kafka.utils.Utils$)
> kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] 
> from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67)
>       at 
> kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82)
>       at 
> kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79)
>       at kafka.utils.Utils$.swallow(Utils.scala:186)
>       at kafka.utils.Logging$class.swallowError(Logging.scala:105)
>       at kafka.utils.Utils$.swallowError(Utils.scala:45)
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79)
>       at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>       at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>       at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>       at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
> Caused by: java.net.ConnectException: 연결이 거부됨
>       at sun.nio.ch.Net.connect0(Native Method)
>       at sun.nio.ch.Net.connect(Net.java:465)
>       at sun.nio.ch.Net.connect(Net.java:457)
>       at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639)
>       at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57)
>       at kafka.producer.SyncProducer.connect(SyncProducer.scala:146)
>       at 
> kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161)
>       at 
> kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68)
>       at kafka.producer.SyncProducer.send(SyncProducer.scala:112)
>       at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53)
>       ... 12 more
> [2013-11-21 15:28:50,332] ERROR Failed to send requests for topics test with 
> correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler)
> [2013-11-21 15:28:50,333] ERROR Error in handling batch of 1 events 
> (kafka.producer.async.ProducerSendThread)
> kafka.common.FailedToSendMessageException: Failed to send messages after 3 
> tries.
>       at 
> kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90)
>       at 
> kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87)
>       at 
> kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67)
>       at scala.collection.immutable.Stream.foreach(Stream.scala:254)
>       at 
> kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66)
>       at 
> kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44)
>        



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to