[ https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530178#comment-15530178 ]
Aljoscha Krettek commented on FLINK-3674: ----------------------------------------- Turns out that adding a generic interface for {{TimelyFunction}} is not too well defined. I'm assuming that people would want to have the chance to emit elements when they get a timer callback. How would this work if the user function is a {{FilterFunction}} or a {{ReduceFunction}}? For now I'm going for {{TimelyFlatMapFunction}} that looks like this: {code} @Public public interface TimelyFlatMapFunction<I, O> extends Function, Serializable { /** * The core method of the {@code TimelyFlatMapFunction}. Takes an element from the input data set and transforms * it into zero, one, or more elements. * * @param value The input value. * @param timerService A {@link TimerService} that allows setting timers and querying the * current time. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ void flatMap(I value, TimerService timerService, Collector<O> out) throws Exception; /** * Called when a timer set using {@link TimerService} fires. * * @param timestamp The timestamp of the firing timer. * @param timeDomain The {@link TimeDomain} of the firing timer. * @param timerService A {@link TimerService} that allows setting timers and querying the * current time. * @param out The collector for returning result values. * * @throws Exception This method may throw exceptions. Throwing an exception will cause the operation * to fail and may trigger recovery. */ void onTimer(long timestamp, TimeDomain timeDomain, TimerService timerService, Collector<O> out) throws Exception ; } {code} In {{onTimer()}} the user can only emit elements of the output type of the function. > Add an interface for Time aware User Functions > ---------------------------------------------- > > Key: FLINK-3674 > URL: https://issues.apache.org/jira/browse/FLINK-3674 > Project: Flink > Issue Type: New Feature > Components: Streaming > Affects Versions: 1.0.0 > Reporter: Stephan Ewen > Assignee: Aljoscha Krettek > > I suggest to add an interface that UDFs can implement, which will let them be > notified upon watermark updates. > Example usage: > {code} > public interface EventTimeFunction { > void onWatermark(Watermark watermark); > } > public class MyMapper implements MapFunction<String, String>, > EventTimeFunction { > private long currentEventTime = Long.MIN_VALUE; > public String map(String value) { > return value + " @ " + currentEventTime; > } > public void onWatermark(Watermark watermark) { > currentEventTime = watermark.getTimestamp(); > } > } > {code} -- This message was sent by Atlassian JIRA (v6.3.4#6332)