Repository: kafka Updated Branches: refs/heads/trunk a08634642 -> 8e8b3c565
KAFKA-5360; Down-converted uncompressed batches should respect fetch offset More specifically, V2 messages are always batched (whether compressed or not) while V0/V1 are only batched if they are compressed. Clients like librdkafka expect to receive messages from the fetch offset when dealing with uncompressed V0/V1 messages. When converting from V2 to V0/1, we were returning all the messages in the V2 batch. Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson <[email protected]> Closes #3191 from ijuma/kafka-5360-down-converted-uncompressed-respect-offset Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/8e8b3c56 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/8e8b3c56 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/8e8b3c56 Branch: refs/heads/trunk Commit: 8e8b3c56572a825d3c1beb6ad77ce88571354f51 Parents: a086346 Author: Ismael Juma <[email protected]> Authored: Thu Jun 1 10:17:03 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Thu Jun 1 10:17:03 2017 -0700 ---------------------------------------------------------------------- .../clients/producer/internals/Sender.java | 2 +- .../kafka/common/record/AbstractRecords.java | 22 ++++- .../apache/kafka/common/record/FileRecords.java | 4 +- .../kafka/common/record/MemoryRecords.java | 4 +- .../org/apache/kafka/common/record/Records.java | 4 +- .../kafka/common/record/FileRecordsTest.java | 54 +++++++++--- .../common/record/MemoryRecordsBuilderTest.java | 43 ++++++++-- .../src/main/scala/kafka/server/KafkaApis.scala | 29 ++++--- .../unit/kafka/server/FetchRequestTest.scala | 89 +++++++++++++++++--- 9 files changed, 201 insertions(+), 50 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java index 01ff91a..4f1c7d4 100644 --- a/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java +++ b/clients/src/main/java/org/apache/kafka/clients/producer/internals/Sender.java @@ -648,7 +648,7 @@ public class Sender implements Runnable { // not all support the same message format version. For example, if a partition migrates from a broker // which is supporting the new magic version to one which doesn't, then we will need to convert. if (!records.hasMatchingMagic(minUsedMagic)) - records = batch.records().downConvert(minUsedMagic); + records = batch.records().downConvert(minUsedMagic, 0); produceRecordsByPartition.put(tp, records); recordsByPartition.put(tp, batch); } http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java index 2771ab7..04d7071 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/AbstractRecords.java @@ -50,7 +50,16 @@ public abstract class AbstractRecords implements Records { return true; } - protected MemoryRecords downConvert(Iterable<? extends RecordBatch> batches, byte toMagic) { + /** + * Down convert batches to the provided message format version. The first offset parameter is only relevant in the + * conversion from uncompressed v2 or higher to v1 or lower. The reason is that uncompressed records in v0 and v1 + * are not batched (put another way, each batch always has 1 record). + * + * If a client requests records in v1 format starting from the middle of an uncompressed batch in v2 format, we + * need to drop records from the batch during the conversion. Some versions of librdkafka rely on this for + * correctness. + */ + protected MemoryRecords downConvert(Iterable<? extends RecordBatch> batches, byte toMagic, long firstOffset) { // maintain the batch along with the decompressed records to avoid the need to decompress again List<RecordBatchAndRecords> recordBatchAndRecordsList = new ArrayList<>(); int totalSizeEstimate = 0; @@ -63,9 +72,16 @@ public abstract class AbstractRecords implements Records { totalSizeEstimate += batch.sizeInBytes(); recordBatchAndRecordsList.add(new RecordBatchAndRecords(batch, null, null)); } else { - List<Record> records = Utils.toList(batch.iterator()); + List<Record> records = new ArrayList<>(); + for (Record record : batch) { + // See the method javadoc for an explanation + if (toMagic > RecordBatch.MAGIC_VALUE_V1 || batch.isCompressed() || record.offset() >= firstOffset) + records.add(record); + } + if (records.isEmpty()) + continue; final long baseOffset; - if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2) + if (batch.magic() >= RecordBatch.MAGIC_VALUE_V2 && toMagic >= RecordBatch.MAGIC_VALUE_V2) baseOffset = batch.baseOffset(); else baseOffset = records.get(0).offset(); http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java index 32ca1a7..35431d8 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/FileRecords.java @@ -230,7 +230,7 @@ public class FileRecords extends AbstractRecords implements Closeable { } @Override - public Records downConvert(byte toMagic) { + public Records downConvert(byte toMagic, long firstOffset) { List<? extends RecordBatch> batches = Utils.toList(batches().iterator()); if (batches.isEmpty()) { // This indicates that the message is too large, which means that the buffer is not large @@ -242,7 +242,7 @@ public class FileRecords extends AbstractRecords implements Closeable { // one full message, even if it requires exceeding the max fetch size requested by the client. return this; } else { - return downConvert(batches, toMagic); + return downConvert(batches, toMagic, firstOffset); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 46798cf..e158e2f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -109,8 +109,8 @@ public class MemoryRecords extends AbstractRecords { } @Override - public MemoryRecords downConvert(byte toMagic) { - return downConvert(batches(), toMagic); + public MemoryRecords downConvert(byte toMagic, long firstOffset) { + return downConvert(batches(), toMagic, firstOffset); } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/main/java/org/apache/kafka/common/record/Records.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/Records.java b/clients/src/main/java/org/apache/kafka/common/record/Records.java index a5a5036..ec2e717 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/Records.java +++ b/clients/src/main/java/org/apache/kafka/common/record/Records.java @@ -96,9 +96,11 @@ public interface Records { * Convert all batches in this buffer to the format passed as a parameter. Note that this requires * deep iteration since all of the deep records must also be converted to the desired format. * @param toMagic The magic value to convert to + * @param firstOffset The starting offset for returned records. This only impacts some cases. See + * {@link AbstractRecords#downConvert(Iterable, byte, long)} for an explanation. * @return A Records instance (which may or may not be the same instance) */ - Records downConvert(byte toMagic); + Records downConvert(byte toMagic, long firstOffset); /** * Get an iterator over the records in this log. Note that this generally requires decompression, http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index 8b9c900..b41db67 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -17,6 +17,7 @@ package org.apache.kafka.common.record; import org.apache.kafka.common.KafkaException; +import org.apache.kafka.common.utils.Utils; import org.apache.kafka.test.TestUtils; import org.easymock.EasyMock; import org.junit.Before; @@ -26,6 +27,7 @@ import java.io.File; import java.io.IOException; import java.nio.ByteBuffer; import java.nio.channels.FileChannel; +import java.util.ArrayList; import java.util.Collections; import java.util.Iterator; import java.util.List; @@ -308,7 +310,7 @@ public class FileRecordsTest { int start = fileRecords.searchForOffsetWithSize(1, 0).position; int size = batch.sizeInBytes(); FileRecords slice = fileRecords.read(start, size - 1); - Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0); + Records messageV0 = slice.downConvert(RecordBatch.MAGIC_VALUE_V0, 0); assertTrue("No message should be there", batches(messageV0).isEmpty()); assertEquals("There should be " + (size - 1) + " bytes", size - 1, messageV0.sizeInBytes()); } @@ -324,31 +326,34 @@ public class FileRecordsTest { } private void doTestConversion(CompressionType compressionType, byte toMagic) throws IOException { - List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L); + List<Long> offsets = asList(0L, 2L, 3L, 9L, 11L, 15L, 16L, 17L, 22L, 24L); List<SimpleRecord> records = asList( new SimpleRecord(1L, "k1".getBytes(), "hello".getBytes()), new SimpleRecord(2L, "k2".getBytes(), "goodbye".getBytes()), new SimpleRecord(3L, "k3".getBytes(), "hello again".getBytes()), new SimpleRecord(4L, "k4".getBytes(), "goodbye for now".getBytes()), new SimpleRecord(5L, "k5".getBytes(), "hello again".getBytes()), - new SimpleRecord(6L, "k6".getBytes(), "goodbye forever".getBytes())); + new SimpleRecord(6L, "k6".getBytes(), "I sense indecision".getBytes()), + new SimpleRecord(7L, "k7".getBytes(), "what now".getBytes()), + new SimpleRecord(8L, "k8".getBytes(), "running out".getBytes()), + new SimpleRecord(9L, "k9".getBytes(), "ok, almost done".getBytes()), + new SimpleRecord(10L, "k10".getBytes(), "finally".getBytes())); ByteBuffer buffer = ByteBuffer.allocate(1024); MemoryRecordsBuilder builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V0, compressionType, TimestampType.CREATE_TIME, 0L); - for (int i = 0; i < 2; i++) + for (int i = 0; i < 3; i++) builder.appendWithOffset(offsets.get(i), records.get(i)); builder.close(); - builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, - TimestampType.CREATE_TIME, 0L); - for (int i = 2; i < 4; i++) + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V1, compressionType, TimestampType.CREATE_TIME, + 0L); + for (int i = 3; i < 6; i++) builder.appendWithOffset(offsets.get(i), records.get(i)); builder.close(); - builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, - TimestampType.CREATE_TIME, 0L); - for (int i = 4; i < 6; i++) + builder = MemoryRecords.builder(buffer, RecordBatch.MAGIC_VALUE_V2, compressionType, TimestampType.CREATE_TIME, 0L); + for (int i = 6; i < 10; i++) builder.appendWithOffset(offsets.get(i), records.get(i)); builder.close(); @@ -357,11 +362,34 @@ public class FileRecordsTest { try (FileRecords fileRecords = FileRecords.open(tempFile())) { fileRecords.append(MemoryRecords.readableRecords(buffer)); fileRecords.flush(); - Records convertedRecords = fileRecords.downConvert(toMagic); + Records convertedRecords = fileRecords.downConvert(toMagic, 0L); verifyConvertedRecords(records, offsets, convertedRecords, compressionType, toMagic); + + if (toMagic <= RecordBatch.MAGIC_VALUE_V1 && compressionType == CompressionType.NONE) { + long firstOffset; + if (toMagic == RecordBatch.MAGIC_VALUE_V0) + firstOffset = 11L; // v1 record + else + firstOffset = 17; // v2 record + Records convertedRecords2 = fileRecords.downConvert(toMagic, firstOffset); + List<Long> filteredOffsets = new ArrayList<>(offsets); + List<SimpleRecord> filteredRecords = new ArrayList<>(records); + int index = filteredOffsets.indexOf(firstOffset) - 1; + filteredRecords.remove(index); + filteredOffsets.remove(index); + verifyConvertedRecords(filteredRecords, filteredOffsets, convertedRecords2, compressionType, toMagic); + } else { + // firstOffset doesn't have any effect in this case + Records convertedRecords2 = fileRecords.downConvert(toMagic, 10L); + verifyConvertedRecords(records, offsets, convertedRecords2, compressionType, toMagic); + } } } + private String utf8(ByteBuffer buffer) { + return Utils.utf8(buffer, buffer.remaining()); + } + private void verifyConvertedRecords(List<SimpleRecord> initialRecords, List<Long> initialOffsets, Records convertedRecords, @@ -378,8 +406,8 @@ public class FileRecordsTest { for (Record record : batch) { assertTrue("Inner record should have magic " + magicByte, record.hasMagic(batch.magic())); assertEquals("Offset should not change", initialOffsets.get(i).longValue(), record.offset()); - assertEquals("Key should not change", initialRecords.get(i).key(), record.key()); - assertEquals("Value should not change", initialRecords.get(i).value(), record.value()); + assertEquals("Key should not change", utf8(initialRecords.get(i).key()), utf8(record.key())); + assertEquals("Value should not change", utf8(initialRecords.get(i).value()), utf8(record.value())); assertFalse(record.hasTimestampType(TimestampType.LOG_APPEND_TIME)); if (batch.magic() == RecordBatch.MAGIC_VALUE_V0) { assertEquals(RecordBatch.NO_TIMESTAMP, record.timestamp()); http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 9734f59..f10bd98 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -432,7 +432,7 @@ public class MemoryRecordsBuilderTest { buffer.flip(); - Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1); + Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0); List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator()); if (compressionType != CompressionType.NONE) { @@ -469,24 +469,57 @@ public class MemoryRecordsBuilderTest { buffer.flip(); - Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1); + Records records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 0); List<? extends RecordBatch> batches = Utils.toList(records.batches().iterator()); if (compressionType != CompressionType.NONE) { assertEquals(2, batches.size()); assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic()); + assertEquals(0, batches.get(0).baseOffset()); assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic()); + assertEquals(1, batches.get(1).baseOffset()); } else { assertEquals(3, batches.size()); assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic()); + assertEquals(0, batches.get(0).baseOffset()); assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic()); + assertEquals(1, batches.get(1).baseOffset()); assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(2).magic()); + assertEquals(2, batches.get(2).baseOffset()); } List<Record> logRecords = Utils.toList(records.records().iterator()); - assertEquals(ByteBuffer.wrap("1".getBytes()), logRecords.get(0).key()); - assertEquals(ByteBuffer.wrap("2".getBytes()), logRecords.get(1).key()); - assertEquals(ByteBuffer.wrap("3".getBytes()), logRecords.get(2).key()); + assertEquals("1", utf8(logRecords.get(0).key())); + assertEquals("2", utf8(logRecords.get(1).key())); + assertEquals("3", utf8(logRecords.get(2).key())); + + records = MemoryRecords.readableRecords(buffer).downConvert(RecordBatch.MAGIC_VALUE_V1, 2L); + + batches = Utils.toList(records.batches().iterator()); + logRecords = Utils.toList(records.records().iterator()); + + if (compressionType != CompressionType.NONE) { + assertEquals(2, batches.size()); + assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic()); + assertEquals(0, batches.get(0).baseOffset()); + assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic()); + assertEquals(1, batches.get(1).baseOffset()); + assertEquals("1", utf8(logRecords.get(0).key())); + assertEquals("2", utf8(logRecords.get(1).key())); + assertEquals("3", utf8(logRecords.get(2).key())); + } else { + assertEquals(2, batches.size()); + assertEquals(RecordBatch.MAGIC_VALUE_V0, batches.get(0).magic()); + assertEquals(0, batches.get(0).baseOffset()); + assertEquals(RecordBatch.MAGIC_VALUE_V1, batches.get(1).magic()); + assertEquals(2, batches.get(1).baseOffset()); + assertEquals("1", utf8(logRecords.get(0).key())); + assertEquals("3", utf8(logRecords.get(1).key())); + } + } + + private String utf8(ByteBuffer buffer) { + return Utils.utf8(buffer, buffer.remaining()); } @Test http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/core/src/main/scala/kafka/server/KafkaApis.scala ---------------------------------------------------------------------- diff --git a/core/src/main/scala/kafka/server/KafkaApis.scala b/core/src/main/scala/kafka/server/KafkaApis.scala index eb0bf3b..5ce590f 100644 --- a/core/src/main/scala/kafka/server/KafkaApis.scala +++ b/core/src/main/scala/kafka/server/KafkaApis.scala @@ -508,19 +508,24 @@ class KafkaApis(val requestChannel: RequestChannel, // know it must be supported. However, if the magic version is changed from a higher version back to a // lower version, this check will no longer be valid and we will fail to down-convert the messages // which were written in the new format prior to the version downgrade. - replicaManager.getMagic(tp) match { - case Some(magic) if magic > 0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0) => - trace(s"Down converting message to V0 for fetch request from $clientId") - new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, - data.logStartOffset, data.abortedTransactions, data.records.downConvert(RecordBatch.MAGIC_VALUE_V0)) + replicaManager.getMagic(tp).flatMap { magic => + val downConvertMagic = { + if (magic > RecordBatch.MAGIC_VALUE_V0 && versionId <= 1 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V0)) + Some(RecordBatch.MAGIC_VALUE_V0) + else if (magic > RecordBatch.MAGIC_VALUE_V1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1)) + Some(RecordBatch.MAGIC_VALUE_V1) + else + None + } - case Some(magic) if magic > 1 && versionId <= 3 && !data.records.hasCompatibleMagic(RecordBatch.MAGIC_VALUE_V1) => - trace(s"Down converting message to V1 for fetch request from $clientId") + downConvertMagic.map { magic => + trace(s"Down converting records from partition $tp to message format version $magic for fetch request from $clientId") + val converted = data.records.downConvert(magic, fetchRequest.fetchData.get(tp).fetchOffset) new FetchResponse.PartitionData(data.error, data.highWatermark, FetchResponse.INVALID_LAST_STABLE_OFFSET, - data.logStartOffset, data.abortedTransactions, data.records.downConvert(RecordBatch.MAGIC_VALUE_V1)) + data.logStartOffset, data.abortedTransactions, converted) + } - case _ => data - } + }.getOrElse(data) } // the callback for process a fetch response, invoked before throttling @@ -549,7 +554,9 @@ class KafkaApis(val requestChannel: RequestChannel, def fetchResponseCallback(bandwidthThrottleTimeMs: Int) { def createResponse(requestThrottleTimeMs: Int): RequestChannel.Response = { val convertedData = new util.LinkedHashMap[TopicPartition, FetchResponse.PartitionData] - fetchedPartitionData.asScala.foreach(e => convertedData.put(e._1, convertedPartitionData(e._1, e._2))) + fetchedPartitionData.asScala.foreach { case (tp, partitionData) => + convertedData.put(tp, convertedPartitionData(tp, partitionData)) + } val response = new FetchResponse(convertedData, 0) val responseStruct = response.toStruct(versionId) http://git-wip-us.apache.org/repos/asf/kafka/blob/8e8b3c56/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala index 48b3945..c5d40f6 100644 --- a/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/FetchRequestTest.scala @@ -19,13 +19,14 @@ package kafka.server import java.util import java.util.Properties +import kafka.api.KAFKA_0_11_0_IV2 import kafka.log.LogConfig import kafka.utils.TestUtils import kafka.utils.TestUtils._ import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{ApiKeys, Errors} -import org.apache.kafka.common.record.Record +import org.apache.kafka.common.record.{Record, RecordBatch} import org.apache.kafka.common.requests.{FetchRequest, FetchResponse, IsolationLevel} import org.apache.kafka.common.serialization.StringSerializer import org.junit.Assert._ @@ -42,12 +43,6 @@ class FetchRequestTest extends BaseRequestTest { private var producer: KafkaProducer[String, String] = null - override def setUp() { - super.setUp() - producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), - retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer) - } - override def tearDown() { producer.close() super.tearDown() @@ -67,14 +62,20 @@ class FetchRequestTest extends BaseRequestTest { partitionMap } - private def sendFetchRequest(leaderId: Int, request: FetchRequest, - version: Short = ApiKeys.FETCH.latestVersion): FetchResponse = { + private def sendFetchRequest(leaderId: Int, request: FetchRequest): FetchResponse = { val response = connectAndSend(request, ApiKeys.FETCH, destination = brokerSocketServer(leaderId)) - FetchResponse.parse(response, version) + FetchResponse.parse(response, request.version) + } + + private def initProducer(): Unit = { + producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), + retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer) } @Test def testBrokerRespectsPartitionsOrderAndSizeLimits(): Unit = { + initProducer() + val messagesPerPartition = 9 val maxResponseBytes = 800 val maxPartitionBytes = 190 @@ -152,13 +153,14 @@ class FetchRequestTest extends BaseRequestTest { @Test def testFetchRequestV2WithOversizedMessage(): Unit = { + initProducer() val maxPartitionBytes = 200 val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1).head producer.send(new ProducerRecord(topicPartition.topic, topicPartition.partition, "key", new String(new Array[Byte](maxPartitionBytes + 1)))).get val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(maxPartitionBytes, Seq(topicPartition))).build(2) - val fetchResponse = sendFetchRequest(leaderId, fetchRequest, version = 2) + val fetchResponse = sendFetchRequest(leaderId, fetchRequest) val partitionData = fetchResponse.responseData.get(topicPartition) assertEquals(Errors.NONE, partitionData.error) assertTrue(partitionData.highWatermark > 0) @@ -166,6 +168,68 @@ class FetchRequestTest extends BaseRequestTest { assertEquals(0, records(partitionData).map(_.sizeInBytes).sum) } + /** + * Ensure that we respect the fetch offset when returning records that were converted from an uncompressed v2 + * record batch to multiple v0/v1 record batches with size 1. If the fetch offset points to inside the record batch, + * some records have to be dropped during the conversion. + */ + @Test + def testDownConversionFromBatchedToUnbatchedRespectsOffset(): Unit = { + // Increase linger so that we have control over the batches created + producer = TestUtils.createNewProducer(TestUtils.getBrokerListStrFromServers(servers), + retries = 5, keySerializer = new StringSerializer, valueSerializer = new StringSerializer, + lingerMs = 300 * 1000) + + val topicConfig = Map(LogConfig.MessageFormatVersionProp -> KAFKA_0_11_0_IV2.version) + val (topicPartition, leaderId) = createTopics(numTopics = 1, numPartitions = 1, topicConfig).head + val topic = topicPartition.topic + + val firstBatchFutures = (0 until 10).map(i => producer.send(new ProducerRecord(topic, s"key-$i", s"value-$i"))) + producer.flush() + val secondBatchFutures = (10 until 25).map(i => producer.send(new ProducerRecord(topic, s"key-$i", s"value-$i"))) + producer.flush() + + firstBatchFutures.foreach(_.get) + secondBatchFutures.foreach(_.get) + + def check(fetchOffset: Long, requestVersion: Short, expectedOffset: Long, expectedNumBatches: Int, expectedMagic: Byte): Unit = { + val fetchRequest = FetchRequest.Builder.forConsumer(Int.MaxValue, 0, createPartitionMap(Int.MaxValue, + Seq(topicPartition), Map(topicPartition -> fetchOffset))).build(requestVersion) + val fetchResponse = sendFetchRequest(leaderId, fetchRequest) + val partitionData = fetchResponse.responseData.get(topicPartition) + assertEquals(Errors.NONE, partitionData.error) + assertTrue(partitionData.highWatermark > 0) + val batches = partitionData.records.batches.asScala.toBuffer + assertEquals(expectedNumBatches, batches.size) + val batch = batches.head + assertEquals(expectedMagic, batch.magic) + assertEquals(expectedOffset, batch.baseOffset) + } + + // down conversion to message format 0, batches of 1 message are returned so we receive the exact offset we requested + check(fetchOffset = 3, expectedOffset = 3, requestVersion = 1, expectedNumBatches = 22, + expectedMagic = RecordBatch.MAGIC_VALUE_V0) + check(fetchOffset = 15, expectedOffset = 15, requestVersion = 1, expectedNumBatches = 10, + expectedMagic = RecordBatch.MAGIC_VALUE_V0) + + // down conversion to message format 1, batches of 1 message are returned so we receive the exact offset we requested + check(fetchOffset = 3, expectedOffset = 3, requestVersion = 3, expectedNumBatches = 22, + expectedMagic = RecordBatch.MAGIC_VALUE_V1) + check(fetchOffset = 15, expectedOffset = 15, requestVersion = 3, expectedNumBatches = 10, + expectedMagic = RecordBatch.MAGIC_VALUE_V1) + + // no down conversion, we receive a single batch so the received offset won't necessarily be the same + check(fetchOffset = 3, expectedOffset = 0, requestVersion = 4, expectedNumBatches = 2, + expectedMagic = RecordBatch.MAGIC_VALUE_V2) + check(fetchOffset = 15, expectedOffset = 10, requestVersion = 4, expectedNumBatches = 1, + expectedMagic = RecordBatch.MAGIC_VALUE_V2) + + // no down conversion, we receive a single batch and the exact offset we requested because it happens to be the + // offset of the first record in the batch + check(fetchOffset = 10, expectedOffset = 10, requestVersion = 4, expectedNumBatches = 1, + expectedMagic = RecordBatch.MAGIC_VALUE_V2) + } + private def records(partitionData: FetchResponse.PartitionData): Seq[Record] = { partitionData.records.records.asScala.toIndexedSeq } @@ -207,10 +271,11 @@ class FetchRequestTest extends BaseRequestTest { assertTrue(responseSize <= maxResponseBytes) } - private def createTopics(numTopics: Int, numPartitions: Int): Map[TopicPartition, Int] = { + private def createTopics(numTopics: Int, numPartitions: Int, configs: Map[String, String] = Map.empty): Map[TopicPartition, Int] = { val topics = (0 until numPartitions).map(t => s"topic$t") val topicConfig = new Properties topicConfig.setProperty(LogConfig.MinInSyncReplicasProp, 2.toString) + configs.foreach { case (k, v) => topicConfig.setProperty(k, v) } topics.flatMap { topic => val partitionToLeader = createTopic(zkUtils, topic, numPartitions = numPartitions, replicationFactor = 2, servers = servers, topicConfig = topicConfig)
