This is an automated email from the ASF dual-hosted git repository.

pnowojski pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/flink.git


The following commit(s) were added to refs/heads/master by this push:
     new 955a683  [FLINK-17842][network] Remove NextRecordResponse to improve 
deserialisation performance
955a683 is described below

commit 955a6832b9fc3f68c5accef3a22a0b4d45140d68
Author: Piotr Nowojski <piotr.nowoj...@gmail.com>
AuthorDate: Fri May 29 09:30:18 2020 +0200

    [FLINK-17842][network] Remove NextRecordResponse to improve deserialisation 
performance
    
    This removes NextRecordResponse and inlines 
NonSpanningWrapper#getNextRecord to
    fix second performance regression that happened on May 19th. This second 
regression
    is visible as a difference between results from Mat 19th (before regression)
    and May 22nd - 29th (period after fixing the first performance regression):
    
    
http://codespeed.dak8s.net:8000/timeline/?ben=networkBroadcastThroughput&env=2
    
    It was introduced by the following commit:
    [824100e1460dd2f78f689da72bc5d7b0c9dcbbde] [FLINK-17547][task][hotfix] 
Extract methods from RecordsDeserializer
    
    networkBroadcastThroughput benchmark results are restored from ~1000 to 
~1100 ops/ms on the Hetzner worker.
---
 .../api/serialization/NonSpanningWrapper.java      | 21 +++-----------
 ...SpillingAdaptiveSpanningRecordDeserializer.java | 33 +++++++---------------
 2 files changed, 14 insertions(+), 40 deletions(-)

diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
index 343c6f4..59deae3 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java
@@ -21,7 +21,7 @@ import org.apache.flink.core.io.IOReadableWritable;
 import org.apache.flink.core.memory.DataInputView;
 import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.core.memory.MemorySegmentFactory;
-import 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.NextRecordResponse;
+import 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler;
 import org.apache.flink.runtime.io.network.buffer.NetworkBuffer;
@@ -34,7 +34,6 @@ import java.nio.ByteBuffer;
 
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER;
-import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD;
 import static 
org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES;
 import static 
org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER;
 
@@ -55,8 +54,6 @@ final class NonSpanningWrapper implements DataInputView {
        private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding
        private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding
 
-       private final NextRecordResponse reusedNextRecordResponse = new 
NextRecordResponse(null, 0); // performance impact of immutable objects not 
benchmarked
-
        private int remaining() {
                return this.limit - this.position;
        }
@@ -333,16 +330,7 @@ final class NonSpanningWrapper implements DataInputView {
                clear();
        }
 
-       NextRecordResponse getNextRecord(IOReadableWritable target) throws 
IOException {
-               int recordLen = readInt();
-               if (canReadRecord(recordLen)) {
-                       return readInto(target);
-               } else {
-                       return reusedNextRecordResponse.updated(PARTIAL_RECORD, 
recordLen);
-               }
-       }
-
-       private NextRecordResponse readInto(IOReadableWritable target) throws 
IOException {
+       DeserializationResult readInto(IOReadableWritable target) throws 
IOException {
                try {
                        target.read(this);
                } catch (IndexOutOfBoundsException e) {
@@ -352,14 +340,14 @@ final class NonSpanningWrapper implements DataInputView {
                if (remaining < 0) {
                        throw new 
IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, new 
IndexOutOfBoundsException("Remaining = " + remaining));
                }
-               return reusedNextRecordResponse.updated(remaining == 0 ? 
LAST_RECORD_FROM_BUFFER : INTERMEDIATE_RECORD_FROM_BUFFER, remaining);
+               return remaining == 0 ? LAST_RECORD_FROM_BUFFER : 
INTERMEDIATE_RECORD_FROM_BUFFER;
        }
 
        boolean hasCompleteLength() {
                return remaining() >= LENGTH_BYTES;
        }
 
-       private boolean canReadRecord(int recordLength) {
+       boolean canReadRecord(int recordLength) {
                return recordLength <= remaining();
        }
 
@@ -368,5 +356,4 @@ final class NonSpanningWrapper implements DataInputView {
                        new NetworkBuffer(target, 
FreeingBufferRecycler.INSTANCE, DATA_BUFFER, target.size()),
                        Buffer::recycleBuffer);
        }
-
 }
diff --git 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
index 2d4c24c..7984403 100644
--- 
a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
+++ 
b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java
@@ -23,8 +23,6 @@ import org.apache.flink.core.memory.MemorySegment;
 import org.apache.flink.runtime.io.network.buffer.Buffer;
 import org.apache.flink.util.CloseableIterator;
 
-import javax.annotation.concurrent.NotThreadSafe;
-
 import java.io.IOException;
 
 import static 
org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER;
@@ -101,11 +99,17 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
        }
 
        private DeserializationResult readNonSpanningRecord(T target) throws 
IOException {
-               NextRecordResponse response = 
nonSpanningWrapper.getNextRecord(target);
-               if (response.result == PARTIAL_RECORD) {
-                       spanningWrapper.transferFrom(nonSpanningWrapper, 
response.bytesLeft);
+               // following three calls to nonSpanningWrapper from object 
oriented design would be better
+               // to encapsulate inside nonSpanningWrapper, but then 
nonSpanningWrapper.readInto equivalent
+               // would have to return a tuple of DeserializationResult and 
recordLen, which would affect
+               // performance too much
+               int recordLen = nonSpanningWrapper.readInt();
+               if (nonSpanningWrapper.canReadRecord(recordLen)) {
+                       return nonSpanningWrapper.readInto(target);
+               } else {
+                       spanningWrapper.transferFrom(nonSpanningWrapper, 
recordLen);
+                       return PARTIAL_RECORD;
                }
-               return response.result;
        }
 
        @Override
@@ -118,21 +122,4 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T 
extends IOReadableWrit
        public boolean hasUnfinishedData() {
                return this.nonSpanningWrapper.hasRemaining() || 
this.spanningWrapper.getNumGatheredBytes() > 0;
        }
-
-       @NotThreadSafe
-       static class NextRecordResponse {
-               DeserializationResult result;
-               int bytesLeft;
-
-               NextRecordResponse(DeserializationResult result, int bytesLeft) 
{
-                       this.result = result;
-                       this.bytesLeft = bytesLeft;
-               }
-
-               public NextRecordResponse updated(DeserializationResult result, 
int bytesLeft) {
-                       this.result = result;
-                       this.bytesLeft = bytesLeft;
-                       return this;
-               }
-       }
 }

Reply via email to