azagrebin commented on a change in pull request #8310: [FLINK-12331][network]
Introduce partition/gate setup to decouple task registration with
NetworkEnvironment
URL: https://github.com/apache/flink/pull/8310#discussion_r279426797
##########
File path:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
##########
@@ -290,27 +304,38 @@ public String getOwningTaskName() {
// Setup/Life-cycle
//
------------------------------------------------------------------------
- public void setBufferPool(BufferPool bufferPool) {
+ @Override
+ public void setup() throws IOException {
checkState(this.bufferPool == null, "Bug in input gate setup
logic: buffer pool has" +
"already been set for this input gate.");
- this.bufferPool = checkNotNull(bufferPool);
+ int maxNumberOfMemorySegments;
+ try {
+ if (isCreditBased) {
+ // assign exclusive buffers to input channels
directly and use the rest for floating buffers
+ assignExclusiveSegments();
+
+ maxNumberOfMemorySegments =
consumedPartitionType.isBounded() ? floatingNetworkBuffersPerGate :
Integer.MAX_VALUE;
+ bufferPool =
networkBufferPool.createBufferPool(0, maxNumberOfMemorySegments);
+ } else {
+ maxNumberOfMemorySegments =
consumedPartitionType.isBounded() ?
+ numberOfInputChannels *
networkBuffersPerChannel + floatingNetworkBuffersPerGate : Integer.MAX_VALUE;
+ bufferPool =
networkBufferPool.createBufferPool(numberOfInputChannels,
maxNumberOfMemorySegments);
+ }
+ } catch (Throwable t) {
+ if (bufferPool != null) {
+ bufferPool.lazyDestroy();
+ }
+
+ ExceptionUtils.rethrowIOException(t);
+ }
}
/**
* Assign the exclusive buffers to all remote input channels directly
for credit-based mode.
- *
- * @param networkBufferPool The global pool to request and recycle
exclusive buffers
- * @param networkBuffersPerChannel The number of exclusive buffers for
each channel
*/
- public void assignExclusiveSegments(NetworkBufferPool
networkBufferPool, int networkBuffersPerChannel) throws IOException {
- checkState(this.isCreditBased, "Bug in input gate setup logic:
exclusive buffers only exist with credit-based flow control.");
- checkState(this.networkBufferPool == null, "Bug in input gate
setup logic: global buffer pool has" +
Review comment:
Maybe, we could keep this check `networkBufferPool == null` here where it is
really relevant and allow nulls in the constructor? I would also keep the
existing methods used in tests as they are now to avoid so many changes in
tests at the moment because it looks like another refactoring which could be
done at least in another commit.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services