zhijiangW commented on a change in pull request #13209:
URL: https://github.com/apache/flink/pull/13209#discussion_r484164187



##########
File path: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/graph/StreamEdge.java
##########
@@ -136,6 +172,14 @@ public void setPartitioner(StreamPartitioner<?> 
partitioner) {
                this.outputPartitioner = partitioner;
        }
 
+       public void setBufferTimeout(long bufferTimeout) {
+               this.bufferTimeout = bufferTimeout;

Review comment:
       Move `checkAndResetBufferTimeout` into `StreamEdge` seems not possible, 
because we can not only rely on internal `StreamEdge#outputPartitioner` to 
generate `ResultPartitionType` which also relies on `StreamGraph`, see 
`StreamingJobGraphGenerator#determineResultPartitionType`. Also it seems not 
good to pass the generate `ResultPartitionType` as argument into 
`StreamEdge#setBufferTimeout`.
   
   The initial value of `StreamEdge#bufferTimeout` is actually `-1` from the 
constructor in core codes if it is not set explicitly via 
`StreamExecutionEnvironment`/`Transformation`, and we will set this value 
properly based on the partition type during graph generation. I can add the 
check logic for the passed argument `bufferTimeout >= -1`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to