[
https://issues.apache.org/jira/browse/KAFKA-391?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14195762#comment-14195762
]
Honghai Chen commented on KAFKA-391:
------------------------------------
This situation happen under below scenario:
one broker is leader for several partitions, for example 3, when send one
messageset which has message for all of the 3 partitions of this broker ,
the response.status.size is 3 and the producerRequest.data.size is 1. then
it hit this exception. Any idea for fix? Do we need compare
response.status.size with messagesPerTopic.Count instead of
producerRequest.data.size ?
private def send(brokerId: Int, messagesPerTopic:
collection.mutable.Map[TopicAndPartition, ByteBufferMessageSet]) = {
if(brokerId < 0) {
warn("Failed to send data since partitions %s don't have a
leader".format(messagesPerTopic.map(_._1).mkString(",")))
messagesPerTopic.keys.toSeq
} else if(messagesPerTopic.size > 0) {
val currentCorrelationId = correlationId.getAndIncrement
val producerRequest = new ProducerRequest(currentCorrelationId,
config.clientId, config.requestRequiredAcks,
config.requestTimeoutMs, messagesPerTopic)
var failedTopicPartitions = Seq.empty[TopicAndPartition]
try {
val syncProducer = producerPool.getProducer(brokerId)
debug("Producer sending messages with correlation id %d for topics %s
to broker %d on %s:%d"
.format(currentCorrelationId, messagesPerTopic.keySet.mkString(","),
brokerId, syncProducer.config.host, syncProducer.config.port))
val response = syncProducer.send(producerRequest)
debug("Producer sent messages with correlation id %d for topics %s to
broker %d on %s:%d"
.format(currentCorrelationId, messagesPerTopic.keySet.mkString(","),
brokerId, syncProducer.config.host, syncProducer.config.port))
if(response != null) {
if (response.status.size != producerRequest.data.size)
throw new KafkaException("Incomplete response (%s) for producer
request (%s)".format(response, producerRequest))
> Producer request and response classes should use maps
> -----------------------------------------------------
>
> Key: KAFKA-391
> URL: https://issues.apache.org/jira/browse/KAFKA-391
> Project: Kafka
> Issue Type: Bug
> Reporter: Joel Koshy
> Assignee: Joel Koshy
> Priority: Blocker
> Labels: optimization
> Fix For: 0.8.0
>
> Attachments: KAFKA-391-draft-r1374069.patch, KAFKA-391-v2.patch,
> KAFKA-391-v3.patch, KAFKA-391-v4.patch
>
>
> Producer response contains two arrays of error codes and offsets - the
> ordering in these arrays correspond to the flattened ordering of the request
> arrays.
> It would be better to switch to maps in the request and response as this
> would make the code clearer and more efficient (right now, linear scans are
> used in handling producer acks).
> We can probably do the same in the fetch request/response.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)