MirrorMaker with shallow.iterator.enable=true produces unreadble messages; patched by Jun Rao; reviewed by Neha Narkhede; kafka-732
Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/771760ce Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/771760ce Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/771760ce Branch: refs/heads/trunk Commit: 771760ce23f00ba86b916420d8e209b2611b23c0 Parents: eae1bd5 Author: Jun Rao <jun...@gmail.com> Authored: Wed Mar 6 16:08:56 2013 -0800 Committer: Jun Rao <jun...@gmail.com> Committed: Wed Mar 6 16:08:56 2013 -0800 ---------------------------------------------------------------------- .../main/scala/kafka/consumer/ConsumerConfig.scala | 6 ------ .../scala/kafka/consumer/ConsumerIterator.scala | 7 +------ .../main/scala/kafka/consumer/KafkaStream.scala | 3 +-- .../consumer/ZookeeperConsumerConnector.scala | 3 +-- .../unit/kafka/consumer/ConsumerIteratorTest.scala | 1 - 5 files changed, 3 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ConsumerConfig.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala index 45db07b..2ebd72a 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala @@ -125,12 +125,6 @@ class ConsumerConfig private (val props: VerifiableProperties) extends ZKConfig( /** throw a timeout exception to the consumer if no message is available for consumption after the specified interval */ val consumerTimeoutMs = props.getInt("consumer.timeout.ms", ConsumerTimeoutMs) - /** Use shallow iterator over compressed messages directly. This feature should be used very carefully. - * Typically, it's only used for mirroring raw messages from one kafka cluster to another to save the - * overhead of decompression. - * */ - val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", false) - /** * Client id is specified by the kafka consumer client, used to distinguish different clients */ http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ConsumerIterator.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala index 746a4bd..a504534 100644 --- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala +++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala @@ -34,7 +34,6 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val enableShallowIterator: Boolean, val clientId: String) extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging { @@ -83,11 +82,7 @@ class ConsumerIterator[K, V](private val channel: BlockingQueue[FetchedDataChunk .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo)) currentTopicInfo.resetConsumeOffset(cdcFetchOffset) } - localCurrent = - if (enableShallowIterator) - currentDataChunk.messages.shallowIterator - else - currentDataChunk.messages.iterator + localCurrent = currentDataChunk.messages.iterator current.set(localCurrent) } http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/KafkaStream.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala b/core/src/main/scala/kafka/consumer/KafkaStream.scala index d4e0e96..31eaf86 100644 --- a/core/src/main/scala/kafka/consumer/KafkaStream.scala +++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala @@ -26,12 +26,11 @@ class KafkaStream[K,V](private val queue: BlockingQueue[FetchedDataChunk], consumerTimeoutMs: Int, private val keyDecoder: Decoder[K], private val valueDecoder: Decoder[V], - val enableShallowIterator: Boolean, val clientId: String) extends Iterable[MessageAndMetadata[K,V]] with java.lang.Iterable[MessageAndMetadata[K,V]] { private val iter: ConsumerIterator[K,V] = - new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, enableShallowIterator, clientId) + new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, valueDecoder, clientId) /** * Create an iterator over messages in the stream. http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala index b266f3f..dcbcf21 100644 --- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala +++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala @@ -195,7 +195,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, threadIdSet.map(_ => { val queue = new LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages) val stream = new KafkaStream[K,V]( - queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.shallowIteratorEnable, config.clientId) + queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, config.clientId) (queue, stream) }) ).flatten.toList @@ -695,7 +695,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: ConsumerConfig, config.consumerTimeoutMs, keyDecoder, valueDecoder, - config.shallowIteratorEnable, config.clientId) (queue, stream) }).toList http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala index 8ae30ea..1ee34b9 100644 --- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala +++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala @@ -78,7 +78,6 @@ class ConsumerIteratorTest extends JUnit3Suite with KafkaServerTestHarness { consumerConfig.consumerTimeoutMs, new StringDecoder(), new StringDecoder(), - enableShallowIterator = false, clientId = "") val receivedMessages = (0 until 5).map(i => iter.next.message).toList