pnowojski commented on a change in pull request #11567:
URL: https://github.com/apache/flink/pull/11567#discussion_r412999442
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartition.java
##########
@@ -147,6 +151,9 @@ public void setup() throws IOException {
checkArgument(bufferPool.getNumberOfRequiredMemorySegments() >=
getNumberOfSubpartitions(),
"Bug in result partition setup logic: Buffer pool has
not enough guaranteed buffers for this result partition.");
+ // initialize subpartitions and maxBuffersPerChannel
+ bufferPool.setNumSubpartitions(subpartitions.length);
+ bufferPool.setMaxBuffersPerChannel(maxBuffersPerChannel);
Review comment:
Instead of setting it dynamically, with potential race conditions
issues, I think this could be set in the `LocalBufferPool` constructor. For
example take a look at `ResultPartitionFactory#createBufferPoolFactory`,
`numberOfSubpartitions` is already accessible there.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/writer/RecordWriter.java
##########
@@ -290,11 +290,11 @@ protected void addBufferConsumer(BufferConsumer consumer,
int targetChannel) thr
}
@VisibleForTesting
- public BufferBuilder getBufferBuilder() throws IOException,
InterruptedException {
- BufferBuilder builder = targetPartition.tryGetBufferBuilder();
+ public BufferBuilder getNewBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
Review comment:
Hmmm, this is getting a bit messy. We have three different methods
`getBufferBuilder()`/`getNewBufferBuilder(targetChannel)`,
`getBufferBuilder(targetChannel)` and `requestNewBufferBuilder(targetChannel)`.
(It's not your fault, I've missed that in a previous code review which
introduced this `getBufferBuilder()` method)
I'm not sure how to name those things, but maybe this method should be named
`requestNewBufferBuilder(targetChannel)` (it would no longer be an abstract
method), and this method should overridden in the two sub classes in the
following way:
```
@Override
public BufferBuilder
ChannelSelectorRecordWriter#requestNewBufferBuilder(int targetChannel) throws
IOException, InterruptedException {
checkState(bufferBuilders[targetChannel] == null ||
bufferBuilders[targetChannel].isFinished());
BufferBuilder bufferBuilder =
super.requestNewBufferBuilder(targetChannel);
addBufferConsumer(bufferBuilder.createBufferConsumer(),
targetChannel);
bufferBuilders[targetChannel] = bufferBuilder;
return bufferBuilder;
}
```
and
```
@Override
public BufferBuilder BroadcastRecordWriter#requestNewBufferBuilder(int
targetChannel) throws IOException, InterruptedException {
checkState(bufferBuilder == null || bufferBuilder.isFinished());
BufferBuilder builder =
super.requestNewBufferBuilder(targetChannel);
if (randomTriggered) {
addBufferConsumer(builder.createBufferConsumer(),
targetChannel);
} else {
try (BufferConsumer bufferConsumer =
builder.createBufferConsumer()) {
for (int channel = 0; channel <
numberOfChannels; channel++) {
addBufferConsumer(bufferConsumer.copy(), channel);
}
}
}
bufferBuilder = builder;
return builder;
}
```
?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -87,6 +87,14 @@
*/
private int numberOfRequestedMemorySegments;
+ private int maxBuffersPerChannel;
+
+ private int[] subpartitionBuffersCount;
+
+ private BufferRecycler[] subpartitionBufferRecyclers;
Review comment:
Can those fields be final and initialised in the constructor?
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -463,4 +516,29 @@ private void returnExcessMemorySegments() {
returnMemorySegment(segment);
}
}
+
+ @Nullable
+ @Override
+ public BufferRecycler[] getSubpartitionBufferRecyclers() {
+ return subpartitionBufferRecyclers;
+ }
+
+ /**
+ *
+ */
+ class SubpartitionBufferRecycler implements BufferRecycler {
+
+ private int channel;
+ private LocalBufferPool bufferPool;
Review comment:
As this class is not static, you don't need `bufferPool` field. So you
can either drop this field, or make the `SubpartitionBufferRecycler` class
static.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/BufferProvider.java
##########
@@ -42,15 +42,15 @@
*
* <p>Returns <code>null</code> if no buffer is available or the buffer
provider has been destroyed.
*/
- BufferBuilder requestBufferBuilder() throws IOException;
+ BufferBuilder requestBufferBuilder(int targetChannel) throws
IOException;
Review comment:
I guess that if instead we would overload those methods, and keep both
versions: with and without `targetChannel` parameter, that would reduce the
amount of changes in the code AND it would prevent from `UNKNOWN_CHANNEL`
leaking to the API of `BufferPool` class - `UNKNOWN_CHANNEL` could then be a
private implementation detail of the `LocalBufferPool` class.
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPoolTest.java
##########
@@ -406,14 +407,40 @@ public void testBoundedBuffer() throws Exception {
assertEquals(1,
localBufferPool.getNumberOfAvailableMemorySegments());
}
+ @Test
+ public void testMaxBuffersPerChannelAndAvailability() throws
IOException, InterruptedException {
Review comment:
I would either add another more complicated test case, or expand this
one to cover more scenarios (two blocked channels + at least one channel that
exceeded limit by more than one buffer):
```
// same setup, but with 3 subpartitions
requestBufferBuilderBlocking(0)
requestBufferBuilderBlocking(1)
assertTrue(...isDone())
requestBufferBuilderBlocking(0)
assertFalse(...isDone())
requestBufferBuilderBlocking(0)
requestBufferBuilderBlocking(2)
requestBufferBuilderBlocking(2)
assertFalse(...isDone())
recycle(1)
assertFalse(...isDone())
recycle(2)
assertFalse(...isDone())
recycle(0)
assertFalse(...isDone())
recycle(0)
assertTrue(...isDone())
(...)
```
?
##########
File path:
flink-runtime/src/test/java/org/apache/flink/runtime/io/network/partition/ResultPartitionTest.java
##########
@@ -360,6 +361,32 @@ private void testReleaseMemory(final ResultPartitionType
resultPartitionType) th
}
}
+ /**
+ * Tests {@link ResultPartition#getAvailableFuture()}.
+ */
+ @Test
+ public void testIsAvailableOrNot() throws IOException,
InterruptedException {
Review comment:
Is this test adding a test coverage for something new? Or a missing
coverage for `getAvailableFuture()`? If only the latter, we could go away
without this test, as the logic is almost entirely covered by
`LocalBufferPoolTest`, but I'm also fine with adding some basic coverage here.
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/LocalBufferPool.java
##########
@@ -463,4 +516,29 @@ private void returnExcessMemorySegments() {
returnMemorySegment(segment);
}
}
+
+ @Nullable
+ @Override
+ public BufferRecycler[] getSubpartitionBufferRecyclers() {
+ return subpartitionBufferRecyclers;
+ }
+
+ /**
+ *
+ */
Review comment:
nit: remove the comment?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]