[ 
https://issues.apache.org/jira/browse/SAMZA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14114994#comment-14114994
 ] 

Chris Riccomini commented on SAMZA-353:
---------------------------------------

Continuing on the "global state only" proposals, I have another idea.

Let's say you have four SamzaContainers. First, introduce the concept of a 
"shared state store". This is a container-level store (vs. a task-level store), 
so there is one per-container. This "shared state store" has a change log, 
which has one partition per container (vs. one partition per-task, as in the 
task-level store's change log). During the "restore" phase of a SamzaContainer, 
the "shared state store" is restored for *all* partitions in its change log, 
not just a single partition (as is the case with a task-level store). Next, a 
given SamzaContainer should register all "shared state store" change log SSPs 
with the SystemConsumers, except for the partitions that it, itself, produces 
to. The messages from these SSPs should *not* be forwarded to a TaskInstance, 
but should instead be shunted off directly into the "shared state store"'s raw 
DB (likely in RunLoop.process).

As a concrete example, consider a job that is configured to have two input 
streams: ip-domain and page-views. The ip-domain stream contains key value 
pairs, where the key is first three octets of an IP (e.g. 50.131.31.0), and the 
value is a domain name associated with the IP (e.g. comcast.net). The goal of 
the job is to read the page-views stream, which contains an IP address for 
every page view, and emit the same page-view event to a stream called 
page-views-with-domain with the domain name added into the page-view event.

To accomplish this, you would have to re-partition the page-view stream to be 
keyed by the IP address' first three octets, so that the join could be done 
against a partitioned store (ip-domain, in this case). This is because Samza 
currently only provides partitioned state stores.

If we implemented the "shared state store" described above, you could skip the 
repartition phase, and do the equivalent of a map-side join by having a single 
ip-domain shared state store, which all StreamTasks would have full access to. 
In such a case, the job would end up with some number of containers, say 4 
(identified by IDs 0-3). The job would also be configured with a single "shared 
state store" called "ip-domain-store". The "ip-domain-store" store would be 
configured to have a change log with the same name. In this configuration, the 
"ip-domain-store" change log would have 4 partitions (since there are 4 
containers). Each container would bootstrap all 4 partitions at container start 
time. Then, each container would register all partitions except its partition 
with its SystemConsumers. Container 0 would call SystemConsumers.register for 
partitions 1, 2, and 3, while container 1 would call SystemConsumers.register 
with partitions 0, 2, and 3, and so on. The StreamTask.process implementation 
would simply be written to take messages from the "ip-domain" stream, and write 
them to the "ip-domain-store". Thus, when container 0 receives an "ip-domain" 
message for 50.131.31.0, it writes a message to the "shared state store" with a 
key of "50.131.31.0" and a value of "comcast.net". This message is then logged 
to partition 0 of the "ip-domain-store" change log, which containers 1, 2, and 
3 are consuming from. Each of these containers receives the message, and shunts 
the write off to its "shared state store". Care must be taken at this step to 
make sure that the container's write does not get replicated to the change log 
again, as this would create a cycle (similar to TaskStorageManager.restore, 
where the writes are sent to rawDb).

The ip-domain stream should also be configured to have bootstrap=true, and 
offset.default=oldest. This will force the SamzaContainers to start reading the 
ip-domain stream from the oldest offset (i.e. read all of the data in the 
stream), and to fully read the stream (i.e. bootstrap the stream) before 
processing any page-views.

The fascinating thing about this approach is that it also enables use cases. 
Suppose that you simply want to count all messages in a partitioned stream. 
This could be done by having a shared state store where they key is a TaskName, 
and the value is a count. Each task would increment the counter for its key for 
every message it receives. To get the total number of messages, the task then 
needs to call store.all(), and sum all of the values together. This is 
incredibly useful for doing very common computations such as calculating 
percentages (e.g. "What % of page views are for the search page, home page, 
etc?").

This implementation does start to feel a bit like a special case of a broadcast 
stream (where the broadcast stream is the change log), since tasks are 
communicating with each other within a single job. I haven't yet thought 
through how the two relate.

> Support assigning the same SSP to multiple tasknames
> ----------------------------------------------------
>
>                 Key: SAMZA-353
>                 URL: https://issues.apache.org/jira/browse/SAMZA-353
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Jakob Homan
>         Attachments: DESIGN-SAMZA-353-0.md, DESIGN-SAMZA-353-0.pdf
>
>
> Post SAMZA-123, it is possible to add the same SSP to multiple tasknames, 
> although currently we check for this and error out if this is done.  We 
> should think through the implications of having the same SSP appear in 
> multiple tasknames and support this if it makes sense.  
> This could be used as a broadcast stream that's either added by Samza itself 
> to each taskname, or individual groupers could do this as makes sense.  Right 
> now the container maintains a map of SSP to TaskInstance and delivers the ssp 
> to that task instance.  With this change, we'd need to change the map to SSP 
> to Set[TaskInstance] and deliver the message to each TI in the set.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to