Repository: kafka Updated Branches: refs/heads/0.10.2 dfbe944e9 -> 71a511c34
KAFKA-4761; Fix producer regression handling small or zero batch size Author: Jason Gustafson <ja...@confluent.io> Reviewers: Apurva Mehta <apurva.1...@gmail.com>, Vahid Hashemian <vahidhashem...@us.ibm.com>, Ismael Juma <ism...@juma.me.uk>, Guozhang Wang <wangg...@gmail.com> Closes #2545 from hachikuji/KAFKA-4761 (cherry picked from commit 3b36d5cff0b7a51e737a97144d6d479af708b2d7) Signed-off-by: Jason Gustafson <ja...@confluent.io> Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/71a511c3 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/71a511c3 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/71a511c3 Branch: refs/heads/0.10.2 Commit: 71a511c3460bfbc2a47e8f03926be8bff19fe916 Parents: dfbe944 Author: Jason Gustafson <ja...@confluent.io> Authored: Mon Feb 13 14:07:19 2017 -0800 Committer: Jason Gustafson <ja...@confluent.io> Committed: Mon Feb 13 14:08:51 2017 -0800 ---------------------------------------------------------------------- .../common/record/MemoryRecordsBuilder.java | 18 +++++++++++- .../kafka/common/record/FileRecordsTest.java | 4 +-- .../common/record/MemoryRecordsBuilderTest.java | 29 ++++++++++++++++++ .../kafka/common/record/MemoryRecordsTest.java | 6 ++-- .../java/org/apache/kafka/test/TestUtils.java | 26 +++++----------- .../kafka/api/BaseProducerSendTest.scala | 31 ++++++++++++++++++-- .../kafka/api/PlaintextProducerSendTest.scala | 8 +++++ .../message/BaseMessageSetTestCases.scala | 4 +-- 8 files changed, 96 insertions(+), 30 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/71a511c3/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index 69e9003..02bfc24 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -105,6 +105,20 @@ public class MemoryRecordsBuilder { private MemoryRecords builtRecords; + /** + * Construct a new builder. + * + * @param buffer The underlying buffer to use (note that this class will allocate a new buffer if necessary + * to fit the records appended) + * @param magic The magic value to use + * @param compressionType The compression codec to use + * @param timestampType The desired timestamp type. For magic > 0, this cannot be {@link TimestampType#NO_TIMESTAMP_TYPE}. + * @param baseOffset The initial offset to use for + * @param logAppendTime The log append time of this record set. Can be set to NO_TIMESTAMP if CREATE_TIME is used. + * @param writeLimit The desired limit on the total bytes for this record set (note that this can be exceeded + * when compression is used since size estimates are rough, and in the case that the first + * record added exceeds the size). + */ public MemoryRecordsBuilder(ByteBuffer buffer, byte magic, CompressionType compressionType, @@ -373,7 +387,9 @@ public class MemoryRecordsBuilder { } public boolean isFull() { - return isClosed() || this.writeLimit <= estimatedBytesWritten(); + // note that the write limit is respected only after the first record is added which ensures we can always + // create non-empty batches (this is used to disable batching when the producer's batch size is set to 0). + return isClosed() || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); } public int sizeInBytes() { http://git-wip-us.apache.org/repos/asf/kafka/blob/71a511c3/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java index dcd3bef..274bf9d 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/FileRecordsTest.java @@ -381,11 +381,11 @@ public class FileRecordsTest { } private static List<LogEntry> shallowEntries(Records buffer) { - return TestUtils.toList(buffer.shallowEntries().iterator()); + return TestUtils.toList(buffer.shallowEntries()); } private static List<LogEntry> deepEntries(Records buffer) { - return TestUtils.toList(buffer.deepEntries().iterator()); + return TestUtils.toList(buffer.deepEntries()); } private FileRecords createFileRecords(Record ... records) throws IOException { http://git-wip-us.apache.org/repos/asf/kafka/blob/71a511c3/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java index 034faf6..02ee75e 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsBuilderTest.java @@ -16,6 +16,7 @@ **/ package org.apache.kafka.common.record; +import org.apache.kafka.test.TestUtils; import org.junit.Test; import org.junit.runner.RunWith; import org.junit.runners.Parameterized; @@ -28,6 +29,7 @@ import java.util.List; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; @RunWith(value = Parameterized.class) public class MemoryRecordsBuilderTest { @@ -178,6 +180,33 @@ public class MemoryRecordsBuilderTest { } @Test + public void testSmallWriteLimit() { + // with a small write limit, we always allow at least one record to be added + + byte[] key = "foo".getBytes(); + byte[] value = "bar".getBytes(); + int writeLimit = 0; + ByteBuffer buffer = ByteBuffer.allocate(512); + MemoryRecordsBuilder builder = new MemoryRecordsBuilder(buffer, Record.CURRENT_MAGIC_VALUE, compressionType, + TimestampType.CREATE_TIME, 0L, Record.NO_TIMESTAMP, writeLimit); + + assertFalse(builder.isFull()); + assertTrue(builder.hasRoomFor(key, value)); + builder.append(0L, key, value); + + assertTrue(builder.isFull()); + assertFalse(builder.hasRoomFor(key, value)); + + MemoryRecords memRecords = builder.build(); + List<Record> records = TestUtils.toList(memRecords.records()); + assertEquals(1, records.size()); + + Record record = records.get(0); + assertEquals(ByteBuffer.wrap(key), record.key()); + assertEquals(ByteBuffer.wrap(value), record.value()); + } + + @Test public void writePastLimit() { ByteBuffer buffer = ByteBuffer.allocate(64); buffer.position(bufferOffset); http://git-wip-us.apache.org/repos/asf/kafka/blob/71a511c3/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index 9c8ca7f..9271a3f 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -135,7 +135,7 @@ public class MemoryRecordsTest { MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator()); + List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries()); List<Long> expectedOffsets = compression == CompressionType.NONE ? asList(1L, 4L, 5L, 6L) : asList(1L, 5L, 6L); assertEquals(expectedOffsets.size(), shallowEntries.size()); @@ -148,7 +148,7 @@ public class MemoryRecordsTest { shallowEntry.record().timestampType()); } - List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries().iterator()); + List<LogEntry> deepEntries = TestUtils.toList(filteredRecords.deepEntries()); assertEquals(4, deepEntries.size()); LogEntry first = deepEntries.get(0); @@ -197,7 +197,7 @@ public class MemoryRecordsTest { filtered.flip(); MemoryRecords filteredRecords = MemoryRecords.readableRecords(filtered); - List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries().iterator()); + List<ByteBufferLogInputStream.ByteBufferLogEntry> shallowEntries = TestUtils.toList(filteredRecords.shallowEntries()); assertEquals(compression == CompressionType.NONE ? 3 : 2, shallowEntries.size()); for (LogEntry shallowEntry : shallowEntries) { http://git-wip-us.apache.org/repos/asf/kafka/blob/71a511c3/clients/src/test/java/org/apache/kafka/test/TestUtils.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/test/TestUtils.java b/clients/src/test/java/org/apache/kafka/test/TestUtils.java index c39f402..0cb32be 100644 --- a/clients/src/test/java/org/apache/kafka/test/TestUtils.java +++ b/clients/src/test/java/org/apache/kafka/test/TestUtils.java @@ -31,7 +31,6 @@ import java.nio.file.Files; import java.nio.file.Path; import java.util.ArrayList; import java.util.Collections; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Properties; @@ -43,7 +42,6 @@ import java.util.regex.Pattern; import static java.util.Arrays.asList; import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -291,27 +289,17 @@ public class TestUtils { } /** - * Throw an exception if the two iterators are of differing lengths or contain - * different messages on their Nth element - */ - public static <T> void checkEquals(Iterator<T> s1, Iterator<T> s2) { - while (s1.hasNext() && s2.hasNext()) - assertEquals(s1.next(), s2.next()); - assertFalse("Iterators have uneven length--first has more", s1.hasNext()); - assertFalse("Iterators have uneven length--second has more", s2.hasNext()); - } - - /** * Checks the two iterables for equality by first converting both to a list. */ public static <T> void checkEquals(Iterable<T> it1, Iterable<T> it2) { - assertEquals(toList(it1.iterator()), toList(it2.iterator())); + assertEquals(toList(it1), toList(it2)); } - public static <T> List<T> toList(Iterator<? extends T> iterator) { - List<T> res = new ArrayList<>(); - while (iterator.hasNext()) - res.add(iterator.next()); - return res; + public static <T> List<T> toList(Iterable<? extends T> iterable) { + List<T> list = new ArrayList<>(); + for (T item : iterable) + list.add(item); + return list; } + } http://git-wip-us.apache.org/repos/asf/kafka/blob/71a511c3/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala index ad61a37..da3f651 100644 --- a/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/BaseProducerSendTest.scala @@ -96,7 +96,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { @Test def testSendOffset() { val producer = createProducer(brokerList) - val partition = new Integer(0) + val partition = 0 object callback extends Callback { var offset = 0L @@ -175,8 +175,33 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { sendAndVerifyTimestamp(producer, TimestampType.CREATE_TIME) } + protected def sendAndVerify(producer: KafkaProducer[Array[Byte], Array[Byte]], + numRecords: Int = numRecords, + timeoutMs: Long = 20000L) { + val partition = 0 + try { + TestUtils.createTopic(zkUtils, topic, 1, 2, servers) + + val recordAndFutures = for (i <- 1 to numRecords) yield { + val record = new ProducerRecord(topic, partition, s"key$i".getBytes, s"value$i".getBytes) + (record, producer.send(record)) + } + producer.close(timeoutMs, TimeUnit.MILLISECONDS) + val lastOffset = recordAndFutures.foldLeft(0) { case (offset, (record, future)) => + val recordMetadata = future.get + assertEquals(topic, recordMetadata.topic) + assertEquals(partition, recordMetadata.partition) + assertEquals(offset, recordMetadata.offset) + offset + 1 + } + assertEquals(numRecords, lastOffset) + } finally { + producer.close() + } + } + protected def sendAndVerifyTimestamp(producer: KafkaProducer[Array[Byte], Array[Byte]], timestampType: TimestampType) { - val partition = new Integer(0) + val partition = 0 val baseTimestamp = 123456L val startTime = System.currentTimeMillis() @@ -212,7 +237,7 @@ abstract class BaseProducerSendTest extends KafkaServerTestHarness { TestUtils.createTopic(zkUtils, topic, 1, 2, servers, topicProps) val recordAndFutures = for (i <- 1 to numRecords) yield { - val record = new ProducerRecord(topic, partition, baseTimestamp + i, "key".getBytes, "value".getBytes) + val record = new ProducerRecord(topic, partition, baseTimestamp + i, s"key$i".getBytes, s"value$i".getBytes) (record, producer.send(record, callback)) } producer.close(20000L, TimeUnit.MILLISECONDS) http://git-wip-us.apache.org/repos/asf/kafka/blob/71a511c3/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala index 956fe61..10063a9 100644 --- a/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala +++ b/core/src/test/scala/integration/kafka/api/PlaintextProducerSendTest.scala @@ -42,6 +42,14 @@ class PlaintextProducerSendTest extends BaseProducerSendTest { } @Test + def testBatchSizeZero() { + val producerProps = new Properties() + producerProps.setProperty(ProducerConfig.BATCH_SIZE_CONFIG, "0") + val producer = createProducer(brokerList = brokerList, lingerMs = Long.MaxValue, props = Some(producerProps)) + sendAndVerify(producer) + } + + @Test def testSendCompressedMessageWithLogAppendTime() { val producerProps = new Properties() producerProps.setProperty(ProducerConfig.COMPRESSION_TYPE_CONFIG, "gzip") http://git-wip-us.apache.org/repos/asf/kafka/blob/71a511c3/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala ---------------------------------------------------------------------- diff --git a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala index 3327a65..a53602d 100644 --- a/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala +++ b/core/src/test/scala/unit/kafka/javaapi/message/BaseMessageSetTestCases.scala @@ -40,14 +40,14 @@ trait BaseMessageSetTestCases extends JUnitSuite { def testIteratorIsConsistent() { val m = createMessageSet(messages) // two iterators over the same set should give the same results - TestUtils.checkEquals(m.iterator, m.iterator) + TestUtils.checkEquals(m, m) } @Test def testIteratorIsConsistentWithCompression() { val m = createMessageSet(messages, DefaultCompressionCodec) // two iterators over the same set should give the same results - TestUtils.checkEquals(m.iterator, m.iterator) + TestUtils.checkEquals(m, m) } @Test