Hi team, I'm writing my custom Operator as a high fan-out operation and I use processing time timers to defer processing some inputs When timers are firing, the operator will continue to process the deferred elements. One typical use case for my Operator is like: ImpulseOperator -> my Operator -> downstream where the watermark of ImpulseOperator advances to MAX_TIMESTAMP immediately.
One problem I have is that after my operator.close() is called, it's still possible for me to set processing time timers and wait for these timers to be fired. But it seems like Flink pauses invoking processing timers once one operator.close() is called in the new version. I'm curious why Flink decides to do so and any workaround I can do for my operator? Thanks for your help!