[ https://issues.apache.org/jira/browse/KAFKA-1138?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Manikumar resolved KAFKA-1138. ------------------------------ Resolution: Fixed > Remote producer uses the hostname defined in broker > --------------------------------------------------- > > Key: KAFKA-1138 > URL: https://issues.apache.org/jira/browse/KAFKA-1138 > Project: Kafka > Issue Type: Bug > Components: producer > Affects Versions: 0.8.0 > Reporter: Hyun-Gul Roh > Assignee: Jun Rao > > When the producer API in the node which is not the broker sends message to a > broker, only TopicMetadataRequest is sent, but ProducerRequest is not by > observing the log of "kafka-request.log" > According to my analysis, when the producer api sends ProducerRequest, it > seems to use the hostname defined in the broker. So, if the hostname is not > the one registered in DNS, the producer cannot send the ProducerRequest. > I am attaching the log: > [2013-11-21 15:28:49,464] ERROR Failed to collate messages by topic, > partition due to: fetching topic metadata for topics [Set(test)] from broker > [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed > (kafka.producer.async.DefaultEventHandler) > [2013-11-21 15:28:49,465] INFO Back off for 100 ms before retrying send. > Remaining retries = 1 (kafka.producer.async.DefaultEventHandler) > [2013-11-21 15:28:49,566] INFO Fetching metadata from broker > id:0,host:111.111.111.111,port:9092 with correlation id 6 for 1 topic(s) > Set(test) (kafka.client.ClientUtils$) > [2013-11-21 15:28:49,819] ERROR Producer connection to 111.111.111.111:9092 > unsuccessful (kafka.producer.SyncProducer) > java.net.ConnectException: 연결이 거부됨 > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > [2013-11-21 15:28:49,821] WARN Fetching topic metadata with correlation id 6 > for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] > failed (kafka.client.ClientUtils$) > java.net.ConnectException: 연결이 거부됨 > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > [2013-11-21 15:28:49,822] ERROR fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] > failed (kafka.utils.Utils$) > kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] > from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > Caused by: java.net.ConnectException: 연결이 거부됨 > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > ... 12 more > [2013-11-21 15:28:49,825] INFO Fetching metadata from broker > id:0,host:111.111.111.111,port:9092 with correlation id 7 for 1 topic(s) > Set(test) (kafka.client.ClientUtils$) > [2013-11-21 15:28:50,021] ERROR Producer connection to 111.111.111.111:9092 > unsuccessful (kafka.producer.SyncProducer) > java.net.ConnectException: 연결이 거부됨 > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > [2013-11-21 15:28:50,024] WARN Fetching topic metadata with correlation id 7 > for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] > failed (kafka.client.ClientUtils$) > java.net.ConnectException: 연결이 거부됨 > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.BrokerPartitionInfo.getBrokerPartitionInfo(BrokerPartitionInfo.scala:49) > at > kafka.producer.async.DefaultEventHandler.kafka$producer$async$DefaultEventHandler$$getPartitionListForTopic(DefaultEventHandler.scala:187) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:151) > at > kafka.producer.async.DefaultEventHandler$$anonfun$partitionAndCollate$1.apply(DefaultEventHandler.scala:150) > at > scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:57) > at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:43) > at > kafka.producer.async.DefaultEventHandler.partitionAndCollate(DefaultEventHandler.scala:150) > at > kafka.producer.async.DefaultEventHandler.dispatchSerializedData(DefaultEventHandler.scala:96) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:73) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > [2013-11-21 15:28:50,025] ERROR Failed to collate messages by topic, > partition due to: fetching topic metadata for topics [Set(test)] from broker > [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed > (kafka.producer.async.DefaultEventHandler) > [2013-11-21 15:28:50,026] INFO Back off for 100 ms before retrying send. > Remaining retries = 0 (kafka.producer.async.DefaultEventHandler) > [2013-11-21 15:28:50,127] INFO Fetching metadata from broker > id:0,host:111.111.111.111,port:9092 with correlation id 8 for 1 topic(s) > Set(test) (kafka.client.ClientUtils$) > [2013-11-21 15:28:50,324] ERROR Producer connection to 111.111.111.111:9092 > unsuccessful (kafka.producer.SyncProducer) > java.net.ConnectException: 연결이 거부됨 > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > [2013-11-21 15:28:50,326] WARN Fetching topic metadata with correlation id 8 > for topics [Set(test)] from broker [id:0,host:111.111.111.111,port:9092] > failed (kafka.client.ClientUtils$) > java.net.ConnectException: 연결이 거부됨 > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > [2013-11-21 15:28:50,328] ERROR fetching topic metadata for topics > [Set(test)] from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] > failed (kafka.utils.Utils$) > kafka.common.KafkaException: fetching topic metadata for topics [Set(test)] > from broker [ArrayBuffer(id:0,host:111.111.111.111,port:9092)] failed > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:67) > at > kafka.producer.BrokerPartitionInfo.updateInfo(BrokerPartitionInfo.scala:82) > at > kafka.producer.async.DefaultEventHandler$$anonfun$handle$2.apply$mcV$sp(DefaultEventHandler.scala:79) > at kafka.utils.Utils$.swallow(Utils.scala:186) > at kafka.utils.Logging$class.swallowError(Logging.scala:105) > at kafka.utils.Utils$.swallowError(Utils.scala:45) > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:79) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > Caused by: java.net.ConnectException: 연결이 거부됨 > at sun.nio.ch.Net.connect0(Native Method) > at sun.nio.ch.Net.connect(Net.java:465) > at sun.nio.ch.Net.connect(Net.java:457) > at sun.nio.ch.SocketChannelImpl.connect(SocketChannelImpl.java:639) > at kafka.network.BlockingChannel.connect(BlockingChannel.scala:57) > at kafka.producer.SyncProducer.connect(SyncProducer.scala:146) > at > kafka.producer.SyncProducer.getOrMakeConnection(SyncProducer.scala:161) > at > kafka.producer.SyncProducer.kafka$producer$SyncProducer$$doSend(SyncProducer.scala:68) > at kafka.producer.SyncProducer.send(SyncProducer.scala:112) > at kafka.client.ClientUtils$.fetchTopicMetadata(ClientUtils.scala:53) > ... 12 more > [2013-11-21 15:28:50,332] ERROR Failed to send requests for topics test with > correlation ids in [0,8] (kafka.producer.async.DefaultEventHandler) > [2013-11-21 15:28:50,333] ERROR Error in handling batch of 1 events > (kafka.producer.async.ProducerSendThread) > kafka.common.FailedToSendMessageException: Failed to send messages after 3 > tries. > at > kafka.producer.async.DefaultEventHandler.handle(DefaultEventHandler.scala:90) > at > kafka.producer.async.ProducerSendThread.tryToHandle(ProducerSendThread.scala:104) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:87) > at > kafka.producer.async.ProducerSendThread$$anonfun$processEvents$3.apply(ProducerSendThread.scala:67) > at scala.collection.immutable.Stream.foreach(Stream.scala:254) > at > kafka.producer.async.ProducerSendThread.processEvents(ProducerSendThread.scala:66) > at > kafka.producer.async.ProducerSendThread.run(ProducerSendThread.scala:44) > -- This message was sent by Atlassian JIRA (v6.4.14#64029)