[ 
https://issues.apache.org/jira/browse/SAMZA-1065?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15771343#comment-15771343
 ] 

ASF GitHub Bot commented on SAMZA-1065:
---------------------------------------

GitHub user prateekm opened a pull request:

    https://github.com/apache/samza/pull/35

    SAMZA-1065: Change the commit order to support at least once processing 
when using local state store for deduping.

    

You can merge this pull request into a Git repository by running:

    $ git pull https://github.com/prateekm/samza commit-order

Alternatively you can review and apply these changes as the patch at:

    https://github.com/apache/samza/pull/35.patch

To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:

    This closes #35
    
----
commit 63a38d3fba035cc465abc3dc4258d8a565807fb3
Author: Prateek Maheshwari <[email protected]>
Date:   2016-12-22T22:55:25Z

    SAMZA-1065: Change the commit order to support at least once processing 
when deduping with local state store.

----


> Change the commit order to support at least once processing when deduping 
> with local state store
> ------------------------------------------------------------------------------------------------
>
>                 Key: SAMZA-1065
>                 URL: https://issues.apache.org/jira/browse/SAMZA-1065
>             Project: Samza
>          Issue Type: Bug
>            Reporter: Prateek Maheshwari
>            Assignee: Prateek Maheshwari
>             Fix For: 0.12.0
>
>
> The current commit order for a task instance is 
> (https://github.com/apache/samza/blob/1d458050c06deefc1dcf3d3e9534631aece64553/samza-core/src/main/scala/org/apache/samza/container/TaskInstance.scala#L193):
> 1. storeManager.flush() (flushes store contents to disk and writes the 
> changelog OFFSET file)
> 2. collector.flush() (flushes producer buffers for both task output and 
> changelog streams)
> 3. offsetManager.checkpoint(taskName)
> Consider a scenario where the store is being used for de-duping incoming 
> messages by recording previously seen message UIDs. If the container dies 
> during commit between steps 1 and 2 and restarts on the same host (due to 
> host affinity), it'll consider the persisted store contents on the disk as 
> the source of truth. This will cause some of the incoming messages to not be 
> (re-)processed, even though their output wasn't flushed earlier. Since their 
> changelog entries weren't flushed either, this behavior will be different 
> depending on whether the container restarted on the same host or on another 
> host.
> There are two issues here:
> * Output messages need to be flushed before flushing the store changelog 
> messages.
> * Store changelog messages need to be flushed before the store contents are 
> persisted to disk.
> Note: the LoggedStore can be flushed independently from a 
> TaskInstance#commit() (e.g. in CachedStore). Furthermore, the underlying raw 
> store (e.g. RocksDB) can flush its in-memory contents to disk independently 
> from LoggedStore#flush(). 
> One solution would be to:
> a. Change the LoggedStore#put to call collector.send() before calling 
> underlying store.put().
> b. Change the commit order to the following:
>     1. output collector.flush()
>     2. changelog collector.flush() (requires separating output and changelog 
> system producers)
>     3.1. get changelog offset
>     3.2. taskStores.flush()
>     3.3. write changelog OFFSET file
>     4. offsetManager.checkpoint()
> c. Document that if users need at-least once guarantee AND deduping, they 
> should turn on synchronous send for the output and changelog system producers 
> and call commit after processing each message.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to