[ 
https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13943223#comment-13943223
 ] 

Martin Kleppmann commented on SAMZA-179:
----------------------------------------

bq. The SystemAdmin API has had a lot of churn in it, and I don't think we can 
commit to a user-facing API with it.

Agree, we should keep the surface area of the public API as small as possible.

bq. Another thing to think about is whether there are use cases where a 
StreamTask might want access to all metadata, not just the metadata for its 
partition.

Perhaps for the 'broadcast streams' feature we discussed (where every task 
receives messages from all partitions)?

bq. it would be nice to also have a config value specifically for the use case 
of shutting down when all tasks have fully caught up

Sure, can do that. In order to make sure the job has processed any messages 
that came in while it was catching up, should we make it something like "shut 
down after the newest offset has been reached and no new message has been 
received for x minutes"?

(Actually, that raises the question: would it be ok to approximate "caught up" 
as "has been idle for x minutes"? That would avoid all the issues of giving 
tasks access to stream metadata.)

bq. This is strictly a usability requirement, but it's a very common use case.

Are you sure? We've commented above that this kind of batch-oriented use of 
Samza doesn't really fit the model well. Do we want to actively encourage 
people using the framework in this way?

(Not questioning the validity of Alan's use case, just questioning how common 
it will be in general.)

bq. The ReadbleCoordinator changes are good, but I think the name for it now 
needs to be changed to something like TaskCoordinators, or something.

Sounds good, done on the RB update.

bq. If we decide that the TaskContext should return the latest metadata, I 
think it should call straight through to SystemAdmin.getSystemStreamMetadata as 
you suggest. Can you think of an alternative way of doing it?

Not obviously, I'm afraid.

bq. Regarding TopicMetadtaCache, we actually DO contact the broker for each 
getSystemStreamMetadata call. The topic metadata cache just stores the broker 
locations for each topic.

Oh yes, I completely missed that!

bq. Perhaps we should introduce a generic cache for metadata, and use it for 
all SystemAdmins?

That seems like a good idea to me. I think the code is simpler if all parts of 
the system can just call one thing to get the system metadata (and have it be 
cached there), rather than trying to make the call once and passing the result 
to everyone who needs it.

bq. We could also support a useCache: Boolean argument, which would allow tasks 
to get the "real" latest metadata in cases where they absolutely need it

My gut sense is that a short-term cache (say 5 seconds, like 
TopicMetadataCache) should be good for most applications; if you're depending 
on more frequent metadata than that, you likely have a performance problem 
anyway.

bq. Can we keep TaskCoordinator.shutdown, and have it just call shutdown(true)?

Sounds good.

bq. What do you think about using an enum instead of true/false to make the 
code more readable? Something like shutdown(NOW)/shutdown(WHEN_ALL_FINISH) or 
something?

Good idea. Also gives us more flexibility in case we want to introduce other 
shutdown strategies in future (e.g. shutdown after being idle for some time?).

I've updated the RB https://reviews.apache.org/r/19384/ with the renaming of 
ReadableCoordinator and the shutdown method improvements. I'll look into the 
generic StreamMetadataCache in a separate revision.

> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
>                 Key: SAMZA-179
>                 URL: https://issues.apache.org/jira/browse/SAMZA-179
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.6.0
>            Reporter: Chris Riccomini
>            Assignee: Martin Kleppmann
>         Attachments: SAMZA-179-v1.patch
>
>
> There is currently no easy way for a task to process all messages from some 
> offset all the way to the head of a stream, and then shutdown. This behavior 
> is useful in cases where a Samza job is re-processing old data, or for Samza 
> jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream 
> metadata required to do these operations. We should expose this to the tasks 
> in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc) 
> that has a callback that's triggered whenever the last message processed by a 
> task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the 
> InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through 
> other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently 
> immediately shuts down the task. There is an argument to be made that it 
> should just mean "cease to process any messages for this task instance", and 
> SamzaContainer should only be shutdown when ALL task instances have called 
> coordinator.shutdown (or there is an error). This would be useful in 
> situations where a StreamTask wishes to shutdown when it's fully caught up to 
> head across all partitions. Right now, the first task instance to call 
> shutdown (the first one caught up) would shut down the entire SamzaContainer. 
> This would shut down partitions that might not be caught up yet.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to