pnowojski commented on a change in pull request #13614:
URL: https://github.com/apache/flink/pull/13614#discussion_r508270969



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -211,46 +223,83 @@ protected void releaseInternal() {
                }
        }
 
-       private BufferBuilder getSubpartitionBufferBuilder(int 
targetSubpartition) throws IOException {
-               final BufferBuilder bufferBuilder = 
subpartitionBufferBuilders[targetSubpartition];
-               if (bufferBuilder != null) {
-                       return bufferBuilder;
+       private BufferBuilder appendDataToSubpartitionForNewRecord(
+                       ByteBuffer record, int targetSubpartition) throws 
IOException {
+               BufferBuilder buffer = 
subpartitionBufferBuilders[targetSubpartition];
+
+               if (buffer == null) {
+                       buffer = 
requestNewSubpartitionBufferBuilder(targetSubpartition);
+                       
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(),
 0);
                }
 
-               return getNewSubpartitionBufferBuilder(targetSubpartition);
+               buffer.appendAndCommit(record);
+
+               return buffer;
        }
 
-       private BufferBuilder getNewSubpartitionBufferBuilder(int 
targetSubpartition) throws IOException {
-               checkInProduceState();
-               ensureUnicastMode();
+       private BufferBuilder appendDataToSubpartitionForRecordContinuation(
+                       final ByteBuffer remainingRecordBytes,
+                       final int targetSubpartition) throws IOException {
+               final BufferBuilder buffer = 
requestNewSubpartitionBufferBuilder(targetSubpartition);
+               // !! Be aware, in case of partialRecordBytes != 0, partial 
length and data has to `appendAndCommit` first
+               // before consumer is created. Otherwise it would be confused 
with the case the buffer starting
+               // with a complete record.
+               // !! The next two lines can not change order.
+               final int partialRecordBytes = 
buffer.appendAndCommit(remainingRecordBytes);
+               
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(),
 partialRecordBytes);
+
+               return buffer;
+       }
 
-               final BufferBuilder bufferBuilder = 
requestNewBufferBuilderFromPool(targetSubpartition);
-               
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
-               subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
-               return bufferBuilder;
+       private BufferBuilder appendDataToBroadcastForNewRecord(ByteBuffer 
record) throws IOException {
+               BufferBuilder buffer = broadcastBufferBuilder;
+
+               if (buffer == null) {
+                       buffer = requestNewBroadcastBufferBuilder();
+                       createBroadcastBufferConsumers(buffer, 0);
+               }
+
+               buffer.appendAndCommit(record);
+
+               return buffer;
        }
 
-       private BufferBuilder getBroadcastBufferBuilder() throws IOException {
-               if (broadcastBufferBuilder != null) {
-                       return broadcastBufferBuilder;
+       private BufferBuilder appendDataToBroadcastForRecordContinuation(
+                       final ByteBuffer remainingRecordBytes) throws 
IOException {
+               final BufferBuilder buffer = requestNewBroadcastBufferBuilder();
+               // !! Be aware, in case of partialRecordBytes != 0, partial 
length and data has to `appendAndCommit` first
+               // before consumer is created. Otherwise it would be confused 
with the case the buffer starting
+               // with a complete record.
+               // !! The next two lines can not change order.
+               final int partialRecordBytes = 
buffer.appendAndCommit(remainingRecordBytes);
+               createBroadcastBufferConsumers(buffer, partialRecordBytes);
+
+               return buffer;
+       }
+
+       private void createBroadcastBufferConsumers(BufferBuilder buffer, int 
partialRecordBytes) throws IOException {
+               try (final BufferConsumer consumer = 
buffer.createBufferConsumerFromBeginning()) {
+                       for (ResultSubpartition subpartition : subpartitions) {
+                               subpartition.add(consumer.copy(), 
partialRecordBytes);
+                       }
                }
+       }
 
-               return getNewBroadcastBufferBuilder();
+       private BufferBuilder requestNewSubpartitionBufferBuilder(int 
targetSubpartition) throws IOException {
+               checkInProduceState();
+               ensureUnicastMode();

Review comment:
       Please do so in the same PR :) as I described. New code that you are 
adding please add with already corrected name. Pre existing code that you are 
going to rename, please rename in a separate hotfix commit.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to