[ 
https://issues.apache.org/jira/browse/SAMZA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14118627#comment-14118627
 ] 

Chris Riccomini commented on SAMZA-353:
---------------------------------------

bq. With global (unpartitioned) state, I think the container should definitely 
continue consuming the stream after the initial bootstrapping. Some jobs may 
have global state that changes continuously, or have periodic (e.g. hourly) 
state updates from an external process. It would be very disruptive if you had 
to keep restarting the job in order to pick up those state changes, or if you 
had to wait for some TTL to expire before the state was refreshed.

I agree. I would actually take this one step further, and consider how we might 
implement atomic swaps of shared state within a container (note: oops, I see 
that you mention this down below). We have an existing pattern where a DB index 
is built offline in Hadoop, pushed to the live site, and then atomically 
swapped to enable reads for it. This is the case with Voldemort's read-only 
stores. Even though the existing proposals come close with shared state, 
there's still no easy way to do an atomic swap. The nearest that I can think of 
is to somehow have two stores, and some control message that determines which 
one should be read off of at any given point. This seems ugly, though.

Related to this is the problem where you're generating a data set in Hadoop, 
and pushing through Kafka to your Samza job's shared state store. If you 
generate a feed that has a value for key A, and then future generations of the 
feed don't contain key A, you'll still have the value for key A forever, since 
it's sitting in a log-compacted Kafka topic. The only way around this, that I 
can see, is to have the Hadoop job intersect its keys from its latest run with 
the keys from the prior run, and send a key A message with a null value (a 
delete) to the Kafka topic. This seems ugly as well.

bq. Would a "global state" store be read-only, and only be updated by messages 
on the stream? If so, a job could still write to it by sending a message to the 
changelog stream, but the write would only take effect after a round-trip 
through the message broker. I think that's fine.

I had been thinking that it'd be read-write in my latest proposal, but I 
actually think that proposal is broken. In it, I propose having each container 
have a partition for the shared state store, and then having all containers 
except the partition's owner read from it. The problem with this approach is 
that it totally breaks partitioning by key, which every state store needs. The 
reason that it breaks this is because if container 0 sends a write for key A to 
its partition, and container 1 does the same, then key A will exist in two 
partitions. Which one should be read after the other? There's no ordering, and 
reading between partitions could be non-deterministic.

To fix this, you have to basically arrive at the approach that you propose, I 
think: a read-only store, where the writes are handled by sending messages to a 
stream. The trade-off with this approach is that it makes writes asynchronous, 
which is a little weird. I haven't fully considered the problems that this 
might cause.

bq. If the primary interface to the global state is a key-value interface 
(rather than a stream that is consumed), we may still want some way for a 
StreamTask to be notified when something changes in the state. For example, if 
the StreamTask needs to maintain some internal state that is derived from the 
global state, it would be good if every state change was also sent to the 
process() method.

Yea, this seems like it'd be useful. I think it also might fix some of the risk 
of having an asynchronous write to the KV store (see above), since you could be 
notified when your write comes through.

bq. An example of a use case that would benefit from a "global stream" model: 
Take the use case of a daily Hadoop job generating some global state, pushing 
it out through a stream, and the stream task performing a map-side join against 
it. While the global state is in the middle of being updated, the stream task 
will see some values from the previous day's state, and some values from the 
current day. In applications where such inconsistency is unacceptable, it would 
be better if each day had a separate copy of the state store, and the stream 
task did an atomic switchover to the new day's state when the push is complete. 
In a "global stream" model, a stream task could implement this atomic 
switchover itself, but in a "global state" model, the switchover would have to 
be part of the framework. And since it's a somewhat application-specific use 
case, we probably wouldn't want to make it part of the framework.

Yes, this use case is problematic. Even if we have "global streams", I think 
the actual StreamTask implementation is pretty ugly. You'd have to have either 
a control message topic, or a timeout that defines the edge of a store push.

bq. I think that global state should use the same message broker communication 
mechanism as we use for all other streams. I think it would be short-sighted to 
introduce a new communication mechanism (e.g. RPC to AM, or querying a remote 
database) for the purpose of global state. There's nothing preventing jobs from 
making RPC/database queries in application code if they want to, but I believe 
that framework-managed state should be a stream (see also comment above on 
continuing to consume a stream after initial bootstrapping).

I agree.

bq. Multi-subscriber partitions: I think the use case you give in the design 
doc (hot standby) is a good and valid one. I see great potential for Samza as a 
framework for maintaining materialised views of data in streams, and making it 
queryable by external processes (cf. SAMZA-316). However, I have been worried 
about the lack of high availability in such an architecture (after container 
failure, its partitions become unavailable for a while as they are 
re-bootstrapped). Thus it would be valuable to have the option of implementing 
hot standby (even with the caveat that you need to ensure that replicas of the 
same partition treat the input messages deterministically).

I agree hot standby seems desirable in the long run. I do question this 
implementation style, though. It seems more elegant to simply implement a hot 
standby by having it consume from the changelog and checkpoint topics for all 
StreamTasks in a container. In such a case, you don't need to have 
multi-subscriber streams. I really struggled to come up with any use case where 
multi-subscriber streams would be desirable.

bq. Regarding ordered offsets: I would prefer a solution which doesn't require 
offsets to be totally ordered. For example, I was thinking about implementing a 
MySQL system consumer (SAMZA-200) in a way that didn't have totally ordered 
offsets. I thought about using the MySQL GTID as offset, which consists of two 
parts: a UUID and a transaction number. The transaction number is sequential, 
but every time the master changes (failover), the transaction number sequence 
is reset and restarted with a new UUID. This means that in order to establish a 
total ordering of GTIDs, you need a lookup table on the side, telling you which 
UUID corresponds to which period of mastership; although there is a total 
ordering of transactions, it is not apparent from the GTIDs themselves.

That's really good to know. I don't think we'd had any non-ordered offset 
examples yet, so this is really helpful. I'd prefer more general, rather than 
more restrictive, all things being equal.

bq. Regarding the problem of SamzaContainer consuming the same stream at 
different offsets: as a suggestion, we could specify that if several tasks 
within the same container consume the same SSP, they will always consume at the 
same position. That may still require changing MessageChooser, but would 
probably be less intrusive than consuming at multiple offsets or requiring 
totally ordered offsets. For example, multi-subscriber partitions may be 
special-cased in MessageChooser (if one task instance chooses to process a 
message from a multi-subscriber partition, then all tasks which are assigned 
that SSP within that container must also process the message).

I thought this initially, too. The problem that I see is that it's actually 
impossible to enforce this. There are two cases where your StreamTasks can 
diverge within a single container:

# Checkpoints are per-StreamTask, so a failure could occur half-way through 
checkpointing all StreamTasks. Some will be left with the old SSP offset, while 
some with the new.
# The container count could change, which would involve moving StreamTasks from 
one container to another. You could end up with some StreamTasks on one offset 
while others are on a separate one, since we are only talking about 
container-level lock-step offsets.

This line of thinking is what led me to the ordered offset approach. With 
ordering, we can have a single offset (the oldest one), and just strip 
process() calls for StreamTasks that were ahead.

bq. I like the fact that global state can be shared amongst all tasks in the 
same container. That seems like a worthwhile optimisation. This leads me to 
think that there are two kinds of state stores: (a) task-local stores, whose 
changelog stream is only read during container startup, and which thereafter 
are read-writable by one particular task instance; and (b) global 
(container-local) stores, which continually consume the changelog stream, which 
are readable by all tasks in the container, but which are not writable through 
the key-value interface (only asynchronously writable by sending a message to 
the changelog stream).

Agreed. This seems pretty reasonable.

bq. Regarding "shared state store": I'm not keen on using a partition per 
container, because containers are supposed to be purely a unit of computational 
resource, and not have any semantic importance for a job. 
Partition-per-container would violate that principle, and would make it harder 
to change the number of containers used by a job. (Also, restoring from all 
partitions would require some kind of cross-partition ordering.)

You're correct. My most recently proposed approach seems broken in this regard. 
I think your proposed read-only store with asynch writes is the best we can do.

bq. Allow assigning the same SSP to multiple tasks by removing the check.

I'd rather leave this feature out for now, as it seems almost totally 
orthogonal to global state at this point (as you say at the end of your 
comment).

bq. Add configuration for global state stores, which are read-only, continually 
consume the changelog, and are shared between all tasks in a container.

Yes, this seems to be what I'm arriving at as well.

bq. Looking at it this way, the global state and the multi-subscriber 
partitions seem somewhat orthogonal. We could, for example, do only global 
state stores on this ticket, and multi-subscriber partitions separately on 
another ticket.

I agree. I was actually thinking that I'd open up a separate global state 
ticket, and leave this as the multi-subscriber ticket. I'm mostly interested in 
tackling global state right now, not multi-subscriber streams.

The two sticking points to think through seem to be:

* How to implement atomic swap.
* Are asynchronous writes a problem in shared state stores.
* Should we implement callbacks for state change on the shared stores? This 
seems hacky.


> Support assigning the same SSP to multiple tasknames
> ----------------------------------------------------
>
>                 Key: SAMZA-353
>                 URL: https://issues.apache.org/jira/browse/SAMZA-353
>             Project: Samza
>          Issue Type: Bug
>          Components: container
>    Affects Versions: 0.8.0
>            Reporter: Jakob Homan
>         Attachments: DESIGN-SAMZA-353-0.md, DESIGN-SAMZA-353-0.pdf
>
>
> Post SAMZA-123, it is possible to add the same SSP to multiple tasknames, 
> although currently we check for this and error out if this is done.  We 
> should think through the implications of having the same SSP appear in 
> multiple tasknames and support this if it makes sense.  
> This could be used as a broadcast stream that's either added by Samza itself 
> to each taskname, or individual groupers could do this as makes sense.  Right 
> now the container maintains a map of SSP to TaskInstance and delivers the ssp 
> to that task instance.  With this change, we'd need to change the map to SSP 
> to Set[TaskInstance] and deliver the message to each TI in the set.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to