[
https://issues.apache.org/jira/browse/SAMZA-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15981470#comment-15981470
]
ASF GitHub Bot commented on SAMZA-1202:
---------------------------------------
GitHub user vjagadish1989 opened a pull request:
https://github.com/apache/samza/pull/136
SAMZA-1202: Multiple calls to getInputStream() result in non-deterministic
behavior
Here's the sequence:
1. User creates two `MessageStream`s by multiple calls to
`graph.getInputStream()` with the same streamId but different MessageBuilders.
2. This invokes StreamGraphImpl#getInputStream, which obtains a StreamSpec
instance for the streamId and adds it to its map.
3. During the wire-up of the physical DAG in `StreamOperatorTask`,
streamGraph.getInputStreams().forEach((streamSpec, inputStream)` is invoked.
4. Depending on the iteration order in which
`streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns its
results, we could end up with a different representation of the DAG (with the
latest streamSpec clobbering it's previous one).
There are 2 approaches to solve this:
Approach 1:
Add additional validation to prevent this scenario from happening. We will
validate multiple calls made to `graph.getInputStream` with the same streamId,
and throw an IllegalArgumentException.
Approach 2:
Maintain a `MultiMap` instead of a `HashMap` so that the latest
`StreamSpec` does not clobber the earlier one.
This implements Approach 1.
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/vjagadish1989/samza samza-1202
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/samza/pull/136.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #136
----
commit eaebd42105378e8880ecb392dcab249e4e1a1312
Author: vjagadish1989 <[email protected]>
Date: 2017-04-24T16:30:04Z
SAMZA-1202: Multiple calls to getInputStream() result in non-deterministic
behavior
----
> Multiple calls to `graph.getInputStream()` with the same streamId results in
> non-deterministic behavior
> -------------------------------------------------------------------------------------------------------
>
> Key: SAMZA-1202
> URL: https://issues.apache.org/jira/browse/SAMZA-1202
> Project: Samza
> Issue Type: Bug
> Reporter: Jagadish
> Assignee: Jagadish
> Fix For: 0.13.0
>
>
> Consider the following code-snippet that invokes graph.getInputStream
> multiple times with the same streamId (but with different Message Builders).
> {code}
> BiFunction<String, String, String> msgBuilder1 = (k, v) -> v;
> BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new
> Integer(v);
> graph.getInputStream("page-views", msgBuilder2);
> MessageStream<String> pageViews1 = graph.getInputStream("page-views",
> msgBuilder1);
> pageViews1.map(..)
> .filter(..)
> .window(..)
> .sink(..)
> {code}
> *Here is the exact sequence:*
> 1. User creates two `MessageStream`s by multiple calls to
> `graph.getInputStream()` with the same streamId but different MessageBuilders.
> 2. This invokes StreamGraphImpl#getInputStream, which obtains a StreamSpec
> instance for the streamId and adds it to its map.
> 3. During the wire-up of the physical DAG in `StreamOperatorTask`,
> streamGraph.getInputStreams().forEach((streamSpec, inputStream)` is invoked.
> 4. Depending on the iteration order in which
> `streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns its
> results, we could end up with a different representation of the DAG (with the
> latest streamSpec clobbering it's previous one).
> There are 2 approaches to solve this:
> *Approach 1:*
> Add additional validation to prevent this scenario from happening. We will
> validate multiple calls made to `graph.getInputStream` with the same
> streamId, and throw an IllegalArgumentException.
> *Approach 2:*
> Maintain a `MultiMap` instead of a `HashMap` so that the latest `StreamSpec`
> does not clobber the earlier one.
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)