curcur commented on a change in pull request #13614:
URL: https://github.com/apache/flink/pull/13614#discussion_r505183102
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -211,46 +225,62 @@ protected void releaseInternal() {
}
}
- private BufferBuilder getSubpartitionBufferBuilder(int
targetSubpartition) throws IOException {
- final BufferBuilder bufferBuilder =
subpartitionBufferBuilders[targetSubpartition];
- if (bufferBuilder != null) {
- return bufferBuilder;
- }
+ private BufferBuilder
getNewEmptySubpartitionBufferBuilderForNewRecord(int targetSubpartition) throws
IOException {
+ final BufferBuilder bufferBuilder =
requestNewSubpartitionBufferBuilder(targetSubpartition);
+
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(),
0);
- return getNewSubpartitionBufferBuilder(targetSubpartition);
+ return bufferBuilder;
}
- private BufferBuilder getNewSubpartitionBufferBuilder(int
targetSubpartition) throws IOException {
+ private BufferBuilder
getNewEmptySubpartitionBufferBuilderForRecordContinuation(
+ final ByteBuffer remainingRecordBytes,
+ final int targetSubpartition) throws IOException {
+ final BufferBuilder bufferBuilder =
requestNewSubpartitionBufferBuilder(targetSubpartition);
+ final int partialRecordBytes =
bufferBuilder.appendAndCommit(remainingRecordBytes);
+
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumerFromBeginning(),
partialRecordBytes);
+
+ return bufferBuilder;
+ }
+
+ private BufferBuilder requestNewSubpartitionBufferBuilder(int
targetSubpartition) throws IOException {
checkInProduceState();
ensureUnicastMode();
-
final BufferBuilder bufferBuilder =
requestNewBufferBuilderFromPool(targetSubpartition);
-
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
+
return bufferBuilder;
}
- private BufferBuilder getBroadcastBufferBuilder() throws IOException {
- if (broadcastBufferBuilder != null) {
- return broadcastBufferBuilder;
+ private BufferBuilder getNewEmptyBroadcastBufferBuilderForNewRecord()
throws IOException {
+ final BufferBuilder bufferBuilder =
requestNewBroadcastBufferBuilder();
+ try (final BufferConsumer consumer =
bufferBuilder.createBufferConsumerFromBeginning()) {
+ for (ResultSubpartition subpartition : subpartitions) {
+ subpartition.add(consumer.copy(), 0);
+ }
}
- return getNewBroadcastBufferBuilder();
+ return bufferBuilder;
}
- private BufferBuilder getNewBroadcastBufferBuilder() throws IOException
{
+ private BufferBuilder
getNewEmptyBroadcastBufferBuilderForRecordContinuation(
+ final ByteBuffer remainingRecordBytes) throws
IOException {
+ final BufferBuilder bufferBuilder =
requestNewBroadcastBufferBuilder();
+ final int partialRecordBytes =
bufferBuilder.appendAndCommit(remainingRecordBytes);
+ try (final BufferConsumer consumer =
bufferBuilder.createBufferConsumerFromBeginning()) {
+ for (ResultSubpartition subpartition : subpartitions) {
+ subpartition.add(consumer.copy(),
partialRecordBytes);
Review comment:
I think that's roughly the same (from the complexity point of view).
The only difference is easier to differentiate when remainingRecordLength ==
buffer.size(), but I think that's not that much big difference.
I agree the original code is a bit duplicated. If that's the concern, I've
rewritten the code (literally just rewrite), to see whether it looks much
better now.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]