[
https://issues.apache.org/jira/browse/FLINK-21996?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17309697#comment-17309697
]
Stephan Ewen commented on FLINK-21996:
--------------------------------------
[~trohrmann] What do you think of the proposed solution?
I have played around with the code a bit, the implementation is pretty simple.
The harder part is setting up tests for this.
> Transient RPC failure without TaskManager failure can lead to split
> assignment loss
> -----------------------------------------------------------------------------------
>
> Key: FLINK-21996
> URL: https://issues.apache.org/jira/browse/FLINK-21996
> Project: Flink
> Issue Type: Bug
> Components: Runtime / Checkpointing
> Affects Versions: 1.12.2
> Reporter: Stephan Ewen
> Assignee: Stephan Ewen
> Priority: Blocker
> Fix For: 1.13.0
>
>
> NOTE: This bug has not been actually observed. It is based on reviews of the
> current implementation.
> I would expect it to be a pretty rare case, bu at scale, even the rare cases
> happen often enough.
> h2. Problem
> Intermediate RPC messages from JM to TM can get dropped, even when the TM is
> not marked as failed.
> That can happen when the connection can be recovered before the heartbeat
> times out.
> So RPCs generally retry, or handle failures: For example Deploy-Task-RPC
> retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers
> a new checkpoint.
> The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a
> Future with the acknowledgement. But if that one fails, we are in the
> situation where we do not know whether the event sending was successful or
> not (only the ack failed).
> This is especially tricky for split assignments and checkpoints. Consider
> this sequence of actions:
> 1. Coordinator assigns a split. Ack not yet received.
> 2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so
> is not included on the Coordinator.
> 3. Split assignment RPC response is "failed".
> 4. Checkpoint completes.
> Now we don't know whether the split was in the checkpoint on the Operator
> (TaskManager) or not, and with that we don't know whether we should add it
> back to the coordinator. We need to do something to make sure the split is
> now either on the coordinator or on the Operator. Currently, the split is
> implicitly assumed to be on the Operator; if it isn't, then that split is
> lost.
> Not, it is worth pointing out that this is a pretty rare situation, because
> it means that the RPC with the split assignment fails and the one for the
> checkpoint succeeds, even though they are in close proximity. The way the
> Akka-based RPC transport works (with retries, etc.), this can happen, but
> isn't very likely. That why we haven't so far seen this bug in practice or
> haven't gotten a report for it, yet.
> h2. Proposed solution
> The solution has two components:
> 1. Fallback to consistent point: If the system doesn't know whether two
> parts are still consistent with each other (here coordinator and Operator),
> fall back to a consistent point. Here that is the case when the Ack-Future
> for the "Send Operator Event" RPC fails or times out. Then we call the
> scheduler to trigger a failover of the target operator to latest checkpoint
> and signaling the coordinator the same. That restores consistency. We can
> later optimize this (see below).
> 2. We cannot trigger checkpoints while we are "in limbo" concerning our
> knowledge about splits. Concretely that means that the Coordinator can only
> acknowledge the checkpoint once the Acks for pending Operator Event RPCs
> (Assign-Splits) have arrived. The checkpoint future is conditional on all
> pending RPC futures. If the RPC futures fail (or time out) then the
> checkpoint cannot complete (and the target operator will anyways go through a
> failover). In the common case, RPC round trip time is milliseconds, which
> would be added to the checkpoint latency if the checkpoint happends to
> overlap with a split assignment (most won't).
> h2. Possible Future Improvements
> Step (1) above can be optimized by going with retries first and sequence
> numbers to deduplicate the calls. That can help reduce the number of cases
> were a failover is needed. However, the number of situations where the RPC
> would need a retry and has a chance of succeeding (the TM is not down) should
> be very few to begin with, so whether this optimization is worth it remains
> to be seen.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)