[ 
https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16148970#comment-16148970
 ] 

ASF GitHub Bot commented on FLINK-7552:
---------------------------------------

Github user pnowojski commented on a diff in the pull request:

    https://github.com/apache/flink/pull/4616#discussion_r136334346
  
    --- Diff: 
flink-streaming-java/src/main/java/org/apache/flink/streaming/api/operators/StreamSink.java
 ---
    @@ -31,14 +31,24 @@
     
        private static final long serialVersionUID = 1L;
     
    +   private transient SimpleSinkContext sinkContext;
    +
        public StreamSink(SinkFunction<IN> sinkFunction) {
                super(sinkFunction);
                chainingStrategy = ChainingStrategy.ALWAYS;
        }
     
        @Override
    +   public void open() throws Exception {
    +           super.open();
    +
    +           this.sinkContext = new SimpleSinkContext<>();
    +   }
    +
    +   @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
    -           userFunction.invoke(element.getValue());
    +           sinkContext.element = element;
    +           userFunction.invoke(sinkContext, element.getValue());
    --- End diff --
    
    I get it, however I still do not like this objects repacking thing done 
here. Instead of passing `StreamRecord` directly, we could introduce 
`UserRecord` interface with `getTimestamp()` method, that would be implemented 
by `StreamRecord`.
    
    But that's minor complain, if you have stronger preferences for keeping it 
as it is, it's fine for me.


> Extend SinkFunction interface with SinkContext
> ----------------------------------------------
>
>                 Key: FLINK-7552
>                 URL: https://issues.apache.org/jira/browse/FLINK-7552
>             Project: Flink
>          Issue Type: Bug
>          Components: DataStream API
>            Reporter: Aljoscha Krettek
>            Assignee: Aljoscha Krettek
>             Fix For: 1.4.0
>
>
> Now that we require Java 8 we can extend the {{SinkFunction}} interface 
> without breaking backwards compatibility. I'm proposing this:
> {code}
> /**
>  * Interface for implementing user defined sink functionality.
>  *
>  * @param <IN> Input type parameter.
>  */
> @Public
> public interface SinkFunction<IN> extends Function, Serializable {
>       /**
>        * Function for standard sink behaviour. This function is called for 
> every record.
>        *
>        * @param value The input record.
>        * @throws Exception
>        * @deprecated Use {@link #invoke(SinkContext, Object)}.
>        */
>       @Deprecated
>       default void invoke(IN value) throws Exception {
>       }
>       /**
>        * Writes the given value to the sink. This function is called for 
> every record.
>        *
>        * @param context Additional context about the input record.
>        * @param value The input record.
>        * @throws Exception
>        */
>       default void invoke(SinkContext context, IN value) throws Exception {
>               invoke(value);
>       }
>       /**
>        * Context that {@link SinkFunction SinkFunctions } can use for getting 
> additional data about
>        * an input record.
>        *
>        * @param <T> The type of elements accepted by the sink.
>        */
>       @Public // Interface might be extended in the future with additional 
> methods.
>       interface SinkContext<T> {
>               /**
>                * Returns the timestamp of the current input record.
>                */
>               long timestamp();
>       }
> }
> {code}
> For now, this only allows access to the element timestamp. This would allow 
> us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a 
> hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to 
> timestamps.



--
This message was sent by Atlassian JIRA
(v6.4.14#64029)

Reply via email to