[ https://issues.apache.org/jira/browse/KAFKA-1011?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13768820#comment-13768820 ]
Guozhang Wang commented on KAFKA-1011: -------------------------------------- Currently the main issue is the MessageAndMetadata returned by ConsumerIterator does not maintain the compression information, and hence we would not know if the message contained is compressed or not. So in order for this to work we have to add the compression info into MessageAndMetadata also. With the SyncProducer approach the changes on MM would be: 1) Add compression info into MessageAndMetadata; 2) Use SyncProducer instead of Producer in MM; 3) Add the batching and retry mechanism around SyncProducer; 4) Use a specific SyncProducer to send MetadataRequest, and maintain the cached metadata structure; 5) Reconstruct the Message object from the MessageAndMetadata object, and then construct the MessageSetByteBuffer object, and then the ProduceRequest, and call the SyncProducer; 3), 4), 5) would be implemented as a different class which will mimic the Producer's behavior but would use a different interface that takes MessageAndMetadata instead of KeyedMessage. > Decompression and re-compression on MirrorMaker could result in messages > being dropped in the pipeline > ------------------------------------------------------------------------------------------------------ > > Key: KAFKA-1011 > URL: https://issues.apache.org/jira/browse/KAFKA-1011 > Project: Kafka > Issue Type: Bug > Reporter: Guozhang Wang > Assignee: Guozhang Wang > Fix For: 0.8.1 > > Attachments: KAFKA-1011.v1.patch > > > The way MirrorMaker works today is that its consumers could use deep iterator > to decompress messages received from the source brokers and its producers > could re-compress the messages while sending them to the target brokers. > Since MirrorMakers use a centralized data channel for its consumers to pipe > messages to its producers, and since producers would compress messages with > the same topic within a batch as a single produce request, this could result > in messages accepted at the front end of the pipeline being dropped at the > target brokers of the MirrorMaker due to MesageSizeTooLargeException if it > happens that one batch of messages contain too many messages of the same > topic in MirrorMaker's producer. If we can use shallow iterator at the > MirrorMaker's consumer side to directly pipe compressed messages this issue > can be fixed. > Also as Swapnil pointed out, currently if the MirrorMaker lags and there are > large messages in the MirrorMaker queue (large after decompression), it can > run into an OutOfMemoryException. Shallow iteration will be very helpful in > avoiding this exception. > The proposed solution of this issue is also related to KAFKA-527. -- This message is automatically generated by JIRA. If you think it was sent incorrectly, please contact your JIRA administrators For more information on JIRA, see: http://www.atlassian.com/software/jira