Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5423#discussion_r168724735
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
---
@@ -111,93 +117,49 @@ public void randomEmit(T record) throws IOException,
InterruptedException {
private void sendToTarget(T record, int targetChannel) throws
IOException, InterruptedException {
RecordSerializer<T> serializer = serializers[targetChannel];
- synchronized (serializer) {
- SerializationResult result =
serializer.addRecord(record);
-
- while (result.isFullBuffer()) {
- Buffer buffer = serializer.getCurrentBuffer();
-
- if (buffer != null) {
- numBytesOut.inc(buffer.getSizeUnsafe());
- writeAndClearBuffer(buffer,
targetChannel, serializer);
-
- // If this was a full record, we are
done. Not breaking
- // out of the loop at this point will
lead to another
- // buffer request before breaking out
(that would not be
- // a problem per se, but it can lead to
stalls in the
- // pipeline).
- if (result.isFullRecord()) {
- break;
- }
- } else {
- BufferBuilder bufferBuilder =
-
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
- result =
serializer.setNextBufferBuilder(bufferBuilder);
+ SerializationResult result = serializer.addRecord(record);
+
+ while (result.isFullBuffer()) {
+ if (tryFinishCurrentBufferBuilder(targetChannel,
serializer)) {
+ // If this was a full record, we are done. Not
breaking
+ // out of the loop at this point will lead to
another
+ // buffer request before breaking out (that
would not be
+ // a problem per se, but it can lead to stalls
in the
+ // pipeline).
+ if (result.isFullRecord()) {
+ break;
}
}
+ BufferBuilder bufferBuilder =
requestNewBufferBuilder(targetChannel);
+
+ result = serializer.setNextBufferBuilder(bufferBuilder);
}
+ checkState(!serializer.hasSerializedData(), "All data should be
written at once");
}
- public void broadcastEvent(AbstractEvent event) throws IOException,
InterruptedException {
- final Buffer eventBuffer = EventSerializer.toBuffer(event);
- try {
+ public BufferConsumer broadcastEvent(AbstractEvent event) throws
IOException, InterruptedException {
--- End diff --
Good question, there could be a package private method that returns the
buffer, and the public method uses this method but does not return the buffer.
But questionable if that is really better, because we also would need to ensure
that the the public goes through the private method etc.
---