[
https://issues.apache.org/jira/browse/SAMZA-1384?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jake Maes updated SAMZA-1384:
-----------------------------
Description:
Consider two threads A and B. A is performing an async commit. B is an
in-flight process(). The following sequence will cause data loss:
A: TaskInstance.commit() begins
A: producer.flush() is called // no new messages will go out in this batch
B: producer.send() is called
B: TaskCallback is invoked for the finished process()
B: OffsetManager records the offset for the completed process()
A: producer.flush() finishes
A: checkpoint is written using the latest offsets from the OffsetManager. This
INCLUDES the offset for the latest send, which has not yet gone out over the
wire.
A: TaskInstance.commit() finishes
B: producer.send()->callback is invoked with an error. Send was unsuccessful,
but has been checkpointed already.
B: Exception is propagated and container fails
Result: Container is restarted and starts from the last checkpoint.
Note that this is only an issue when the commit() occurs concurrently with
in-flight requests, so it doesn't affect the fully-synchronous mode or
concurrent mode with synchronous commit().
Proposed solution:
Take a snapshot of the offsets in the OffsetManager at the beginning of
commit(). Only checkpoint those offsets and nothing new that has been sent
since the commit() started.
was:
Consider two threads A and B. A is performing an async commit. B is an
in-flight process(). The following sequence will cause data loss:
1. A: TaskInstance.commit() begins
2. A: producer.flush() is called // no new messages will go out in this batch
3. B: producer.send() is called
4. B: TaskCallback is invoked for the finished process()
5. B: OffsetManager records the offset for the completed process()
6. A: producer.flush() finishes
7. A: checkpoint is written using the latest offsets from the OffsetManager.
This INCLUDES the offset for the latest send, which has not yet gone out over
the wire.
8. A: TaskInstance.commit() finishes
8. B: producer.send()->callback is invoked with an error. Send was
unsuccessful, but has been checkpointed already.
9. B: Exception is propagated and container fails
10. Container is restarted and starts from the last checkpoint.
Note that this is only an issue when the commit() occurs concurrently with
in-flight requests, so it doesn't affect the fully-synchronous mode or
concurrent mode with synchronous commit().
Proposed solution:
Take a snapshot of the offsets in the OffsetManager at the beginning of
commit(). Only checkpoint those offsets and nothing new that has been sent
since the commit() started.
> Race condition with async commit affects checkpoint correctness
> ---------------------------------------------------------------
>
> Key: SAMZA-1384
> URL: https://issues.apache.org/jira/browse/SAMZA-1384
> Project: Samza
> Issue Type: Bug
> Reporter: Jake Maes
> Assignee: Jake Maes
> Fix For: 0.14.0
>
>
> Consider two threads A and B. A is performing an async commit. B is an
> in-flight process(). The following sequence will cause data loss:
> A: TaskInstance.commit() begins
> A: producer.flush() is called // no new messages will go out in this batch
> B: producer.send() is called
> B: TaskCallback is invoked for the finished process()
> B: OffsetManager records the offset for the completed process()
> A: producer.flush() finishes
> A: checkpoint is written using the latest offsets from the OffsetManager.
> This INCLUDES the offset for the latest send, which has not yet gone out over
> the wire.
> A: TaskInstance.commit() finishes
> B: producer.send()->callback is invoked with an error. Send was unsuccessful,
> but has been checkpointed already.
> B: Exception is propagated and container fails
> Result: Container is restarted and starts from the last checkpoint.
> Note that this is only an issue when the commit() occurs concurrently with
> in-flight requests, so it doesn't affect the fully-synchronous mode or
> concurrent mode with synchronous commit().
> Proposed solution:
> Take a snapshot of the offsets in the OffsetManager at the beginning of
> commit(). Only checkpoint those offsets and nothing new that has been sent
> since the commit() started.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)