> On Oct. 7, 2016, 9:04 p.m., Jake Maes wrote:
> > samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java,
> >  line 61
> > <https://reviews.apache.org/r/47994/diff/3/?file=1522711#file1522711line61>
> >
> >     3 thoughts on this line:
> >     1. Why should this be static? Wouldn't this preclude you from having 
> > two tasks run the same operator DAG in the same container/process?
> >     
> >     2. And why here instead of the MessageStream or ChainedOperators 
> > classes? I would expect the topology to be an instantiated thing rather 
> > than a global map. At a minimum since this map and ChainedOperators encode 
> > similar information (subscribers to an operator or message stream) they 
> > should be consolidated to one source of truth for structural/topology info.
> >     
> >     3. Does the order of the Operators in the list have any meaning? e.g. 
> > does it implicitly define the order of processing, or is it just for 
> > consistency, or is the List used to allow duplicates?
> 
> Yi Pan (Data Infrastructure) wrote:
>     Hi, Jake, thanks for the comments. Let me try to answer it one-by-one:
>     1. The key to this map is the MessageStream object, which will be 
> separate instances for each input topic partition. Hence, two tasks w/ the 
> same operator DAG will only share the SystemMessageStream and will have their 
> own MessageStream and operator objects. Not sure why sharing the same 
> topology info between two tasks is necessary.
>     2. The reason I put this map in Operators.java is due to packaging and 
> access mode. In the implementation, I tried to achieve the following two 
> goals: a) restrict the direct dependency from any operator.api class to 
> operator.impl s.t. we can potentially package API classes separately. Hence, 
> creating the operator map directly in ChainedOperators in impl class is not 
> chosen; b) don't expose any internal classes (i.e. Operator class is not 
> exposed to user at all) via public API classes and methods. Hence, recording 
> the subscribers in MessageStream class is not chosen since it inevitably 
> requires a public access method in this API class to get the list of 
> operators, which should not be exposed/accessed by the programmer. The 
> existance of the multiple layers of topology is strictly following the 
> three-layers in the API design: programming layer 
> (MessageStream/Windows/...), representation layer (Operators, etc. in 
> operator.api.internal), and implementation layer (OperatorImpl, Chain
 edOperators, etc.). In each layer, the map is the single source of truth. 
Classes in different layers only access the map in its own layer. A single 
consolidated source of truth will break the layering design and does not allow 
packaging the API-only classes separately. Hope this explains the motivation 
and thoughts behind the design choices. I am open to any better suggestion to 
achive the above two goals.
>     3. So far, I don't see a strong reason for or against a List vs Set. 
> Maybe it would be better to keep it as Collection s.t. we have freedom in 
> choosing its implementation? 
>     
>     I will keep this issue open to see whether we can find any better ideas 
> for now.
> 
> Jake Maes wrote:
>     Thanks for the explanation. It makes a little more sense now, but I still 
> don't see the need for global maps. I'll give an example scenario to 
> illustrate why they worry me: 
>     ```
>     As you mentioned, the key to the map is the MessageStream object which is 
> always intended to be unique. Months/years later, someone changes 
> MessageStream by adding an equals() and hashcode() implementation based on 
> some cononical name within the DAG so MessageStreams can be considered 
> equivalent even if they are distinct objects. After this change, the map 
> starts associating the wrong operator with some message streams. 
>     ```
>     
>     It feels like an easy trap to fall into. 
>     
>     However, if the topology was not a global map, we could avoid conflicts 
> when a particular MessageStream is used in multiple definitions or instances 
> (1 for each SSP) of an operator DAG because each one essentially represents 
> its own namespace for the objects it contains. This could be useful if, for 
> example, we had a design where the DAG is first defined (essentially defining 
> a Type) and then instantiated for each SSP. I think this design fits well 
> with the representation/implementation layers you described. 
>     
>     
>     Ultimately I think we want to get to a place where the user's code looks 
> something like the following:
>     ```
>     OperatorExpression expression = 
> OperatorExpressionBuilder.new().headStream()
>       .filter()
>       .map()
>       .udf()
>       .build()
>     expression.run();
>     ```
>     When they call run() it gets automatically wired into an OperatorTask and 
> submitted via some modified JobRunner. 
>     
>     To me, this topology info belongs somewhere in/underneath the 
> OperatorExpression.
> 
> Yi Pan (Data Infrastructure) wrote:
>     Hi, Jake, I agree w/ your long term view on the OperatorExpression idea. 
> We just talked a bit about how the programming API w/ standalone 
> StreamProcessor would look like and it pretty much seems very close to what 
> you have illustrated here, which does not expose any partition/task concept 
> in the programming APIs. This brings in other new requirements that has not 
> been included in the scope of this RB yet. For example, how to describe what 
> is the partition and grouping logic needed to process two topics, in addition 
> to the processing logic. I would prefer to keep this RB's scope within each 
> task context and we have already opened another ticket for a higher level 
> APIs to allow programming API at standalone mode.
>     
>     P.S. regarding to the argument against global maps, I understand your 
> concern now. It would be better if the subscribers of a MessageStream object 
> are kept within, to avoid the problem that you mentioned in the example. The 
> tradeoff is that we may have to expose unnecessary methods to the end user 
> (i.e. programmers). It might be OK as long as we returns a unmodifiable 
> collection in MessageStream.getSubscribers(). However, in the instantiation 
> code in OperatorFactory.getOperator(), it seems to me that we still need to 
> have a global registry for all Operator classes that have been instantiated 
> to avoid creating two separate implementation instances of the same operator 
> in join/merge. Example would be: a and b are both SystemMessageStream 
> objects, a.join(b).window() would be instantiated from two input sources a 
> and b, each has the chain of: a.partialJoinOp -> windowOp, and 
> b.partialJoinOp -> windowOp. Note that the downstream windowOp *MUST* be the 
> same instance of operator, inst
 ead of two instances each has its own state. There has to be some global 
registry to mark whether a certain Operator object has already been 
instantiated w/ a specific implementation object or not. The ownership of 
subscribers within each upstream operator's namespae actually places 
adversarily in this case, not like the example you used in MessageStream. Any 
thoughts?
> 
> Jake Maes wrote:
>     >> I would prefer to keep this RB's scope within each task context
>     
>     Totally fine with me. 
>     
>     >> It would be better if the subscribers of a MessageStream object are 
> kept within, to avoid the problem that you mentioned in the example. The 
> tradeoff is that we may have to expose unnecessary methods to the end user 
> (i.e. programmers)
>     
>     Yeah, I go back and fourth on this. It really depends on how you want to 
> model the streams. In some systems, the streams know who their subscribers 
> are. In others, the subscribers know what streams they subscribe to. In a 
> system where the entire topology is governed as a collective, the subscriber 
> information can belong in the topology itself (OperatorExpression in my 
> example above)
>     
>     >> There has to be some global registry to mark whether a certain 
> Operator object has already been instantiated w/ a specific implementation 
> object or not.
>     
>     I see the need for a registry, but rather than *global* scope, I'd say it 
> should be *expression* scope. 
>     
>     
>     Regardless of the above discussions, the topology mapping currently isn't 
> part of the API and is therefore easier to change. So I don't mean to hold up 
> this patch.

Hi, Jake, thanks for the detailed feedbacks! Appreciate it! As for the topology 
mapping, I totally agree w/ you that we can evolve later w/o changing the APIs. 
I will make the change to the MessageStream to add getSubscribers(). Thanks!


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47994/#review151818
-----------------------------------------------------------


On Oct. 9, 2016, 5:48 a.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47994/
> -----------------------------------------------------------
> 
> (Updated Oct. 9, 2016, 5:48 a.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-915
>     https://issues.apache.org/jira/browse/SAMZA-915
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-915: implementation of ChainedOperators and operator runtime impl 
> classes
> 
> 
> Diffs
> -----
> 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java
>  PRE-CREATION 
> 
> Diff: https://reviews.apache.org/r/47994/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build.
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>

Reply via email to