[ 
https://issues.apache.org/jira/browse/SAMZA-568?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14324652#comment-14324652
 ] 

Chris Riccomini commented on SAMZA-568:
---------------------------------------

This ticket relates to SAMZA-567. If we re-order the init lifecycle, as defined 
in SAMZA-567, this patch gets a bit more complicated. Currently, this patch 
simply sets the OffsetManager's offset. This works because the consumers are 
started after the init() method is called. If we start the consumers before the 
init() method is called, this won't work anymore.

We could make this continue to work by doing:

{noformat}
startProducers
startTask
startConsumers
{noformat}

Alternatively, we could do what's proposed in SAMZA-567:

{noformat}
startProducers
startConsumers
startTask
{noformat}

And change the TaskContext method to be setOffset(SSP, offset). When this is 
called, we'd have to completely tear down SystemConsumers, and restart it again 
with the new offsets set. Because of this, it might make a bit more sense to 
have the API be setOffsets(Map<SSP, String>); so that multiple offsets can be 
set at once.

> Start offset override in Task init
> ----------------------------------
>
>                 Key: SAMZA-568
>                 URL: https://issues.apache.org/jira/browse/SAMZA-568
>             Project: Samza
>          Issue Type: Improvement
>            Reporter: Ben Kirwin
>            Priority: Minor
>         Attachments: 
> 0001-Allow-overriding-starting-offsets-in-TaskContext.patch
>
>
> A couple months back -- [on the mailing list | 
> http://mail-archives.apache.org/mod_mbox/incubator-samza-dev/201411.mbox/%3ccacux-d_zwzp2emqse4nou76skfh6bkifitzsmnm_b8dxjut...@mail.gmail.com%3E]
>  -- I mentioned a couple offset management issues I'd been having. (I'm happy 
> to elaborate on this, but in short: I associate some extra state / ordering 
> information with the input offsets, and there's a nontrivial performance cost 
> keeping Samza's checkpoints and my task's state in sync.)
> It occurs to me now that there's a simple workaround for this: disable 
> Samza's checkpointing entirely, and let `StreamTask.init` choose the starting 
> offsets. The task can just keep its checkpoints in an ordinary StorageEngine 
> -- and by managing all the state from a single place, it's easy to keep 
> everything in sync.
> The basic implementation actually seems fairly straightforward -- the 
> consumers are not started until after the tasks are initialized, so all we'd 
> need to do is allow the `init` method to override the starting offsets. I've 
> attached a small patch that exposes this through the TaskContext interface, 
> just to illustrate the idea -- if this seems like an interesting feature for 
> Samza, I'm happy to add more tests / documentation / etc.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to