This is an automated email from the ASF dual-hosted git repository. nkruber pushed a commit to branch release-1.5 in repository https://gitbox.apache.org/repos/asf/flink.git
commit 11ada4b96ec51819c758708d89525e26cd743867 Author: Nico Kruber <[email protected]> AuthorDate: Thu Sep 13 12:14:05 2018 +0200 [hotfix][network] ensure deserialization buffer capacity for the whole record length Once we know the record length and if we are not spilling, we should size the buffer immediately to the expected record size, and not incrementally for each received buffer chunk. --- .../serialization/SpillingAdaptiveSpanningRecordDeserializer.java | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 41ee03d..196287b 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -485,7 +485,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit } else { // collect in memory - ensureBufferCapacity(numBytesChunk); + ensureBufferCapacity(nextRecordLength); partial.segment.get(partial.position, buffer, 0, numBytesChunk); } @@ -515,6 +515,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit segmentRemaining -= toPut; if (this.recordLength > THRESHOLD_FOR_SPILLING) { this.spillingChannel = createSpillingChannel(); + } else { + ensureBufferCapacity(this.recordLength); } } } @@ -527,9 +529,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit // spill to file ByteBuffer toWrite = segment.wrap(segmentPosition, toCopy); this.spillingChannel.write(toWrite); - } - else { - ensureBufferCapacity(accumulatedRecordBytes + toCopy); + } else { segment.get(segmentPosition, buffer, this.accumulatedRecordBytes, toCopy); }
