I guess, I was worried about the same thing as @pnowojski ... the expanded 
method here will actually look like this:
```
                boolean pruneTriggered = false;
                BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
                SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
                while (result.isFullBuffer()) {
                        if (bufferBuilders[targetChannel].isPresent()) {
                                bufferBuilder = 
bufferBuilders[targetChannel].get();
                                bufferBuilders[targetChannel] = 
Optional.empty();
                                numBytesOut.inc(bufferBuilder.finish());
                                numBuffersOut.inc();
                        }
                        tryFinishCurrentBufferBuilder(targetChannel);

                        // If this was a full record, we are done. Not breaking
                        // out of the loop at this point will lead to another
                        // buffer request before breaking out (that would not be
                        // a problem per se, but it can lead to stalls in the 
pipeline).
                        if (result.isFullRecord()) {
                                pruneTriggered = true;
                                break;
                        }

                        checkState(!bufferBuilders[targetChannel].isPresent());
                        bufferBuilder = 
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
                        bufferBuilders[targetChannel] = 
Optional.of(bufferBuilder);
                        
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 
targetChannel);
                        result = serializer.copyToBufferBuilder(bufferBuilder);
                }
```

while it could be the following in the extreme

```
                boolean pruneTriggered = false;
                BufferBuilder bufferBuilder = getBufferBuilder(targetChannel);
                SerializationResult result = 
serializer.copyToBufferBuilder(bufferBuilder);
                while (result.isFullBuffer()) {
                        numBytesOut.inc(bufferBuilder.finish());
                        numBuffersOut.inc();

                        // If this was a full record, we are done. Not breaking
                        // out of the loop at this point will lead to another
                        // buffer request before breaking out (that would not be
                        // a problem per se, but it can lead to stalls in the 
pipeline).
                        if (result.isFullRecord()) {
                                bufferBuilder = null;
                                pruneTriggered = true;
                                break;
                        }

                        bufferBuilder = 
targetPartition.getBufferProvider().requestBufferBuilderBlocking();
                        
targetPartition.addBufferConsumer(bufferBuilder.createBufferConsumer(), 
targetChannel);
                        result = serializer.copyToBufferBuilder(bufferBuilder);
                }
                bufferBuilders[targetChannel] = 
Optional.ofNullable(bufferBuilder);
```

I'll leave it up to you whether this is actually worth it (benchmarks?) and 
feasible to abstract away a bit more nicely without adding too much complexity.

[ Full content available at: https://github.com/apache/flink/pull/6417 ]
This message was relayed via gitbox.apache.org for [email protected]

Reply via email to