[ 
https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13947090#comment-13947090
 ] 

Chris Riccomini commented on SAMZA-179:
---------------------------------------

I was talking with some folks who proposed a totally alternative solution to 
this problem that's pretty interesting.

The idea is to introduce two configs called something like:

{noformat}
task.catchup.to.job.name
task.catchup.to.job.id
{noformat}

(the names are just strawman)

`task.catchup.to.job.name` defines the job.name of the job that you want to 
catch up to. `task.catchup.to.job.id` defines the job.id of the job that you 
want to catch up to.

The idea is that you could then run a second instance of a job with this 
config, have it start from the oldest offset (SAMZA-180), catch up to the first 
instance of the job, then shut itself down.

For example, if we had a job called `classify-articles`, which is processing a 
stream of articles, the job.name would be `classify-articles` and the job.id 
would be `1`. If we update the code for `classify-articles` to have some new 
classifier, and re-deploy, it will pick up where it left off, which is fine, 
but we want to re-process the old articles to re-classify them. One approach is 
to just force job.id `1` to start processing from offset 0. Unfortunately, this 
means that new article updates will not be processed until the job is fully 
caught up. This could take days if there are many articles. Instead, using the 
config proposed above, you could define a second job config with:

{noformat}
job.id=2
task.catchup.to.job.name=classify-articles
task.catchup.to.job.id=1
systems.kafka.samza.offset.default=oldest
{noformat}

Running this second job would then reprocess all messages from 0 (oldest) all 
the way up to the last offset checkpointed by classify-articles with job.id=1. 
It would then shut itself down. In this way, you have re-processed all prior 
articles without any downtime/lag to the current articles getting inserted into 
the stream.

This seems like a pretty simple solution, and should require no user-facing API 
changes. What do you guys think?

> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
>                 Key: SAMZA-179
>                 URL: https://issues.apache.org/jira/browse/SAMZA-179
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Martin Kleppmann
>         Attachments: SAMZA-179-v1.patch, SAMZA-179-v3.patch
>
>
> There is currently no easy way for a task to process all messages from some 
> offset all the way to the head of a stream, and then shutdown. This behavior 
> is useful in cases where a Samza job is re-processing old data, or for Samza 
> jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream 
> metadata required to do these operations. We should expose this to the tasks 
> in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc) 
> that has a callback that's triggered whenever the last message processed by a 
> task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the 
> InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through 
> other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently 
> immediately shuts down the task. There is an argument to be made that it 
> should just mean "cease to process any messages for this task instance", and 
> SamzaContainer should only be shutdown when ALL task instances have called 
> coordinator.shutdown (or there is an error). This would be useful in 
> situations where a StreamTask wishes to shutdown when it's fully caught up to 
> head across all partitions. Right now, the first task instance to call 
> shutdown (the first one caught up) would shut down the entire SamzaContainer. 
> This would shut down partitions that might not be caught up yet.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to