AHeise commented on a change in pull request #11403: [FLINK-16316][operators]
Implement new StreamOperatorBase as a replacement for AbstractStreamOperator
URL: https://github.com/apache/flink/pull/11403#discussion_r392923505
##########
File path:
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamOperatorFactoryUtil.java
##########
@@ -57,7 +57,13 @@
((ProcessingTimeServiceAware)
operatorFactory).setProcessingTimeService(processingTimeService);
}
- OP op = operatorFactory.createStreamOperator(containingTask,
configuration, output);
+ // TODO: what to do with ProcessingTimeServiceAware?
+ OP op = operatorFactory.createStreamOperator(
+ new StreamOperatorInitializer<>(
+ containingTask,
+ configuration,
+ output,
+ processingTimeService));
Review comment:
In general, as written above 👍 .
Always passing `timeService` comes closer to my understanding of a factory
(factory being stateless except for fundamental configurations that would
change the type of the returned operator for all invocations of
`createStreamOperator`). The factory then decides if it wants to use the
service or not.
If the service `processingTimeService` would only be (costly) created for a
specific operator factory (e.g. MailboxExecutor being used only in
AsyncWaitOperatorFactory), then I'd wrap the creation in a supplier.
Ultimately, we would get rid of all the different OperatorFactory interfaces
except for the main one. Then I'd be perfectly fine to keep factories and not
convert them into builders.
Note for that goal, we would need to get rid of SimpleOperatorFactory: Once
an operator has been created, it cannot go back into factory. If we need to
functionality, then I only see builder pattern as a clean solution, where going
back and forth between operator and operator builder is doable.
Last remark, if `StreamOperatorInitializer` ends up with 10+ fields that are
all passed on construction, I'd probably switch to a builder style, but that
can also be done later.
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services