StephanEwen opened a new pull request #15605:
URL: https://github.com/apache/flink/pull/15605


   ## This is based on #15557 and #15601
   
   Only the last commit, named *"[FLINK-21996][coordination] Ensure 
exactly-once guarantees for OperatorEvent RPCs"* is relevant.
   It is a fairly small one, compared to the previous PR.
   
   ## What is the purpose of the change
   
   This is the third and final PR on the way to fixing [FLINK-21996]. The 
overall steps of the fix are:
   
     1. Add a test and did preliminary refactoring (#15557)
     2. Fix [FLINK-18071] to make sure target tasks for OperatorEvents are 
well-defined (#15601)
     3. Failover for target task on timeouts on *Acks* for sending 
`OperatorEvents` **(This PR)**
     4. Delay operator coordinator checkpoint completion until all pending 
event acks are received or failed. **(This PR)**
   
   This PR activates the `OperatorEventSendingCheckpointITCase` which  ran 
successfully 500x in a row with this patch.
   
   ## Changes in this PR
   
   **Ensure checkpoints don't happen while we have an unclear status of 
in-flight OperatorCoordinator events.**
   
   We cannot complete a checkpoint while there are events that have been sent 
before a checkpoint and have not yet been
   either confirmed received or confirmed failed. We wouldn't know whether to 
assume the data carried by those events
   is pre-checkpoint (and handled by the receiving tasks) or post checkpoints 
(and to be handled on the coordinator).
   
   This change delays the confirmation of checkpoints until there is clarity 
about the status of events. Furthermore, if event
   sending possibly failed, the current checkpoint is discarded. We anyways 
trigger a recovery of the event's target subtask in that case.
   
   **When events are presumed lost in transfer, fail the target task**
   
   Because we don't know whether the events were actually lost, or just the ack 
is lost, we trigger a failover for the receiving task. This both restarts the 
actual receiver task from the latest checkpoint and notifies the coordinator to 
reset its view of that task to the latest checkpoint. This is a safe option to 
restore consistency.
   
   In a future optimization, we can add idempotent retries to the event sending 
(in Flink, which would be on top of what Akka already does), to reduce the 
number of cases where such a failover is necessary. However, we need to keep 
the failover as a last resort, because we cannot keep retrying infinitely.
   
   ## Verifying this change
   
   This is hard to test in a real-world setup, it would need very specific 
network setup that provokes just the right package losses and very severe 
thread races.
   
   The `CoordinatorEventsExactlyOnceITCase` and 
`OperatorEventSendingCheckpointITCase` target the relevant scenarios pretty 
precisely. Both fail reliably on `master` and succeed with this series of 
patches.
   
   ## Does this pull request potentially affect one of the following parts:
   
     - Dependencies (does it add or upgrade a dependency): **no**
     - The public API, i.e., is any changed class annotated with 
`@Public(Evolving)`: **no**
     - The serializers: **no**
     - The runtime per-record code paths (performance sensitive): **no**
     - Anything that affects deployment or recovery: JobManager (and its 
components), Checkpointing, Kubernetes/Yarn/Mesos, ZooKeeper: **no**
     - The S3 file system connector: **no**
   
   ## Documentation
   
     - Does this pull request introduce a new feature? **no**
     - If yes, how is the feature documented? **not applicable**


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to