Repository: incubator-apex-core Updated Branches: refs/heads/master cf4e29ea3 -> 6b78b57bd
APEXCORE-365 - Log error when buffer server receives a tuple with the length that exceeds buffer server data list block size Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/6b78b57b Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/6b78b57b Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/6b78b57b Branch: refs/heads/master Commit: 6b78b57bd6d13cb855f7b767106d33c2b5f97885 Parents: cf4e29e Author: Vlad Rozov <[email protected]> Authored: Mon Feb 29 13:11:49 2016 -0800 Committer: Thomas Weise <[email protected]> Committed: Mon Feb 29 20:04:56 2016 -0800 ---------------------------------------------------------------------- .../com/datatorrent/bufferserver/internal/DataList.java | 7 ++++++- .../com/datatorrent/bufferserver/server/Server.java | 12 ++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6b78b57b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index 0de7261..95c32b0 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -447,8 +447,13 @@ public class DataList return (storage == null) || (numberOfInMemBlockPermits.get() > 0); } - public byte[] newBuffer() + public byte[] newBuffer(final int size) { + if (size > blockSize) { + logger.error("Tuple size {} exceeds buffer server current block size {}. Please decrease tuple size. " + + "Proceeding with allocating larger block that may cause out of memory exception.", size, blockSize); + return new byte[size]; + } return new byte[blockSize]; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/6b78b57b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index 03d96ee..c2da111 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -665,7 +665,7 @@ public class Server implements ServerListener * so we allocate a new byteBuffer and copy over the partially written data to the * new byteBuffer and start as if we always had full room but not enough data. */ - if (!switchToNewBufferOrSuspendRead(buffer, readOffset)) { + if (!switchToNewBufferOrSuspendRead(buffer, readOffset, size)) { return false; } } @@ -694,7 +694,7 @@ public class Server implements ServerListener /* * hit wall while writing serialized data, so have to allocate a new byteBuffer. */ - if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size))) { + if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size), size)) { readOffset -= VarInt.getSize(size); size = 0; return false; @@ -710,19 +710,19 @@ public class Server implements ServerListener while (true); } - private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset) + private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset, final int size) { - if (switchToNewBuffer(array, offset)) { + if (switchToNewBuffer(array, offset, size)) { return true; } datalist.suspendRead(this); return false; } - private boolean switchToNewBuffer(final byte[] array, final int offset) + private boolean switchToNewBuffer(final byte[] array, final int offset, final int size) { if (datalist.isMemoryBlockAvailable()) { - final byte[] newBuffer = datalist.newBuffer(); + final byte[] newBuffer = datalist.newBuffer(size); byteBuffer = ByteBuffer.wrap(newBuffer); if (array == null || array.length - offset == 0) { writeOffset = 0;
