[ 
https://issues.apache.org/jira/browse/SAMZA-402?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14122155#comment-14122155
 ] 

Martin Kleppmann commented on SAMZA-402:
----------------------------------------

I think this is a useful feature. Some more example use cases for a "map-side 
join" (shared state that is only read by the job):

* A control channel for a job, allowing an administrator to adjust aspects of 
the job at runtime without job restart. For example, a job could be deployed 
with two different algorithms and an A/B-testing facility. An administrator 
could adjust whether algorithm A or algorithm B should be used by updating a 
particular key in the shared store.
* A repository for Avro schemas (see SAMZA-317), where the store contains a 
mapping from schema ID to schema.

At the moment, if you want to implement those things, you have to do one of the 
following:

* Create a stream for the shared state (with at least as many partitions as any 
other input stream), co-partition that stream with the input streams, and write 
each state update to *all* partitions of that stream (so that it is delivered 
to all StreamTasks in the job). That seems wasteful and ugly.
* Connect to some external system (not managed by Samza), e.g. an external 
database. This increases the number of infrastructure components that need to 
be deployed, makes the job more complicated, and potentially performs worse. 
For example, the job may cache lookups to an external database, but without 
consuming a changelog from that database it won't know when cache entries need 
to be invalidated (you can only use a TTL and cross your fingers). A cold cache 
after container restart will perform badly as every lookup incurs a network 
round-trip.

Both of those suck. Even a simple shared state abstraction (read-only, no 
support for atomic swaps, no special handling of deletions) would make the 
implementation of this kind of use cases significantly nicer.

If we want to support use cases where a batch job pushes a new version of the 
state that completely replaces the old version, then we would probably need 
atomic swaps and handling of deletions. For that reason, I'm inclined to not 
support such batch updates of shared state. Batch-updated state can continue to 
use Voldemort.

Regarding mutable state shared between StreamTasks: I think this would be a 
dangerous abstraction for the reasons you describe. A true implementation of 
mutable shared state would require a consensus algorithm and would be a 
nightmare.

I think single-writer state would probably be safe (as you describe, using the 
task name as key). However, I would prefer to think of this as a kind of 
asynchronous message passing: one task is sending a message to the other tasks, 
saying "my counter value is now x". Put that way, the key is the "sender" of 
the message.

In my opinion, the key-value interface for a shared store should not permit 
writing (calling put() should raise an exception), to avoid setting false 
expectations of synchronous updates and magical distributed consistency. Job 
authors who know what they're doing can still write to the store asynchronously 
by sending a message to the output stream that is the changelog for the store. 
That way the write looks conceptually more like sending a message to the other 
tasks, and less like a state update. But that should be considered advanced 
usage, because it's up to the job author to enforce things like the 
single-writer constraint.

(If this kind of cross-task coordination use case turns out to be common, we 
could consider adding an abstraction on top of shared stores which enforces 
things like "one writer per key". But that can be a separate, future issue.)

On SAMZA-353 we discussed whether the StreamTask should be notified about 
changes in the store. I now think that probably isn't necessary, at least for a 
first version.

In summary: just because certain use cases can't easily be satisfied, we 
shouldn't throw the baby out with the bathwater. I think we should implement a 
simple version of shared state which is read-only and which only supports 
single-key updates (no batch updates, no atomic switching), like you describe 
in the implementation section of the design doc. That would already be very 
useful, and leave open our options to support more use cases in future.

> Provide a "shared state" store among StreamTasks
> ------------------------------------------------
>
>                 Key: SAMZA-402
>                 URL: https://issues.apache.org/jira/browse/SAMZA-402
>             Project: Samza
>          Issue Type: Bug
>          Components: container, kv
>    Affects Versions: 0.8.0
>            Reporter: Chris Riccomini
>         Attachments: DESIGN-SAMZA-402-0.md, DESIGN-SAMZA-402-0.pdf
>
>
> There has been a lot of discussion about shared state stores in SAMZA-353. 
> Initially, it seemed as though we might implement them through SAMZA-353, but 
> now it seems more preferable to implement them separately. As such, this 
> ticket is to discuss global state/shared state (terms that are being used 
> interchangeably) between StreamTasks.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to