[
https://issues.apache.org/jira/browse/SAMZA-1202?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jagadish updated SAMZA-1202:
----------------------------
Description:
Consider the following code-snippet that invokes graph.getInputStream multiple
times with the same streamId (but with different Message Builders).
{code}
BiFunction<String, String, String> msgBuilder1 = (k, v) -> v;
BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new Integer(v);
graph.getInputStream("page-views", msgBuilder2);
MessageStream<String> pageViews1 = graph.getInputStream("page-views",
msgBuilder1);
pageViews1.map(..)
.filter(..)
.window(..)
.sink(..)
{code}
TL;DR:
The above snippet results in non-deterministic behavior (Messages may not even
be passed in to the operator chain.) depending on the iteration order of Java
hashmaps.
Here is the exact sequence:
1. User creates two `MessageStream`s by multiple calls to
`graph.getInputStream()` with the same streamId but different MessageBuilders.
2. This invokes StreamGraphImpl#getInputStream, which maintains book-keeping by
`StreamSpec`. (It obtains a StreamSpec instance from the streamId)
3. Depending on the iteration order in which
`streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns, we
could end up with a different representation of the DAG (with the latest
streamSpec clobbering it's previous one).
was:
Consider the following code-snippet that invokes graph.getInputStream multiple
times with the same streamId (but with different Message Builders).
{code}
BiFunction<String, String, String> msgBuilder1 = (k, v) -> v;
BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new Integer(v);
graph.getInputStream("page-views", msgBuilder2);
MessageStream<String> pageViews1 = graph.getInputStream("page-views",
msgBuilder1);
pageViews1.map(..)
.filter(..)
.window(..)
.sink(..)
{code}
TL;DR:
The above snippet results in non-deterministic behavior (Messages may not even
be passed in to the operator chain.) depending on the iteration order of Java
hashmaps.
> Multiple calls to `graph.getInputStream()` with the same streamId results in
> non-deterministic behavior
> -------------------------------------------------------------------------------------------------------
>
> Key: SAMZA-1202
> URL: https://issues.apache.org/jira/browse/SAMZA-1202
> Project: Samza
> Issue Type: Bug
> Reporter: Jagadish
>
> Consider the following code-snippet that invokes graph.getInputStream
> multiple times with the same streamId (but with different Message Builders).
> {code}
> BiFunction<String, String, String> msgBuilder1 = (k, v) -> v;
> BiFunction<String, String, Integer> msgBuilder2 = (k, v) -> new
> Integer(v);
> graph.getInputStream("page-views", msgBuilder2);
> MessageStream<String> pageViews1 = graph.getInputStream("page-views",
> msgBuilder1);
> pageViews1.map(..)
> .filter(..)
> .window(..)
> .sink(..)
> {code}
> TL;DR:
> The above snippet results in non-deterministic behavior (Messages may not
> even be passed in to the operator chain.) depending on the iteration order of
> Java hashmaps.
> Here is the exact sequence:
> 1. User creates two `MessageStream`s by multiple calls to
> `graph.getInputStream()` with the same streamId but different MessageBuilders.
> 2. This invokes StreamGraphImpl#getInputStream, which maintains book-keeping
> by `StreamSpec`. (It obtains a StreamSpec instance from the streamId)
> 3. Depending on the iteration order in which
> `streamGraph.getInputStreams().forEach((streamSpec, inputStream)` returns, we
> could end up with a different representation of the DAG (with the latest
> streamSpec clobbering it's previous one).
--
This message was sent by Atlassian JIRA
(v6.3.15#6346)