[
https://issues.apache.org/jira/browse/FLINK-7378?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141727#comment-16141727
]
ASF GitHub Bot commented on FLINK-7378:
---------------------------------------
Github user NicoK commented on the issue:
https://github.com/apache/flink/pull/4485
Reviewed 9 of 13 files at r2.
Review status: all files reviewed at latest revision, 7 unresolved
discussions, some commit checks failed.
---
*[a
discussion](https://reviewable.io:443/reviews/apache/flink/4485#-KsOs0jqeqTsAUTwWuFa:-KsOs0jqeqTsAUTwWuFb:b-kg45p7)
(no related file):*
Depending on how you build on this in the other PRs, what do you think
about using a fixed-size `LocalBufferPool` (or a customized sub-class) per
`RemoteInputChannel` instead? This would solve potential issues with recycling
and would also be a lot less code. Additionally, you will gain the buffer
availability listener feature so that you will be notified when the buffer is
released (which may be deep inside other code with no access to the
`RemoteInputChannel` anymore.
FYI: This change of commits in the PR actually would qualify for a separate
PR
---
*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java,
line 216 at
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOlN5wNvmcco4z2tGj:-KsOlN5xcf-lW0z1FpJo:b-ppkkjd)
([raw
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/NetworkEnvironment.java#L216)):*
> ```Java
> if
(gate.getConsumedPartitionType().isCreditBased()) {
> // Create a fix size buffer
pool for floating buffers and assign exclusive buffers to input channels
directly
> bufferPool =
networkBufferPool.createBufferPool(extraNetworkBuffersPerGate,
extraNetworkBuffersPerGate);
> ```
we still need to call `gate.setBufferPool(bufferPool)` in order for the
gate to be aware (this call is common to both paths of the `if`)
---
*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java,
line 164 at
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOrPb1_aKuUAa_uyC6:-KsOrPb1_aKuUAa_uyC7:b3045fp)
([raw
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/buffer/NetworkBufferPool.java#L164)):*
> ```Java
> }
>
> redistributeBuffers();
> ```
now here, you may need to add the try-catch releasing any already added
segments back (see my comments in `SingleInputGate`
---
*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java,
line 38 at
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOhXwT2FF306uRbf5l:-KsOhXwU2y_hAFTh2tTM:b-mb3jxr)
([raw
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L38)):*
> ```Java
> * no checkpoint barriers.
> */
> PIPELINED_BOUNDED(true, true, true, false);
> ```
Does it make sense, to already add an `PIPELINE_CREDIT_BASED(true, true,
true, true)`? I guess, credit-based can be considered bounded as well
---
*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java,
line 82 at
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOi2-hM5IFzcjrGzin:-KsOi2-iXW_-MtjTxono:b-3woyzq)
([raw
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionType.java#L82)):*
> ```Java
> return isBounded;
> }
> ```
please add a (simple) javadoc similar to the `isBounded()`method
---
*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java,
line 315 at
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOoBbKr_kVVDxNIZhm:-KsOoBbKr_kVVDxNIZhn:b85dio4)
([raw
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L315)):*
> ```Java
> return segments;
> } catch (Throwable t) {
> if (segments != null && segments.size() > 0) {
> ```
Unfortunately, the cleanup will not work as documented - if
`networkBufferPool.requestMemorySegments(networkBuffersPerChannel);` throws an
exception, `segments` will be `null`. In order to handle all cases, e.g.
successfully requested some and afterwards an exception was thrown, you need to
handle this inside `NetworkBufferPool#requestMemorySegments()`.
I guess, after changing this, this method will not be required anymore
---
*[flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java,
line 319 at
r2](https://reviewable.io:443/reviews/apache/flink/4485#-KsOowKk76qDgaVZ6KQK:-KsOowKk76qDgaVZ6KQL:b-n13dga)
([raw
file](https://github.com/apache/flink/blob/b6b8e433d7564c202049a6b21184831dd273510c/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java#L319)):*
> ```Java
> }
>
> if (t instanceof IOException) {
> ```
please use `ExceptionUtils#rethrowIOException` for this pattern
---
*Comments from
[Reviewable](https://reviewable.io:443/reviews/apache/flink/4485)*
<!-- Sent from Reviewable.io -->
> Create a fix size (non rebalancing) buffer pool type for the floating buffers
> -----------------------------------------------------------------------------
>
> Key: FLINK-7378
> URL: https://issues.apache.org/jira/browse/FLINK-7378
> Project: Flink
> Issue Type: Sub-task
> Components: Core
> Reporter: zhijiang
> Assignee: zhijiang
> Fix For: 1.4.0
>
>
> Currently the number of network buffers in {{LocalBufferPool}} for
> {{SingleInputGate}} is limited by {{a * <number of channels> + b}}, where a
> is the number of exclusive buffers for each channel and b is the number of
> floating buffers shared by all channels.
> Considering the credit-based flow control feature, we want to create a fix
> size buffer pool used to manage the floating buffers for {{SingleInputGate}}.
> And the exclusive buffers are assigned to {{InputChannel}}s directly.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)