[
https://issues.apache.org/jira/browse/SAMZA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14118627#comment-14118627
]
Chris Riccomini commented on SAMZA-353:
---------------------------------------
bq. With global (unpartitioned) state, I think the container should definitely
continue consuming the stream after the initial bootstrapping. Some jobs may
have global state that changes continuously, or have periodic (e.g. hourly)
state updates from an external process. It would be very disruptive if you had
to keep restarting the job in order to pick up those state changes, or if you
had to wait for some TTL to expire before the state was refreshed.
I agree. I would actually take this one step further, and consider how we might
implement atomic swaps of shared state within a container (note: oops, I see
that you mention this down below). We have an existing pattern where a DB index
is built offline in Hadoop, pushed to the live site, and then atomically
swapped to enable reads for it. This is the case with Voldemort's read-only
stores. Even though the existing proposals come close with shared state,
there's still no easy way to do an atomic swap. The nearest that I can think of
is to somehow have two stores, and some control message that determines which
one should be read off of at any given point. This seems ugly, though.
Related to this is the problem where you're generating a data set in Hadoop,
and pushing through Kafka to your Samza job's shared state store. If you
generate a feed that has a value for key A, and then future generations of the
feed don't contain key A, you'll still have the value for key A forever, since
it's sitting in a log-compacted Kafka topic. The only way around this, that I
can see, is to have the Hadoop job intersect its keys from its latest run with
the keys from the prior run, and send a key A message with a null value (a
delete) to the Kafka topic. This seems ugly as well.
bq. Would a "global state" store be read-only, and only be updated by messages
on the stream? If so, a job could still write to it by sending a message to the
changelog stream, but the write would only take effect after a round-trip
through the message broker. I think that's fine.
I had been thinking that it'd be read-write in my latest proposal, but I
actually think that proposal is broken. In it, I propose having each container
have a partition for the shared state store, and then having all containers
except the partition's owner read from it. The problem with this approach is
that it totally breaks partitioning by key, which every state store needs. The
reason that it breaks this is because if container 0 sends a write for key A to
its partition, and container 1 does the same, then key A will exist in two
partitions. Which one should be read after the other? There's no ordering, and
reading between partitions could be non-deterministic.
To fix this, you have to basically arrive at the approach that you propose, I
think: a read-only store, where the writes are handled by sending messages to a
stream. The trade-off with this approach is that it makes writes asynchronous,
which is a little weird. I haven't fully considered the problems that this
might cause.
bq. If the primary interface to the global state is a key-value interface
(rather than a stream that is consumed), we may still want some way for a
StreamTask to be notified when something changes in the state. For example, if
the StreamTask needs to maintain some internal state that is derived from the
global state, it would be good if every state change was also sent to the
process() method.
Yea, this seems like it'd be useful. I think it also might fix some of the risk
of having an asynchronous write to the KV store (see above), since you could be
notified when your write comes through.
bq. An example of a use case that would benefit from a "global stream" model:
Take the use case of a daily Hadoop job generating some global state, pushing
it out through a stream, and the stream task performing a map-side join against
it. While the global state is in the middle of being updated, the stream task
will see some values from the previous day's state, and some values from the
current day. In applications where such inconsistency is unacceptable, it would
be better if each day had a separate copy of the state store, and the stream
task did an atomic switchover to the new day's state when the push is complete.
In a "global stream" model, a stream task could implement this atomic
switchover itself, but in a "global state" model, the switchover would have to
be part of the framework. And since it's a somewhat application-specific use
case, we probably wouldn't want to make it part of the framework.
Yes, this use case is problematic. Even if we have "global streams", I think
the actual StreamTask implementation is pretty ugly. You'd have to have either
a control message topic, or a timeout that defines the edge of a store push.
bq. I think that global state should use the same message broker communication
mechanism as we use for all other streams. I think it would be short-sighted to
introduce a new communication mechanism (e.g. RPC to AM, or querying a remote
database) for the purpose of global state. There's nothing preventing jobs from
making RPC/database queries in application code if they want to, but I believe
that framework-managed state should be a stream (see also comment above on
continuing to consume a stream after initial bootstrapping).
I agree.
bq. Multi-subscriber partitions: I think the use case you give in the design
doc (hot standby) is a good and valid one. I see great potential for Samza as a
framework for maintaining materialised views of data in streams, and making it
queryable by external processes (cf. SAMZA-316). However, I have been worried
about the lack of high availability in such an architecture (after container
failure, its partitions become unavailable for a while as they are
re-bootstrapped). Thus it would be valuable to have the option of implementing
hot standby (even with the caveat that you need to ensure that replicas of the
same partition treat the input messages deterministically).
I agree hot standby seems desirable in the long run. I do question this
implementation style, though. It seems more elegant to simply implement a hot
standby by having it consume from the changelog and checkpoint topics for all
StreamTasks in a container. In such a case, you don't need to have
multi-subscriber streams. I really struggled to come up with any use case where
multi-subscriber streams would be desirable.
bq. Regarding ordered offsets: I would prefer a solution which doesn't require
offsets to be totally ordered. For example, I was thinking about implementing a
MySQL system consumer (SAMZA-200) in a way that didn't have totally ordered
offsets. I thought about using the MySQL GTID as offset, which consists of two
parts: a UUID and a transaction number. The transaction number is sequential,
but every time the master changes (failover), the transaction number sequence
is reset and restarted with a new UUID. This means that in order to establish a
total ordering of GTIDs, you need a lookup table on the side, telling you which
UUID corresponds to which period of mastership; although there is a total
ordering of transactions, it is not apparent from the GTIDs themselves.
That's really good to know. I don't think we'd had any non-ordered offset
examples yet, so this is really helpful. I'd prefer more general, rather than
more restrictive, all things being equal.
bq. Regarding the problem of SamzaContainer consuming the same stream at
different offsets: as a suggestion, we could specify that if several tasks
within the same container consume the same SSP, they will always consume at the
same position. That may still require changing MessageChooser, but would
probably be less intrusive than consuming at multiple offsets or requiring
totally ordered offsets. For example, multi-subscriber partitions may be
special-cased in MessageChooser (if one task instance chooses to process a
message from a multi-subscriber partition, then all tasks which are assigned
that SSP within that container must also process the message).
I thought this initially, too. The problem that I see is that it's actually
impossible to enforce this. There are two cases where your StreamTasks can
diverge within a single container:
# Checkpoints are per-StreamTask, so a failure could occur half-way through
checkpointing all StreamTasks. Some will be left with the old SSP offset, while
some with the new.
# The container count could change, which would involve moving StreamTasks from
one container to another. You could end up with some StreamTasks on one offset
while others are on a separate one, since we are only talking about
container-level lock-step offsets.
This line of thinking is what led me to the ordered offset approach. With
ordering, we can have a single offset (the oldest one), and just strip
process() calls for StreamTasks that were ahead.
bq. I like the fact that global state can be shared amongst all tasks in the
same container. That seems like a worthwhile optimisation. This leads me to
think that there are two kinds of state stores: (a) task-local stores, whose
changelog stream is only read during container startup, and which thereafter
are read-writable by one particular task instance; and (b) global
(container-local) stores, which continually consume the changelog stream, which
are readable by all tasks in the container, but which are not writable through
the key-value interface (only asynchronously writable by sending a message to
the changelog stream).
Agreed. This seems pretty reasonable.
bq. Regarding "shared state store": I'm not keen on using a partition per
container, because containers are supposed to be purely a unit of computational
resource, and not have any semantic importance for a job.
Partition-per-container would violate that principle, and would make it harder
to change the number of containers used by a job. (Also, restoring from all
partitions would require some kind of cross-partition ordering.)
You're correct. My most recently proposed approach seems broken in this regard.
I think your proposed read-only store with asynch writes is the best we can do.
bq. Allow assigning the same SSP to multiple tasks by removing the check.
I'd rather leave this feature out for now, as it seems almost totally
orthogonal to global state at this point (as you say at the end of your
comment).
bq. Add configuration for global state stores, which are read-only, continually
consume the changelog, and are shared between all tasks in a container.
Yes, this seems to be what I'm arriving at as well.
bq. Looking at it this way, the global state and the multi-subscriber
partitions seem somewhat orthogonal. We could, for example, do only global
state stores on this ticket, and multi-subscriber partitions separately on
another ticket.
I agree. I was actually thinking that I'd open up a separate global state
ticket, and leave this as the multi-subscriber ticket. I'm mostly interested in
tackling global state right now, not multi-subscriber streams.
The two sticking points to think through seem to be:
* How to implement atomic swap.
* Are asynchronous writes a problem in shared state stores.
* Should we implement callbacks for state change on the shared stores? This
seems hacky.
> Support assigning the same SSP to multiple tasknames
> ----------------------------------------------------
>
> Key: SAMZA-353
> URL: https://issues.apache.org/jira/browse/SAMZA-353
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.8.0
> Reporter: Jakob Homan
> Attachments: DESIGN-SAMZA-353-0.md, DESIGN-SAMZA-353-0.pdf
>
>
> Post SAMZA-123, it is possible to add the same SSP to multiple tasknames,
> although currently we check for this and error out if this is done. We
> should think through the implications of having the same SSP appear in
> multiple tasknames and support this if it makes sense.
> This could be used as a broadcast stream that's either added by Samza itself
> to each taskname, or individual groupers could do this as makes sense. Right
> now the container maintains a map of SSP to TaskInstance and delivers the ssp
> to that task instance. With this change, we'd need to change the map to SSP
> to Set[TaskInstance] and deliver the message to each TI in the set.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)