Repository: incubator-apex-core Updated Branches: refs/heads/release-3.2 8b4699d22 -> 13bb0f488
APEXCORE-365 - Log error when buffer server receives a tuple with the length that exceeds buffer server data list block size Project: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/repo Commit: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/commit/9e15841a Tree: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/tree/9e15841a Diff: http://git-wip-us.apache.org/repos/asf/incubator-apex-core/diff/9e15841a Branch: refs/heads/release-3.2 Commit: 9e15841af2faac05b97957c7b2cc0d2c519e885e Parents: 75b7168 Author: Vlad Rozov <[email protected]> Authored: Mon Feb 29 13:11:49 2016 -0800 Committer: Vlad Rozov <[email protected]> Committed: Mon Feb 29 13:11:49 2016 -0800 ---------------------------------------------------------------------- .../com/datatorrent/bufferserver/internal/DataList.java | 7 ++++++- .../com/datatorrent/bufferserver/server/Server.java | 12 ++++++------ 2 files changed, 12 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9e15841a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java index f39eca1..06bfbf6 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/internal/DataList.java @@ -437,8 +437,13 @@ public class DataList return (storage == null) || (numberOfInMemBlockPermits.get() > 0); } - public byte[] newBuffer() + public byte[] newBuffer(final int size) { + if (size > blockSize) { + logger.error("Tuple size {} exceeds buffer server current block size {}. Please decrease tuple size. " + + "Proceeding with allocating larger block that may cause out of memory exception.", size, blockSize); + return new byte[size]; + } return new byte[blockSize]; } http://git-wip-us.apache.org/repos/asf/incubator-apex-core/blob/9e15841a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java ---------------------------------------------------------------------- diff --git a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java index c4cdf5b..3df17c9 100644 --- a/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java +++ b/bufferserver/src/main/java/com/datatorrent/bufferserver/server/Server.java @@ -652,7 +652,7 @@ public class Server implements ServerListener * so we allocate a new byteBuffer and copy over the partially written data to the * new byteBuffer and start as if we always had full room but not enough data. */ - if (!switchToNewBufferOrSuspendRead(buffer, readOffset)) { + if (!switchToNewBufferOrSuspendRead(buffer, readOffset, size)) { return false; } } @@ -681,7 +681,7 @@ public class Server implements ServerListener /* * hit wall while writing serialized data, so have to allocate a new byteBuffer. */ - if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size))) { + if (!switchToNewBufferOrSuspendRead(buffer, readOffset - VarInt.getSize(size), size)) { readOffset -= VarInt.getSize(size); size = 0; return false; @@ -697,19 +697,19 @@ public class Server implements ServerListener while (true); } - private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset) + private boolean switchToNewBufferOrSuspendRead(final byte[] array, final int offset, final int size) { - if (switchToNewBuffer(array, offset)) { + if (switchToNewBuffer(array, offset, size)) { return true; } datalist.suspendRead(this); return false; } - private boolean switchToNewBuffer(final byte[] array, final int offset) + private boolean switchToNewBuffer(final byte[] array, final int offset, final int size) { if (datalist.isMemoryBlockAvailable()) { - final byte[] newBuffer = datalist.newBuffer(); + final byte[] newBuffer = datalist.newBuffer(size); byteBuffer = ByteBuffer.wrap(newBuffer); if (array == null || array.length - offset == 0) { writeOffset = 0;
