This is an automated email from the ASF dual-hosted git repository. pnowojski pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/flink.git
The following commit(s) were added to refs/heads/master by this push: new 955a683 [FLINK-17842][network] Remove NextRecordResponse to improve deserialisation performance 955a683 is described below commit 955a6832b9fc3f68c5accef3a22a0b4d45140d68 Author: Piotr Nowojski <piotr.nowoj...@gmail.com> AuthorDate: Fri May 29 09:30:18 2020 +0200 [FLINK-17842][network] Remove NextRecordResponse to improve deserialisation performance This removes NextRecordResponse and inlines NonSpanningWrapper#getNextRecord to fix second performance regression that happened on May 19th. This second regression is visible as a difference between results from Mat 19th (before regression) and May 22nd - 29th (period after fixing the first performance regression): http://codespeed.dak8s.net:8000/timeline/?ben=networkBroadcastThroughput&env=2 It was introduced by the following commit: [824100e1460dd2f78f689da72bc5d7b0c9dcbbde] [FLINK-17547][task][hotfix] Extract methods from RecordsDeserializer networkBroadcastThroughput benchmark results are restored from ~1000 to ~1100 ops/ms on the Hetzner worker. --- .../api/serialization/NonSpanningWrapper.java | 21 +++----------- ...SpillingAdaptiveSpanningRecordDeserializer.java | 33 +++++++--------------- 2 files changed, 14 insertions(+), 40 deletions(-) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java index 343c6f4..59deae3 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/NonSpanningWrapper.java @@ -21,7 +21,7 @@ import org.apache.flink.core.io.IOReadableWritable; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.core.memory.MemorySegmentFactory; -import org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.NextRecordResponse; +import org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.runtime.io.network.buffer.FreeingBufferRecycler; import org.apache.flink.runtime.io.network.buffer.NetworkBuffer; @@ -34,7 +34,6 @@ import java.nio.ByteBuffer; import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.LAST_RECORD_FROM_BUFFER; -import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.PARTIAL_RECORD; import static org.apache.flink.runtime.io.network.api.serialization.SpillingAdaptiveSpanningRecordDeserializer.LENGTH_BYTES; import static org.apache.flink.runtime.io.network.buffer.Buffer.DataType.DATA_BUFFER; @@ -55,8 +54,6 @@ final class NonSpanningWrapper implements DataInputView { private byte[] utfByteBuffer; // reusable byte buffer for utf-8 decoding private char[] utfCharBuffer; // reusable char buffer for utf-8 decoding - private final NextRecordResponse reusedNextRecordResponse = new NextRecordResponse(null, 0); // performance impact of immutable objects not benchmarked - private int remaining() { return this.limit - this.position; } @@ -333,16 +330,7 @@ final class NonSpanningWrapper implements DataInputView { clear(); } - NextRecordResponse getNextRecord(IOReadableWritable target) throws IOException { - int recordLen = readInt(); - if (canReadRecord(recordLen)) { - return readInto(target); - } else { - return reusedNextRecordResponse.updated(PARTIAL_RECORD, recordLen); - } - } - - private NextRecordResponse readInto(IOReadableWritable target) throws IOException { + DeserializationResult readInto(IOReadableWritable target) throws IOException { try { target.read(this); } catch (IndexOutOfBoundsException e) { @@ -352,14 +340,14 @@ final class NonSpanningWrapper implements DataInputView { if (remaining < 0) { throw new IOException(BROKEN_SERIALIZATION_ERROR_MESSAGE, new IndexOutOfBoundsException("Remaining = " + remaining)); } - return reusedNextRecordResponse.updated(remaining == 0 ? LAST_RECORD_FROM_BUFFER : INTERMEDIATE_RECORD_FROM_BUFFER, remaining); + return remaining == 0 ? LAST_RECORD_FROM_BUFFER : INTERMEDIATE_RECORD_FROM_BUFFER; } boolean hasCompleteLength() { return remaining() >= LENGTH_BYTES; } - private boolean canReadRecord(int recordLength) { + boolean canReadRecord(int recordLength) { return recordLength <= remaining(); } @@ -368,5 +356,4 @@ final class NonSpanningWrapper implements DataInputView { new NetworkBuffer(target, FreeingBufferRecycler.INSTANCE, DATA_BUFFER, target.size()), Buffer::recycleBuffer); } - } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java index 2d4c24c..7984403 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/serialization/SpillingAdaptiveSpanningRecordDeserializer.java @@ -23,8 +23,6 @@ import org.apache.flink.core.memory.MemorySegment; import org.apache.flink.runtime.io.network.buffer.Buffer; import org.apache.flink.util.CloseableIterator; -import javax.annotation.concurrent.NotThreadSafe; - import java.io.IOException; import static org.apache.flink.runtime.io.network.api.serialization.RecordDeserializer.DeserializationResult.INTERMEDIATE_RECORD_FROM_BUFFER; @@ -101,11 +99,17 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit } private DeserializationResult readNonSpanningRecord(T target) throws IOException { - NextRecordResponse response = nonSpanningWrapper.getNextRecord(target); - if (response.result == PARTIAL_RECORD) { - spanningWrapper.transferFrom(nonSpanningWrapper, response.bytesLeft); + // following three calls to nonSpanningWrapper from object oriented design would be better + // to encapsulate inside nonSpanningWrapper, but then nonSpanningWrapper.readInto equivalent + // would have to return a tuple of DeserializationResult and recordLen, which would affect + // performance too much + int recordLen = nonSpanningWrapper.readInt(); + if (nonSpanningWrapper.canReadRecord(recordLen)) { + return nonSpanningWrapper.readInto(target); + } else { + spanningWrapper.transferFrom(nonSpanningWrapper, recordLen); + return PARTIAL_RECORD; } - return response.result; } @Override @@ -118,21 +122,4 @@ public class SpillingAdaptiveSpanningRecordDeserializer<T extends IOReadableWrit public boolean hasUnfinishedData() { return this.nonSpanningWrapper.hasRemaining() || this.spanningWrapper.getNumGatheredBytes() > 0; } - - @NotThreadSafe - static class NextRecordResponse { - DeserializationResult result; - int bytesLeft; - - NextRecordResponse(DeserializationResult result, int bytesLeft) { - this.result = result; - this.bytesLeft = bytesLeft; - } - - public NextRecordResponse updated(DeserializationResult result, int bytesLeft) { - this.result = result; - this.bytesLeft = bytesLeft; - return this; - } - } }