pnowojski commented on a change in pull request #13642:
URL: https://github.com/apache/flink/pull/13642#discussion_r505449067



##########
File path: 
flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/channel/RecoveredChannelStateHandler.java
##########
@@ -101,7 +101,10 @@ private RecoveredInputChannel getChannel(InputChannelInfo 
info) {
        public void recover(ResultSubpartitionInfo subpartitionInfo, 
Tuple2<BufferBuilder, BufferConsumer> bufferBuilderAndConsumer) {
                bufferBuilderAndConsumer.f0.finish();
                if (bufferBuilderAndConsumer.f1.isDataAvailable()) {
-                       
getSubpartition(subpartitionInfo).addBufferConsumer(bufferBuilderAndConsumer.f1);
+                       boolean added = 
getSubpartition(subpartitionInfo).add(bufferBuilderAndConsumer.f1);
+                       if (!added) {
+                               throw new RuntimeException("Buffer consumer 
couldn't be added to ResultSubpartition");

Review comment:
       I suggested `IOException` because that's what was in the signature of 
the parent/caller method.
   
   Maybe alternatively `CancelTaskException`, but I'm not entirely sure if 
that's always the case (what if there was some error in this task, that caused 
the partition to be released?)
   
   What adds to my confusion is that this return value is ignored in rest of 
the code base? 🤷‍♂️ 
   
   I think for now it would be the best to go with `IOException`.




----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to