[ 
https://issues.apache.org/jira/browse/SAMZA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14116392#comment-14116392
 ] 

Martin Kleppmann commented on SAMZA-353:
----------------------------------------

Just caught up on the design doc and discussion. Here are some opinionated 
ramblings:

# With global (unpartitioned) state, I think the container should definitely 
continue consuming the stream after the initial bootstrapping. Some jobs may 
have global state that changes continuously, or have periodic (e.g. hourly) 
state updates from an external process. It would be very disruptive if you had 
to keep restarting the job in order to pick up those state changes, or if you 
had to wait for some TTL to expire before the state was refreshed.
# Would a "global state" store be read-only, and only be updated by messages on 
the stream? If so, a job could still write to it by sending a message to the 
changelog stream, but the write would only take effect after a round-trip 
through the message broker. I think that's fine.
# If the primary interface to the global state is a key-value interface (rather 
than a stream that is consumed), we may still want some way for a StreamTask to 
be notified when something changes in the state. For example, if the StreamTask 
needs to maintain some internal state that is derived from the global state, it 
would be good if every state change was also sent to the process() method.
# An example of a use case that would benefit from a "global stream" model: 
Take the use case of a daily Hadoop job generating some global state, pushing 
it out through a stream, and the stream task performing a map-side join against 
it. While the global state is in the middle of being updated, the stream task 
will see some values from the previous day's state, and some values from the 
current day. In applications where such inconsistency is unacceptable, it would 
be better if each day had a separate copy of the state store, and the stream 
task did an atomic switchover to the new day's state when the push is complete. 
In a "global stream" model, a stream task could implement this atomic 
switchover itself, but in a "global state" model, the switchover would have to 
be part of the framework. And since it's a somewhat application-specific use 
case, we probably wouldn't want to make it part of the framework.
# I think that global state should use the same message broker communication 
mechanism as we use for all other streams. I think it would be short-sighted to 
introduce a new communication mechanism (e.g. RPC to AM, or querying a remote 
database) for the purpose of global state. There's nothing preventing jobs from 
making RPC/database queries in application code if they want to, but I believe 
that framework-managed state should be a stream (see also comment above on 
continuing to consume a stream after initial bootstrapping).
# Multi-subscriber partitions: I think the use case you give in the design doc 
(hot standby) is a good and valid one. I see great potential for Samza as a 
framework for maintaining materialised views of data in streams, and making it 
queryable by external processes (cf. SAMZA-316). However, I have been worried 
about the lack of high availability in such an architecture (after container 
failure, its partitions become unavailable for a while as they are 
re-bootstrapped). Thus it would be valuable to have the _option_ of 
implementing hot standby (even with the caveat that you need to ensure that 
replicas of the same partition treat the input messages deterministically).
# Regarding ordered offsets: I would prefer a solution which doesn't require 
offsets to be totally ordered. For example, I was thinking about implementing a 
MySQL system consumer (SAMZA-200) in a way that didn't have totally ordered 
offsets. I thought about using the [MySQL 
GTID|http://dev.mysql.com/doc/refman/5.6/en/replication-gtids-concepts.html] as 
offset, which consists of two parts: a UUID and a transaction number. The 
transaction number is sequential, but every time the master changes (failover), 
the transaction number sequence is reset and restarted with a new UUID. This 
means that in order to establish a total ordering of GTIDs, you need a lookup 
table on the side, telling you which UUID corresponds to which period of 
mastership; although there is a total ordering of transactions, it is not 
apparent from the GTIDs themselves.
# Regarding the problem of SamzaContainer consuming the same stream at 
different offsets: as a suggestion, we could specify that if several tasks 
within the same container consume the same SSP, they will always consume at the 
same position. That may still require changing MessageChooser, but would 
probably be less intrusive than consuming at multiple offsets or requiring 
totally ordered offsets. For example, multi-subscriber partitions may be 
special-cased in MessageChooser (if one task instance chooses to process a 
message from a multi-subscriber partition, then all tasks which are assigned 
that SSP within that container must also process the message).
# I like the fact that global state can be shared amongst all tasks in the same 
container. That seems like a worthwhile optimisation. This leads me to think 
that there are two kinds of state stores: (a) task-local stores, whose 
changelog stream is only read during container startup, and which thereafter 
are read-writable by one particular task instance; and (b) global 
(container-local) stores, which continually consume the changelog stream, which 
are readable by all tasks in the container, but which are not writable through 
the key-value interface (only asynchronously writable by sending a message to 
the changelog stream).
# Regarding "shared state store": I'm not keen on using a partition per 
container, because containers are supposed to be purely a unit of computational 
resource, and not have any semantic importance for a job. 
Partition-per-container would violate that principle, and would make it harder 
to change the number of containers used by a job. (Also, restoring from all 
partitions would require some kind of cross-partition ordering.)

In summary, I would vote for the following:

* Allow assigning the same SSP to multiple tasks by removing the check.
* Keep the restriction that each container consumes each SSP at one position, 
and change MessageChooser to enforce that.
* Add configuration for global state stores, which are read-only, continually 
consume the changelog, and are shared between all tasks in a container.

Looking at it this way, the global state and the multi-subscriber partitions 
seem somewhat orthogonal. We could, for example, do only global state stores on 
this ticket, and multi-subscriber partitions separately on another ticket.


> Support assigning the same SSP to multiple tasknames
> ----------------------------------------------------
>
>                 Key: SAMZA-353
>                 URL: https://issues.apache.org/jira/browse/SAMZA-353
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Jakob Homan
>         Attachments: DESIGN-SAMZA-353-0.md, DESIGN-SAMZA-353-0.pdf
>
>
> Post SAMZA-123, it is possible to add the same SSP to multiple tasknames, 
> although currently we check for this and error out if this is done.  We 
> should think through the implications of having the same SSP appear in 
> multiple tasknames and support this if it makes sense.  
> This could be used as a broadcast stream that's either added by Samza itself 
> to each taskname, or individual groupers could do this as makes sense.  Right 
> now the container maintains a map of SSP to TaskInstance and delivers the ssp 
> to that task instance.  With this change, we'd need to change the map to SSP 
> to Set[TaskInstance] and deliver the message to each TI in the set.



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to