Repository: kafka Updated Branches: refs/heads/trunk 2bd7b6450 -> c36cc60f7
MINOR: Use `Record` instead of `ByteBufferMessageSet` in `ProduceRequestTest` We want to phase out `ByteBufferMessageSet` eventually, so new code should favour `Record` where possible. Also use a fixed timestamp in `testCorruptLz4ProduceRequest` to ensure that the checksum is always the same. Author: Ismael Juma <[email protected]> Reviewers: Jason Gustafson, Guozhang Wang Closes #1357 from ijuma/produce-request-test-improvement Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/c36cc60f Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/c36cc60f Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/c36cc60f Branch: refs/heads/trunk Commit: c36cc60f73ca1fe956fb8792bc538b1fdebb712d Parents: 2bd7b64 Author: Ismael Juma <[email protected]> Authored: Wed May 18 11:01:27 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed May 18 11:01:27 2016 -0700 ---------------------------------------------------------------------- .../java/org/apache/kafka/test/TestUtils.java | 24 ++++++++++- .../unit/kafka/server/ProduceRequestTest.scala | 45 +++++++++++++------- 2 files changed, 52 insertions(+), 17 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/c36cc60f/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index 1bfe578..742d14f 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -20,6 +20,7 @@ import static java.util.Arrays.asList; import java.io.File; import java.io.IOException; +import java.nio.ByteBuffer; import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; @@ -31,9 +32,12 @@ import java.util.Random; import org.apache.kafka.common.Cluster; import org.apache.kafka.common.Node; import org.apache.kafka.common.PartitionInfo; +import org.apache.kafka.common.record.CompressionType; +import org.apache.kafka.common.record.MemoryRecords; +import org.apache.kafka.common.record.Record; +import org.apache.kafka.common.record.Records; import org.apache.kafka.common.utils.Utils; - /** * Helper functions for writing unit tests */ @@ -141,4 +145,22 @@ public class TestUtils { return file; } + /** + * Create a records buffer including the offset and message size at the start, which is required if the buffer is to + * be sent as part of `ProduceRequest`. This is the reason why we can't use + * `Record(long timestamp, byte[] key, byte[] value, CompressionType type, int valueOffset, int valueSize)` as this + * constructor does not include either of these fields. + */ + public static ByteBuffer partitionRecordsBuffer(long offset, CompressionType compressionType, Record... records) { + int bufferSize = 0; + for (Record record : records) + bufferSize += Records.LOG_OVERHEAD + record.size(); + ByteBuffer buffer = ByteBuffer.allocate(bufferSize); + MemoryRecords memoryRecords = MemoryRecords.emptyRecords(buffer, compressionType); + for (Record record : records) + memoryRecords.append(offset, record); + memoryRecords.close(); + return memoryRecords.buffer(); + } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/c36cc60f/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala index 67f7d41..f80fa7d 100644 --- a/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala +++ b/core/src/test/scala/unit/kafka/server/ProduceRequestTest.scala @@ -17,10 +17,13 @@ package kafka.server -import kafka.message.{ByteBufferMessageSet, LZ4CompressionCodec, Message} +import java.nio.ByteBuffer + import kafka.utils.TestUtils +import org.apache.kafka.test.{TestUtils => JTestUtils} import org.apache.kafka.common.TopicPartition import org.apache.kafka.common.protocol.{ApiKeys, Errors, ProtoUtils} +import org.apache.kafka.common.record.{CompressionType, Record} import org.apache.kafka.common.requests.{ProduceRequest, ProduceResponse} import org.junit.Assert._ import org.junit.Test @@ -36,17 +39,26 @@ class ProduceRequestTest extends BaseRequestTest { @Test def testSimpleProduceRequest() { val (partition, leader) = createTopicAndFindPartitionWithLeader("topic") - val messageBuffer = new ByteBufferMessageSet(new Message("value".getBytes, "key".getBytes, - System.currentTimeMillis(), 1: Byte)).buffer - val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = Map(topicPartition -> messageBuffer) - val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava)) - assertEquals(1, produceResponse.responses.size) - val (tp, partitionResponse) = produceResponse.responses.asScala.head - assertEquals(topicPartition, tp) - assertEquals(Errors.NONE.code, partitionResponse.errorCode) - assertEquals(0, partitionResponse.baseOffset) - assertEquals(-1, partitionResponse.timestamp) + + def sendAndCheck(recordBuffer: ByteBuffer, expectedOffset: Long): ProduceResponse.PartitionResponse = { + val topicPartition = new TopicPartition("topic", partition) + val partitionRecords = Map(topicPartition -> recordBuffer) + val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava)) + assertEquals(1, produceResponse.responses.size) + val (tp, partitionResponse) = produceResponse.responses.asScala.head + assertEquals(topicPartition, tp) + assertEquals(Errors.NONE.code, partitionResponse.errorCode) + assertEquals(expectedOffset, partitionResponse.baseOffset) + assertEquals(-1, partitionResponse.timestamp) + partitionResponse + } + + sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.NONE, + new Record(System.currentTimeMillis(), "key".getBytes, "value".getBytes)), 0) + + sendAndCheck(JTestUtils.partitionRecordsBuffer(0, CompressionType.GZIP, + new Record(System.currentTimeMillis(), "key1".getBytes, "value1".getBytes), + new Record(System.currentTimeMillis(), "key2".getBytes, "value2".getBytes)), 1) } /* returns a pair of partition id and leader id */ @@ -60,12 +72,13 @@ class ProduceRequestTest extends BaseRequestTest { @Test def testCorruptLz4ProduceRequest() { val (partition, leader) = createTopicAndFindPartitionWithLeader("topic") - val messageBuffer = new ByteBufferMessageSet(LZ4CompressionCodec, new Message("value".getBytes, "key".getBytes, - System.currentTimeMillis(), 1: Byte)).buffer + val timestamp = 1000000 + val recordBuffer = JTestUtils.partitionRecordsBuffer(0, CompressionType.LZ4, + new Record(timestamp, "key".getBytes, "value".getBytes)) // Change the lz4 checksum value so that it doesn't match the contents - messageBuffer.array.update(40, 0) + recordBuffer.array.update(40, 0) val topicPartition = new TopicPartition("topic", partition) - val partitionRecords = Map(topicPartition -> messageBuffer) + val partitionRecords = Map(topicPartition -> recordBuffer) val produceResponse = sendProduceRequest(leader, new ProduceRequest(-1, 3000, partitionRecords.asJava)) assertEquals(1, produceResponse.responses.size) val (tp, partitionResponse) = produceResponse.responses.asScala.head
