[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15530178#comment-15530178
 ] 

Aljoscha Krettek commented on FLINK-3674:
-----------------------------------------

Turns out that adding a generic interface for {{TimelyFunction}} is not too 
well defined. I'm assuming that people would want to have the chance to emit 
elements when they get a timer callback. How would this work if the user 
function is a {{FilterFunction}} or a {{ReduceFunction}}?

For now I'm going for {{TimelyFlatMapFunction}} that looks like this:

{code}
@Public
public interface TimelyFlatMapFunction<I, O> extends Function, Serializable {

        /**
         * The core method of the {@code TimelyFlatMapFunction}. Takes an 
element from the input data set and transforms
         * it into zero, one, or more elements.
         *
         * @param value The input value.
         * @param timerService A {@link TimerService} that allows setting 
timers and querying the
         *                        current time.
         * @param out The collector for returning result values.
         *
         * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        void flatMap(I value, TimerService timerService, Collector<O> out) 
throws Exception;

        /**
         * Called when a timer set using {@link TimerService} fires.
         *
         * @param timestamp The timestamp of the firing timer.
         * @param timeDomain The {@link TimeDomain} of the firing timer.
         * @param timerService A {@link TimerService} that allows setting 
timers and querying the
         *                        current time.
         * @param out The collector for returning result values.
         *
         * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
         *                   to fail and may trigger recovery.
         */
        void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector<O> out) throws Exception ;

}
{code}

In {{onTimer()}} the user can only emit elements of the output type of the 
function.

> Add an interface for Time aware User Functions
> ----------------------------------------------
>
>                 Key: FLINK-3674
>                 URL: https://issues.apache.org/jira/browse/FLINK-3674
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Stephan Ewen
>            Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
>     void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction<String, String>, 
> EventTimeFunction {
>     private long currentEventTime = Long.MIN_VALUE;
>     public String map(String value) {
>         return value + " @ " + currentEventTime;
>     }
>     public void onWatermark(Watermark watermark) {
>         currentEventTime = watermark.getTimestamp();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to