curcur edited a comment on pull request #13648:
URL: https://github.com/apache/flink/pull/13648#issuecomment-718354004
Hey @tillrohrmann , thanks for bringing this up. We've nowhere explicitly
explained why to bring a new ResultPartitionType, so I think here is a good
place to discuss the reason. I will summarize the thoughts we've discussed
offline, and some other considerations from my perspective after our
discussion. cc @pnowojski
> I wanted to ask why we need a special ResultPartitionType for the
approximate local recovery? Shouldn't it be conceptually possible that we
support the normal and approximative recovery behaviour with the same pipelined
partitions?
Conceptually speaking, yes, it is possible to unify normal and approximation
pipelined partitions; Practically speaking, I am not 100% sure they can be.
There are mainly two considerations leading me to introduce a different type.
**1) The first and most important reason is isolating changes to avoid
affecting the normal execution path.**
Since I am not sure whether the two can be unified, so I go with the safe
step first. This is also to identify differences between these two types (There
might be more differences for downstream reconnection as well). There could be
corner cases that I am not aware of until real implementation.
Even though conceptually yes, having **different implementation subclasses
for different connection behavior** does seem reasonable. It simplifies the
logic for different behavior. **So personally, I am leaning not to unifty
them**.
But certainly, if it turns out to be cleaner and simpler to unify the two
types, I have no objection to doing so. But from safety and easier-developing
purpose, starting with a different type seems to be a better choice.
**2) Differences between these two types;**
For upstream-reconnection, there are mainly two differences: **read** and
**release** upon these two types.
- In normal pipeline mode, for each subpartition, its view is created once,
and released when downstream disconnects. View release will cause subpartition
release, and eventually partition release.
- In approximate mode, for each subpartition, a view can be created and
released multiple times as long as one view is available at one instant for a
subpartition.
- for reading: upon reconnection, the reader should clean-up partial
record caused by downstream failure (This could be easily unified)
- for release: a partition is released only if the partition finishes
consumption (all data read) or its producer failed. The partition should not be
released when all its views are released because new views can be created. (a
bit difficult based on the current setting, let's discuss in the lifecycle part
later).
> If we say that we can reconnect to every pipelined result partition
(including dropping partially consumed results), then it can be the
responsibility of the scheduler to make sure that producers are restarted as
well in order to ensure exactly/at-least once processing guarantees. If not,
then we would simply consume from where we have left off.
This seems true for now, since we can achieve exactly/at least-once through
RegionPipeline failover, and approximate through single task failover. But I am
not sure in the future. Later, if we want to support single task failover with
at least once/exactly once, where channel data may persist somewhere, I can not
say for sure this is purely a scheduler decision. We may end up having high
chances to introduce more connection types for single task failover to support
at least once/exactly once.
> As far as I understand the existing
ResultPartitionType.PIPELINED(_BOUNDED) cannot be used because we release the
result partition if the downstream consumer disconnects. I believe that this is
not a strict contract of pipelined result partitions but more of an
implementation artefact. Couldn't we solve the problem of disappearing
pipelined result partitions by binding the lifecyle of a pipelined result
partition to the lifecycle of a Task? We could say that a Task can only
terminate once the pipelined result partition has been consumed. Moreover, a
Task will clean up the result partition if it fails or gets canceled. That way,
we have a clearly defined lifecycle and make sure that these results get
cleaned up (iff the Task reaches a terminal state).
I totally agree.
Right now, the life cycle of `ResultPartitionType.PIPELINED(_BOUNDED)` is
“binding” to the consumer task, not very intuitive but reasonable. Because
`PIPELINED(_BOUNDED)` is consumed only once and as long as the downstream
restarts, the upstream is restarting correspondingly.
Is it reasonable to bind the partition to the producer? Yes, I think it is
following the best intuition as long as we make the task terminate after its
produced result partition is consumed. I think this can also simplify the logic
that needs to be applied on partitions through task resources but cannot due to
the task has already terminated.
On second thought last night, why can't we put all the lifecycle management
of result partitions into JobMaster, no matter is a blocking family or
pipelined family. Is there a problem to do so? cc @tillrohrmann
In short:
**The current behavior of PIPELINED is** ->
- release partition as soon as consumer exits
- release partition as soon as producer fails/canceled
**Current behavior of PIPELINED_APPOXIMATE** ->
- do nothing when consumer exits
- release partition as soon as producer fails/canceled
- release partition when the job exists
**I think what Till prefer is to unify Pipelined Family to**
**Producer release partition when producer exits.**
**And my question is whether we can unify Blocking + Pieliened Family to**
- Producer release partition when producer fails/canceled
- Release partition when the job exists
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]