Github user revans2 commented on the issue:
https://github.com/apache/storm/pull/1693
I have looked through the proposal, and I have a number of concerns about
the API. I mostly want to be sure that we can consistent terminology about
what different operations are intended to do. groupby is the big one for me,
because it seems to be a routing function and not an actual groupby, but join
also seemed a little off.
Why do we need so many lines of code to do a repartition/groupby in the
word count example?
```
builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
/*
* split the sentences to words
*/
.flatMap(s -> Arrays.asList(s.split(" ")))
/*
* create a stream of (word, 1) pairs
*/
.mapToPair(w -> new Pair<>(w, 1))
/*
* group by word so that the same words end up in the same partition
*/
.groupByKey()
/*
* streams parallelism changed to 3 and applies to downstream
operations
*/
.repartition(3)
/*
* a ten seconds tumbling window
*/
.window(TumblingWindows.of(Duration.seconds(10)))
/*
* compute the word counts in the last two second window
*/
.aggregateByKey(new Count<>())
/*
* the words that occurred at-least five times in the last two seconds
*/
.filter(x -> x.getSecond() >= 5)
/*
* print the results (word, count) to stdout
*/
.print();
```
it feels like we could accomplish the entire thing much simpler (look at
flink or spark streaming, even BEAM (although it is rather complex ultimetly
the final code does the aggregation more cleanly).
```
builder.newStream(new RandomSentenceSpout(), new ValueMapper<String>(0), 2)
.flatMap(s -> Arrays.asList(s.split(" ")))
.window(TumblingWindows.of(Duration.seconds(10)))
// short hand for .aggregateBy(1 /*index to use in grouping*/, new
Count<>(), 3 /*new number of partitions although we probably want to internally
actually have many more partitions but only 3 executors to start out with*/)
.countBy(1, 3)
.filter(x -> x.getSecond() >= 5)
.print();
```
But I am most concerned about expanding this API in the future. The stated
plan is to start out with defining the API and implementing a back end that
supports at most one and at least once processing and then move on to exactly
once processing. I really would like to have a plan in place on how we expect
to do exactly once processing before we go too deeply into the API. Even if we
do mark the API as unstable. I am really concerned about how state plays into
this (inputs, outputs, windowing, and actual state). We currently have two
different types of spouts. We have one type for at least once/at most once and
another for exactly once. The exactly once use case scares me a bit because if
we want to support BEAM they have a very specific model on exactly once. All
operations are idempotent and replay includes a complete rollback to a known
consistent checkpoint. The trident spout does not do this (no ordering
guarantees). The other spouts also do not do this. My concern
is that the APIs for a lot of different user facing things may need to change
for exactly once, and I would like to have a plan in place on how we want to
support it and have thought out how that might impact the APIs before we
release this to the public.
This also goes for the windows. Having more robust windowing may impact
some of these APIs. Have we thought that through?
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---