Repository: kafka Updated Branches: refs/heads/trunk 0bede30ad -> 970c00eab
KAFKA-5213; Mark a MemoryRecordsBuilder as full as soon as the append stream is closed Author: Apurva Mehta <[email protected]> Reviewers: Ismael Juma <[email protected]>, Jason Gustafson <[email protected]> Closes #3015 from apurvam/KAFKA-5213-illegalstateexception-in-ensureOpenForAppend Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/970c00ea Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/970c00ea Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/970c00ea Branch: refs/heads/trunk Commit: 970c00eab80d82b97456e276ee0f5615cb1ccfa1 Parents: 0bede30 Author: Apurva Mehta <[email protected]> Authored: Wed May 10 18:15:54 2017 -0700 Committer: Jason Gustafson <[email protected]> Committed: Wed May 10 18:15:54 2017 -0700 ---------------------------------------------------------------------- .../kafka/common/record/MemoryRecordsBuilder.java | 2 +- .../producer/internals/ProducerBatchTest.java | 15 +++++++++++++++ 2 files changed, 16 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/970c00ea/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java index f7451cf..025b402 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecordsBuilder.java @@ -664,7 +664,7 @@ public class MemoryRecordsBuilder { public boolean isFull() { // note that the write limit is respected only after the first record is added which ensures we can always // create non-empty batches (this is used to disable batching when the producer's batch size is set to 0). - return isClosed() || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); + return appendStreamIsClosed || (this.numRecords > 0 && this.writeLimit <= estimatedBytesWritten()); } public int sizeInBytes() { http://git-wip-us.apache.org/repos/asf/kafka/blob/970c00ea/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java index 6895fce..fede528 100644 --- a/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java +++ b/clients/src/test/java/org/apache/kafka/clients/producer/internals/ProducerBatchTest.java @@ -20,12 +20,16 @@ import org.apache.kafka.common.TopicPartition; import org.apache.kafka.common.record.CompressionType; import org.apache.kafka.common.record.MemoryRecords; import org.apache.kafka.common.record.MemoryRecordsBuilder; +import org.apache.kafka.common.record.Record; import org.apache.kafka.common.record.TimestampType; import org.junit.Test; import java.nio.ByteBuffer; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; public class ProducerBatchTest { @@ -70,4 +74,15 @@ public class ProducerBatchTest { // Set `now` to 2ms before the create time. assertFalse(batch.maybeExpire(10240, 10240L, now - 2L, 10240L, true)); } + + @Test + public void testShouldNotAttemptAppendOnceRecordsBuilderIsClosedForAppends() { + ProducerBatch batch = new ProducerBatch(new TopicPartition("topic", 1), memoryRecordsBuilder, now); + FutureRecordMetadata result0 = batch.tryAppend(now, null, new byte[10], Record.EMPTY_HEADERS, null, now); + assertNotNull(result0); + assertTrue(memoryRecordsBuilder.hasRoomFor(now, null, new byte[10])); + memoryRecordsBuilder.closeForRecordAppends(); + assertFalse(memoryRecordsBuilder.hasRoomFor(now, null, new byte[10])); + assertEquals(null, batch.tryAppend(now + 1, null, new byte[10], Record.EMPTY_HEADERS, null, now + 1)); + } }
