pnowojski commented on a change in pull request #13614:
URL: https://github.com/apache/flink/pull/13614#discussion_r508262147
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/BufferWritingResultPartition.java
##########
@@ -211,46 +223,83 @@ protected void releaseInternal() {
}
}
- private BufferBuilder getSubpartitionBufferBuilder(int
targetSubpartition) throws IOException {
- final BufferBuilder bufferBuilder =
subpartitionBufferBuilders[targetSubpartition];
- if (bufferBuilder != null) {
- return bufferBuilder;
+ private BufferBuilder appendDataToSubpartitionForNewRecord(
+ ByteBuffer record, int targetSubpartition) throws
IOException {
+ BufferBuilder buffer =
subpartitionBufferBuilders[targetSubpartition];
+
+ if (buffer == null) {
+ buffer =
requestNewSubpartitionBufferBuilder(targetSubpartition);
+
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(),
0);
}
- return getNewSubpartitionBufferBuilder(targetSubpartition);
+ buffer.appendAndCommit(record);
+
+ return buffer;
}
- private BufferBuilder getNewSubpartitionBufferBuilder(int
targetSubpartition) throws IOException {
- checkInProduceState();
- ensureUnicastMode();
+ private BufferBuilder appendDataToSubpartitionForRecordContinuation(
+ final ByteBuffer remainingRecordBytes,
+ final int targetSubpartition) throws IOException {
+ final BufferBuilder buffer =
requestNewSubpartitionBufferBuilder(targetSubpartition);
+ // !! Be aware, in case of partialRecordBytes != 0, partial
length and data has to `appendAndCommit` first
+ // before consumer is created. Otherwise it would be confused
with the case the buffer starting
+ // with a complete record.
+ // !! The next two lines can not change order.
+ final int partialRecordBytes =
buffer.appendAndCommit(remainingRecordBytes);
+
subpartitions[targetSubpartition].add(buffer.createBufferConsumerFromBeginning(),
partialRecordBytes);
+
+ return buffer;
+ }
- final BufferBuilder bufferBuilder =
requestNewBufferBuilderFromPool(targetSubpartition);
-
subpartitions[targetSubpartition].add(bufferBuilder.createBufferConsumer());
- subpartitionBufferBuilders[targetSubpartition] = bufferBuilder;
- return bufferBuilder;
+ private BufferBuilder appendDataToBroadcastForNewRecord(ByteBuffer
record) throws IOException {
+ BufferBuilder buffer = broadcastBufferBuilder;
+
+ if (buffer == null) {
+ buffer = requestNewBroadcastBufferBuilder();
+ createBroadcastBufferConsumers(buffer, 0);
+ }
+
+ buffer.appendAndCommit(record);
+
+ return buffer;
}
- private BufferBuilder getBroadcastBufferBuilder() throws IOException {
- if (broadcastBufferBuilder != null) {
- return broadcastBufferBuilder;
+ private BufferBuilder appendDataToBroadcastForRecordContinuation(
+ final ByteBuffer remainingRecordBytes) throws
IOException {
+ final BufferBuilder buffer = requestNewBroadcastBufferBuilder();
+ // !! Be aware, in case of partialRecordBytes != 0, partial
length and data has to `appendAndCommit` first
+ // before consumer is created. Otherwise it would be confused
with the case the buffer starting
+ // with a complete record.
+ // !! The next two lines can not change order.
+ final int partialRecordBytes =
buffer.appendAndCommit(remainingRecordBytes);
+ createBroadcastBufferConsumers(buffer, partialRecordBytes);
+
+ return buffer;
+ }
+
+ private void createBroadcastBufferConsumers(BufferBuilder buffer, int
partialRecordBytes) throws IOException {
+ try (final BufferConsumer consumer =
buffer.createBufferConsumerFromBeginning()) {
+ for (ResultSubpartition subpartition : subpartitions) {
+ subpartition.add(consumer.copy(),
partialRecordBytes);
+ }
}
+ }
- return getNewBroadcastBufferBuilder();
+ private BufferBuilder requestNewSubpartitionBufferBuilder(int
targetSubpartition) throws IOException {
+ checkInProduceState();
+ ensureUnicastMode();
Review comment:
I think the `unicast` is more accurate and better prefix/infix compared
to `subpartition`, so I would vote to rename all of them to the `unicast`.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]