Github user NicoK commented on a diff in the pull request:
https://github.com/apache/flink/pull/4485#discussion_r135481583
--- Diff:
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/consumer/SingleInputGate.java
---
@@ -259,17 +267,72 @@ public int getNumberOfQueuedBuffers() {
public void setBufferPool(BufferPool bufferPool) {
// Sanity checks
- checkArgument(numberOfInputChannels ==
bufferPool.getNumberOfRequiredMemorySegments(),
+ if (!getConsumedPartitionType().isCreditBased()) {
+ checkArgument(numberOfInputChannels ==
bufferPool.getNumberOfRequiredMemorySegments(),
"Bug in input gate setup logic: buffer pool has
not enough guaranteed buffers " +
- "for this input gate. Input
gates require at least as many buffers as " +
+ "for this input gate. Input gates
require at least as many buffers as " +
"there are input channels.");
+ }
checkState(this.bufferPool == null, "Bug in input gate setup
logic: buffer pool has" +
- "already been set for this input gate.");
+ "already been set for this input gate.");
this.bufferPool = checkNotNull(bufferPool);
}
+ /**
+ * Assign the exclusive buffers to all remote input channels directly
for credit-based mode.
+ *
+ * @param networkBufferPool The global pool to request and recycle
exclusive buffers
+ * @param networkBuffersPerChannel The number of exclusive buffers for
each channel
+ */
+ public void assignExclusiveSegments(NetworkBufferPool
networkBufferPool, int networkBuffersPerChannel) throws IOException {
+ this.networkBufferPool = checkNotNull(networkBufferPool);
--- End diff --
please guard against using this method multiple times (like in
`setBufferPool`) as a sanity check
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---