[
https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13947052#comment-13947052
]
Jakob Homan commented on SAMZA-179:
-----------------------------------
bq. It is an interesting idea to just have an idle timeout INSTEAD of the
metadata, though. Something like task.shutdown.idle.ms. If a SamzaContainer
receives no messages for X milliseconds, then shut it down. That actually seems
generally useful, but it IS something that can be done simply by setting a
task.window.ms, and doing the operation in the window method.
+1 to doing this through the windowing mechanism. I'm very hesitant about
providing too many tricks to the task implementations that they can easily
implement themselves.
Is there any merit to looking at this another way and having the input systems
themselves signal that they're done producing, which will cause Samza to then
shutdown the tasks? For some pound-it-into-a-stream-shape systems like a file
system, this would occur at the end of the file. For
reprocess-the-old-data-with-new-logic-jobs, the input stream could be told how
far to process and then signal it is done.
> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
> Key: SAMZA-179
> URL: https://issues.apache.org/jira/browse/SAMZA-179
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Martin Kleppmann
> Attachments: SAMZA-179-v1.patch, SAMZA-179-v3.patch
>
>
> There is currently no easy way for a task to process all messages from some
> offset all the way to the head of a stream, and then shutdown. This behavior
> is useful in cases where a Samza job is re-processing old data, or for Samza
> jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream
> metadata required to do these operations. We should expose this to the tasks
> in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc)
> that has a callback that's triggered whenever the last message processed by a
> task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the
> InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through
> other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently
> immediately shuts down the task. There is an argument to be made that it
> should just mean "cease to process any messages for this task instance", and
> SamzaContainer should only be shutdown when ALL task instances have called
> coordinator.shutdown (or there is an error). This would be useful in
> situations where a StreamTask wishes to shutdown when it's fully caught up to
> head across all partitions. Right now, the first task instance to call
> shutdown (the first one caught up) would shut down the entire SamzaContainer.
> This would shut down partitions that might not be caught up yet.
--
This message was sent by Atlassian JIRA
(v6.2#6252)