Repository: kafka Updated Branches: refs/heads/trunk 74e6dc842 -> 327400449
KAFKA-3594; After calling MemoryRecords.close() method, hasRoomFor() method should return false This exception is occurring when producer is trying to append a record to a Re-enqueued record batch in the accumulator. We should not allow to add a record to Re-enqueued record batch. This is due a bug in MemoryRecords.java/hasRoomFor() method. After calling MemoryRecords.close() method, hasRoomFor() method should return false. Author: Manikumar reddy O <[email protected]> Reviewers: Ismael Juma, Grant Henke, Guozhang Wang Closes #1249 from omkreddy/KAFKA-3594 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/32740044 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/32740044 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/32740044 Branch: refs/heads/trunk Commit: 32740044972ad3bfc9539b5d76128dceddedc2ba Parents: 74e6dc8 Author: Manikumar reddy O <[email protected]> Authored: Thu Apr 21 15:13:25 2016 -0700 Committer: Guozhang Wang <[email protected]> Committed: Thu Apr 21 15:13:25 2016 -0700 ---------------------------------------------------------------------- .../org/apache/kafka/common/record/MemoryRecords.java | 5 ++++- .../apache/kafka/common/record/MemoryRecordsTest.java | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/32740044/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index f37ef39..7175953 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -115,7 +115,10 @@ public class MemoryRecords implements Records { * to accept this single record. */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.writable && this.compressor.numRecordsWritten() == 0 ? + if (!this.writable) + return false; + + return this.compressor.numRecordsWritten() == 0 ? this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) : this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/32740044/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index ed64f63..b1117f1 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -71,6 +71,17 @@ public class MemoryRecordsTest { } } + @Test + public void testHasRoomForMethod() { + MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); + recs1.append(0, new Record(0L, "a".getBytes(), "1".getBytes())); + + assertTrue(recs1.hasRoomFor("b".getBytes(), "2".getBytes())); + recs1.close(); + assertFalse(recs1.hasRoomFor("b".getBytes(), "2".getBytes())); + + } + @Parameterized.Parameters public static Collection<Object[]> data() { List<Object[]> values = new ArrayList<Object[]>();
