> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java,
> >  line 120
> > <https://reviews.apache.org/r/34500/diff/1/?file=965740#file965740line120>
> >
> >     I thought TopologyBuilder was to abstract away the spec and provide a 
> > simplified API for a user implementing a simple SQL query. 
> >     Imo, this still seems pretty involved for a user concerned with just 
> > defining a simple join query. 
> >     
> >     I assumed we could have a builder pattern as below:
> >     
> >     ```
> >     TopologyBuilder builder = TopologyBuilder
> >                                 .create()
> >                                 .join(window("stream1", 10), 
> > window("stream2", 10), List{joinKey1, joinKey2, ...})
> >                                 .partition(partitionKey)
> >     .build()
> >     ```
> >     
> >     The idea here is that the build statement order determines the 
> > topology. The builder just validates and chains them together. 
> >     I can see that this can be a problem with running operators in parallel 
> > and possibly, make it hard for the user to understand the correct sequence 
> > of operators. 
> >     I am wondering if you think this kind of a model is possible. It would 
> > greatly simplify the API for most users. 
> >     Just wanted to put this comment out so that we can discuss further.
> 
> Milinda Pathirage wrote:
>     I also agree with Navina here. I think we should make building topologies 
> simple with the builder API. One complexity of current OperatorSpec based API 
> is you need to create intermediate streams (EntityName)s to wire operators 
> together. I think we should try to hide that complexity through the builder 
> API. Even though source and sink hides that complexity to some extent, its 
> better if we can completely remove that.
> 
> Yi Pan (Data Infrastructure) wrote:
>     Thank you both for the good points here. @Navina, yes, the basic idea for 
> the topology builder is exactly what you mentioned and the model you 
> illustrated is much simpler and very attractive. The issue I saw is that the 
> topology may not be completely linear, or a tree. It is not easy to describe 
> a network of operators like the following, w/o introducing the concept of 
> intermediate streams.
>        window-->aggregate--+---------------------+
>        window-->aggregate----> join --+          |       +-->aggregate --+
>                      project-->window--> join -->|split -+               |
>                                                  +-------->join 
> ---------->join --> partition
>     There are three issues in the above example:
>     1. the join input may be intermediate streams, which essentially could be 
> an output from a sub-topology
>     2. the multi-output operator will make the downstream expression branch 
> off and not easily expressible in linear format
>     3. the output of a single operator maybe used by multiple downstream 
> operators, again, forking off the linear expression
>     
>     Maybe we can adopt the simple join builder as you illustrated for simple 
> queries, although I think that I would like to add an OperatorBuilder as well 
> here:
>     OperatorRouter simpleJoinQuery = TopologyBuilder.create()
>                                  .join(OperatorBuilder.window("stream1", 10), 
> OperatorBuilder.window("stream2", 10), joinKeys)
>                                  .partition(partitionKey).build();
>     
>     For more complex queries, I am thinking of the following method may work 
> better:
>     OperatorRouter router = 
> TopologyBuilder.create().beginStream("stream1").window(10).aggregate("group-by",
>  "treeId", "sum")
>                               
> .beginStream("stream2").window(10).aggregate("group-by","treeId", 
> "avg").join(joinCondition)
>                               
> .beginStream("stream3").project(fieldList).window(10).join(joinCondition)
>                               .partition(partitionKey, 
> number,"outstream1").build();
>                               
>     In which, beginStream() always signify one linear path of operators and 
> add it to the topology. The following join operator will join the latest two 
> streams and create one joined stream. This model may even solve the issue 2 
> by allowing: 
>           ...split().beginStream(1).aggregate().beginStream(2)
>             .beginStream("stream4").filter().window()
>             .join().join().partition().build();
>             
>     For issue 3, i.e. reuse a certain intermediate stream in multiple 
> downstream operator, we can introduce beginReuseStream(source, "name") as the 
> following:
>     OperatorRouter router = 
> TopologyBuilder.create().beginReuseStream("stream1", 
> "reuseStream1").window(10).aggregate("group-by", "treeId", "sum")
>                               
> .beginStream("stream2").window(10).aggregate("group-by","treeId", 
> "avg").join(joinCondition)
>                               
> .beginStream("stream3").project(fieldList).window(10).join(joinCondition)
>                               .join("reuseStream1", joinCondition)
>                               .partition(partitionKey, 
> number,"outstream1").build();
>     in which the downstream operator will refer to the name of the reuse 
> stream to use the intermediate stream multiple times.
>     
>     In addition, from review of SAMZA-561 changes, I think that we will need 
> to include an OperatorBuilder interface as well, to help create operators 
> easily:
>     e.g. 
> OperatorBuilder.join().setInputs().setOutput().setJoinCondition().build(). 
> This can be used inside the TopologyBuilder, which allows us to hide all the 
> intermediate inputs/outputs stream/entity names and there will be no 
> OperatorSpec the programmer need to handle directly.
> 
> Yi Pan (Data Infrastructure) wrote:
>     For the last line, I meant /opId/-output-1

Summarizing my offline discussion with Yi:
* I think we are overlapping our requirements for operator builder API with the 
topology / workflow definition of a data processing pipeline. We should rename 
TopologyBuilder to QueryBuilder. 
* I agree with Milinda that we should simplify the API to hide out intermediate 
stream and spec details. I like the idea behind OperatorBuilder. Paves way for 
a cleaner API.
* Regarding non-linear workflow, I think we should not focus much on it. The 
idea is not to define arbitary workflows. It is more about defining sub-queries 
and connecting them together. If we think in those lines, it will be a much 
simpler api. We can extend to builders to support non-linear workflows.
* One more nit-pick: It will more intuitive to expose a "groupBy" interface 
rather than an "aggreagte"  interface with group-by key.


- Navina


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85590
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda 
> Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>  PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
> PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>

Reply via email to