[FLINK-1326] [runtime] Make sure that serializers release their very large 
intermediate buffers early


Project: http://git-wip-us.apache.org/repos/asf/flink/repo
Commit: http://git-wip-us.apache.org/repos/asf/flink/commit/bc69b0bb
Tree: http://git-wip-us.apache.org/repos/asf/flink/tree/bc69b0bb
Diff: http://git-wip-us.apache.org/repos/asf/flink/diff/bc69b0bb

Branch: refs/heads/master
Commit: bc69b0bb530a6daeea2c18b1ff9edbb4d1faa48c
Parents: 4825656
Author: Stephan Ewen <[email protected]>
Authored: Thu Dec 4 15:48:03 2014 +0100
Committer: Ufuk Celebi <[email protected]>
Committed: Wed Jan 21 12:01:35 2015 +0100

----------------------------------------------------------------------
 .../AdaptiveSpanningRecordDeserializer.java     |   2 +
 .../serialization/SpanningRecordSerializer.java |  10 +-
 .../runtime/util/DataInputDeserializer.java     |   4 +
 .../runtime/util/DataOutputSerializer.java      |  73 ++++++----
 .../types/SerializationTestTypeFactory.java     |   1 +
 .../network/serialization/LargeRecordsTest.java | 132 +++++++++++++++++++
 .../serialization/types/LargeObjectType.java    |  98 ++++++++++++++
 7 files changed, 290 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/flink/blob/bc69b0bb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
index 9eda286..28bcf4a 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/AdaptiveSpanningRecordDeserializer.java
@@ -505,6 +505,8 @@ public class AdaptiveSpanningRecordDeserializer<T extends 
IOReadableWritable> im
 
                public void clear() {
                        this.serializationBuffer.clear();
+                       this.serializationBuffer.pruneBuffer();
+                       this.serializationReadBuffer.releaseArrays();
 
                        this.recordLength = -1;
                        this.lengthBuffer.clear();

http://git-wip-us.apache.org/repos/asf/flink/blob/bc69b0bb/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index 9d0256f..ab6fe75 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -100,7 +100,15 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                        copyToTargetBufferFrom(this.dataBuffer);
                }
 
-               return getSerializationResult();
+               SerializationResult result = getSerializationResult();
+               
+               // make sure we don't hold onto the large buffers for too long
+               if (result.isFullRecord()) {
+                       this.serializationBuffer.pruneBuffer();
+                       this.dataBuffer = 
this.serializationBuffer.wrapAsByteBuffer();
+               }
+               
+               return result;
        }
 
        /**

http://git-wip-us.apache.org/repos/asf/flink/blob/bc69b0bb/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
index 6952ce8..793a2e3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataInputDeserializer.java
@@ -79,6 +79,10 @@ public class DataInputDeserializer implements DataInputView {
                this.position = start;
                this.end = start + len;
        }
+       
+       public void releaseArrays() {
+               this.buffer = null;
+       }
 
        // 
----------------------------------------------------------------------------------------
        //                               Data Input

http://git-wip-us.apache.org/repos/asf/flink/blob/bc69b0bb/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
index adfec50..b7a3715 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java
@@ -16,7 +16,7 @@
  * limitations under the License.
  */
 
-package org.apache.flink.runtime.util;
+package org.apache.flink.runtime.io.network.serialization;
 
 import java.io.EOFException;
 import java.io.IOException;
@@ -27,28 +27,36 @@ import java.nio.ByteOrder;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.DataOutputView;
 import org.apache.flink.core.memory.MemoryUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * A simple and efficient serializer for the {@link java.io.DataOutput} 
interface.
  */
 public class DataOutputSerializer implements DataOutputView {
-
+       
+       private static final Logger LOG = 
LoggerFactory.getLogger(DataOutputSerializer.class);
+       
+       private static final int PRUNE_BUFFER_THRESHOLD = 5 * 1024 * 1024;
+       
+       private final byte[] startBuffer;
+       
        private byte[] buffer;
-
+       
        private int position;
 
        private ByteBuffer wrapper;
-
+       
        public DataOutputSerializer(int startSize) {
                if (startSize < 1) {
                        throw new IllegalArgumentException();
                }
 
-               this.buffer = new byte[startSize];
-               this.position = 0;
+               this.startBuffer = new byte[startSize];
+               this.buffer = this.startBuffer;
                this.wrapper = ByteBuffer.wrap(buffer);
        }
-
+       
        public ByteBuffer wrapAsByteBuffer() {
                this.wrapper.position(0);
                this.wrapper.limit(this.position);
@@ -66,6 +74,17 @@ public class DataOutputSerializer implements DataOutputView {
        public int length() {
                return this.position;
        }
+       
+       public void pruneBuffer() {
+               if (this.buffer.length > PRUNE_BUFFER_THRESHOLD) {
+                       if (LOG.isDebugEnabled()) {
+                               LOG.debug("Releasing serialization buffer of " 
+ this.buffer.length + " bytes.");
+                       }
+                       
+                       this.buffer = this.startBuffer;
+                       this.wrapper = ByteBuffer.wrap(this.buffer);
+               }
+       }
 
        @Override
        public String toString() {
@@ -75,7 +94,7 @@ public class DataOutputSerializer implements DataOutputView {
        // 
----------------------------------------------------------------------------------------
        //                               Data Output
        // 
----------------------------------------------------------------------------------------
-
+       
        @Override
        public void write(int b) throws IOException {
                if (this.position >= this.buffer.length) {
@@ -117,7 +136,7 @@ public class DataOutputSerializer implements DataOutputView 
{
                if (this.position >= this.buffer.length - sLen) {
                        resize(sLen);
                }
-
+               
                for (int i = 0; i < sLen; i++) {
                        writeByte(s.charAt(i));
                }
@@ -136,9 +155,9 @@ public class DataOutputSerializer implements DataOutputView 
{
        @Override
        public void writeChars(String s) throws IOException {
                final int sLen = s.length();
-               if (this.position >= this.buffer.length - 2 * sLen) {
-                       resize(2 * sLen);
-               }
+               if (this.position >= this.buffer.length - 2*sLen) {
+                       resize(2*sLen);
+               } 
                for (int i = 0; i < sLen; i++) {
                        writeChar(s.charAt(i));
                }
@@ -162,7 +181,7 @@ public class DataOutputSerializer implements DataOutputView 
{
                }
                if (LITTLE_ENDIAN) {
                        v = Integer.reverseBytes(v);
-               }
+               }                       
                UNSAFE.putInt(this.buffer, BASE_OFFSET + this.position, v);
                this.position += 4;
        }
@@ -200,11 +219,9 @@ public class DataOutputSerializer implements 
DataOutputView {
                        c = str.charAt(i);
                        if ((c >= 0x0001) && (c <= 0x007F)) {
                                utflen++;
-                       }
-                       else if (c > 0x07FF) {
+                       } else if (c > 0x07FF) {
                                utflen += 3;
-                       }
-                       else {
+                       } else {
                                utflen += 2;
                        }
                }
@@ -215,7 +232,7 @@ public class DataOutputSerializer implements DataOutputView 
{
                else if (this.position > this.buffer.length - utflen - 2) {
                        resize(utflen + 2);
                }
-
+               
                byte[] bytearr = this.buffer;
                int count = this.position;
 
@@ -236,13 +253,11 @@ public class DataOutputSerializer implements 
DataOutputView {
                        if ((c >= 0x0001) && (c <= 0x007F)) {
                                bytearr[count++] = (byte) c;
 
-                       }
-                       else if (c > 0x07FF) {
+                       } else if (c > 0x07FF) {
                                bytearr[count++] = (byte) (0xE0 | ((c >> 12) & 
0x0F));
                                bytearr[count++] = (byte) (0x80 | ((c >> 6) & 
0x3F));
                                bytearr[count++] = (byte) (0x80 | ((c >> 0) & 
0x3F));
-                       }
-                       else {
+                       } else {
                                bytearr[count++] = (byte) (0xC0 | ((c >> 6) & 
0x1F));
                                bytearr[count++] = (byte) (0x80 | ((c >> 0) & 
0x3F));
                        }
@@ -250,8 +265,8 @@ public class DataOutputSerializer implements DataOutputView 
{
 
                this.position = count;
        }
-
-
+       
+       
        private void resize(int minCapacityAdd) throws IOException {
                try {
                        final int newLen = Math.max(this.buffer.length * 2, 
this.buffer.length + minCapacityAdd);
@@ -264,18 +279,18 @@ public class DataOutputSerializer implements 
DataOutputView {
                        throw new IOException("Serialization failed because the 
record length would exceed 2GB (max addressable array size in Java).");
                }
        }
-
+       
        @SuppressWarnings("restriction")
        private static final sun.misc.Unsafe UNSAFE = MemoryUtils.UNSAFE;
-
+       
        @SuppressWarnings("restriction")
        private static final long BASE_OFFSET = 
UNSAFE.arrayBaseOffset(byte[].class);
-
+       
        private static final boolean LITTLE_ENDIAN = 
(MemoryUtils.NATIVE_BYTE_ORDER == ByteOrder.LITTLE_ENDIAN);
 
        @Override
        public void skipBytesToWrite(int numBytes) throws IOException {
-               if (buffer.length - this.position < numBytes) {
+               if(buffer.length - this.position < numBytes){
                        throw new EOFException("Could not skip " + numBytes + " 
bytes.");
                }
 
@@ -284,7 +299,7 @@ public class DataOutputSerializer implements DataOutputView 
{
 
        @Override
        public void write(DataInputView source, int numBytes) throws 
IOException {
-               if (buffer.length - this.position < numBytes) {
+               if(buffer.length - this.position < numBytes){
                        throw new EOFException("Could not write " + numBytes + 
" bytes. Buffer overflow.");
                }
 

http://git-wip-us.apache.org/repos/asf/flink/blob/bc69b0bb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
index 8ecbfce..03a093f 100644
--- 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/serialization/types/SerializationTestTypeFactory.java
@@ -20,6 +20,7 @@
 package org.apache.flink.runtime.io.network.api.serialization.types;
 
 public enum SerializationTestTypeFactory {
+       
        BOOLEAN(new BooleanType()),
        BYTE_ARRAY(new ByteArrayType()),
        BYTE_SUB_ARRAY(new ByteSubArrayType()),

http://git-wip-us.apache.org/repos/asf/flink/blob/bc69b0bb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
new file mode 100644
index 0000000..6000fee
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/LargeRecordsTest.java
@@ -0,0 +1,132 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.flink.runtime.io.network.serialization;
+
+import static org.junit.Assert.*;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
+import org.apache.flink.core.memory.MemorySegment;
+import org.apache.flink.runtime.io.network.Buffer;
+import org.apache.flink.runtime.io.network.serialization.types.IntType;
+import org.apache.flink.runtime.io.network.serialization.types.LargeObjectType;
+import 
org.apache.flink.runtime.io.network.serialization.types.SerializationTestType;
+import org.junit.Test;
+
+public class LargeRecordsTest {
+
+       @Test
+       public void testHandleMixedLargeRecords() {
+               try {
+                       final int NUM_RECORDS = 99;
+                       final int SEGMENT_SIZE = 32 * 1024;
+
+                       final RecordSerializer<SerializationTestType> 
serializer = new SpanningRecordSerializer<SerializationTestType>();
+                       final RecordDeserializer<SerializationTestType> 
deserializer = new AdaptiveSpanningRecordDeserializer<SerializationTestType>();
+
+                       final Buffer buffer = new Buffer(new MemorySegment(new 
byte[SEGMENT_SIZE]), SEGMENT_SIZE, null);
+
+                       List<SerializationTestType> originalRecords = new 
ArrayList<SerializationTestType>();
+                       List<SerializationTestType> deserializedRecords = new 
ArrayList<SerializationTestType>();
+                       
+                       LargeObjectType genLarge = new LargeObjectType();
+                       
+                       Random rnd = new Random();
+                       
+                       for (int i = 0; i < NUM_RECORDS; i++) {
+                               if (i % 2 == 0) {
+                                       originalRecords.add(new IntType(42));
+                                       deserializedRecords.add(new IntType());
+                               } else {
+                                       
originalRecords.add(genLarge.getRandom(rnd));
+                                       deserializedRecords.add(new 
LargeObjectType());
+                               }
+                       }
+
+                       // 
-------------------------------------------------------------------------------------------------------------
+
+                       serializer.setNextBuffer(buffer);
+                       
+                       int numRecordsDeserialized = 0;
+                       
+                       for (SerializationTestType record : originalRecords) {
+
+                               // serialize record
+                               if 
(serializer.addRecord(record).isFullBuffer()) {
+
+                                       // buffer is full => move to 
deserializer
+                                       
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 SEGMENT_SIZE);
+
+                                       // deserialize records, as many 
complete as there are
+                                       while (numRecordsDeserialized < 
deserializedRecords.size()) {
+                                               SerializationTestType next = 
deserializedRecords.get(numRecordsDeserialized);
+                                       
+                                               if 
(deserializer.getNextRecord(next).isFullRecord()) {
+                                                       
assertEquals(originalRecords.get(numRecordsDeserialized), next);
+                                                       
numRecordsDeserialized++;
+                                               } else {
+                                                       break;
+                                               }
+                                       }
+
+                                       // move buffers as long as necessary 
(for long records)
+                                       while 
(serializer.setNextBuffer(buffer).isFullBuffer()) {
+                                               
deserializer.setNextMemorySegment(serializer.getCurrentBuffer().getMemorySegment(),
 SEGMENT_SIZE);
+                                       }
+                                       
+                                       // deserialize records, as many as 
there are in the last buffer
+                                       while (numRecordsDeserialized < 
deserializedRecords.size()) {
+                                               SerializationTestType next = 
deserializedRecords.get(numRecordsDeserialized);
+                                       
+                                               if 
(deserializer.getNextRecord(next).isFullRecord()) {
+                                                       
assertEquals(originalRecords.get(numRecordsDeserialized), next);
+                                                       
numRecordsDeserialized++;
+                                               } else {
+                                                       break;
+                                               }
+                                       }
+                               }
+                       }
+                       
+                       // move the last (incomplete buffer)
+                       Buffer last = serializer.getCurrentBuffer();
+                       
deserializer.setNextMemorySegment(last.getMemorySegment(), last.size());
+                       serializer.clear();
+                       
+                       // deserialize records, as many as there are in the 
last buffer
+                       while (numRecordsDeserialized < 
deserializedRecords.size()) {
+                               SerializationTestType next = 
deserializedRecords.get(numRecordsDeserialized);
+                       
+                               
assertTrue(deserializer.getNextRecord(next).isFullRecord());
+                               
assertEquals(originalRecords.get(numRecordsDeserialized), next);
+                               numRecordsDeserialized++;
+                       }
+                       
+                       // might be that the last big records has not yet been 
fully moved, and a small one is missing
+                       assertFalse(serializer.hasData());
+                       assertFalse(deserializer.hasUnfinishedData());
+               }
+               catch (Exception e) {
+                       e.printStackTrace();
+                       fail(e.getMessage());
+               }
+       }
+}

http://git-wip-us.apache.org/repos/asf/flink/blob/bc69b0bb/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
----------------------------------------------------------------------
diff --git 
a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
new file mode 100644
index 0000000..01a00e4
--- /dev/null
+++ 
b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/serialization/types/LargeObjectType.java
@@ -0,0 +1,98 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+
+package org.apache.flink.runtime.io.network.serialization.types;
+
+import java.io.IOException;
+import java.util.Random;
+
+import org.apache.flink.core.memory.DataInputView;
+import org.apache.flink.core.memory.DataOutputView;
+
+public class LargeObjectType implements SerializationTestType {
+
+       private static final int MIN_LEN = 12 * 1000 * 1000;
+       
+       private static final int MAX_LEN = 40 * 1000 * 1000;
+
+       private int len;
+
+       public LargeObjectType() {
+               this.len = 0;
+       }
+
+       public LargeObjectType(int len) {
+               this.len = len;
+       }
+
+       @Override
+       public LargeObjectType getRandom(Random rnd) {
+               int len = rnd.nextInt(MAX_LEN - MIN_LEN) + MIN_LEN;
+               return new LargeObjectType(len);
+       }
+
+       @Override
+       public int length() {
+               return len + 4;
+       }
+
+       @Override
+       public void write(DataOutputView out) throws IOException {
+               out.writeInt(len);
+               for (int i = 0; i < len / 8; i++) {
+                       out.writeLong(i);
+               }
+               for (int i = 0; i < len % 8; i++) {
+                       out.write(i);
+               }
+       }
+
+       @Override
+       public void read(DataInputView in) throws IOException {
+               final int len = in.readInt();
+               this.len = len;
+               
+               for (int i = 0; i < len / 8; i++) {
+                       if (in.readLong() != i) {
+                               throw new IOException("corrupt serialization");
+                       }
+               }
+               
+               for (int i = 0; i < len % 8; i++) {
+                       if (in.readByte() != i) {
+                               throw new IOException("corrupt serialization");
+                       }
+               }
+       }
+
+       @Override
+       public int hashCode() {
+               return len;
+       }
+
+       @Override
+       public boolean equals(Object obj) {
+               return (obj instanceof LargeObjectType) && ((LargeObjectType) 
obj).len == this.len;
+       }
+       
+       @Override
+       public String toString() {
+               return "Large Object " + len;
+       }
+}

Reply via email to