[ 
https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13947100#comment-13947100
 ] 

Jakob Homan commented on SAMZA-179:
-----------------------------------

Yet another way to look at the problem, using the situation Chris just 
described, is that the existing job with the new logic has a temporary surge in 
work to do (ie, starting from 0 on its input) and needs extra capacity in order 
to do it in a reasonable time frame.  This way it's easy to reframe the problem 
to be a capacity issue - give the job extra containers to increase the rate at 
which it can process messagse s.t. it can reprocess everything in the same 
amount of time it would be able to do so with the assistance of the catch-up 
job from above.  

SAMZA-71 could help with this, as could the dynamic capacity promised by 
YARN-1051.  The former is much closer than the latter, but the latter isn't 
strictly necessary.

> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
>                 Key: SAMZA-179
>                 URL: https://issues.apache.org/jira/browse/SAMZA-179
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Martin Kleppmann
>         Attachments: SAMZA-179-v1.patch, SAMZA-179-v3.patch
>
>
> There is currently no easy way for a task to process all messages from some 
> offset all the way to the head of a stream, and then shutdown. This behavior 
> is useful in cases where a Samza job is re-processing old data, or for Samza 
> jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream 
> metadata required to do these operations. We should expose this to the tasks 
> in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc) 
> that has a callback that's triggered whenever the last message processed by a 
> task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the 
> InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through 
> other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently 
> immediately shuts down the task. There is an argument to be made that it 
> should just mean "cease to process any messages for this task instance", and 
> SamzaContainer should only be shutdown when ALL task instances have called 
> coordinator.shutdown (or there is an error). This would be useful in 
> situations where a StreamTask wishes to shutdown when it's fully caught up to 
> head across all partitions. Right now, the first task instance to call 
> shutdown (the first one caught up) would shut down the entire SamzaContainer. 
> This would shut down partitions that might not be caught up yet.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to