pnowojski commented on a change in pull request #9383: [FLINK-13248] [runtime] 
Adding processing of downstream messages in AsyncWaitOperator's wait loops
URL: https://github.com/apache/flink/pull/9383#discussion_r317573452
 
 

 ##########
 File path: 
flink-libraries/flink-state-processing-api/src/main/java/org/apache/flink/state/api/output/BoundedStreamTask.java
 ##########
 @@ -72,6 +74,10 @@ protected void init() throws Exception {
                // re-initialize the operator with the correct collector.
                StreamOperatorFactory<OUT> operatorFactory = 
configuration.getStreamOperatorFactory(getUserCodeClassLoader());
                headOperator = operatorFactory.createStreamOperator(this, 
configuration, new CollectorWrapper<>(collector));
+               if (operatorFactory instanceof YieldingOperatorFactory) {
 
 Review comment:
   Should we deduplicate this if check? It's present in 4 different places. 
Maybe as a helper static method in `YieldingOperatorFactory`?
   
   edit:
   Also I would deduplicate this logic including `createStreamOperator(...)` 
call, as now in four different places (and in all of the future ones) we must 
make sure that `setMailboxExecutor(...)` has been correctly set before creating 
`StreamOperator`. This place here for example does this after the creation. 
Another place below looks like is not creating the operator at all.
   
   So I would extract this logic to some utility class, as a static method to 
`StreamOperatorFactoryUtil`?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
us...@infra.apache.org


With regards,
Apache Git Services

Reply via email to