Repository: kafka
Updated Branches:
  refs/heads/trunk a08634642 -> 8e8b3c565


KAFKA-5360; Down-converted uncompressed batches should respect fetch offset

More specifically, V2 messages are always batched (whether compressed or not) 
while
V0/V1 are only batched if they are compressed.

Clients like librdkafka expect to receive messages from the fetch offset when 
dealing with uncompressed V0/V1 messages. When converting from V2 to V0/1, we 
were returning all the
messages in the V2 batch.

Author: Ismael Juma <[email protected]>

Reviewers: Jason Gustafson <[email protected]>

Closes #3191 from ijuma/kafka-5360-down-converted-uncompressed-respect-offset


Project: http://git-wip-us.apache.org/repos/asf/kafka/repo
Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e8b3c56
Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e8b3c56
Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e8b3c56

Branch: refs/heads/trunk
Commit: 8e8b3c56572a825d3c1beb6ad77ce88571354f51
Parents: a086346
Author: Ismael Juma <[email protected]>
Authored: Thu Jun 1 10:17:03 2017 -0700
Committer: Jason Gustafson <[email protected]>
Committed: Thu Jun 1 10:17:03 2017 -0700

----------------------------------------------------------------------
 .../clients/producer/internals/Sender.java      |  2 +-
 .../kafka/common/record/AbstractRecords.java    | 22 ++++-
 .../apache/kafka/common/record/FileRecords.java |  4 +-
 .../kafka/common/record/MemoryRecords.java      |  4 +-
 .../org/apache/kafka/common/record/Records.java |  4 +-
 .../kafka/common/record/FileRecordsTest.java    | 54 +++++++++---
 .../common/record/MemoryRecordsBuilderTest.java | 43 ++++++++--
 .../src/main/scala/kafka/server/KafkaApis.scala | 29 ++++---
 .../unit/kafka/server/FetchRequestTest.scala    | 89 +++++++++++++++++---
 9 files changed, 201 insertions(+), 50 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
index 01ff91a..4f1c7d4 100644
--- 
a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
+++ 
b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java
@@ -648,7 +648,7 @@ public class Sender implements Runnable {
             // not all support the same message format version. For example, 
if a partition migrates from a broker
             // which is supporting the new magic version to one which doesn't, 
then we will need to convert.
             if (!records.hasMatchingMagic(minUsedMagic))
-                records = batch.records().downConvert(minUsedMagic);
+                records = batch.records().downConvert(minUsedMagic, 0);
             produceRecordsByPartition.put(tp, records);
             recordsByPartition.put(tp, batch);
         }

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
index 2771ab7..04d7071 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java
@@ -50,7 +50,16 @@ public abstract class AbstractRecords implements Records {
         return true;
     }
 
-    protected MemoryRecords downConvert(Iterable<? extends RecordBatch> 
batches, byte toMagic) {
+    /**
+     * Down convert batches to the provided message format version. The first 
offset parameter is only relevant in the
+     * conversion from uncompressed v2 or higher to v1 or lower. The reason is 
that uncompressed records in v0 and v1
+     * are not batched (put another way, each batch always has 1 record).
+     *
+     * If a client requests records in v1 format starting from the middle of 
an uncompressed batch in v2 format, we
+     * need to drop records from the batch during the conversion. Some 
versions of librdkafka rely on this for
+     * correctness.
+     */
+    protected MemoryRecords downConvert(Iterable<? extends RecordBatch> 
batches, byte toMagic, long firstOffset) {
         // maintain the batch along with the decompressed records to avoid the 
need to decompress again
         List<RecordBatchAndRecords> recordBatchAndRecordsList = new 
ArrayList<>();
         int totalSizeEstimate = 0;
@@ -63,9 +72,16 @@ public abstract class AbstractRecords implements Records {
                 totalSizeEstimate += batch.sizeInBytes();
                 recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, 
null, null));
             } else {
-                List<Record> records = Utils.toList(batch.iterator());
+                List<Record> records = new ArrayList<>();
+                for (Record record : batch) {
+                    // See the method javadoc for an explanation
+                    if (toMagic > RecordBatch.MAGIC_VALUE_V1 || 
batch.isCompressed() || record.offset() >= firstOffset)
+                        records.add(record);
+                }
+                if (records.isEmpty())
+                    continue;
                 final long baseOffset;
-                if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2)
+                if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && toMagic >= 
RecordBatch.MAGIC_VALUE_V2)
                     baseOffset = batch.baseOffset();
                 else
                     baseOffset = records.get(0).offset();

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
index 32ca1a7..35431d8 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java
@@ -230,7 +230,7 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
     }
 
     @Override
-    public Records downConvert(byte toMagic) {
+    public Records downConvert(byte toMagic, long firstOffset) {
         List<? extends RecordBatch> batches = 
Utils.toList(batches().iterator());
         if (batches.isEmpty()) {
             // This indicates that the message is too large, which means that 
the buffer is not large
@@ -242,7 +242,7 @@ public class FileRecords extends AbstractRecords implements 
Closeable {
             // one full message, even if it requires exceeding the max fetch 
size requested by the client.
             return this;
         } else {
-            return downConvert(batches, toMagic);
+            return downConvert(batches, toMagic, firstOffset);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
----------------------------------------------------------------------
diff --git 
a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java 
b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
index 46798cf..e158e2f 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java
@@ -109,8 +109,8 @@ public class MemoryRecords extends AbstractRecords {
     }
 
     @Override
-    public MemoryRecords downConvert(byte toMagic) {
-        return downConvert(batches(), toMagic);
+    public MemoryRecords downConvert(byte toMagic, long firstOffset) {
+        return downConvert(batches(), toMagic, firstOffset);
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/Records.java
----------------------------------------------------------------------
diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java 
b/clients/src/main/java/org/apache/kafka/common/record/Records.java
index a5a5036..ec2e717 100644
--- a/clients/src/main/java/org/apache/kafka/common/record/Records.java
+++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java
@@ -96,9 +96,11 @@ public interface Records {
      * Convert all batches in this buffer to the format passed as a parameter. 
Note that this requires
      * deep iteration since all of the deep records must also be converted to 
the desired format.
      * @param toMagic The magic value to convert to
+     * @param firstOffset The starting offset for returned records. This only 
impacts some cases. See
+     *                    {@link AbstractRecords#downConvert(Iterable, byte, 
long)} for an explanation.
      * @return A Records instance (which may or may not be the same instance)
      */
-    Records downConvert(byte toMagic);
+    Records downConvert(byte toMagic, long firstOffset);
 
     /**
      * Get an iterator over the records in this log. Note that this generally 
requires decompression,

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java 
b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
index 8b9c900..b41db67 100644
--- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
+++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java
@@ -17,6 +17,7 @@
 package org.apache.kafka.common.record;
 
 import org.apache.kafka.common.KafkaException;
+import org.apache.kafka.common.utils.Utils;
 import org.apache.kafka.test.TestUtils;
 import org.easymock.EasyMock;
 import org.junit.Before;
@@ -26,6 +27,7 @@ import java.io.File;
 import java.io.IOException;
 import java.nio.ByteBuffer;
 import java.nio.channels.FileChannel;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
@@ -308,7 +310,7 @@ public class FileRecordsTest {
         int start = fileRecords.searchForOffsetWithSize(1, 0).position;
         int size = batch.sizeInBytes();
         FileRecords slice = fileRecords.read(start, size - 1);
-        Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0);
+        Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0);
         assertTrue("No message should be there", batches(messageV0).isEmpty());
         assertEquals("There should be " + (size - 1) + " bytes", size - 1, 
messageV0.sizeInBytes());
     }
@@ -324,31 +326,34 @@ public class FileRecordsTest {
     }
 
     private void doTestConversion(CompressionType compressionType, byte 
toMagic) throws IOException {
-        List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L);
+        List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 
24L);
         List<SimpleRecord> records = asList(
                 new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()),
                 new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()),
                 new SimpleRecord(3L, "k3".getBytes(), "hello 
again".getBytes()),
                 new SimpleRecord(4L, "k4".getBytes(), "goodbye for 
now".getBytes()),
                 new SimpleRecord(5L, "k5".getBytes(), "hello 
again".getBytes()),
-                new SimpleRecord(6L, "k6".getBytes(), "goodbye 
forever".getBytes()));
+                new SimpleRecord(6L, "k6".getBytes(), "I sense 
indecision".getBytes()),
+                new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()),
+                new SimpleRecord(8L, "k8".getBytes(), "running 
out".getBytes()),
+                new SimpleRecord(9L, "k9".getBytes(), "ok, almost 
done".getBytes()),
+                new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes()));
 
         ByteBuffer buffer = ByteBuffer.allocate(1024);
         MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, 
RecordBatch.MAGIC_VALUE_V0, compressionType,
                 TimestampType.CREATE_TIME, 0L);
-        for (int i = 0; i < 2; i++)
+        for (int i = 0; i < 3; i++)
             builder.appendWithOffset(offsets.get(i), records.get(i));
         builder.close();
 
-        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, 
compressionType,
-                TimestampType.CREATE_TIME, 0L);
-        for (int i = 2; i < 4; i++)
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, 
compressionType, TimestampType.CREATE_TIME,
+                0L);
+        for (int i = 3; i < 6; i++)
             builder.appendWithOffset(offsets.get(i), records.get(i));
         builder.close();
 
-        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
compressionType,
-                TimestampType.CREATE_TIME, 0L);
-        for (int i = 4; i < 6; i++)
+        builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, 
compressionType, TimestampType.CREATE_TIME, 0L);
+        for (int i = 6; i < 10; i++)
             builder.appendWithOffset(offsets.get(i), records.get(i));
         builder.close();
 
@@ -357,11 +362,34 @@ public class FileRecordsTest {
         try (FileRecords fileRecords = FileRecords.open(tempFile())) {
             fileRecords.append(MemoryRecords.readableRecords(buffer));
             fileRecords.flush();
-            Records convertedRecords = fileRecords.downConvert(toMagic);
+            Records convertedRecords = fileRecords.downConvert(toMagic, 0L);
             verifyConvertedRecords(records, offsets, convertedRecords, 
compressionType, toMagic);
+
+            if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compressionType == 
CompressionType.NONE) {
+                long firstOffset;
+                if (toMagic == RecordBatch.MAGIC_VALUE_V0)
+                    firstOffset = 11L; // v1 record
+                else
+                    firstOffset = 17; // v2 record
+                Records convertedRecords2 = fileRecords.downConvert(toMagic, 
firstOffset);
+                List<Long> filteredOffsets = new ArrayList<>(offsets);
+                List<SimpleRecord> filteredRecords = new ArrayList<>(records);
+                int index = filteredOffsets.indexOf(firstOffset) - 1;
+                filteredRecords.remove(index);
+                filteredOffsets.remove(index);
+                verifyConvertedRecords(filteredRecords, filteredOffsets, 
convertedRecords2, compressionType, toMagic);
+            } else {
+                // firstOffset doesn't have any effect in this case
+                Records convertedRecords2 = fileRecords.downConvert(toMagic, 
10L);
+                verifyConvertedRecords(records, offsets, convertedRecords2, 
compressionType, toMagic);
+            }
         }
     }
 
+    private String utf8(ByteBuffer buffer) {
+        return Utils.utf8(buffer, buffer.remaining());
+    }
+
     private void verifyConvertedRecords(List<SimpleRecord> initialRecords,
                                         List<Long> initialOffsets,
                                         Records convertedRecords,
@@ -378,8 +406,8 @@ public class FileRecordsTest {
             for (Record record : batch) {
                 assertTrue("Inner record should have magic " + magicByte, 
record.hasMagic(batch.magic()));
                 assertEquals("Offset should not change", 
initialOffsets.get(i).longValue(), record.offset());
-                assertEquals("Key should not change", 
initialRecords.get(i).key(), record.key());
-                assertEquals("Value should not change", 
initialRecords.get(i).value(), record.value());
+                assertEquals("Key should not change", 
utf8(initialRecords.get(i).key()), utf8(record.key()));
+                assertEquals("Value should not change", 
utf8(initialRecords.get(i).value()), utf8(record.value()));
                 
assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME));
                 if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) {
                     assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp());

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
----------------------------------------------------------------------
diff --git 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
index 9734f59..f10bd98 100644
--- 
a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
+++ 
b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java
@@ -432,7 +432,7 @@ public class MemoryRecordsBuilderTest {
 
         buffer.flip();
 
-        Records records = 
MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);
+        Records records = 
MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 
0);
 
         List<? extends RecordBatch> batches = 
Utils.toList(records.batches().iterator());
         if (compressionType != CompressionType.NONE) {
@@ -469,24 +469,57 @@ public class MemoryRecordsBuilderTest {
 
         buffer.flip();
 
-        Records records = 
MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1);
+        Records records = 
MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 
0);
 
         List<? extends RecordBatch> batches = 
Utils.toList(records.batches().iterator());
         if (compressionType != CompressionType.NONE) {
             assertEquals(2, batches.size());
             assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(0, batches.get(0).baseOffset());
             assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+            assertEquals(1, batches.get(1).baseOffset());
         } else {
             assertEquals(3, batches.size());
             assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(0, batches.get(0).baseOffset());
             assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+            assertEquals(1, batches.get(1).baseOffset());
             assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(2).magic());
+            assertEquals(2, batches.get(2).baseOffset());
         }
 
         List<Record> logRecords = Utils.toList(records.records().iterator());
-        assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key());
-        assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key());
-        assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key());
+        assertEquals("1", utf8(logRecords.get(0).key()));
+        assertEquals("2", utf8(logRecords.get(1).key()));
+        assertEquals("3", utf8(logRecords.get(2).key()));
+
+        records = 
MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 
2L);
+
+        batches = Utils.toList(records.batches().iterator());
+        logRecords = Utils.toList(records.records().iterator());
+
+        if (compressionType != CompressionType.NONE) {
+            assertEquals(2, batches.size());
+            assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(0, batches.get(0).baseOffset());
+            assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+            assertEquals(1, batches.get(1).baseOffset());
+            assertEquals("1", utf8(logRecords.get(0).key()));
+            assertEquals("2", utf8(logRecords.get(1).key()));
+            assertEquals("3", utf8(logRecords.get(2).key()));
+        } else {
+            assertEquals(2, batches.size());
+            assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic());
+            assertEquals(0, batches.get(0).baseOffset());
+            assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic());
+            assertEquals(2, batches.get(1).baseOffset());
+            assertEquals("1", utf8(logRecords.get(0).key()));
+            assertEquals("3", utf8(logRecords.get(1).key()));
+        }
+    }
+
+    private String utf8(ByteBuffer buffer) {
+        return Utils.utf8(buffer, buffer.remaining());
     }
 
     @Test

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/core/src/main/scala/kafka/server/KafkaApis.scala
----------------------------------------------------------------------
diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala 
b/core/src/main/scala/kafka/server/KafkaApis.scala
index eb0bf3b..5ce590f 100644
--- a/core/src/main/scala/kafka/server/KafkaApis.scala
+++ b/core/src/main/scala/kafka/server/KafkaApis.scala
@@ -508,19 +508,24 @@ class KafkaApis(val requestChannel: RequestChannel,
       // know it must be supported. However, if the magic version is changed 
from a higher version back to a
       // lower version, this check will no longer be valid and we will fail to 
down-convert the messages
       // which were written in the new format prior to the version downgrade.
-      replicaManager.getMagic(tp) match {
-        case Some(magic) if magic > 0 && versionId <= 1 && 
!data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) =>
-          trace(s"Down converting message to V0 for fetch request from 
$clientId")
-          new FetchResponse.PartitionData(data.error, data.highWatermark, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-              data.logStartOffset, data.abortedTransactions, 
data.records.downConvert(RecordBatch.MAGIC_VALUE_V0))
+      replicaManager.getMagic(tp).flatMap { magic =>
+        val downConvertMagic = {
+          if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && 
!data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0))
+            Some(RecordBatch.MAGIC_VALUE_V0)
+          else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && 
!data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1))
+            Some(RecordBatch.MAGIC_VALUE_V1)
+          else
+            None
+        }
 
-        case Some(magic) if magic > 1 && versionId <= 3 && 
!data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) =>
-          trace(s"Down converting message to V1 for fetch request from 
$clientId")
+        downConvertMagic.map { magic =>
+          trace(s"Down converting records from partition $tp to message format 
version $magic for fetch request from $clientId")
+          val converted = data.records.downConvert(magic, 
fetchRequest.fetchData.get(tp).fetchOffset)
           new FetchResponse.PartitionData(data.error, data.highWatermark, 
FetchResponse.INVALID_LAST_STABLE_OFFSET,
-              data.logStartOffset, data.abortedTransactions, 
data.records.downConvert(RecordBatch.MAGIC_VALUE_V1))
+            data.logStartOffset, data.abortedTransactions, converted)
+        }
 
-        case _ => data
-      }
+      }.getOrElse(data)
     }
 
     // the callback for process a fetch response, invoked before throttling
@@ -549,7 +554,9 @@ class KafkaApis(val requestChannel: RequestChannel,
       def fetchResponseCallback(bandwidthThrottleTimeMs: Int) {
         def createResponse(requestThrottleTimeMs: Int): 
RequestChannel.Response = {
           val convertedData = new util.LinkedHashMap[TopicPartition, 
FetchResponse.PartitionData]
-          fetchedPartitionData.asScala.foreach(e => convertedData.put(e._1, 
convertedPartitionData(e._1, e._2)))
+          fetchedPartitionData.asScala.foreach { case (tp, partitionData) =>
+            convertedData.put(tp, convertedPartitionData(tp, partitionData))
+          }
           val response = new FetchResponse(convertedData, 0)
           val responseStruct = response.toStruct(versionId)
 

http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
----------------------------------------------------------------------
diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala 
b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
index 48b3945..c5d40f6 100644
--- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
+++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala
@@ -19,13 +19,14 @@ package kafka.server
 import java.util
 import java.util.Properties
 
+import kafka.api.KAFKA_0_11_0_IV2
 import kafka.log.LogConfig
 import kafka.utils.TestUtils
 import kafka.utils.TestUtils._
 import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
 import org.apache.kafka.common.TopicPartition
 import org.apache.kafka.common.protocol.{ApiKeys, Errors}
-import org.apache.kafka.common.record.Record
+import org.apache.kafka.common.record.{Record, RecordBatch}
 import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, 
IsolationLevel}
 import org.apache.kafka.common.serialization.StringSerializer
 import org.junit.Assert._
@@ -42,12 +43,6 @@ class FetchRequestTest extends BaseRequestTest {
 
   private var producer: KafkaProducer[String, String] = null
 
-  override def setUp() {
-    super.setUp()
-    producer = 
TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
-      retries = 5, keySerializer = new StringSerializer, valueSerializer = new 
StringSerializer)
-  }
-
   override def tearDown() {
     producer.close()
     super.tearDown()
@@ -67,14 +62,20 @@ class FetchRequestTest extends BaseRequestTest {
     partitionMap
   }
 
-  private def sendFetchRequest(leaderId: Int, request: FetchRequest,
-                               version: Short = ApiKeys.FETCH.latestVersion): 
FetchResponse = {
+  private def sendFetchRequest(leaderId: Int, request: FetchRequest): 
FetchResponse = {
     val response = connectAndSend(request, ApiKeys.FETCH, destination = 
brokerSocketServer(leaderId))
-    FetchResponse.parse(response, version)
+    FetchResponse.parse(response, request.version)
+  }
+
+  private def initProducer(): Unit = {
+    producer = 
TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5, keySerializer = new StringSerializer, valueSerializer = new 
StringSerializer)
   }
 
   @Test
   def testBrokerRespectsPartitionsOrderAndSizeLimits(): Unit = {
+    initProducer()
+
     val messagesPerPartition = 9
     val maxResponseBytes = 800
     val maxPartitionBytes = 190
@@ -152,13 +153,14 @@ class FetchRequestTest extends BaseRequestTest {
 
   @Test
   def testFetchRequestV2WithOversizedMessage(): Unit = {
+    initProducer()
     val maxPartitionBytes = 200
     val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1).head
     producer.send(new ProducerRecord(topicPartition.topic, 
topicPartition.partition,
       "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get
     val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(maxPartitionBytes,
       Seq(topicPartition))).build(2)
-    val fetchResponse = sendFetchRequest(leaderId, fetchRequest, version = 2)
+    val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
     val partitionData = fetchResponse.responseData.get(topicPartition)
     assertEquals(Errors.NONE, partitionData.error)
     assertTrue(partitionData.highWatermark > 0)
@@ -166,6 +168,68 @@ class FetchRequestTest extends BaseRequestTest {
     assertEquals(0, records(partitionData).map(_.sizeInBytes).sum)
   }
 
+  /**
+    * Ensure that we respect the fetch offset when returning records that were 
converted from an uncompressed v2
+    * record batch to multiple v0/v1 record batches with size 1. If the fetch 
offset points to inside the record batch,
+    * some records have to be dropped during the conversion.
+    */
+  @Test
+  def testDownConversionFromBatchedToUnbatchedRespectsOffset(): Unit = {
+    // Increase linger so that we have control over the batches created
+    producer = 
TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers),
+      retries = 5, keySerializer = new StringSerializer, valueSerializer = new 
StringSerializer,
+      lingerMs = 300 * 1000)
+
+    val topicConfig = Map(LogConfig.MessageFormatVersionProp -> 
KAFKA_0_11_0_IV2.version)
+    val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions 
= 1, topicConfig).head
+    val topic = topicPartition.topic
+
+    val firstBatchFutures = (0 until 10).map(i => producer.send(new 
ProducerRecord(topic, s"key-$i", s"value-$i")))
+    producer.flush()
+    val secondBatchFutures = (10 until 25).map(i => producer.send(new 
ProducerRecord(topic, s"key-$i", s"value-$i")))
+    producer.flush()
+
+    firstBatchFutures.foreach(_.get)
+    secondBatchFutures.foreach(_.get)
+
+    def check(fetchOffset: Long, requestVersion: Short, expectedOffset: Long, 
expectedNumBatches: Int, expectedMagic: Byte): Unit = {
+      val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, 
createPartitionMap(Int.MaxValue,
+        Seq(topicPartition), Map(topicPartition -> 
fetchOffset))).build(requestVersion)
+      val fetchResponse = sendFetchRequest(leaderId, fetchRequest)
+      val partitionData = fetchResponse.responseData.get(topicPartition)
+      assertEquals(Errors.NONE, partitionData.error)
+      assertTrue(partitionData.highWatermark > 0)
+      val batches = partitionData.records.batches.asScala.toBuffer
+      assertEquals(expectedNumBatches, batches.size)
+      val batch = batches.head
+      assertEquals(expectedMagic, batch.magic)
+      assertEquals(expectedOffset, batch.baseOffset)
+    }
+
+    // down conversion to message format 0, batches of 1 message are returned 
so we receive the exact offset we requested
+    check(fetchOffset = 3, expectedOffset = 3, requestVersion = 1, 
expectedNumBatches = 22,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V0)
+    check(fetchOffset = 15, expectedOffset = 15, requestVersion = 1, 
expectedNumBatches = 10,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V0)
+
+    // down conversion to message format 1, batches of 1 message are returned 
so we receive the exact offset we requested
+    check(fetchOffset = 3, expectedOffset = 3, requestVersion = 3, 
expectedNumBatches = 22,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V1)
+    check(fetchOffset = 15, expectedOffset = 15, requestVersion = 3, 
expectedNumBatches = 10,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V1)
+
+    // no down conversion, we receive a single batch so the received offset 
won't necessarily be the same
+    check(fetchOffset = 3, expectedOffset = 0, requestVersion = 4, 
expectedNumBatches = 2,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V2)
+    check(fetchOffset = 15, expectedOffset = 10, requestVersion = 4, 
expectedNumBatches = 1,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V2)
+
+    // no down conversion, we receive a single batch and the exact offset we 
requested because it happens to be the
+    // offset of the first record in the batch
+    check(fetchOffset = 10, expectedOffset = 10, requestVersion = 4, 
expectedNumBatches = 1,
+      expectedMagic = RecordBatch.MAGIC_VALUE_V2)
+  }
+
   private def records(partitionData: FetchResponse.PartitionData): Seq[Record] 
= {
     partitionData.records.records.asScala.toIndexedSeq
   }
@@ -207,10 +271,11 @@ class FetchRequestTest extends BaseRequestTest {
     assertTrue(responseSize <= maxResponseBytes)
   }
 
-  private def createTopics(numTopics: Int, numPartitions: Int): 
Map[TopicPartition, Int] = {
+  private def createTopics(numTopics: Int, numPartitions: Int, configs: 
Map[String, String] = Map.empty): Map[TopicPartition, Int] = {
     val topics = (0 until numPartitions).map(t => s"topic$t")
     val topicConfig = new Properties
     topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString)
+    configs.foreach { case (k, v) => topicConfig.setProperty(k, v) }
     topics.flatMap { topic =>
       val partitionToLeader = createTopic(zkUtils, topic, numPartitions = 
numPartitions, replicationFactor = 2,
         servers = servers, topicConfig = topicConfig)

Reply via email to