I guess, I was worried about the same thing as @pnowojski ... the expanded
method here will actually look like this:
```
boolean pruneTriggered = false;
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
SerializationResult result =
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
if (bufferBuilders[targetChannel].isPresent()) {
bufferBuilder =
bufferBuilders[targetChannel].get();
bufferBuilders[targetChannel] =
Optional.empty();
numBytesOut.inc(bufferBuilder.finish());
numBuffersOut.inc();
}
tryFinishCurrentBufferBuilder(targetChannel);
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
pipeline).
if (result.isFullRecord()) {
pruneTriggered = true;
break;
}
checkState(!bufferBuilders[targetChannel].isPresent());
bufferBuilder =
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
bufferBuilders[targetChannel] =
Optional.of(bufferBuilder);
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(),
targetChannel);
result = serializer.copyToBufferBuilder(bufferBuilder);
}
```
while it could be the following in the extreme
```
boolean pruneTriggered = false;
BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
SerializationResult result =
serializer.copyToBufferBuilder(bufferBuilder);
while (result.isFullBuffer()) {
numBytesOut.inc(bufferBuilder.finish());
numBuffersOut.inc();
// If this was a full record, we are done. Not breaking
// out of the loop at this point will lead to another
// buffer request before breaking out (that would not be
// a problem per se, but it can lead to stalls in the
pipeline).
if (result.isFullRecord()) {
bufferBuilder = null;
pruneTriggered = true;
break;
}
bufferBuilder =
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(),
targetChannel);
result = serializer.copyToBufferBuilder(bufferBuilder);
}
bufferBuilders[targetChannel] =
Optional.ofNullable(bufferBuilder);
```
I'll leave it up to you whether this is actually worth it (benchmarks?) and
feasible to abstract away a bit more nicely without adding too much complexity.
[ Full content available at: https://github.com/apache/flink/pull/6417 ]
This message was relayed via gitbox.apache.org for [email protected]