> On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > It would be helpful for me if you described the basic outline of how these > > operators fit together in a few paragraphs. The API looks very well thought > > out and intentional, but I'm having a bit of trouble building a proper > > mental map of how everything fits together. > > > > It seems that there's no concept of a schema in this model. Tuples have > > fields of String -> object, and that's all. Is this a correct > > understanding? If so, I wonder if there are cases where we would require > > knowledge of the data types in the Tuples? > > > > Nit: @author tags should go away. > > Nit: licenses missing on everything.
Sure. The overall structure of the operators can be described in 6 types: 1) RelationOperator: takes a set of relations as input 2) RelationRelationOperator extends RelationOperator: takes a set of relations as input and generate a single output relation (i.e. regular relation algebra) 3) RelationTupleOperator extends RelationOperator: takes a relation as input and generate a stream of tuples as output (i.e. istream/dstream operators) 4) TupleOperator: takes a tuple from one input stream as input 5) TupleRelationOperator extends TupleOperator: takes a tuple from one input stream and generate a single output relation (i.e. window operator) 6) TupleTupleOperator extends TupleOperator: takes as tuple from one input stream and generate a single output stream of tuples The base class Operator is simply to provide a generic timeout() and init() methods, which I am thinking of separating it into a separate interface rather than as a base class. There are a few use cases the above class hierarchy tries to implement: 1) There are two different types of output data an operator can generate: tuple or relation, which determines the input type of the next operator (irrespective to the next operator's output types). Therefore, there are four possible combinations: relation-to-relation, relation-to-tuple, tuple-to-relation, and tuple-to-tuple. Hence, the four derived classes each implement one type of connection. 2) The general base class based on input type: RelationOperator and TupleOperator is used in setNextOp() since in this method, we only need to know the input type of the next operator 3) There is also one special case that the TupleOperator is used: when the operator just takes the tuple input and directly publish them out to the system, there is no next operator and the operator is a input-only operator (i.e. the SystemStreamOperator in the example). As you already noticed, the existence of the corresponding Spec classes are placeholders to include the SQL clause definitions s.t. window spec, join spec, etc. We probably can skip all those classes in the primitive version. But I think that it might be useful when we try to connect the SQL grammar w/ the operator implementation. I will address the "Nit" points in the updated review, thanks! > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/main/java/org/apache/samza/sql/api/data/RelationStore.java, > > line 16 > > <https://reviews.apache.org/r/29592/diff/1/?file=806956#file806956line16> > > > > If I understand correctly, this is basically a relation manager that > > allows you to get any relation based on its spec. Is that correct? Yes. You are right on spot. > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/main/java/org/apache/samza/sql/api/data/RelationStore.java, > > line 26 > > <https://reviews.apache.org/r/29592/diff/1/?file=806956#file806956line26> > > > > Can we retrieve relations by name rather than spec? Seems a little > > cleaner. Sure. I used this RelationSpec in the consideration that the RelationSpec may also include some external database specification, not simply a name of the in-memory relation. For example, a relation that is kept to record the current window state and a relation that is loaded from a file should have different initialization/restoration method implemented by the actual implementation of the RelationStore. I agree that we can simplify the interface for now w/ just relation name. We can enrich the interface to support different types of relations later. > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/main/java/org/apache/samza/sql/api/data/RelationStore.java, > > line 36 > > <https://reviews.apache.org/r/29592/diff/1/?file=806956#file806956line36> > > > > It seems liek window state and relations are somewhat unrelated. This > > class (RelationStore) feels like a generic "Manager" interface that's > > responsible for holding on to a bunch of stuff to make it easy to access > > throught the rest of the API. More of a Context object. Good point and you are totally right. I used this when implementing the window operation and realized that if we want to recover the overlapping windows, we need to persist the window states. It is not the intermediate relation states. I would change the name to something like SqlStateManager. > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/main/java/org/apache/samza/sql/api/data/StreamSpec.java, line > > 3 > > <https://reviews.apache.org/r/29592/diff/1/?file=806957#file806957line3> > > > > What's the idea behind this class. It feels like a place holder for > > something important, but I'm not quite sure what. Yes. It is a placeholder. It should specify the in-memory stream of tuples that flow between operators. > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/main/java/org/apache/samza/sql/api/data/StreamSpec.java, line > > 10 > > <https://reviews.apache.org/r/29592/diff/1/?file=806957#file806957line10> > > > > Is this the tuple name, or stream name? Does each tuple have its own > > name? Thanks for catching this! This is the stream name. I have updated the method to getStreamSpec(). I don't see the need to assign individual name to each tuple yet. > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java, line 11 > > <https://reviews.apache.org/r/29592/diff/1/?file=806958#file806958line11> > > > > Do we need a corresponding getKey call? Good point. I would think that it makes more sense to add getKey() in StreamSpec and RelationSpec. Since 1) the key is essentially a property to the stream or relation that contains the tuple, while it can be simply a data field in the tuple; 2) tuples in a single stream or relation should be keyed on the same field. Thoughts? > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java, line 34 > > <https://reviews.apache.org/r/29592/diff/1/?file=806958#file806958line34> > > > > How do nested data structures work? Will getField return another Tuple, > > or a Map? I did not think of that yet. Could you help to illustrate some use cases of the nested data structures? > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java, > > line 21 > > <https://reviews.apache.org/r/29592/diff/1/?file=806959#file806959line21> > > > > Is there ever a case where an operator might need multiple stores? As you pointed out earlier, the <code>store</code> object is more like a manager to access the KV-store in the task context. Hence, it does not prevent the actual implementation initializes multiple stores from the manager class. > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationRelationOperator.java, > > line 19 > > <https://reviews.apache.org/r/29592/diff/1/?file=806961#file806961line19> > > > > I'm not crazy about setter methods, in general. I prefer keeping > > classes immutable, if possible. How often are these setNextOp methods > > getting called? Who calls them? Yeah... This has been the biggest debatable point in my trial. I used this setter method with one use case in mind: allowing users to dynamically connecting the operators on-demand. I have tried to put the next operator in the class constructors and/or init methods before. It creates two issues: a) the constructor now requires an abstract class to be exposed as part of API and we do not have a clean set of interface-only classes as our APIs; b) if putting the next operators in init() method, each different type of operator class now need a different signature for the init() method. Now, thinking of it more, I would agree with you that the use case of dynamically connecting operators on-demand does not seem to be a popular one. Let me try to remove this setter method here. > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/test/java/org/apache/samza/sql/task/StreamSqlTask.java, line > > 137 > > <https://reviews.apache.org/r/29592/diff/1/?file=806989#file806989line137> > > > > A nice description of what's going on here would be pretty useful. This > > seems like the meat of the example. I can tell that you create a bunch of > > specs, use the specs to get operators, init the operators, and then wire > > them together. Yes. Thanks for the feedback. I would add more comments to the example application. > On Jan. 6, 2015, 12:48 a.m., Chris Riccomini wrote: > > samza-sql/src/test/java/org/apache/samza/sql/task/StreamSqlTask.java, line > > 169 > > <https://reviews.apache.org/r/29592/diff/1/?file=806989#file806989line169> > > > > It's unclear to me where the input/output are. It seems like things are > > wired in reverse (i.e. input goes to join, which goes to the window > > operators). I would have expected the input to go to the window operators, > > and the join to operato on both windowed relations. When setting the next operator of the current operator, the next operator object has to be created. Hence, the code actually is doing the following: 1) create and initialize all operators 2) link the operators via setNextOp() from the input operator to the intermediate and final output operators Removing the setNextOp() actually would be a bit more controversal here: If the constructor of the current operator requires the nextOp as the input parameter, that would require the creation of the operators in the reverse order: i.e. from the final output operator backward to the input operator(s); We could use: 1) only requires the OperatorSpec as the nextOpSpec in the constructor of operators to avoid the need to create the operator objects in reverse order 2) use init() method to set the nextOp. I will experiment on both options. Any other suggestions? - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/29592/#review66751 ----------------------------------------------------------- On Jan. 5, 2015, 10:19 p.m., Yi Pan (Data Infrastructure) wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/29592/ > ----------------------------------------------------------- > > (Updated Jan. 5, 2015, 10:19 p.m.) > > > Review request for samza, Chris Riccomini, Navina Ramesh, and Naveen > Somasundaram. > > > Bugs: SAMZA-482 > https://issues.apache.org/jira/browse/SAMZA-482 > > > Repository: samza > > > Description > ------- > > StreamSQL operator API draft > - This is the first draft of the StreamSQL operator APIs > - org.apache.samza.sql.api.* contains definitions of all interface classes > - org.apache.samza.sql.operators.* are skeleton implementation of some > example build-in operators > - src/test/java contains the example application (a stream-join application) > using the above APIs > > > Diffs > ----- > > build.gradle 38383bd9e3f0847d6088a4ea4c1ee6f3dcd1e430 > samza-sql/src/main/java/org/apache/samza/sql/api/data/Relation.java > PRE-CREATION > samza-sql/src/main/java/org/apache/samza/sql/api/data/RelationSpec.java > PRE-CREATION > samza-sql/src/main/java/org/apache/samza/sql/api/data/RelationStore.java > PRE-CREATION > samza-sql/src/main/java/org/apache/samza/sql/api/data/StreamSpec.java > PRE-CREATION > samza-sql/src/main/java/org/apache/samza/sql/api/data/Tuple.java > PRE-CREATION > samza-sql/src/main/java/org/apache/samza/sql/api/operators/Operator.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationOperator.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationRelationOperator.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/RelationTupleOperator.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleOperator.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleRelationOperator.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/TupleTupleOperator.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/OperatorSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/RelationOperatorSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/RelationRelationSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/RelationTupleSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/TupleOperatorSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/TupleRelationSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/api/operators/spec/TupleTupleSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/output/SystemStreamOp.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/output/SystemStreamSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java > PRE-CREATION > samza-sql/src/main/java/org/apache/samza/sql/operators/relation/Join.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/relation/JoinSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStream.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/stream/InsertStreamSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java > PRE-CREATION > > samza-sql/src/main/java/org/apache/samza/sql/operators/window/WindowState.java > PRE-CREATION > samza-sql/src/main/java/org/apache/samza/task/SqlTaskContext.java > PRE-CREATION > samza-sql/src/test/java/org/apache/samza/sql/data/IncomingMessageTuple.java > PRE-CREATION > samza-sql/src/test/java/org/apache/samza/sql/store/SQLRelationStore.java > PRE-CREATION > samza-sql/src/test/java/org/apache/samza/sql/task/StreamSqlTask.java > PRE-CREATION > settings.gradle 3a01fd66359b8c79954ae8f34eeaf4b2e3fdc0b4 > > Diff: https://reviews.apache.org/r/29592/diff/ > > > Testing > ------- > > > Thanks, > > Yi Pan (Data Infrastructure) > >
