Github user StefanRRichter commented on a diff in the pull request:
https://github.com/apache/flink/pull/5423#discussion_r167528361
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
---
@@ -209,22 +171,38 @@ public void setMetricGroup(TaskIOMetricGroup metrics)
{
}
/**
- * Writes the buffer to the {@link ResultPartitionWriter} and removes
the
- * buffer from the serializer state.
+ * Marks the current {@link BufferBuilder} as finished and clears the
state for next one.
*
- * <p><b>Needs to be synchronized on the serializer!</b>
+ * @return true if some data were written
*/
- private void writeAndClearBuffer(
- Buffer buffer,
+ private boolean tryFinishCurrentBufferBuilder(
int targetChannel,
RecordSerializer<T> serializer) throws IOException {
- try {
- targetPartition.writeBuffer(buffer, targetChannel);
- }
- finally {
- serializer.clearCurrentBuffer();
+ if (!bufferBuilders[targetChannel].isPresent()) {
+ return false;
}
+ BufferBuilder bufferBuilder =
bufferBuilders[targetChannel].get();
+ bufferBuilders[targetChannel] = Optional.empty();
+
+ numBytesOut.inc(bufferBuilder.getWrittenBytes());
+ bufferBuilder.finish();
--- End diff --
You could combine this into `numBytesOut.inc(bufferBuilder.finish())` or
maybe `finish()` should not need to have a return value?
---