[
https://issues.apache.org/jira/browse/SAMZA-1065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15773858#comment-15773858
]
ASF GitHub Bot commented on SAMZA-1065:
---------------------------------------
Github user asfgit closed the pull request at:
https://github.com/apache/samza/pull/35
> Change the commit order to support at least once processing when deduping
> with local state store
> ------------------------------------------------------------------------------------------------
>
> Key: SAMZA-1065
> URL: https://issues.apache.org/jira/browse/SAMZA-1065
> Project: Samza
> Issue Type: Bug
> Reporter: Prateek Maheshwari
> Assignee: Prateek Maheshwari
> Fix For: 0.12.0
>
>
> The current commit order for a task instance is
> (https://github.com/apache/samza/blob/1d458050c06deefc1dcf3d3e9534631aece64553/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L193):
> 1. storeManager.flush() (flushes store contents to disk and writes the
> changelog OFFSET file)
> 2. collector.flush() (flushes producer buffers for both task output and
> changelog streams)
> 3. offsetManager.checkpoint(taskName)
> Consider a scenario where the store is being used for de-duping incoming
> messages by recording previously seen message UIDs. If the container dies
> during commit between steps 1 and 2 and restarts on the same host (due to
> host affinity), it'll consider the persisted store contents on the disk as
> the source of truth. This will cause some of the incoming messages to not be
> (re-)processed, even though their output wasn't flushed earlier. Since their
> changelog entries weren't flushed either, this behavior will be different
> depending on whether the container restarted on the same host or on another
> host.
> There are two issues here:
> * Output messages need to be flushed before flushing the store changelog
> messages.
> * Store changelog messages need to be flushed before the store contents are
> persisted to disk.
> Note: the LoggedStore can be flushed independently from a
> TaskInstance#commit() (e.g. in CachedStore). Furthermore, the underlying raw
> store (e.g. RocksDB) can flush its in-memory contents to disk independently
> from LoggedStore#flush().
> One solution would be to:
> a. Change the LoggedStore#put to call collector.send() before calling
> underlying store.put().
> b. Change the commit order to the following:
> 1. output collector.flush()
> 2. changelog collector.flush() (requires separating output and changelog
> system producers)
> 3.1. get changelog offset
> 3.2. taskStores.flush()
> 3.3. write changelog OFFSET file
> 4. offsetManager.checkpoint()
> c. Document that if users need at-least once guarantee AND deduping, they
> should turn on synchronous send for the output and changelog system producers
> and call commit after processing each message.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)