This is an automated email from the ASF dual-hosted git repository. sewen pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
commit 35e57a8460c7f03010972f587bb24052ea694cce Author: kevin.cyj <kevin....@alibaba-inc.com> AuthorDate: Thu Sep 19 11:20:56 2019 +0800 [FLINK-11859][runtime] Small improvement to performance of SpanningRecordSerializer This removes the length buffer of SpanningRecordSerializer and serializes the record length to the data buffer directly. This closes #9710 --- .../serialization/SpanningRecordSerializer.java | 23 ++++++++-------------- 1 file changed, 8 insertions(+), 15 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java index f066679..d6da1ad 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpanningRecordSerializer.java @@ -24,7 +24,6 @@ import org.apache.flink.runtime.io.network.buffer.BufferBuilder; import java.io.IOException; import java.nio.ByteBuffer; -import java.nio.ByteOrder; /** * Record serializer which serializes the complete record to an intermediate @@ -44,18 +43,11 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R /** Intermediate buffer for data serialization (wrapped from {@link #serializationBuffer}). */ private ByteBuffer dataBuffer; - /** Intermediate buffer for length serialization. */ - private final ByteBuffer lengthBuffer; - public SpanningRecordSerializer() { serializationBuffer = new DataOutputSerializer(128); - lengthBuffer = ByteBuffer.allocate(4); - lengthBuffer.order(ByteOrder.BIG_ENDIAN); - // ensure initial state with hasRemaining false (for correct continueWritingWithNextBufferBuilder logic) dataBuffer = serializationBuffer.wrapAsByteBuffer(); - lengthBuffer.position(4); } /** @@ -72,13 +64,16 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R } serializationBuffer.clear(); - lengthBuffer.clear(); + // the initial capacity of the serialization buffer should be no less than 4 + serializationBuffer.skipBytesToWrite(4); // write data and length record.write(serializationBuffer); - int len = serializationBuffer.length(); - lengthBuffer.putInt(0, len); + int len = serializationBuffer.length() - 4; + serializationBuffer.setPosition(0); + serializationBuffer.writeInt(len); + serializationBuffer.skipBytesToWrite(len); dataBuffer = serializationBuffer.wrapAsByteBuffer(); } @@ -92,7 +87,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R */ @Override public SerializationResult copyToBufferBuilder(BufferBuilder targetBuffer) { - targetBuffer.append(lengthBuffer); targetBuffer.append(dataBuffer); targetBuffer.commit(); @@ -100,7 +94,7 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R } private SerializationResult getSerializationResult(BufferBuilder targetBuffer) { - if (dataBuffer.hasRemaining() || lengthBuffer.hasRemaining()) { + if (dataBuffer.hasRemaining()) { return SerializationResult.PARTIAL_RECORD_MEMORY_SEGMENT_FULL; } return !targetBuffer.isFull() @@ -111,7 +105,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R @Override public void reset() { dataBuffer.position(0); - lengthBuffer.position(0); } @Override @@ -122,6 +115,6 @@ public class SpanningRecordSerializer<T extends IOReadableWritable> implements R @Override public boolean hasSerializedData() { - return lengthBuffer.hasRemaining() || dataBuffer.hasRemaining(); + return dataBuffer.hasRemaining(); } }