Repository: kafka
Updated Branches:
  refs/heads/0.9.0 c6ecc7809 -> c3f575d5f


KAFKA-3179; Fix seek on compressed messages

The fix itself is simple.

Some explanation on unit tests. Currently we the vast majority of unit test is 
running with uncompressed messages.  I was initially thinking about run all the 
tests using compressed messages. But it seems uncompressed messages are 
necessary in a many test cases because we need the bytes sent and appended to 
the log to be predictable. In most of other cases, it does not matter whether 
the message is compressed or not, and compression will slow down the unit test. 
So I just added one method in the BaseConsumerTest to send compressed messages 
whenever we need it.

Author: Jiangjie Qin <[email protected]>

Reviewers: Aditya Auradkar <[email protected]>, Ismael Juma 
<[email protected]>, Joel Koshy <[email protected]>

Closes #842 from becketqin/KAFKA-3179


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c3f575d5
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c3f575d5
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c3f575d5

Branch: refs/heads/0.9.0
Commit: c3f575d5f248cd60daada7037838d0c7c5be1277
Parents: c6ecc78
Author: Jiangjie Qin <[email protected]>
Authored: Thu Feb 4 16:08:21 2016 -0800
Committer: Joel Koshy <[email protected]>
Committed: Thu Feb 4 16:53:25 2016 -0800

----------------------------------------------------------------------
 .../clients/consumer/internals/Fetcher.java     |  7 +++-
 .../clients/consumer/internals/FetcherTest.java | 10 ++++-
 .../kafka/api/BaseConsumerTest.scala            | 31 ++++++++++------
 .../kafka/api/PlaintextConsumerTest.scala       | 39 ++++++++++++++++++--
 4 files changed, 69 insertions(+), 18 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f575d5/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
index c5d1bfe..e894102 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java
@@ -561,8 +561,11 @@ public class Fetcher<K, V> {
                 MemoryRecords records = MemoryRecords.readableRecords(buffer);
                 List<ConsumerRecord<K, V>> parsed = new ArrayList<>();
                 for (LogEntry logEntry : records) {
-                    parsed.add(parseRecord(tp, logEntry));
-                    bytes += logEntry.size();
+                    // Skip the messages earlier than current position.
+                    if (logEntry.offset() >= position) {
+                        parsed.add(parseRecord(tp, logEntry));
+                        bytes += logEntry.size();
+                    }
                 }
 
                 if (!parsed.isEmpty()) {

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f575d5/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
index 0a976fe..57fc67f 100644
--- 
a/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/clients/consumer/internals/FetcherTest.java
@@ -466,8 +466,16 @@ public class FetcherTest {
 
         // normal fetch
         for (int i = 1; i < 4; i++) {
+            // We need to make sure the message offset grows. Otherwise they 
will be considered as already consumed
+            // and filtered out by consumer.
+            if (i > 1) {
+                this.records = 
MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), CompressionType.NONE);
+                for (int v = 0; v < 3; v++) {
+                    this.records.append((long) i * 3 + v, "key".getBytes(), 
String.format("value-%d", v).getBytes());
+                }
+                this.records.close();
+            }
             fetcher.initFetches(cluster);
-
             client.prepareResponse(fetchResponse(this.records.buffer(), 
Errors.NONE.code(), 100L, 100 * i));
             consumerClient.poll(0);
             records = fetcher.fetchedRecords().get(tp);

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f575d5/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
index 819e690..eb24706 100644
--- a/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/BaseConsumerTest.scala
@@ -15,7 +15,7 @@ package kafka.api
 import java.util
 
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.clients.producer.{Producer, ProducerConfig, 
ProducerRecord}
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.{PartitionInfo, TopicPartition}
 
@@ -73,7 +73,7 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
     assertEquals(0, this.consumers(0).assignment.size)
     this.consumers(0).assign(List(tp).asJava)
     assertEquals(1, this.consumers(0).assignment.size)
-    
+
     this.consumers(0).seek(tp, 0)
     consumeAndVerifyRecords(this.consumers(0), numRecords = numRecords, 
startingOffset = 0)
 
@@ -143,20 +143,20 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
     this.consumers(0).poll(50)
     val pos1 = this.consumers(0).position(tp)
     val pos2 = this.consumers(0).position(tp2)
-    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp, 
new OffsetAndMetadata(3L))).asJava)
+    this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp, 
new OffsetAndMetadata(3L))).asJava)
     assertEquals(3, this.consumers(0).committed(tp).offset)
     assertNull(this.consumers(0).committed(tp2))
 
     // positions should not change
     assertEquals(pos1, this.consumers(0).position(tp))
     assertEquals(pos2, this.consumers(0).position(tp2))
-    this.consumers(0).commitSync(Map[TopicPartition,OffsetAndMetadata]((tp2, 
new OffsetAndMetadata(5L))).asJava)
+    this.consumers(0).commitSync(Map[TopicPartition, OffsetAndMetadata]((tp2, 
new OffsetAndMetadata(5L))).asJava)
     assertEquals(3, this.consumers(0).committed(tp).offset)
     assertEquals(5, this.consumers(0).committed(tp2).offset)
 
     // Using async should pick up the committed changes after commit completes
     val commitCallback = new CountConsumerCommitCallback()
-    this.consumers(0).commitAsync(Map[TopicPartition,OffsetAndMetadata]((tp2, 
new OffsetAndMetadata(7L))).asJava, commitCallback)
+    this.consumers(0).commitAsync(Map[TopicPartition, OffsetAndMetadata]((tp2, 
new OffsetAndMetadata(7L))).asJava, commitCallback)
     awaitCommitCallback(this.consumers(0), commitCallback)
     assertEquals(7, this.consumers(0).committed(tp2).offset)
   }
@@ -259,10 +259,12 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
   protected class TestConsumerReassignmentListener extends 
ConsumerRebalanceListener {
     var callsToAssigned = 0
     var callsToRevoked = 0
+
     def onPartitionsAssigned(partitions: java.util.Collection[TopicPartition]) 
{
       info("onPartitionsAssigned called.")
       callsToAssigned += 1
     }
+
     def onPartitionsRevoked(partitions: java.util.Collection[TopicPartition]) {
       info("onPartitionsRevoked called.")
       callsToRevoked += 1
@@ -274,13 +276,20 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
   }
 
   protected def sendRecords(numRecords: Int, tp: TopicPartition) {
-    (0 until numRecords).map { i =>
-      this.producers(0).send(new ProducerRecord(tp.topic(), tp.partition(), 
s"key $i".getBytes, s"value $i".getBytes))
-    }.foreach(_.get)
+    sendRecords(this.producers(0), numRecords, tp)
+  }
+
+  protected def sendRecords(producer: Producer[Array[Byte], Array[Byte]],
+                            numRecords: Int,
+                            tp: TopicPartition) {
+    (0 until numRecords).foreach { i =>
+      producer.send(new ProducerRecord(tp.topic(), tp.partition(), s"key 
$i".getBytes, s"value $i".getBytes))
+    }
+    producer.flush()
   }
 
   protected def consumeAndVerifyRecords(consumer: Consumer[Array[Byte], 
Array[Byte]], numRecords: Int, startingOffset: Int,
-                                      startingKeyAndValueIndex: Int = 0) {
+                                      startingKeyAndValueIndex: Int = 0, tp: 
TopicPartition = tp) {
     val records = new ArrayList[ConsumerRecord[Array[Byte], Array[Byte]]]()
     val maxIters = numRecords * 300
     var iters = 0
@@ -294,8 +303,8 @@ abstract class BaseConsumerTest extends 
IntegrationTestHarness with Logging {
     for (i <- 0 until numRecords) {
       val record = records.get(i)
       val offset = startingOffset + i
-      assertEquals(topic, record.topic())
-      assertEquals(part, record.partition())
+      assertEquals(tp.topic(), record.topic())
+      assertEquals(tp.partition(), record.partition())
       assertEquals(offset.toLong, record.offset())
       val keyAndValueIndex = startingKeyAndValueIndex + i
       assertEquals(s"key $keyAndValueIndex", new String(record.key()))

http://git-wip-us.apache.org/repos/asf/kafka/blob/c3f575d5/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
----------------------------------------------------------------------
diff --git 
a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala 
b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
index 90e9562..c92b083 100644
--- a/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
+++ b/core/src/test/scala/integration/kafka/api/PlaintextConsumerTest.scala
@@ -12,13 +12,15 @@
   */
 package kafka.api
 
+import java.util.Properties
 import java.util.regex.Pattern
 
 import kafka.server.KafkaConfig
 import kafka.utils.TestUtils
 import org.apache.kafka.clients.consumer._
-import org.apache.kafka.clients.producer.ProducerRecord
-import org.apache.kafka.common.{KafkaException, TopicPartition}
+import org.apache.kafka.clients.producer.{ProducerConfig, ProducerRecord}
+import org.apache.kafka.common.TopicPartition
+import org.apache.kafka.common.record.CompressionType
 import org.apache.kafka.common.serialization.ByteArrayDeserializer
 import org.apache.kafka.common.errors.{InvalidTopicException, 
RecordTooLargeException}
 import org.junit.Assert._
@@ -266,7 +268,10 @@ class PlaintextConsumerTest extends BaseConsumerTest {
   def testSeek() {
     val consumer = this.consumers(0)
     val totalRecords = 50L
-    sendRecords(totalRecords.toInt)
+    val mid = totalRecords / 2
+
+    // Test seek non-compressed message
+    sendRecords(totalRecords.toInt, tp)
     consumer.assign(List(tp).asJava)
 
     consumer.seekToEnd(tp)
@@ -277,10 +282,36 @@ class PlaintextConsumerTest extends BaseConsumerTest {
     assertEquals(0, consumer.position(tp), 0)
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0)
 
-    val mid = totalRecords / 2
     consumer.seek(tp, mid)
     assertEquals(mid, consumer.position(tp))
     consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 
mid.toInt, startingKeyAndValueIndex = mid.toInt)
+
+    // Test seek compressed message
+    sendCompressedMessages(totalRecords.toInt, tp2)
+    consumer.assign(List(tp2).asJava)
+
+    consumer.seekToEnd(tp2)
+    assertEquals(totalRecords, consumer.position(tp2))
+    assertFalse(consumer.poll(totalRecords).iterator().hasNext)
+
+    consumer.seekToBeginning(tp2)
+    assertEquals(0, consumer.position(tp2), 0)
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 0, tp = 
tp2)
+
+    consumer.seek(tp2, mid)
+    assertEquals(mid, consumer.position(tp2))
+    consumeAndVerifyRecords(consumer, numRecords = 1, startingOffset = 
mid.toInt, startingKeyAndValueIndex = mid.toInt,
+      tp = tp2)
+  }
+
+  private def sendCompressedMessages(numRecords: Int, tp: TopicPartition) {
+    val producerProps = new Properties()
+    producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, 
CompressionType.GZIP.name)
+    producerProps.setProperty(ProducerConfig.LINGER_MS_CONFIG, 
Long.MaxValue.toString)
+    val producer = TestUtils.createNewProducer(brokerList, securityProtocol = 
securityProtocol, trustStoreFile = trustStoreFile,
+        retries = 0, lingerMs = Long.MaxValue, props = Some(producerProps))
+    sendRecords(producer, numRecords, tp)
+    producer.close()
   }
 
   def testPositionAndCommit() {

Reply via email to