becketqin commented on pull request #12002: URL: https://github.com/apache/flink/pull/12002#issuecomment-625776143
@StephanEwen Regarding "Injecting the OperatorEventGateway via setOperatorEventGateway(...)", while I am trying to update the patch, I realized it is not possible to pass the `OperatorEventGateway` to the constructor of `SourceOperator` because the gateway is only available after register the `OperatorEventHandler` which is essentially the `SourceOperator` itself. A few solutions might be: 1. Separate out the `OperatorEventHandler` implementation from the `SourceOperator` and pass that implementation instance to the `SourceOperator` constructor. This is a little weird because the event should just be handled by the operator itself. And generally speaking, the external `OperatorEventHandler` may not be able to access some internal states of the operator. 2. Pass the `StreamOperatorParameters.containingTask()` to the constructor of `SourceOperator`. The SourceOperator will have its internal `OperatorEventHandler` class and instance created in the constructor and registered to the containing `StreamTask`. This solutions is slightly better than the previous one. But from API consistency point of view, it is less aligned because this essentially relies on the implementation of the operator to actually register an `OperatorEventHandler` in order to make everything work. 3. Add a method of `setOperatorEventGateway()` to `CoordinatedOperatorFactory` (new interface). This is essentially what `YieldingOperatorFactory` and `ProcessingTimeServiceAware` is doing. This could work, but it will make `DataStream.transform(Operator)` unusable for such operators. Instead, users have to use `DataStream.transform(OneInputStreamOperatorFactory)`. Are we going to deprecate the former interface? If so then it would make sense to put all the decorative interfaces on the factories instead of on the operators. However, in that case, the `CoordinatedOperatorFactory.createStreamOperator()` can only return a `StreamOperator`, instead of a `OperatorEventHandler`. This again makes the API loose. Compared with the above solutions, it seems that the current `WithOperatorCoordinator` API puts together all the required pieces needed for an operator with coordinator. I kind of think API wise it is intuitive for the users to understand. As in, if an operator has a corresponding coordinator, it should provide the coordinator as well as handle the events from the coordinator. I am not sure what would be the best way to do it here. Do you have any suggestion? ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected]
