Hi Folks,
I'm in the process of implementing
https://issues.apache.org/jira/browse/FLINK-3674 and now
I'm having a bit of a problem with deciding how watermarks should be
treated for operators that have more than one input.

The problem is deciding when to fire event-time timers. For one-input
operators it's pretty straightforward: fire once the watermark surpasses a
given timer. For two-input operators we allow the operator implementer to
observe the watermarks from the two inputs separately and react to that and
also to decide what watermark to forward. With this it becomes hard to
figure out when to fire timers.

My thinking is that we should not allow operators to observe the watermark
anymore but route it past the operator and deal with watermarks and timers
outside of the operator. A timer for operators with more than one inputs
(TwoInputOperator) would fire if the watermark from both inputs advances
sufficiently far.

Alternatively, we could still let operators observe watermarks but grab the
watermark before it enters the operator and still deal with timers in the
same way as proposed above.

Any feedback on this is very welcome! What would you expect to happen for
timers of operators with more than one input?

Cheers,
Aljoscha

P.S. One reason for why I want to deal with watermark outside of operators
is that otherwise every operator has to implement the functionality to
update the current watermark at the timer service. i.e. something like this:

@Internal
public class StreamMap<IN, OUT>
        extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
        implements OneInputStreamOperator<IN, OUT> {

    private static final long serialVersionUID = 1L;

    public StreamMap(MapFunction<IN, OUT> mapper) {
        super(mapper);
        chainingStrategy = ChainingStrategy.ALWAYS;
    }

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {

output.collect(element.replace(userFunction.map(element.getValue())));
    }

    @Override
    public void processWatermark(Watermark mark) throws Exception {
        timerService.updateWatermark(mark); // *<--- that's the thing I
don't want*
        output.emitWatermark(mark);
    }
}

This becomes more complicated for two input operators which also do the
merging of the watermarks from the two inputs right now.

Reply via email to