ASF GitHub Bot commented on SAMZA-1384:

GitHub user jmakes opened a pull request:


    SAMZA-1384: Race condition with async commit affects checkpoint correctness


You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/jmakes/samza samza-1384

Alternatively you can review and apply these changes as the patch at:


To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #263
commit 881d84380abb3bd8bebe5272a566dd711b449456
Author: Jacob Maes <jm...@linkedin.com>
Date:   2017-08-08T18:57:11Z

    Race condition with async commit affects checkpoint correctness


> Race condition with async commit affects checkpoint correctness
> ---------------------------------------------------------------
>                 Key: SAMZA-1384
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1384
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Jake Maes
>            Assignee: Jake Maes
>             Fix For: 0.14.0
> tl;dr if any in-flight request updates the offsets between producer.flush() 
> and offsetmanager.checkpoint() we could write a checkpoint for a message that 
> did not yet go out over the wire and could subsequently fail.
> Consider two threads A and B. A is performing an async commit. B is an 
> in-flight process(). The following sequence will cause data loss:
> A: TaskInstance.commit() begins
> A: producer.flush() is called // no new messages will go out in this batch
> B: producer.send() is called
> B: TaskCallback is invoked for the finished process()
> B: OffsetManager records the offset for the completed process()
> A: producer.flush() finishes
> A: checkpoint is written using the latest offsets from the OffsetManager. 
> This INCLUDES the offset for the latest send, which has not yet gone out over 
> the wire.
> A: TaskInstance.commit() finishes
> B: producer.send()->callback is invoked with an error. Send was unsuccessful, 
> but has been checkpointed already. 
> B: Exception is propagated and container fails
> Result: Container is restarted and starts from the last checkpoint.
> Note that this is only an issue when the commit() occurs concurrently with 
> in-flight requests, so it doesn't affect the fully-synchronous mode or 
> concurrent mode with synchronous commit().
> Proposed solution: 
> Take a snapshot of the offsets in the OffsetManager at the beginning of 
> commit(). Only checkpoint those offsets and nothing new that has been sent 
> since the commit() started.

This message was sent by Atlassian JIRA

Reply via email to