[ 
https://issues.apache.org/jira/browse/FLINK-3674?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15585559#comment-15585559
 ] 

ASF GitHub Bot commented on FLINK-3674:
---------------------------------------

Github user StefanRRichter commented on a diff in the pull request:

    https://github.com/apache/flink/pull/2570#discussion_r83857663
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/functions/TimelyFlatMapFunction.java
 ---
    @@ -0,0 +1,78 @@
    +/*
    + * Licensed to the Apache Software Foundation (ASF) under one
    + * or more contributor license agreements.  See the NOTICE file
    + * distributed with this work for additional information
    + * regarding copyright ownership.  The ASF licenses this file
    + * to you under the Apache License, Version 2.0 (the
    + * "License"); you may not use this file except in compliance
    + * with the License.  You may obtain a copy of the License at
    + *
    + *     http://www.apache.org/licenses/LICENSE-2.0
    + *
    + * Unless required by applicable law or agreed to in writing, software
    + * distributed under the License is distributed on an "AS IS" BASIS,
    + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
    + * See the License for the specific language governing permissions and
    + * limitations under the License.
    + */
    +
    +package org.apache.flink.streaming.api.functions;
    +
    +import org.apache.flink.annotation.PublicEvolving;
    +import org.apache.flink.api.common.functions.Function;
    +import org.apache.flink.streaming.api.TimeDomain;
    +import org.apache.flink.streaming.api.TimerService;
    +import org.apache.flink.util.Collector;
    +
    +import java.io.Serializable;
    +
    +/**
    + * Base interface for timely flatMap functions. FlatMap functions take 
elements and transform them,
    + * into zero, one, or more elements. Typical applications can be splitting 
elements, or unnesting lists
    + * and arrays.
    + *
    + * <p>A {@code TimelyFlatMapFunction} can, in addition to the 
functionality of a normal
    + * {@link org.apache.flink.api.common.functions.FlatMapFunction}, also set 
timers and react
    + * to them firing.
    + *
    + * <pre>{@code
    + * DataStream<X> input = ...;
    + *
    + * DataStream<Y> result = input.flatMap(new MyTimelyFlatMapFunction());
    + * }</pre>
    + *
    + * @param <I> Type of the input elements.
    + * @param <O> Type of the returned elements.
    + */
    +@PublicEvolving
    +public interface TimelyFlatMapFunction<I, O> extends Function, 
Serializable {
    +
    +   /**
    +    * The core method of the {@code TimelyFlatMapFunction}. Takes an 
element from the input data set and transforms
    +    * it into zero, one, or more elements.
    +    *
    +    * @param value The input value.
    +    * @param timerService A {@link TimerService} that allows setting 
timers and querying the
    +    *                        current time.
    +    * @param out The collector for returning result values.
    +    *
    +    * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
    +    *                   to fail and may trigger recovery.
    +    */
    +   void flatMap(I value, TimerService timerService, Collector<O> out) 
throws Exception;
    +
    +   /**
    +    * Called when a timer set using {@link TimerService} fires.
    +    *
    +    * @param timestamp The timestamp of the firing timer.
    +    * @param timeDomain The {@link TimeDomain} of the firing timer.
    +    * @param timerService A {@link TimerService} that allows setting 
timers and querying the
    +    *                        current time.
    +    * @param out The collector for returning result values.
    +    *
    +    * @throws Exception This method may throw exceptions. Throwing an 
exception will cause the operation
    +    *                   to fail and may trigger recovery.
    +    */
    +   void onTimer(long timestamp, TimeDomain timeDomain, TimerService 
timerService, Collector<O> out) throws Exception ;
    --- End diff --
    
    I wonder if `TimeDomain` and `TimerService` should be parameter to methods 
in this interface. I assume both remain stable for the lifetime of the UDF and 
could be passed once in some init method that can also be preimplemented in a 
`RichTimelyFlatMapFunction`. Maybe there is a good reason against this, but I 
like to keep the number of parameters small when possible.


> Add an interface for Time aware User Functions
> ----------------------------------------------
>
>                 Key: FLINK-3674
>                 URL: https://issues.apache.org/jira/browse/FLINK-3674
>             Project: Flink
>          Issue Type: New Feature
>          Components: Streaming
>    Affects Versions: 1.0.0
>            Reporter: Stephan Ewen
>            Assignee: Aljoscha Krettek
>
> I suggest to add an interface that UDFs can implement, which will let them be 
> notified upon watermark updates.
> Example usage:
> {code}
> public interface EventTimeFunction {
>     void onWatermark(Watermark watermark);
> }
> public class MyMapper implements MapFunction<String, String>, 
> EventTimeFunction {
>     private long currentEventTime = Long.MIN_VALUE;
>     public String map(String value) {
>         return value + " @ " + currentEventTime;
>     }
>     public void onWatermark(Watermark watermark) {
>         currentEventTime = watermark.getTimestamp();
>     }
> }
> {code}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to