MirrorMaker with shallow.iterator.enable=true produces unreadble messages; 
patched by Jun Rao; reviewed by Neha Narkhede; kafka-732


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/771760ce
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/771760ce
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/771760ce

Branch: refs/heads/trunk
Commit: 771760ce23f00ba86b916420d8e209b2611b23c0
Parents: eae1bd5
Author: Jun Rao <jun...@gmail.com>
Authored: Wed Mar 6 16:08:56 2013 -0800
Committer: Jun Rao <jun...@gmail.com>
Committed: Wed Mar 6 16:08:56 2013 -0800

----------------------------------------------------------------------
 .../main/scala/kafka/consumer/ConsumerConfig.scala |    6 ------
 .../scala/kafka/consumer/ConsumerIterator.scala    |    7 +------
 .../main/scala/kafka/consumer/KafkaStream.scala    |    3 +--
 .../consumer/ZookeeperConsumerConnector.scala      |    3 +--
 .../unit/kafka/consumer/ConsumerIteratorTest.scala |    1 -
 5 files changed, 3 insertions(+), 17 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala 
b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
index 45db07b..2ebd72a 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerConfig.scala
@@ -125,12 +125,6 @@ class ConsumerConfig private (val props: 
VerifiableProperties) extends ZKConfig(
   /** throw a timeout exception to the consumer if no message is available for 
consumption after the specified interval */
   val consumerTimeoutMs = props.getInt("consumer.timeout.ms", 
ConsumerTimeoutMs)
 
-  /** Use shallow iterator over compressed messages directly. This feature 
should be used very carefully.
-   *  Typically, it's only used for mirroring raw messages from one kafka 
cluster to another to save the
-   *  overhead of decompression.
-   *  */
-  val shallowIteratorEnable = props.getBoolean("shallow.iterator.enable", 
false)
-
   /**
    * Client id is specified by the kafka consumer client, used to distinguish 
different clients
    */

http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala 
b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
index 746a4bd..a504534 100644
--- a/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
+++ b/core/src/main/scala/kafka/consumer/ConsumerIterator.scala
@@ -34,7 +34,6 @@ class ConsumerIterator[K, V](private val channel: 
BlockingQueue[FetchedDataChunk
                              consumerTimeoutMs: Int,
                              private val keyDecoder: Decoder[K],
                              private val valueDecoder: Decoder[V],
-                             val enableShallowIterator: Boolean,
                              val clientId: String)
   extends IteratorTemplate[MessageAndMetadata[K, V]] with Logging {
 
@@ -83,11 +82,7 @@ class ConsumerIterator[K, V](private val channel: 
BlockingQueue[FetchedDataChunk
             .format(ctiConsumeOffset, cdcFetchOffset, currentTopicInfo))
           currentTopicInfo.resetConsumeOffset(cdcFetchOffset)
         }
-        localCurrent =
-          if (enableShallowIterator)
-            currentDataChunk.messages.shallowIterator
-          else
-            currentDataChunk.messages.iterator
+        localCurrent = currentDataChunk.messages.iterator
 
         current.set(localCurrent)
       }

http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/KafkaStream.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/consumer/KafkaStream.scala 
b/core/src/main/scala/kafka/consumer/KafkaStream.scala
index d4e0e96..31eaf86 100644
--- a/core/src/main/scala/kafka/consumer/KafkaStream.scala
+++ b/core/src/main/scala/kafka/consumer/KafkaStream.scala
@@ -26,12 +26,11 @@ class KafkaStream[K,V](private val queue: 
BlockingQueue[FetchedDataChunk],
                         consumerTimeoutMs: Int,
                         private val keyDecoder: Decoder[K],
                         private val valueDecoder: Decoder[V],
-                        val enableShallowIterator: Boolean,
                         val clientId: String)
    extends Iterable[MessageAndMetadata[K,V]] with 
java.lang.Iterable[MessageAndMetadata[K,V]] {
 
   private val iter: ConsumerIterator[K,V] =
-    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, 
valueDecoder, enableShallowIterator, clientId)
+    new ConsumerIterator[K,V](queue, consumerTimeoutMs, keyDecoder, 
valueDecoder, clientId)
 
   /**
    *  Create an iterator over messages in the stream.

http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
----------------------------------------------------------------------
diff --git 
a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala 
b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
index b266f3f..dcbcf21 100644
--- a/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
+++ b/core/src/main/scala/kafka/consumer/ZookeeperConsumerConnector.scala
@@ -195,7 +195,7 @@ private[kafka] class ZookeeperConsumerConnector(val config: 
ConsumerConfig,
       threadIdSet.map(_ => {
         val queue =  new 
LinkedBlockingQueue[FetchedDataChunk](config.queuedMaxMessages)
         val stream = new KafkaStream[K,V](
-          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, 
config.shallowIteratorEnable, config.clientId)
+          queue, config.consumerTimeoutMs, keyDecoder, valueDecoder, 
config.clientId)
         (queue, stream)
       })
     ).flatten.toList
@@ -695,7 +695,6 @@ private[kafka] class ZookeeperConsumerConnector(val config: 
ConsumerConfig,
                                           config.consumerTimeoutMs, 
                                           keyDecoder, 
                                           valueDecoder, 
-                                          config.shallowIteratorEnable,
                                           config.clientId)
         (queue, stream)
     }).toList

http://git-wip-us.apache.org/repos/asf/kafka/blob/771760ce/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala 
b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
index 8ae30ea..1ee34b9 100644
--- a/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
+++ b/core/src/test/scala/unit/kafka/consumer/ConsumerIteratorTest.scala
@@ -78,7 +78,6 @@ class ConsumerIteratorTest extends JUnit3Suite with 
KafkaServerTestHarness {
                                                     
consumerConfig.consumerTimeoutMs,
                                                     new StringDecoder(), 
                                                     new StringDecoder(),
-                                                    enableShallowIterator = 
false,
                                                     clientId = "")
     val receivedMessages = (0 until 5).map(i => iter.next.message).toList
 

Reply via email to