Repository: kafka Updated Branches: refs/heads/trunk 3022c1938 -> 87b894d68
KAFKA-2878; Guard against OutOfMemory in Kafka broker Sanity check array size in requests before allocation Author: Rajini Sivaram <[email protected]> Reviewers: Ismael Juma <[email protected]>, Ashish Singh <[email protected]>, Jun Rao <[email protected]> Closes #577 from rajinisivaram/KAFKA-2878 Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/87b894d6 Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/87b894d6 Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/87b894d6 Branch: refs/heads/trunk Commit: 87b894d682a5f10b2e1d32dd028ddb258f2f1bbf Parents: 3022c19 Author: Rajini Sivaram <[email protected]> Authored: Wed Nov 25 08:53:13 2015 -0800 Committer: Jun Rao <[email protected]> Committed: Wed Nov 25 08:53:13 2015 -0800 ---------------------------------------------------------------------- .../kafka/common/protocol/types/ArrayOf.java | 2 ++ .../protocol/types/ProtocolSerializationTest.java | 17 +++++++++++++++++ 2 files changed, 19 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/87b894d6/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java ---------------------------------------------------------------------- diff --git a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java index eec456e..4a36cb7 100644 --- a/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java +++ b/clients/src/main/java/org/apache/kafka/common/protocol/types/ArrayOf.java @@ -41,6 +41,8 @@ public class ArrayOf extends Type { @Override public Object read(ByteBuffer buffer) { int size = buffer.getInt(); + if (size > buffer.remaining()) + throw new SchemaException("Error reading array of size " + size + ", only " + buffer.remaining() + " bytes available"); Object[] objs = new Object[size]; for (int i = 0; i < size; i++) objs[i] = type.read(buffer); http://git-wip-us.apache.org/repos/asf/kafka/blob/87b894d6/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java ---------------------------------------------------------------------- diff --git a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java index 6c335a1..d2e2782 100644 --- a/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java +++ b/clients/src/test/java/org/apache/kafka/common/protocol/types/ProtocolSerializationTest.java @@ -90,6 +90,23 @@ public class ProtocolSerializationTest { struct.validate(); // should be valid even with missing value } + @Test + public void testArray() { + Type type = new ArrayOf(Type.INT8); + int size = 10; + ByteBuffer invalidBuffer = ByteBuffer.allocate(4 + size); + invalidBuffer.putInt(Integer.MAX_VALUE); + for (int i = 0; i < size; i++) + invalidBuffer.put((byte) i); + invalidBuffer.rewind(); + try { + type.read(invalidBuffer); + fail("Array size not validated"); + } catch (SchemaException e) { + // Expected exception + } + } + private Object roundtrip(Type type, Object obj) { ByteBuffer buffer = ByteBuffer.allocate(type.sizeOf(obj)); type.write(buffer, obj);
