[ 
https://issues.apache.org/jira/browse/STORM-1961?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15517391#comment-15517391
 ] 

Arun Mahadevan commented on STORM-1961:
---------------------------------------

> Would be good to do away with StreamBuilder and write it more directly and 
> concisely :
> Stream<String> x = new Stream(...).flatmap().blah()

StreamBuilder tracks the overall graph of processors (the topology of 
processors). Stream abstracts the stream of values flowing through the pipeline 
(and the operation to be performed on the values). Its better to keep them 
separate.

for e.g consider a branching operation,

Stream<String> logs = streambuilder.newStream(...);
logs.filter(isError).forEach(savetoErrorFile)
logs.filter(!isError).forEach(doFurtherProcessing)
submitTopology(..., streambuilder.build());

if you are directly operating on streams, its difficult to translate it to a 
single storm topology without the stream builder.

> Can we avoid the build() call there
> StormSubmitter.submitTopologyWithProgressBar("test", new Config(), 
> builder.build());
> and simplify it to
> StormSubmitter.submitTopologyWithProgressBar("test", new Config(), stream );
> and have the build() or whatever else needs to happen, get invoked internally 
> within submitTopology ?

Like in the example above, there are two streams we are evaluating as a part of 
the topology. So its easier to have the StreamBuilder abstraction that can 
build a single topology out of a graph of processors. 

> Good to have overloaded version taking arrays in flatMap(T[] ) and elsewhere 
> to natively support arrays... so that conversion via Arrays.asList is not 
> needed.

flatMap takes a function that takes a value of type T and returns an 
Iterable<R> (not an array). Due to the way overload resolution works with 
lambdas, its not possible to overload flatMap() to take a function that returns 
an array. We could have a separate function like flatMapArray(...) to deal with 
this, but that is also not very elegant and would confuse the users.

>The doc Needs to have a more concrete definition for Stream concept.

Yes we need to add more documentation, README etc. You 've asked good 
questions, I will try to answer them and later consolidate to a Readme.

> Is it just the data stream produced by the first operator/spout ? or is it 
> the whole pipeline of operators ?

Stream could be thought of as stream of messages (values) flowing through a 
pipeline. It starts from spout, gets transformed along the way and possibly 
ends up in a sink.

E.g
Stream<String> sentences = .... // a stream of sentences coming from a spout
Stream<String> words = sentences.flatMap(...); // a stream of words 
words.to(hbaseBolt); // the stream of words getting saved in hbase 

> is it different from what we call 'topology' in storm ?

Yes it a stream of values. the overall operations that you apply on the stream 
like stream.map.flatmap.print() etc finally gets translated to a storm topology.

> When you say Stream<T> .. what is T ? Is the type of value produced by the 
> terminal operator ? or that of the first ?

T is the type of values flowing across. (if you look at it from implementation 
point of view, its the type of the value received by that particular processor).

> What if there is a branch/split and each terminal operator creates different 
> types ?

With the branch(..) api, the result type is same as input type. The values are 
just forwarded to one of the branches.
But one can always do
Stream<T> s1 = ...
Stream<R1> s2 = s1.map(T to R1)
Stream<R2> s3 = s1.map(T to R2)
and so on..
so s2 will be a stream of values of type R1 and s3 will be a stream of values 
of type R2. If you then apply some terminal operation on s2 and s3, you will 
get the respective types in the respective terminal operation.

> Can a stream pick up data from two different sources ? for example from kafka 
> and hdfs.

You could have a kafka stream and hdfs stream and possibly join them.
E.g.
Stream<String> kafkaStream = streamBuilder.newStream(new KafkaSpout(), ...);
Stream<String> hdfsStream = streamBuilder.newStream(new HdfsSpout(), ...);

kafkaStream.mapToPair(..).join(hdfsStream.mapToPair(...)).print();

StormSubmitter.submitTopology(..., streambuilder.build());

> The diagram in the doc shows fields and shuffle groupings. Not clear from the 
> examples as to how the various gropings will be supported in the API.
> Would like to see API examples in doc as how to the grouping and parallelism 
> hints will be expressed in code.

grouping is not exposed as a separate api to keep it simple for now. But it 
happens behind the scenes when user does a groupByKey (fields grouping on the 
key), repartition (shuffle grouping), Stream.aggregate (global grouping) and so 
on.

> Would this API provide a mechanism to use the existing set of Storm spouts 
> and terminal bolts (like KafkaSpout, HdfsSpout, HbaseBolt, etc) ? Or do we 
> need to > have new implementations ?

Yes its possible. But to provide at-least once guarantees the spout/bolts 
should do anchoring/acking. This needs to be documented.

> How will custom/user-defined operators be supported ?

We could expose an api that accepts a Processor (the built in operations are 
also handled via processors). This needs to be thought through further before 
exposing. It can be added incrementally.

>Doc says
>windowing defines the batch boundaries
>It is fatal mistake to associate batch boundaries with window boundaries.

Can you elaborate?

Windowing provides a logical boundary over the continuous stream of tuples. It 
splits (or logically groups) the stream of values flowing through the pipeline 
into batches so that we can apply operations like aggregations, joins etc on 
the group and emit the results. The transport level batching like how many 
messages to transport together via the messaging layer or how many tuples 
should the spout emit in a nextTuple()  call etc is independent.





> Come up with streams api for storm core use cases
> -------------------------------------------------
>
>                 Key: STORM-1961
>                 URL: https://issues.apache.org/jira/browse/STORM-1961
>             Project: Apache Storm
>          Issue Type: Sub-task
>            Reporter: Arun Mahadevan
>            Assignee: Arun Mahadevan
>         Attachments: UnifiedStreamapiforStorm.pdf
>
>




--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to