[
https://issues.apache.org/jira/browse/SAMZA-402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14122155#comment-14122155
]
Martin Kleppmann commented on SAMZA-402:
----------------------------------------
I think this is a useful feature. Some more example use cases for a "map-side
join" (shared state that is only read by the job):
* A control channel for a job, allowing an administrator to adjust aspects of
the job at runtime without job restart. For example, a job could be deployed
with two different algorithms and an A/B-testing facility. An administrator
could adjust whether algorithm A or algorithm B should be used by updating a
particular key in the shared store.
* A repository for Avro schemas (see SAMZA-317), where the store contains a
mapping from schema ID to schema.
At the moment, if you want to implement those things, you have to do one of the
following:
* Create a stream for the shared state (with at least as many partitions as any
other input stream), co-partition that stream with the input streams, and write
each state update to *all* partitions of that stream (so that it is delivered
to all StreamTasks in the job). That seems wasteful and ugly.
* Connect to some external system (not managed by Samza), e.g. an external
database. This increases the number of infrastructure components that need to
be deployed, makes the job more complicated, and potentially performs worse.
For example, the job may cache lookups to an external database, but without
consuming a changelog from that database it won't know when cache entries need
to be invalidated (you can only use a TTL and cross your fingers). A cold cache
after container restart will perform badly as every lookup incurs a network
round-trip.
Both of those suck. Even a simple shared state abstraction (read-only, no
support for atomic swaps, no special handling of deletions) would make the
implementation of this kind of use cases significantly nicer.
If we want to support use cases where a batch job pushes a new version of the
state that completely replaces the old version, then we would probably need
atomic swaps and handling of deletions. For that reason, I'm inclined to not
support such batch updates of shared state. Batch-updated state can continue to
use Voldemort.
Regarding mutable state shared between StreamTasks: I think this would be a
dangerous abstraction for the reasons you describe. A true implementation of
mutable shared state would require a consensus algorithm and would be a
nightmare.
I think single-writer state would probably be safe (as you describe, using the
task name as key). However, I would prefer to think of this as a kind of
asynchronous message passing: one task is sending a message to the other tasks,
saying "my counter value is now x". Put that way, the key is the "sender" of
the message.
In my opinion, the key-value interface for a shared store should not permit
writing (calling put() should raise an exception), to avoid setting false
expectations of synchronous updates and magical distributed consistency. Job
authors who know what they're doing can still write to the store asynchronously
by sending a message to the output stream that is the changelog for the store.
That way the write looks conceptually more like sending a message to the other
tasks, and less like a state update. But that should be considered advanced
usage, because it's up to the job author to enforce things like the
single-writer constraint.
(If this kind of cross-task coordination use case turns out to be common, we
could consider adding an abstraction on top of shared stores which enforces
things like "one writer per key". But that can be a separate, future issue.)
On SAMZA-353 we discussed whether the StreamTask should be notified about
changes in the store. I now think that probably isn't necessary, at least for a
first version.
In summary: just because certain use cases can't easily be satisfied, we
shouldn't throw the baby out with the bathwater. I think we should implement a
simple version of shared state which is read-only and which only supports
single-key updates (no batch updates, no atomic switching), like you describe
in the implementation section of the design doc. That would already be very
useful, and leave open our options to support more use cases in future.
> Provide a "shared state" store among StreamTasks
> ------------------------------------------------
>
> Key: SAMZA-402
> URL: https://issues.apache.org/jira/browse/SAMZA-402
> Project: Samza
> Issue Type: Bug
> Components: container, kv
> Affects Versions: 0.8.0
> Reporter: Chris Riccomini
> Attachments: DESIGN-SAMZA-402-0.md, DESIGN-SAMZA-402-0.pdf
>
>
> There has been a lot of discussion about shared state stores in SAMZA-353.
> Initially, it seemed as though we might implement them through SAMZA-353, but
> now it seems more preferable to implement them separately. As such, this
> ticket is to discuss global state/shared state (terms that are being used
> interchangeably) between StreamTasks.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)