Stephan Ewen created FLINK-21996:
------------------------------------
Summary: Transient RPC failure without TaskManager failure can
lead to split assignment loss
Key: FLINK-21996
URL: https://issues.apache.org/jira/browse/FLINK-21996
Project: Flink
Issue Type: Bug
Components: Runtime / Checkpointing
Affects Versions: 1.12.2
Reporter: Stephan Ewen
Assignee: Stephan Ewen
Fix For: 1.13.0
NOTE: This bug has not been actually observed. It is based on reviews of the
current implementation.
I would expect it to be a pretty rare case, bu at scale, even the rare cases
happen often enough.
h2. Problem
Intermediate RPC messages from JM to TM can get dropped, even when the TM is
not marked as failed.
That can happen when the connection can be recovered before the heartbeat times
out.
So RPCs generally retry, or handle failures: For example Deploy-Task-RPC
retries, Trigger-Checkpoint RPC aborts the checkpoint on failure and triggers a
new checkpoint.
The "Send OperatorEvent" RPC call (from Coordinator to Operator) gives you a
Future with the acknowledgement. But if that one fails, we are in the situation
where we do not know whether the event sending was successful or not (only the
ack failed).
This is especially tricky for split assignments and checkpoints. Consider this
sequence of actions:
1. Coordinator assigns a split. Ack not yet received.
2. Coordinator takes a checkpoint. Split was sent before the checkpoint, so
is not included on the Coordinator.
3. Split assignment RPC response is "failed".
4. Checkpoint completes.
Now we don't know whether the split was in the checkpoint on the Operator
(TaskManager) or not, and with that we don't know whether we should add it back
to the coordinator. We need to do something to make sure the split is now
either on the coordinator or on the Operator. Currently, the split is
implicitly assumed to be on the Operator; if it isn't, then that split is lost.
Not, it is worth pointing out that this is a pretty rare situation, because it
means that the RPC with the split assignment fails and the one for the
checkpoint succeeds, even though they are in close proximity. The way the
Akka-based RPC transport works (with retries, etc.), this can happen, but isn't
very likely. That why we haven't so far seen this bug in practice or haven't
gotten a report for it, yet.
h2. Proposed solution
The solution has two components:
1. Fallback to consistent point: If the system doesn't know whether two parts
are still consistent with each other (here coordinator and Operator), fall back
to a consistent point. Here that is the case when the Ack-Future for the "Send
Operator Event" RPC fails or times out. Then we call the scheduler to trigger a
failover of the target operator to latest checkpoint and signaling the
coordinator the same. That restores consistency. We can later optimize this
(see below).
2. We cannot trigger checkpoints while we are "in limbo" concerning our
knowledge about splits. Concretely that means that the Coordinator can only
acknowledge the checkpoint once the Acks for pending Operator Event RPCs
(Assign-Splits) have arrived. The checkpoint future is conditional on all
pending RPC futures. If the RPC futures fail (or time out) then the checkpoint
cannot complete (and the target operator will anyways go through a failover).
In the common case, RPC round trip time is milliseconds, which would be added
to the checkpoint latency if the checkpoint happends to overlap with a split
assignment (most won't).
h2. Possible Future Improvements
Step (1) above can be optimized by going with retries first and sequence
numbers to deduplicate the calls. That can help reduce the number of cases were
a failover is needed. However, the number of situations where the RPC would
need a retry and has a chance of succeeding (the TM is not down) should be very
few to begin with, so whether this optimization is worth it remains to be seen.
--
This message was sent by Atlassian Jira
(v8.3.4#803005)