[
https://issues.apache.org/jira/browse/SAMZA-353?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14080315#comment-14080315
]
Chris Riccomini commented on SAMZA-353:
---------------------------------------
I've been thinking about this a little bit, and poking around the code base.
Two potential issues with this:
# If a single message can be sent to multiple TaskInstances, then they each
will have their own offset for the message's SSP. These offsets will currently
be checkpointed independently of one another, which means that you could end up
with a single SystemConsumers being registered for the same SSP with multiple
different offsets.
# The MessageChooser.choose's returned envelope has a 1:1 mapping with a single
TaskInstance. There is currently no granularity that would allow a
MessageChooser to say "process this broadcast message for TaskInstance 7, but
not TaskInstance 4" in the container.
I'm not concerned about (2) because I can't come up with a really good reason
to support it, and if we wanted to, we could change the MessageChooser API
fairly easily.
(1) is a bit of a problem, though. One solution could be to iterate over all
SSP:offset mappings that we're going to register with the SystemConsumers, and
always register with only the smallest offset. If TaskInstance-1 had offset 7
for BroadcastStream1-partition-0 and TaskInstance-2 had offset 4 for
BroadcastStream1-partition-0, then the container would only register the
SystemConsumers with offset 4. This would mean that some TaskInstances would
experience duplication while others would not. This would be OK if we didn't
want to guarantee exactly-once messaging in the future.
Even transactionality within a container won't help us with this approach.
Consider the case where TaskInstance-1 in our previous example was running in a
separate container from TaskInstance-2, prior to a job restart. Both could
transactionally commit their checkpoints to guarantee everyone in the container
has the same offset, but transactions won't help in cases where the commit has
to be coordinated across containers.
A work-around to remove message duplication in this scenario could be to have
the container filter out messages that are about to be processed by a
TaskInstance that's already seen them. In my original example, the container
knows that TaskInstance-1 has seen offset 7, so it could simply filter out any
messages from BroadcastStream1-partition-0 for TaskInstance-1 where the offset
is <= 7.
I *think* that this should work, but it seems like a lot of special-case logic.
I wonder if there's a more elegant solution. In addition, this approach would
require us to dictate a sort ordering to offsets, which we haven't committed to
so far (even though they are sorted in Kafka).
> Support assigning the same SSP to multiple tasknames
> ----------------------------------------------------
>
> Key: SAMZA-353
> URL: https://issues.apache.org/jira/browse/SAMZA-353
> Project: Samza
> Issue Type: Bug
> Components: container
> Affects Versions: 0.8.0
> Reporter: Jakob Homan
>
> Post SAMZA-123, it is possible to add the same SSP to multiple tasknames,
> although currently we check for this and error out if this is done. We
> should think through the implications of having the same SSP appear in
> multiple tasknames and support this if it makes sense.
> This could be used as a broadcast stream that's either added by Samza itself
> to each taskname, or individual groupers could do this as makes sense. Right
> now the container maintains a map of SSP to TaskInstance and delivers the ssp
> to that task instance. With this change, we'd need to change the map to SSP
> to Set[TaskInstance] and deliver the message to each TI in the set.
--
This message was sent by Atlassian JIRA
(v6.2#6252)