[
https://issues.apache.org/jira/browse/SAMZA-179?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13942483#comment-13942483
]
Chris Riccomini commented on SAMZA-179:
---------------------------------------
# The TaskContext.getStreamMetadata API needs to be thought through since it's
a user-facing API, and requires backwards compatibility.
# One could argue that we should give direct access to SystemAdmin instead of
making a custom context method. I don't think so. The SystemAdmin API has had a
lot of churn in it, and I don't think we can commit to a user-facing API with
it. Limiting the context's scope to just the stream metadata is much safer, I
think.
# Another thing to think about is whether there are use cases where a
StreamTask might want access to all metadata, not just the metadata for its
partition. Right now, the TaskContext API is at the partition level. I'm
drawing a blank on use cases for this, but I'd bet that there are some. Does
anyone have any?
# Some feedback I've gotten from [~alanwli] is that this API is really useful,
and we should keep it, but it would be nice to also have a config value
specifically for the use case of shutting down when all tasks have fully caught
up. This is strictly a usability requirement, but it's a very common use case.
It'd be good to accommodate it specifically as a single config, rather than
having every task implement the same logic as the example you've shown (above).
# The ReadbleCoordinator changes are good, but I think the name for it now
needs to be changed to something like TaskCoordinators, or something. It's
basically just managing a collection of coordinators (one for each partition).
bq. Should TaskContext.getStreamMetadata return the current metadata from the
broker, or just the version of the metadata that was fetched when the job was
initialized? BootstrappingChooser just uses the version from job startup, but I
thought that people might use the stream metadata for other things too, and it
would be really confusing if it's not up-to-date.
Intuitively, it feels like it should always return current information. Jakob
and I discussed this before as part of SAMZA-157, and I was also concerned
about it being a slow operation, which might hurt performance if users abuse
it. The cache is helpful from a performance perspective, but it means that
you're not getting the real metadata all the time, which is also confusing.
bq. Is it ok for TaskContext.getStreamMetadata to just call straight through to
SystemAdmin.getSystemStreamMetadata? It's not ideal, as
TaskContext.getStreamMetadata gets called for each task — however, thanks to
TopicMetadataCache, we don't actually contact the broker for each task.
I think this is related to the first question. If we decide that the
TaskContext should return the latest metadata, I think it should call straight
through to SystemAdmin.getSystemStreamMetadata as you suggest. Can you think of
an alternative way of doing it?
Regarding TopicMetadtaCache, we actually DO contact the broker for each
getSystemStreamMetadata call. The topic metadata cache just stores the broker
locations for each topic. We then use the brokers to execute
consumer.getOffsetsBefore, which is a network hop for each broker. The
TopicMetadataCache is a Kafka-specific implementation that just caches broker
leader information. Here's a complete list of what you get in the topic
metadata:
https://github.com/apache/kafka/blob/0.8.1/core/src/main/scala/kafka/api/TopicMetadata.scala
Also, I'm not crazy about each system having to implement their own caching.
Perhaps we should introduce a generic cache for metadata, and use it for all
SystemAdmins? If we went that route, I think I'd be much more comfortable
having the TaskContext method return the latest metadata information, since
we'd be caching for all calls. We could also support a useCache: Boolean
argument, which would allow tasks to get the "real" latest metadata in cases
where they absolutely need it (though this might be over-thinking it).
bq. I agree that shutdown should wait for all tasks to be caught up, so I've
added a boolean parameter to the shutdown method. force=true keeps the current
semantics, force=false waits until all tasks have requested shutdown before
actually doing it. A task may continue processing messages, even after it has
requested shutdown (can't really do anything else – can't throw them away!).
Can we keep TaskCoordinator.shutdown, and have it just call shutdown(true)?
Deleting the method is backwards incompatible.
What do you think about using an enum instead of true/false to make the code
more readable? Something like shutdown(NOW)/shutdown(WHEN_ALL_FINISH) or
something?
I'm not crazy about having to process methods after calling shutdown in a
StreamTask. I think we can implement this without having to process messages
after shutdown by:
1. Removing all SystemStreamPartitions from SystemConsumers for partitions
where the StreamTask has called shutdown(false).
2. Disregarding any new messages chosen by the MessageChooser for any shutdown
partition.
3. Stopping all window() calls for the partition.
That said, it's a really intrusive and error-prone change (I've surely missed
some stuff in the list above), and I think we should punt on it for now. We
should open a separate JIRA to track it.
In summary, my thoughts:
0. Introduce a StreamMetadataCache that uses SystemAdmins to cache metadata
information.
1. Have TaskContext.getStreamMetadata call the StreamMetadataCache underneath
the hood to fetch the latest (cached) metadata.
2. Add a parameter to TaskContext.getStreamMetadata to toggle usage of cache,
so users can go directly to the underlying system to get the latest metadata if
they need it.
3. Open a JIRA to track disabling message processing for a task after it's
called shutdown(false).
4. Get more feedback from others on the TaskContext.getStreamMetadata API,
since it's user-facing, and a big commitment to backwards compatibility.
5. Rename ReadableCoordinator to something a more descriptive of what it does.
> Support a way to shutdown when a task is caught up
> --------------------------------------------------
>
> Key: SAMZA-179
> URL: https://issues.apache.org/jira/browse/SAMZA-179
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.6.0
> Reporter: Chris Riccomini
> Assignee: Martin Kleppmann
> Attachments: SAMZA-179-v1.patch
>
>
> There is currently no easy way for a task to process all messages from some
> offset all the way to the head of a stream, and then shutdown. This behavior
> is useful in cases where a Samza job is re-processing old data, or for Samza
> jobs that only run periodically.
> Now that we have SAMZA-147 merged in, SamzaContainer has access to the stream
> metadata required to do these operations. We should expose this to the tasks
> in some way, so they can decide when to shut down.
> Two ways that I can think of doing this are:
> 1. Add some sort of mix-in interface (a la InitableTask, ClosableTask, etc)
> that has a callback that's triggered whenever the last message processed by a
> task is equal to metadata.newest for the SystemStreamPartition.
> 2. Expose the SystemStreamMetadata information through the
> InitableTask.init's TaskContext object.
> I favor (2) right now, but we should dig into the code and think through
> other potential solutions.
> Also, something else to consider is that TaskCoordinator.shutdown currently
> immediately shuts down the task. There is an argument to be made that it
> should just mean "cease to process any messages for this task instance", and
> SamzaContainer should only be shutdown when ALL task instances have called
> coordinator.shutdown (or there is an error). This would be useful in
> situations where a StreamTask wishes to shutdown when it's fully caught up to
> head across all partitions. Right now, the first task instance to call
> shutdown (the first one caught up) would shut down the entire SamzaContainer.
> This would shut down partitions that might not be caught up yet.
--
This message was sent by Atlassian JIRA
(v6.2#6252)