pnowojski commented on a change in pull request #13614:
URL: https://github.com/apache/flink/pull/13614#discussion_r508262147



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -211,46 +223,83 @@ protected void releaseInternal() {
                }
        }
 
-       private BufferBuilder getSubpartitionBufferBuilder(int 
targetSubpartition) throws IOException {
-               final BufferBuilder bufferBuilder = 
subpartitionBufferBuilders[targetSubpartition];
-               if (bufferBuilder != null) {
-                       return bufferBuilder;
+       private BufferBuilder appendDataToSubpartitionForNewRecord(
+                       ByteBuffer record, int targetSubpartition) throws 
IOException {
+               BufferBuilder buffer = 
subpartitionBufferBuilders[targetSubpartition];
+
+               if (buffer == null) {
+                       buffer = 
requestNewSubpartitionBufferBuilder(targetSubpartition);
+                       
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(),
 0);
                }
 
-               return getNewSubpartitionBufferBuilder(targetSubpartition);
+               buffer.appendAndCommit(record);
+
+               return buffer;
        }
 
-       private BufferBuilder getNewSubpartitionBufferBuilder(int 
targetSubpartition) throws IOException {
-               checkInProduceState();
-               ensureUnicastMode();
+       private BufferBuilder appendDataToSubpartitionForRecordContinuation(
+                       final ByteBuffer remainingRecordBytes,
+                       final int targetSubpartition) throws IOException {
+               final BufferBuilder buffer = 
requestNewSubpartitionBufferBuilder(targetSubpartition);
+               // !! Be aware, in case of partialRecordBytes != 0, partial 
length and data has to `appendAndCommit` first
+               // before consumer is created. Otherwise it would be confused 
with the case the buffer starting
+               // with a complete record.
+               // !! The next two lines can not change order.
+               final int partialRecordBytes = 
buffer.appendAndCommit(remainingRecordBytes);
+               
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(),
 partialRecordBytes);
+
+               return buffer;
+       }
 
-               final BufferBuilder bufferBuilder = 
requestNewBufferBuilderFromPool(targetSubpartition);
-               
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
-               subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
-               return bufferBuilder;
+       private BufferBuilder appendDataToBroadcastForNewRecord(ByteBuffer 
record) throws IOException {
+               BufferBuilder buffer = broadcastBufferBuilder;
+
+               if (buffer == null) {
+                       buffer = requestNewBroadcastBufferBuilder();
+                       createBroadcastBufferConsumers(buffer, 0);
+               }
+
+               buffer.appendAndCommit(record);
+
+               return buffer;
        }
 
-       private BufferBuilder getBroadcastBufferBuilder() throws IOException {
-               if (broadcastBufferBuilder != null) {
-                       return broadcastBufferBuilder;
+       private BufferBuilder appendDataToBroadcastForRecordContinuation(
+                       final ByteBuffer remainingRecordBytes) throws 
IOException {
+               final BufferBuilder buffer = requestNewBroadcastBufferBuilder();
+               // !! Be aware, in case of partialRecordBytes != 0, partial 
length and data has to `appendAndCommit` first
+               // before consumer is created. Otherwise it would be confused 
with the case the buffer starting
+               // with a complete record.
+               // !! The next two lines can not change order.
+               final int partialRecordBytes = 
buffer.appendAndCommit(remainingRecordBytes);
+               createBroadcastBufferConsumers(buffer, partialRecordBytes);
+
+               return buffer;
+       }
+
+       private void createBroadcastBufferConsumers(BufferBuilder buffer, int 
partialRecordBytes) throws IOException {
+               try (final BufferConsumer consumer = 
buffer.createBufferConsumerFromBeginning()) {
+                       for (ResultSubpartition subpartition : subpartitions) {
+                               subpartition.add(consumer.copy(), 
partialRecordBytes);
+                       }
                }
+       }
 
-               return getNewBroadcastBufferBuilder();
+       private BufferBuilder requestNewSubpartitionBufferBuilder(int 
targetSubpartition) throws IOException {
+               checkInProduceState();
+               ensureUnicastMode();

Review comment:
       I think the `unicast` is more accurate and better prefix/infix compared 
to `subpartition`. `unicast` is after all exactly describing this:
   > unicast is a one-to-one transmission from one point in the network to 
another poin
   
   While `subpartition` is hmmm, what does it actually mean? Without a context 
you would have no idea what this name represents. And `unicast` is from the 
same domain as already used `broadcast` name, so I would vote to rename all 
`subpartition` usages in this context to `unicast`. 
   
   Maybe in your commit, when you are adding some methods use `unicast`, and 
add a follow up hotfix commit to rename the remaining things like the one you 
mentioned:
   ```
   private final BufferBuilder[] subpartitionBufferBuilders;
   ```
   




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to