curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004


   Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly 
explained why to bring a new ResultPartitionType, so I think here is a good 
place to discuss the reason. I will summarize the thoughts we've discussed 
offline, and some other considerations from my perspective after our 
discussion. cc @pnowojski 
   
   > I wanted to ask why we need a special ResultPartitionType for the 
approximate local recovery? Shouldn't it be conceptually possible that we 
support the normal and approximative recovery behaviour with the same pipelined 
partitions? 
   
   Conceptually speaking, yes, it is possible to unify normal and approximation 
pipelined partitions; Practically speaking, I am not 100% sure they can be. 
There are mainly two considerations leading me to introduce a different type.
   
   **1) The first and most important reason is isolating changes to avoid 
affecting the normal execution path.**
   Since I am not sure whether the two can be unified, so I go with the safe 
step first. This is also to identify differences between these two types (There 
might be more differences for downstream reconnection as well). There could be 
corner cases that I am not aware of until real implementation. 
   
   Even though conceptually yes, having **different implementation subclasses 
for different connection behavior** does seem reasonable. It simplifies the 
logic for different behavior. **So personally, I am leaning not to unifty 
them**. 
   
   But certainly, if it turns out to be cleaner and simpler to unify the two 
types, I have no objection to doing so. But from safety and easier-developing 
purpose, starting with a different type seems to be a better choice.
   
   **2) Differences between these two types;**
   For upstream-reconnection, there are mainly two differences: **read** and 
**release** upon these two types.
   - In normal pipeline mode,  for each subpartition, its view is created once, 
and released when downstream disconnects. View release will cause subpartition 
release, and eventually partition release.
   - In approximate mode, for each subpartition, a view can be created and 
released multiple times as long as one view is available at one instant for a 
subpartition.
     - for reading: upon reconnection, the reader should clean-up partial 
record caused by downstream failure (This could be easily unified)
     - for release: a partition is released only if the partition finishes 
consumption (all data read) or its producer failed. The partition should not be 
released when all its views are released because new views can be created. (a 
bit difficult based on the current setting, let's discuss in the lifecycle part 
later).
   
   > If we say that we can reconnect to every pipelined result partition 
(including dropping partially consumed results), then it can be the 
responsibility of the scheduler to make sure that producers are restarted as 
well in order to ensure exactly/at-least once processing guarantees. If not, 
then we would simply consume from where we have left off.
   
   This seems true for now, since we can achieve exactly/at least-once through 
RegionPipeline failover, and approximate through single task failover. But I am 
not sure in the future. Later, if we want to support single task failover with 
at least once/exactly once, where channel data may persist somewhere, I can not 
say for sure this is purely a scheduler decision. We may end up having high 
chances to introduce more connection types for single task failover to support 
at least once/exactly once.
   
   > As far as I understand the existing 
ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the 
result partition if the downstream consumer disconnects. I believe that this is 
not a strict contract of pipelined result partitions but more of an 
implementation artefact. Couldn't we solve the problem of disappearing 
pipelined result partitions by binding the lifecyle of a pipelined result 
partition to the lifecycle of a Task? We could say that a Task can only 
terminate once the pipelined result partition has been consumed. Moreover, a 
Task will clean up the result partition if it fails or gets canceled. That way, 
we have a clearly defined lifecycle and make sure that these results get 
cleaned up (iff the Task reaches a terminal state).
   
   I totally agree. 
   
   Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is 
“binding” to the consumer task, not very intuitive but reasonable. Because 
`PIPELINED(_BOUNDED)`  is consumed only once and as long as the downstream 
restarts, the upstream is restarting correspondingly.
   
   Is it reasonable to bind the partition to the producer? Yes, I think it is 
following the best intuition as long as we make the task terminate after its 
produced result partition is consumed. I think this can also simplify the logic 
that needs to be applied on partitions through task resources but cannot due to 
the task has already terminated.
   
   On second thought last night, why can't we put all the lifecycle management 
of result partitions into JobMaster, no matter is a blocking family or 
pipelined family. Is there a problem to do so?  cc @tillrohrmann 
   
   
   
   


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to