This is an automated email from the ASF dual-hosted git repository.

nkruber pushed a commit to branch release-1.6
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 6c77b3183602f3d0d2a84e57663f21cc4fa0a87d
Author: Nico Kruber <[email protected]>
AuthorDate: Thu Sep 13 12:14:05 2018 +0200

    [hotfix][network] ensure deserialization buffer capacity for the whole 
record length
    
    Once we know the record length and if we are not spilling, we should size 
the
    buffer immediately to the expected record size, and not incrementally for 
each
    received buffer chunk.
---
 .../serialization/SpillingAdaptiveSpanningRecordDeserializer.java | 8 ++++----
 1 file changed, 4 insertions(+), 4 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 41ee03d..196287b 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -485,7 +485,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                        }
                        else {
                                // collect in memory
-                               ensureBufferCapacity(numBytesChunk);
+                               ensureBufferCapacity(nextRecordLength);
                                partial.segment.get(partial.position, buffer, 
0, numBytesChunk);
                        }
 
@@ -515,6 +515,8 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                                        segmentRemaining -= toPut;
                                        if (this.recordLength > 
THRESHOLD_FOR_SPILLING) {
                                                this.spillingChannel = 
createSpillingChannel();
+                                       } else {
+                                               
ensureBufferCapacity(this.recordLength);
                                        }
                                }
                        }
@@ -527,9 +529,7 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
                                // spill to file
                                ByteBuffer toWrite = 
segment.wrap(segmentPosition, toCopy);
                                this.spillingChannel.write(toWrite);
-                       }
-                       else {
-                               ensureBufferCapacity(accumulatedRecordBytes + 
toCopy);
+                       } else {
                                segment.get(segmentPosition, buffer, 
this.accumulatedRecordBytes, toCopy);
                        }
 

Reply via email to