[ 
https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13946166#comment-13946166
 ] 

Chris Riccomini commented on SAMZA-179:
---------------------------------------

Updated the RB with a few more comments.

bq. Perhaps for the 'broadcast streams' feature we discussed (where every task 
receives messages from all partitions)?

True. I was thinking about this more. Once SAMZA-71 gets committed, the 
TaskInstance:Partition mapping is broken, as well. I think we ned to give 
access to all SystemStreamMetadata, not just per-partition.

bq. Sure, can do that. In order to make sure the job has processed any messages 
that came in while it was catching up, should we make it something like "shut 
down after the newest offset has been reached and no new message has been 
received for x minutes"? (Actually, that raises the question: would it be ok to 
approximate "caught up" as "has been idle for x minutes"? That would avoid all 
the issues of giving tasks access to stream metadata.)

Hmm, interesting point. I think there are two scenarios here. Either messages 
are still being added to the stream, or they are not. If the stream is not 
being written to, then there's no need to sit idle for x minutes. If the stream 
IS being written to, then there are two potential outcomes. Either the writer 
will stop sending messages, or the writer will continue sending messages 
forever. If the writer stops sending messages at some point in the future, then 
it makes sense to add an idle setting. If the writer never fully stops writing 
messages, then I don't think the idle setting buys us that much, and indeed 
might never get triggered if the writer is faster than the idle timeout.

Basically, I think the idle setting is only useful in cases where a stream is 
still being populated by something like a Hadoop job when the SamzaContainer 
starts. In such a case, the container might catch up to the Hadoop job's 
output, but it's not actually "done" until the Hadoop job finishes populating 
the topic.

There are race conditions here, though, I think. For example, a leadership 
election in Kafka could take longer than the idle timeout, the producer might 
GC for a while, the MessageChooser might starve the stream in favor of higher 
priority streams, etc. I don't think it would be safe to rely on an idle 
timeout to assume that you'd caught up in these case unless the timeout were 
very high (e.g. an hour or more).

It is an interesting idea to just have an idle timeout INSTEAD of the metadata, 
though. Something like task.shutdown.idle.ms. If a SamzaContainer receives no 
messages for X milliseconds, then shut it down. That actually seems generally 
useful, but it IS something that can be done simply by setting a 
task.window.ms, and doing the operation in the window method.

In addition, I don't think the timeout approach satisfies all of our 
requirements. We have use cases where a job is processing stream X, which is a 
realtime feed which has producers running for undefined (infinite) amounts of 
time. The developer wishes to reprocess the data, so they re-deploy their job, 
which picks up where it left off, but maybe with some new processing logic. The 
developer then runs a SECOND instance of the job with the new processing logic, 
but this time with the offsets forced to 0 (see SAMZA-180). This second 
instance should shutdown when it's read to some point AFTER the first instance 
restarted from. This can be achieved simply by running the second instance 
after the first one has started up, provided that the second instance can shut 
itself down can just check the newest offsets. If the only setting the 
developer has is an idle timeout, and the producers are infinitely running, 
then the second instance would never shutdown.

bq. Are you sure? We've commented above that this kind of batch-oriented use of 
Samza doesn't really fit the model well. Do we want to actively encourage 
people using the framework in this way? (Not questioning the validity of Alan's 
use case, just questioning how common it will be in general.)

Sorry, I don't think I was too clear. The batch-oriented use case for Samza is 
certainly less common than just realtime processing, BUT of the batch-oriented 
cases, I think most will simply want to do what [~alanwli] is trying to do: 
just shutdown when all TaskInstances have fully caught up. It seems like a good 
thing to support this as a config parameter, so everyone isn't having to copy 
and paste the same logic in their StreamTasks to do the shutdown-on-caught-up 
use case.

> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
>                 Key: SAMZA-179
>                 URL: https://issues.apache.org/jira/browse/SAMZA-179
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Martin Kleppmann
>         Attachments: SAMZA-179-v1.patch, SAMZA-179-v3.patch
>
>
> There is currently no easy way for a task to process all messages from some 
> offset all the way to the head of a stream, and then shutdown. This behavior 
> is useful in cases where a Samza job is re-processing old data, or for Samza 
> jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream 
> metadata required to do these operations. We should expose this to the tasks 
> in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc) 
> that has a callback that's triggered whenever the last message processed by a 
> task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the 
> InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through 
> other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently 
> immediately shuts down the task. There is an argument to be made that it 
> should just mean "cease to process any messages for this task instance", and 
> SamzaContainer should only be shutdown when ALL task instances have called 
> coordinator.shutdown (or there is an error). This would be useful in 
> situations where a StreamTask wishes to shutdown when it's fully caught up to 
> head across all partitions. Right now, the first task instance to call 
> shutdown (the first one caught up) would shut down the entire SamzaContainer. 
> This would shut down partitions that might not be caught up yet.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to