Repository: kafka Updated Branches: refs/heads/0.9.0 ec276b38f -> cc884ee6a
KAFKA-3594; After calling MemoryRecords.close() method, hasRoomFor() method should return false This exception is occurring when producer is trying to append a record to a Re-enqueued record batch in the accumulator. We should not allow to add a record to Re-enqueued record batch. This is due a bug in MemoryRecords.java/hasRoomFor() method. After calling MemoryRecords.close() method, hasRoomFor() method should return false. This is a backport to the 0.9.0 branch. Author: Manikumar reddy O <[email protected]> Reviewers: Guozhang Wang <[email protected]>, Ismael Juma <[email protected]> Closes #1547 from ijuma/kafka-3594-for-0.9.0 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/cc884ee6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/cc884ee6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/cc884ee6 Branch: refs/heads/0.9.0 Commit: cc884ee6a264d3552a99876b199d09bb3d640839 Parents: ec276b3 Author: Manikumar reddy O <[email protected]> Authored: Fri Jun 24 10:57:07 2016 +0200 Committer: Ismael Juma <[email protected]> Committed: Fri Jun 24 10:57:07 2016 +0200 ---------------------------------------------------------------------- .../org/apache/kafka/common/record/MemoryRecords.java | 5 ++++- .../apache/kafka/common/record/MemoryRecordsTest.java | 11 +++++++++++ 2 files changed, 15 insertions(+), 1 deletion(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/cc884ee6/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java index 971f0a2..de27d5f 100644 --- a/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java +++ b/clients/src/main/java/org/apache/kafka/common/record/MemoryRecords.java @@ -112,7 +112,10 @@ public class MemoryRecords implements Records { * to accept this single record. */ public boolean hasRoomFor(byte[] key, byte[] value) { - return this.writable && this.compressor.numRecordsWritten() == 0 ? + if (!this.writable) + return false; + + return this.compressor.numRecordsWritten() == 0 ? this.initialCapacity >= Records.LOG_OVERHEAD + Record.recordSize(key, value) : this.writeLimit >= this.compressor.estimatedBytesWritten() + Records.LOG_OVERHEAD + Record.recordSize(key, value); } http://git-wip-us.apache.org/repos/asf/kafka/blob/cc884ee6/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java index e343327..7338503 100644 --- a/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java +++ b/clients/src/test/java/org/apache/kafka/common/record/MemoryRecordsTest.java @@ -67,6 +67,17 @@ public class MemoryRecordsTest { } } + @Test + public void testHasRoomForMethod() { + MemoryRecords recs1 = MemoryRecords.emptyRecords(ByteBuffer.allocate(1024), compression); + recs1.append(0, new Record("a".getBytes(), "1".getBytes())); + + assertTrue(recs1.hasRoomFor("b".getBytes(), "2".getBytes())); + recs1.close(); + assertFalse(recs1.hasRoomFor("b".getBytes(), "2".getBytes())); + + } + @Parameterized.Parameters public static Collection<Object[]> data() { List<Object[]> values = new ArrayList<Object[]>();
