[
https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13983173#comment-13983173
]
Chris Riccomini commented on SAMZA-179:
---------------------------------------
I think we're going to punt on this for now, and use a time-based approach for
shutting our tasks down. Our messages all have headers with timestamps in them,
so we're simply going to shut down when we read messages that are within N
seconds of System.currentTimeMillis.
One thing that would make our lives much easier would be to have the
ShutdownMethod code that [~martinkl] posted. Perhaps we can move the shutdown
code to a separate JIRA?
I'm going to move this JIRA out of scope for the 0.7.0 release if that's OK
with everyone.
> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
> Key: SAMZA-179
> URL: https://issues.apache.org/jira/browse/SAMZA-179
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Martin Kleppmann
> Attachments: SAMZA-179-v1.patch, SAMZA-179-v3.patch
>
>
> There is currently no easy way for a task to process all messages from some
> offset all the way to the head of a stream, and then shutdown. This behavior
> is useful in cases where a Samza job is re-processing old data, or for Samza
> jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream
> metadata required to do these operations. We should expose this to the tasks
> in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc)
> that has a callback that's triggered whenever the last message processed by a
> task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the
> InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through
> other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently
> immediately shuts down the task. There is an argument to be made that it
> should just mean "cease to process any messages for this task instance", and
> SamzaContainer should only be shutdown when ALL task instances have called
> coordinator.shutdown (or there is an error). This would be useful in
> situations where a StreamTask wishes to shutdown when it's fully caught up to
> head across all partitions. Right now, the first task instance to call
> shutdown (the first one caught up) would shut down the entire SamzaContainer.
> This would shut down partitions that might not be caught up yet.
--
This message was sent by Atlassian JIRA
(v6.2#6252)