This is an automated email from the ASF dual-hosted git repository.

sewen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git

commit 35e57a8460c7f03010972f587bb24052ea694cce
Author: kevin.cyj <kevin....@alibaba-inc.com>
AuthorDate: Thu Sep 19 11:20:56 2019 +0800

    [FLINK-11859][runtime] Small improvement to performance of 
SpanningRecordSerializer
    
    This removes the length buffer of SpanningRecordSerializer and serializes 
the record length to
    the data buffer directly.
    
    This closes #9710
---
 .../serialization/SpanningRecordSerializer.java    | 23 ++++++++--------------
 1 file changed, 8 insertions(+), 15 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
index f066679..d6da1ad 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java
@@ -24,7 +24,6 @@ import 
org.apache.flink.runtime.io.network.buffer.BufferBuilder;
 
 import java.io.IOException;
 import java.nio.ByteBuffer;
-import java.nio.ByteOrder;
 
 /**
  * Record serializer which serializes the complete record to an intermediate
@@ -44,18 +43,11 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        /** Intermediate buffer for data serialization (wrapped from {@link 
#serializationBuffer}). */
        private ByteBuffer dataBuffer;
 
-       /** Intermediate buffer for length serialization. */
-       private final ByteBuffer lengthBuffer;
-
        public SpanningRecordSerializer() {
                serializationBuffer = new DataOutputSerializer(128);
 
-               lengthBuffer = ByteBuffer.allocate(4);
-               lengthBuffer.order(ByteOrder.BIG_ENDIAN);
-
                // ensure initial state with hasRemaining false (for correct 
continueWritingWithNextBufferBuilder logic)
                dataBuffer = serializationBuffer.wrapAsByteBuffer();
-               lengthBuffer.position(4);
        }
 
        /**
@@ -72,13 +64,16 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
                }
 
                serializationBuffer.clear();
-               lengthBuffer.clear();
+               // the initial capacity of the serialization buffer should be 
no less than 4
+               serializationBuffer.skipBytesToWrite(4);
 
                // write data and length
                record.write(serializationBuffer);
 
-               int len = serializationBuffer.length();
-               lengthBuffer.putInt(0, len);
+               int len = serializationBuffer.length() - 4;
+               serializationBuffer.setPosition(0);
+               serializationBuffer.writeInt(len);
+               serializationBuffer.skipBytesToWrite(len);
 
                dataBuffer = serializationBuffer.wrapAsByteBuffer();
        }
@@ -92,7 +87,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
         */
        @Override
        public SerializationResult copyToBufferBuilder(BufferBuilder 
targetBuffer) {
-               targetBuffer.append(lengthBuffer);
                targetBuffer.append(dataBuffer);
                targetBuffer.commit();
 
@@ -100,7 +94,7 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        }
 
        private SerializationResult getSerializationResult(BufferBuilder 
targetBuffer) {
-               if (dataBuffer.hasRemaining() || lengthBuffer.hasRemaining()) {
+               if (dataBuffer.hasRemaining()) {
                        return 
SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL;
                }
                return !targetBuffer.isFull()
@@ -111,7 +105,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
        @Override
        public void reset() {
                dataBuffer.position(0);
-               lengthBuffer.position(0);
        }
 
        @Override
@@ -122,6 +115,6 @@ public class SpanningRecordSerializer<T extends 
IOReadableWritable> implements R
 
        @Override
        public boolean hasSerializedData() {
-               return lengthBuffer.hasRemaining() || dataBuffer.hasRemaining();
+               return dataBuffer.hasRemaining();
        }
 }

Reply via email to