> On Sept. 13, 2016, 10:03 p.m., Jagadish Venkatraman wrote:
> > samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java, 
> > line 47
> > <https://reviews.apache.org/r/47835/diff/14/?file=1496992#file1496992line47>
> >
> >     The current implementation seems to rely on state maintained across 
> > multiple invocations of `initOperators` to correctly invoke the join 
> > function.
> >     
> >     Suggestion 1:
> >     I wonder if we could pass in list of all `SystemMessageStreams` in this 
> > method? (Or perhaps - declaratively create a `MessageStream` in code doing 
> > a `new MessageStream(systemStreamPartition);`? 
> >     
> >     That way users can do a one-shot initialization? 
> >     
> >     Suggestion 2:
> >     What if we just pass in a `List<SystemStreamPartition>` objects and 
> > users can choose to instantiate `messageStreams` and register them. 
> >     
> >     Please let me know if I don't fully understand the constraints of the 
> > design!

I will update the test case based on the updated initOperators() method. Thanks!


> On Sept. 13, 2016, 10:03 p.m., Jagadish Venkatraman wrote:
> > samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java, 
> > line 48
> > <https://reviews.apache.org/r/47835/diff/14/?file=1496992#file1496992line48>
> >
> >     Curious as to why should users rely on `Join.join` to do the join? I 
> > wonder if we could directly invoke a
> >     `joinSource.join(joinSource2)`? 
> >     
> >     That way, I can do a more fluent groupBy on a join result?
> >     
> >     joinSource.join(joinSource2)
> >               .join(joinSource3)
> >               .window(SessionWindows.into ())
> >               .sink(SinkFunction)

As we discussed offline, there are more details in why the above join() does 
not play well w/ the generic type parameters to ensure the same key in both 
streams in a join and typed joiner functions in the two 
stream-to-buffered-stream joins are also typed. I agree that this Join.join API 
is not the most intuitive way to me either. Besides, the current 
MessageStream.join() method is package private and not a public API. We can 
also add to the public API by making a better version of MessageStream.join() 
public. For now, let's pond on it a bit more before making the change.


- Yi


-----------------------------------------------------------
This is an automatically generated e-mail. To reply, visit:
https://reviews.apache.org/r/47835/#review148787
-----------------------------------------------------------


On Sept. 12, 2016, 5:53 p.m., Yi Pan (Data Infrastructure) wrote:
> 
> -----------------------------------------------------------
> This is an automatically generated e-mail. To reply, visit:
> https://reviews.apache.org/r/47835/
> -----------------------------------------------------------
> 
> (Updated Sept. 12, 2016, 5:53 p.m.)
> 
> 
> Review request for samza, Boris Shkolnik, Chris Pettitt, Chinmay Soman, Jake 
> Maes, Navina Ramesh, Jagadish Venkatraman, and Xinyu Liu.
> 
> 
> Bugs: SAMZA-914
>     https://issues.apache.org/jira/browse/SAMZA-914
> 
> 
> Repository: samza
> 
> 
> Description
> -------
> 
> SAMZA-914: initial draft of operator programming API. Design doc attached to 
> SAMZA-914: 
> https://issues.apache.org/jira/secure/attachment/12821524/SAMZA-914_%20operator%20Java%20programming%20API%20-%20Google%20Docs.pdf
> 
> 
> Diffs
> -----
> 
>   build.gradle 16facbbf4dff378c561461786ff186bd9e0000ed 
>   gradle/dependency-versions.gradle 52e25aa53a1edc85d478b48898621b26508ad4bb 
>   samza-api/src/test/java/org/apache/samza/config/TestConfig.java 
> 5d066c5867e9df9e94e60bde825dedf10703b399 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Join.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/MessageStream.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/MessageStreams.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Triggers.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/WindowState.java 
> PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/operators/api/Windows.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/data/Message.java 
> PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/data/WindowOutput.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/internal/Operators.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/internal/Trigger.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/api/internal/Window.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/ChainedOperators.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/OperatorBaseImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/ProcessorContext.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/StateStoreImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/operators/impl/window/SessionWindowImpl.java
>  PRE-CREATION 
>   
> samza-operator/src/main/java/org/apache/samza/task/StreamOperatorAdaptorTask.java
>  PRE-CREATION 
>   samza-operator/src/main/java/org/apache/samza/task/StreamOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/operators/api/data/TestMessageStream.java
>  PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/AssembleCallGraphTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/BroadcastOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/InputAvroSystemMessage.java
>  PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/JoinOperatorTask.java 
> PRE-CREATION 
>   
> samza-operator/src/test/java/org/apache/samza/task/TestStreamOperatorTasks.java
>  PRE-CREATION 
>   samza-operator/src/test/java/org/apache/samza/task/WindowOperatorTask.java 
> PRE-CREATION 
>   
> samza-sql-calcite/src/test/java/org/apache/samza/sql/calcite/schema/TestAvroSchemaConverter.java
>  PRE-CREATION 
>   samza-sql-core/README.md PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Data.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/EntityName.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Relation.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Schema.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Stream.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Table.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/api/data/Tuple.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/Operator.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/OperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SimpleOperator.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/api/operators/SqlOperatorFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/IncomingMessageTuple.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroData.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/sql/data/avro/AvroSchema.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerde.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/serializers/SqlStringSerdeFactory.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringData.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/data/string/StringSchema.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/NoopOperatorCallback.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorFactoryImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorImpl.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleOperatorSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/factory/SimpleRouter.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoin.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/join/StreamStreamJoinSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionOp.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/partition/PartitionSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/BoundedTimeWindow.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowSpec.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/operators/window/WindowState.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/sql/window/storage/OrderedStoreKey.java
>  PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/LongOffset.java 
> PRE-CREATION 
>   samza-sql-core/src/main/java/org/apache/samza/system/sql/Offset.java 
> PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/RouterMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/main/java/org/apache/samza/task/sql/SimpleMessageCollector.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/sql/data/serializers/SqlAvroSerdeTest.java
>  PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/RandomWindowOperatorTask.java
>  PRE-CREATION 
>   samza-sql-core/src/test/java/org/apache/samza/task/sql/StreamSqlTask.java 
> PRE-CREATION 
>   
> samza-sql-core/src/test/java/org/apache/samza/task/sql/UserCallbacksSqlTask.java
>  PRE-CREATION 
>   settings.gradle 4c1aa107a11d413777e69bc4e48847b811aff7d2 
> 
> Diff: https://reviews.apache.org/r/47835/diff/
> 
> 
> Testing
> -------
> 
> ./gradlew clean build
> 
> 
> Thanks,
> 
> Yi Pan (Data Infrastructure)
> 
>

Reply via email to