> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java,
> >  line 26
> > <https://reviews.apache.org/r/34500/diff/1/?file=965722#file965722line26>
> >
> >     OperatorSource and OperatorSink have the same method signatures. Is 
> > that even allowed in Java? It's kind of confusing, even though the 
> > implementation can be semantically different.
> 
> Milinda Pathirage wrote:
>     This should be fine as long as there aren't any classes implementing both 
> interfaces 
> (http://stackoverflow.com/questions/2801878/implementing-two-interfaces-in-a-class-with-same-method-which-interface-method).
>  May be we can change method naming.

Agreed. I will try to modify the class or method names here.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java,
> >  line 44
> > <https://reviews.apache.org/r/34500/diff/1/?file=965727#file965727line44>
> >
> >     Looks like you left behind some merge conflict statements in comments :)

Thanks for catching that. Will fix.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java,
> >  line 57
> > <https://reviews.apache.org/r/34500/diff/1/?file=965730#file965730line57>
> >
> >     Can you add info on what an unbound input or unbound output is? I think 
> > it will be useful to add/move the comment from Line 205 and Line 218 here.

Sure. Good point.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java,
> >  line 135
> > <https://reviews.apache.org/r/34500/diff/1/?file=965730#file965730line135>
> >
> >     Can we change this source() rather than stream() ?

My original intention is to model an intermediate stream in the topology. I 
will make it matching the return class name.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java,
> >  line 145
> > <https://reviews.apache.org/r/34500/diff/1/?file=965730#file965730line145>
> >
> >     Can you explain when sink should be used?

This is a further thought along the line that we may have an operator like 
split() that takes one input stream and have two output streams. Then, 
something like split().attach(sink1, sink2) would be handy. With the comments 
regarding to streamlining the operators' and hide the intermediate streams 
completely, I will need to think it over more.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java,
> >  line 120
> > <https://reviews.apache.org/r/34500/diff/1/?file=965740#file965740line120>
> >
> >     I thought TopologyBuilder was to abstract away the spec and provide a 
> > simplified API for a user implementing a simple SQL query. 
> >     Imo, this still seems pretty involved for a user concerned with just 
> > defining a simple join query. 
> >     
> >     I assumed we could have a builder pattern as below:
> >     
> >     ```
> >     TopologyBuilder builder = TopologyBuilder
> >                                 .create()
> >                                 .join(window("stream1", 10), 
> > window("stream2", 10), List{joinKey1, joinKey2, ...})
> >                                 .partition(partitionKey)
> >     .build()
> >     ```
> >     
> >     The idea here is that the build statement order determines the 
> > topology. The builder just validates and chains them together. 
> >     I can see that this can be a problem with running operators in parallel 
> > and possibly, make it hard for the user to understand the correct sequence 
> > of operators. 
> >     I am wondering if you think this kind of a model is possible. It would 
> > greatly simplify the API for most users. 
> >     Just wanted to put this comment out so that we can discuss further.
> 
> Milinda Pathirage wrote:
>     I also agree with Navina here. I think we should make building topologies 
> simple with the builder API. One complexity of current OperatorSpec based API 
> is you need to create intermediate streams (EntityName)s to wire operators 
> together. I think we should try to hide that complexity through the builder 
> API. Even though source and sink hides that complexity to some extent, its 
> better if we can completely remove that.

Thank you both for the good points here. @Navina, yes, the basic idea for the 
topology builder is exactly what you mentioned and the model you illustrated is 
much simpler and very attractive. The issue I saw is that the topology may not 
be completely linear, or a tree. It is not easy to describe a network of 
operators like the following, w/o introducing the concept of intermediate 
streams.
   window-->aggregate--+---------------------+
   window-->aggregate----> join --+          |       +-->aggregate --+
                 project-->window--> join -->|split -+               |
                                             +-------->join ---------->join --> 
partition
There are three issues in the above example:
1. the join input may be intermediate streams, which essentially could be an 
output from a sub-topology
2. the multi-output operator will make the downstream expression branch off and 
not easily expressible in linear format
3. the output of a single operator maybe used by multiple downstream operators, 
again, forking off the linear expression

Maybe we can adopt the simple join builder as you illustrated for simple 
queries, although I think that I would like to add an OperatorBuilder as well 
here:
OperatorRouter simpleJoinQuery = TopologyBuilder.create()
                             .join(OperatorBuilder.window("stream1", 10), 
OperatorBuilder.window("stream2", 10), joinKeys)
                             .partition(partitionKey).build();

For more complex queries, I am thinking of the following method may work better:
OperatorRouter router = 
TopologyBuilder.create().beginStream("stream1").window(10).aggregate("group-by",
 "treeId", "sum")
                          
.beginStream("stream2").window(10).aggregate("group-by","treeId", 
"avg").join(joinCondition)
                          
.beginStream("stream3").project(fieldList).window(10).join(joinCondition)
                          .partition(partitionKey, number,"outstream1").build();
                          
In which, beginStream() always signify one linear path of operators and add it 
to the topology. The following join operator will join the latest two streams 
and create one joined stream. This model may even solve the issue 2 by 
allowing: 
      ...split().beginStream(1).aggregate().beginStream(2)
        .beginStream("stream4").filter().window()
        .join().join().partition().build();
        
For issue 3, i.e. reuse a certain intermediate stream in multiple downstream 
operator, we can introduce beginReuseStream(source, "name") as the following:
OperatorRouter router = TopologyBuilder.create().beginReuseStream("stream1", 
"reuseStream1").window(10).aggregate("group-by", "treeId", "sum")
                          
.beginStream("stream2").window(10).aggregate("group-by","treeId", 
"avg").join(joinCondition)
                          
.beginStream("stream3").project(fieldList).window(10).join(joinCondition)
                          .join("reuseStream1", joinCondition)
                          .partition(partitionKey, number,"outstream1").build();
in which the downstream operator will refer to the name of the reuse stream to 
use the intermediate stream multiple times.

In addition, from review of SAMZA-561 changes, I think that we will need to 
include an OperatorBuilder interface as well, to help create operators easily:
e.g. OperatorBuilder.join().setInputs().setOutput().setJoinCondition().build(). 
This can be used inside the TopologyBuilder, which allows us to hide all the 
intermediate inputs/outputs stream/entity names and there will be no 
OperatorSpec the programmer need to handle directly.


> On May 29, 2015, 1:28 a.m., Navina Ramesh wrote:
> > samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java,
> >  line 123
> > <https://reviews.apache.org/r/34500/diff/1/?file=965740#file965740line123>
> >
> >     Why do we need 2 instances of the TopologyBuilder here?
> >     I think this occurs because stream() and sink() method return 
> > OperatorSource type rather than a TopologyBuilder instance. 
> >     
> >     How differently does the TopologyBuilder handle the OperatorSource and 
> > the OperatorSink ?

The current version of TopologyBuilder will copy all operators included in the 
source/sink when bind/attached. I will try to remove those w/ the model I 
described above.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/34500/#review85590
-----------------------------------------------------------


On May 20, 2015, 11:13 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/34500/
> -----------------------------------------------------------
> 
> (Updated May 20, 2015, 11:13 p.m.)
> 
> 
> Review request for samza, Yan Fang, Chris Riccomini, Guozhang Wang, Milinda 
> Pathirage, Navina Ramesh, and Naveen Somasundaram.
> 
> 
> Bugs: SAMZA-552
>     https://issues.apache.org/jira/browse/SAMZA-552
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-552: added operator builder API
> - The current operator builder only supports single output DAG topology yet
> 
> 
> Diffs
> -----
> 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSink.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSource.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/OperatorTopology.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/TopologyBuilder.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>  PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
> PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/34500/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build passed
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>

Reply via email to