[
https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13946166#comment-13946166
]
Chris Riccomini commented on SAMZA-179:
---------------------------------------
Updated the RB with a few more comments.
bq. Perhaps for the 'broadcast streams' feature we discussed (where every task
receives messages from all partitions)?
True. I was thinking about this more. Once SAMZA-71 gets committed, the
TaskInstance:Partition mapping is broken, as well. I think we ned to give
access to all SystemStreamMetadata, not just per-partition.
bq. Sure, can do that. In order to make sure the job has processed any messages
that came in while it was catching up, should we make it something like "shut
down after the newest offset has been reached and no new message has been
received for x minutes"? (Actually, that raises the question: would it be ok to
approximate "caught up" as "has been idle for x minutes"? That would avoid all
the issues of giving tasks access to stream metadata.)
Hmm, interesting point. I think there are two scenarios here. Either messages
are still being added to the stream, or they are not. If the stream is not
being written to, then there's no need to sit idle for x minutes. If the stream
IS being written to, then there are two potential outcomes. Either the writer
will stop sending messages, or the writer will continue sending messages
forever. If the writer stops sending messages at some point in the future, then
it makes sense to add an idle setting. If the writer never fully stops writing
messages, then I don't think the idle setting buys us that much, and indeed
might never get triggered if the writer is faster than the idle timeout.
Basically, I think the idle setting is only useful in cases where a stream is
still being populated by something like a Hadoop job when the SamzaContainer
starts. In such a case, the container might catch up to the Hadoop job's
output, but it's not actually "done" until the Hadoop job finishes populating
the topic.
There are race conditions here, though, I think. For example, a leadership
election in Kafka could take longer than the idle timeout, the producer might
GC for a while, the MessageChooser might starve the stream in favor of higher
priority streams, etc. I don't think it would be safe to rely on an idle
timeout to assume that you'd caught up in these case unless the timeout were
very high (e.g. an hour or more).
It is an interesting idea to just have an idle timeout INSTEAD of the metadata,
though. Something like task.shutdown.idle.ms. If a SamzaContainer receives no
messages for X milliseconds, then shut it down. That actually seems generally
useful, but it IS something that can be done simply by setting a
task.window.ms, and doing the operation in the window method.
In addition, I don't think the timeout approach satisfies all of our
requirements. We have use cases where a job is processing stream X, which is a
realtime feed which has producers running for undefined (infinite) amounts of
time. The developer wishes to reprocess the data, so they re-deploy their job,
which picks up where it left off, but maybe with some new processing logic. The
developer then runs a SECOND instance of the job with the new processing logic,
but this time with the offsets forced to 0 (see SAMZA-180). This second
instance should shutdown when it's read to some point AFTER the first instance
restarted from. This can be achieved simply by running the second instance
after the first one has started up, provided that the second instance can shut
itself down can just check the newest offsets. If the only setting the
developer has is an idle timeout, and the producers are infinitely running,
then the second instance would never shutdown.
bq. Are you sure? We've commented above that this kind of batch-oriented use of
Samza doesn't really fit the model well. Do we want to actively encourage
people using the framework in this way? (Not questioning the validity of Alan's
use case, just questioning how common it will be in general.)
Sorry, I don't think I was too clear. The batch-oriented use case for Samza is
certainly less common than just realtime processing, BUT of the batch-oriented
cases, I think most will simply want to do what [~alanwli] is trying to do:
just shutdown when all TaskInstances have fully caught up. It seems like a good
thing to support this as a config parameter, so everyone isn't having to copy
and paste the same logic in their StreamTasks to do the shutdown-on-caught-up
use case.
> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
> Key: SAMZA-179
> URL: https://issues.apache.org/jira/browse/SAMZA-179
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Martin Kleppmann
> Attachments: SAMZA-179-v1.patch, SAMZA-179-v3.patch
>
>
> There is currently no easy way for a task to process all messages from some
> offset all the way to the head of a stream, and then shutdown. This behavior
> is useful in cases where a Samza job is re-processing old data, or for Samza
> jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream
> metadata required to do these operations. We should expose this to the tasks
> in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc)
> that has a callback that's triggered whenever the last message processed by a
> task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the
> InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through
> other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently
> immediately shuts down the task. There is an argument to be made that it
> should just mean "cease to process any messages for this task instance", and
> SamzaContainer should only be shutdown when ALL task instances have called
> coordinator.shutdown (or there is an error). This would be useful in
> situations where a StreamTask wishes to shutdown when it's fully caught up to
> head across all partitions. Right now, the first task instance to call
> shutdown (the first one caught up) would shut down the entire SamzaContainer.
> This would shut down partitions that might not be caught up yet.
--
This message was sent by Atlassian JIRA
(v6.2#6252)