Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5423#discussion_r168724127
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException,
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws
IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
- synchronized (serializer) {
- SerializationResult result =
serializer.addRecord(record);
-
- while (result.isFullBuffer()) {
- Buffer buffer = serializer.getCurrentBuffer();
-
- if (buffer != null) {
- numBytesOut.inc(buffer.getSizeUnsafe());
- writeAndClearBuffer(buffer,
targetChannel, serializer);
-
- // If this was a full record, we are
done. Not breaking
- // out of the loop at this point will
lead to another
- // buffer request before breaking out
(that would not be
- // a problem per se, but it can lead to
stalls in the
- // pipeline).
- if (result.isFullRecord()) {
- break;
- }
- } else {
- BufferBuilder bufferBuilder =
-
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
- result =
serializer.setNextBufferBuilder(bufferBuilder);
+ SerializationResult result = serializer.addRecord(record);
+
+ while (result.isFullBuffer()) {
--- End diff --
ð Can introduce this change later after some more extensive tests.
---