GitHub user vjagadish1989 opened a pull request:
https://github.com/apache/samza/pull/136
SAMZA-1202: Multiple calls to getInputStream() result in non-deterministic
behavior
Here's the sequence:
1. User creates two `MessageStream`s by multiple calls to
`graph.getInputStream()` with the same streamId but different MessageBuilders.
2. This invokes StreamGraphImpl#getInputStream, which obtains a StreamSpec
instance for the streamId and adds it to its map.
3. During the wire-up of the physical DAG in `StreamOperatorTask`,
streamGraph.getInputStreams().forEach((streamSpec, inputStream)` is invoked.
4. Depending on the iteration order in which
`streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns its
results, we could end up with a different representation of the DAG (with the
latest streamSpec clobbering it's previous one).
There are 2 approaches to solve this:
Approach 1:
Add additional validation to prevent this scenario from happening. We will
validate multiple calls made to `graph.getInputStream` with the same streamId,
and throw an IllegalArgumentException.
Approach 2:
Maintain a `MultiMap` instead of a `HashMap` so that the latest
`StreamSpec` does not clobber the earlier one.
This implements Approach 1.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/vjagadish1989/samza samza-1202
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/samza/pull/136.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #136
----
commit eaebd42105378e8880ecb392dcab249e4e1a1312
Author: vjagadish1989 <[email protected]>
Date: 2017-04-24T16:30:04Z
SAMZA-1202: Multiple calls to getInputStream() result in non-deterministic
behavior
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---