becketqin commented on pull request #12002:
URL: https://github.com/apache/flink/pull/12002#issuecomment-625776143


   @StephanEwen Regarding "Injecting the OperatorEventGateway via 
setOperatorEventGateway(...)", while I am trying to update the patch, I 
realized it is not possible to pass the `OperatorEventGateway` to the 
constructor of `SourceOperator` because the gateway is only available after 
register the `OperatorEventHandler` which is essentially the `SourceOperator` 
itself. A few solutions might be:
   
   1. Separate out the `OperatorEventHandler` implementation from the 
`SourceOperator` and pass that implementation instance to the `SourceOperator` 
constructor. This is a little weird because the event should just be handled by 
the operator itself. And generally speaking, the external 
`OperatorEventHandler` may not be able to access some internal states of the 
operator.
   
   2. Pass the `StreamOperatorParameters.containingTask()` to the constructor 
of `SourceOperator`. The SourceOperator will have its internal 
`OperatorEventHandler` class and instance created in the constructor and 
registered to the containing `StreamTask`. This solutions is slightly better 
than the previous one. But from API consistency point of view, it is less 
aligned because this essentially relies on the implementation of the operator 
to actually register an `OperatorEventHandler` in order to make everything work.
   
   3. Add a method of `setOperatorEventGateway()` to 
`CoordinatedOperatorFactory` (new interface). This is essentially what 
`YieldingOperatorFactory` and `ProcessingTimeServiceAware` is doing. This could 
work, but it will make `DataStream.transform(Operator)` unusable for such 
operators. Instead, users have to use 
`DataStream.transform(OneInputStreamOperatorFactory)`. Are we going to 
deprecate the former interface? If so then it would make sense to put all the 
decorative interfaces on the factories instead of on the operators. However, in 
that case, the `CoordinatedOperatorFactory.createStreamOperator()` can only 
return a `StreamOperator`, instead of a `OperatorEventHandler`. This again 
makes the API loose.
   
   Compared with the above solutions, it seems that the current 
`WithOperatorCoordinator` API puts together all the required pieces needed for 
an operator with coordinator. I kind of think API wise it is intuitive for the 
users to understand. As in, if an operator has a corresponding coordinator,  it 
should provide the coordinator as well as handle the events from the 
coordinator.
   
   I am not sure what would be the best way to do it here. Do you have any 
suggestion?


----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

For queries about this service, please contact Infrastructure at:
[email protected]


Reply via email to