Hi Folks, I'm in the process of implementing https://issues.apache.org/jira/browse/FLINK-3674 and now I'm having a bit of a problem with deciding how watermarks should be treated for operators that have more than one input.
The problem is deciding when to fire event-time timers. For one-input operators it's pretty straightforward: fire once the watermark surpasses a given timer. For two-input operators we allow the operator implementer to observe the watermarks from the two inputs separately and react to that and also to decide what watermark to forward. With this it becomes hard to figure out when to fire timers. My thinking is that we should not allow operators to observe the watermark anymore but route it past the operator and deal with watermarks and timers outside of the operator. A timer for operators with more than one inputs (TwoInputOperator) would fire if the watermark from both inputs advances sufficiently far. Alternatively, we could still let operators observe watermarks but grab the watermark before it enters the operator and still deal with timers in the same way as proposed above. Any feedback on this is very welcome! What would you expect to happen for timers of operators with more than one input? Cheers, Aljoscha P.S. One reason for why I want to deal with watermark outside of operators is that otherwise every operator has to implement the functionality to update the current watermark at the timer service. i.e. something like this: @Internal public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>> implements OneInputStreamOperator<IN, OUT> { private static final long serialVersionUID = 1L; public StreamMap(MapFunction<IN, OUT> mapper) { super(mapper); chainingStrategy = ChainingStrategy.ALWAYS; } @Override public void processElement(StreamRecord<IN> element) throws Exception { output.collect(element.replace(userFunction.map(element.getValue()))); } @Override public void processWatermark(Watermark mark) throws Exception { timerService.updateWatermark(mark); // *<--- that's the thing I don't want* output.emitWatermark(mark); } } This becomes more complicated for two input operators which also do the merging of the watermarks from the two inputs right now.