Github user StefanRRichter commented on a diff in the pull request: https://github.com/apache/flink/pull/5423#discussion_r167588455 --- Diff: flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java --- @@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException, InterruptedException { private void sendToTarget(T record, int targetChannel) throws IOException, InterruptedException { RecordSerializer<T> serializer = serializers[targetChannel]; - synchronized (serializer) { - SerializationResult result = serializer.addRecord(record); - - while (result.isFullBuffer()) { - Buffer buffer = serializer.getCurrentBuffer(); - - if (buffer != null) { - numBytesOut.inc(buffer.getSizeUnsafe()); - writeAndClearBuffer(buffer, targetChannel, serializer); - - // If this was a full record, we are done. Not breaking - // out of the loop at this point will lead to another - // buffer request before breaking out (that would not be - // a problem per se, but it can lead to stalls in the - // pipeline). - if (result.isFullRecord()) { - break; - } - } else { - BufferBuilder bufferBuilder = - targetPartition.getBufferProvider().requestBufferBuilderBlocking(); - result = serializer.setNextBufferBuilder(bufferBuilder); + SerializationResult result = serializer.addRecord(record); + + while (result.isFullBuffer()) { + if (tryFinishCurrentBufferBuilder(targetChannel, serializer)) { + // If this was a full record, we are done. Not breaking + // out of the loop at this point will lead to another + // buffer request before breaking out (that would not be + // a problem per se, but it can lead to stalls in the + // pipeline). + if (result.isFullRecord()) { + break; } } + BufferBuilder bufferBuilder = requestNewBufferBuilder(targetChannel); + + result = serializer.setNextBufferBuilder(bufferBuilder); } + checkState(!serializer.hasSerializedData(), "All data should be written at once"); } - public void broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { - final Buffer eventBuffer = EventSerializer.toBuffer(event); - try { + public BufferConsumer broadcastEvent(AbstractEvent event) throws IOException, InterruptedException { --- End diff -- I think this method does not truly require a return value. The return value is only used in one test, and I found it confusing that it is first closed and then returned.
---