> On Oct. 7, 2016, 9:04 p.m., Jake Maes wrote: > > samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java, > > line 61 > > <https://reviews.apache.org/r/47994/diff/3/?file=1522711#file1522711line61> > > > > 3 thoughts on this line: > > 1. Why should this be static? Wouldn't this preclude you from having > > two tasks run the same operator DAG in the same container/process? > > > > 2. And why here instead of the MessageStream or ChainedOperators > > classes? I would expect the topology to be an instantiated thing rather > > than a global map. At a minimum since this map and ChainedOperators encode > > similar information (subscribers to an operator or message stream) they > > should be consolidated to one source of truth for structural/topology info. > > > > 3. Does the order of the Operators in the list have any meaning? e.g. > > does it implicitly define the order of processing, or is it just for > > consistency, or is the List used to allow duplicates? > > Yi Pan (Data Infrastructure) wrote: > Hi, Jake, thanks for the comments. Let me try to answer it one-by-one: > 1. The key to this map is the MessageStream object, which will be > separate instances for each input topic partition. Hence, two tasks w/ the > same operator DAG will only share the SystemMessageStream and will have their > own MessageStream and operator objects. Not sure why sharing the same > topology info between two tasks is necessary. > 2. The reason I put this map in Operators.java is due to packaging and > access mode. In the implementation, I tried to achieve the following two > goals: a) restrict the direct dependency from any operator.api class to > operator.impl s.t. we can potentially package API classes separately. Hence, > creating the operator map directly in ChainedOperators in impl class is not > chosen; b) don't expose any internal classes (i.e. Operator class is not > exposed to user at all) via public API classes and methods. Hence, recording > the subscribers in MessageStream class is not chosen since it inevitably > requires a public access method in this API class to get the list of > operators, which should not be exposed/accessed by the programmer. The > existance of the multiple layers of topology is strictly following the > three-layers in the API design: programming layer > (MessageStream/Windows/...), representation layer (Operators, etc. in > operator.api.internal), and implementation layer (OperatorImpl, Chain edOperators, etc.). In each layer, the map is the single source of truth. Classes in different layers only access the map in its own layer. A single consolidated source of truth will break the layering design and does not allow packaging the API-only classes separately. Hope this explains the motivation and thoughts behind the design choices. I am open to any better suggestion to achive the above two goals. > 3. So far, I don't see a strong reason for or against a List vs Set. > Maybe it would be better to keep it as Collection s.t. we have freedom in > choosing its implementation? > > I will keep this issue open to see whether we can find any better ideas > for now. > > Jake Maes wrote: > Thanks for the explanation. It makes a little more sense now, but I still > don't see the need for global maps. I'll give an example scenario to > illustrate why they worry me: > ``` > As you mentioned, the key to the map is the MessageStream object which is > always intended to be unique. Months/years later, someone changes > MessageStream by adding an equals() and hashcode() implementation based on > some cononical name within the DAG so MessageStreams can be considered > equivalent even if they are distinct objects. After this change, the map > starts associating the wrong operator with some message streams. > ``` > > It feels like an easy trap to fall into. > > However, if the topology was not a global map, we could avoid conflicts > when a particular MessageStream is used in multiple definitions or instances > (1 for each SSP) of an operator DAG because each one essentially represents > its own namespace for the objects it contains. This could be useful if, for > example, we had a design where the DAG is first defined (essentially defining > a Type) and then instantiated for each SSP. I think this design fits well > with the representation/implementation layers you described. > > > Ultimately I think we want to get to a place where the user's code looks > something like the following: > ``` > OperatorExpression expression = > OperatorExpressionBuilder.new().headStream() > .filter() > .map() > .udf() > .build() > expression.run(); > ``` > When they call run() it gets automatically wired into an OperatorTask and > submitted via some modified JobRunner. > > To me, this topology info belongs somewhere in/underneath the > OperatorExpression. > > Yi Pan (Data Infrastructure) wrote: > Hi, Jake, I agree w/ your long term view on the OperatorExpression idea. > We just talked a bit about how the programming API w/ standalone > StreamProcessor would look like and it pretty much seems very close to what > you have illustrated here, which does not expose any partition/task concept > in the programming APIs. This brings in other new requirements that has not > been included in the scope of this RB yet. For example, how to describe what > is the partition and grouping logic needed to process two topics, in addition > to the processing logic. I would prefer to keep this RB's scope within each > task context and we have already opened another ticket for a higher level > APIs to allow programming API at standalone mode. > > P.S. regarding to the argument against global maps, I understand your > concern now. It would be better if the subscribers of a MessageStream object > are kept within, to avoid the problem that you mentioned in the example. The > tradeoff is that we may have to expose unnecessary methods to the end user > (i.e. programmers). It might be OK as long as we returns a unmodifiable > collection in MessageStream.getSubscribers(). However, in the instantiation > code in OperatorFactory.getOperator(), it seems to me that we still need to > have a global registry for all Operator classes that have been instantiated > to avoid creating two separate implementation instances of the same operator > in join/merge. Example would be: a and b are both SystemMessageStream > objects, a.join(b).window() would be instantiated from two input sources a > and b, each has the chain of: a.partialJoinOp -> windowOp, and > b.partialJoinOp -> windowOp. Note that the downstream windowOp *MUST* be the > same instance of operator, inst ead of two instances each has its own state. There has to be some global registry to mark whether a certain Operator object has already been instantiated w/ a specific implementation object or not. The ownership of subscribers within each upstream operator's namespae actually places adversarily in this case, not like the example you used in MessageStream. Any thoughts? > > Jake Maes wrote: > >> I would prefer to keep this RB's scope within each task context > > Totally fine with me. > > >> It would be better if the subscribers of a MessageStream object are > kept within, to avoid the problem that you mentioned in the example. The > tradeoff is that we may have to expose unnecessary methods to the end user > (i.e. programmers) > > Yeah, I go back and fourth on this. It really depends on how you want to > model the streams. In some systems, the streams know who their subscribers > are. In others, the subscribers know what streams they subscribe to. In a > system where the entire topology is governed as a collective, the subscriber > information can belong in the topology itself (OperatorExpression in my > example above) > > >> There has to be some global registry to mark whether a certain > Operator object has already been instantiated w/ a specific implementation > object or not. > > I see the need for a registry, but rather than *global* scope, I'd say it > should be *expression* scope. > > > Regardless of the above discussions, the topology mapping currently isn't > part of the API and is therefore easier to change. So I don't mean to hold up > this patch.
Hi, Jake, thanks for the detailed feedbacks! Appreciate it! As for the topology mapping, I totally agree w/ you that we can evolve later w/o changing the APIs. I will make the change to the MessageStream to add getSubscribers(). Thanks! - Yi ----------------------------------------------------------- This is an automatically generated e-mail. To reply, visit: https://reviews.apache.org/r/47994/#review151818 ----------------------------------------------------------- On Oct. 9, 2016, 5:48 a.m., Yi Pan (Data Infrastructure) wrote: > > ----------------------------------------------------------- > This is an automatically generated e-mail. To reply, visit: > https://reviews.apache.org/r/47994/ > ----------------------------------------------------------- > > (Updated Oct. 9, 2016, 5:48 a.m.) > > > Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake > Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu. > > > Bugs: SAMZA-915 > https://issues.apache.org/jira/browse/SAMZA-915 > > > Repository: samza > > > Description > ------- > > SAMZA-915: implementation of ChainedOperators and operator runtime impl > classes > > > Diffs > ----- > > > samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorFactory.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorImpl.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/join/PartialJoinOpImpl.java > PRE-CREATION > > samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/api/TestMessageStream.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/api/internal/TestOperators.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/impl/TestChainedOperators.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorFactory.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/impl/TestOperatorImpl.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/impl/TestOutputMessage.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/impl/TestSimpleOperatorImpl.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/impl/TestSinkOperatorImpl.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/impl/data/serializers/SqlAvroSerdeTest.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/operators/impl/window/TestSessionWindowImpl.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java > PRE-CREATION > > samza-operator/src/test/java/org/apache/samza/task/InputJsonSystemMessage.java > PRE-CREATION > > Diff: https://reviews.apache.org/r/47994/diff/ > > > Testing > ------- > > ./gradlew clean build. > > > Thanks, > > Yi Pan (Data Infrastructure) > >