pnowojski commented on a change in pull request #11877:
URL: https://github.com/apache/flink/pull/11877#discussion_r666756241



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/ResultPartitionFactory.java
##########
@@ -156,15 +155,16 @@ public ResultPartition create(
                             bufferCompressor,
                             bufferPoolFactory);
 
-            BiFunction<Integer, PipelinedResultPartition, 
PipelinedSubpartition> factory;
-            if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
-                factory = PipelinedApproximateSubpartition::new;
-            } else {
-                factory = PipelinedSubpartition::new;
-            }
-
             for (int i = 0; i < subpartitions.length; i++) {
-                subpartitions[i] = factory.apply(i, pipelinedPartition);
+                if (type == ResultPartitionType.PIPELINED_APPROXIMATE) {
+                    subpartitions[i] =
+                            new PipelinedApproximateSubpartition(
+                                    i, networkBuffersPerChannel, 
pipelinedPartition);

Review comment:
       rename `networkBuffersPerChannel` -> 
`configuredNetworkBuffersPerChannel` to better reflect that we are actually 
overriding this value for the output?

##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/io/network/partition/PipelinedSubpartition.java
##########
@@ -112,8 +120,11 @@
 
     // ------------------------------------------------------------------------
 
-    PipelinedSubpartition(int index, ResultPartition parent) {
+    PipelinedSubpartition(int index, int buffersPerChannel, ResultPartition 
parent) {
         super(index, parent);
+
+        checkArgument(buffersPerChannel >= 0, "Buffers per channel must be 
non-negative.");
+        this.buffersPerChannel = buffersPerChannel;

Review comment:
       Can we rename this property to something like 
`receiverExclusiveBuffersPerChannel`?  Because actually this is not the number 
of `buffersPerChannel` for the sender.




-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to