[
https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Aljoscha Krettek closed FLINK-7552.
-----------------------------------
Resolution: Fixed
Implemented in e7996b0d0ff5fe705a3830f3855f977cad4f0c44
> Extend SinkFunction interface with SinkContext
> ----------------------------------------------
>
> Key: FLINK-7552
> URL: https://issues.apache.org/jira/browse/FLINK-7552
> Project: Flink
> Issue Type: Bug
> Components: DataStream API
> Reporter: Aljoscha Krettek
> Assignee: Aljoscha Krettek
> Fix For: 1.4.0
>
>
> Now that we require Java 8 we can extend the {{SinkFunction}} interface
> without breaking backwards compatibility. I'm proposing this:
> {code}
> /**
> * Interface for implementing user defined sink functionality.
> *
> * @param <IN> Input type parameter.
> */
> @Public
> public interface SinkFunction<IN> extends Function, Serializable {
> /**
> * Function for standard sink behaviour. This function is called for
> every record.
> *
> * @param value The input record.
> * @throws Exception
> * @deprecated Use {@link #invoke(SinkContext, Object)}.
> */
> @Deprecated
> default void invoke(IN value) throws Exception {
> }
> /**
> * Writes the given value to the sink. This function is called for
> every record.
> *
> * @param context Additional context about the input record.
> * @param value The input record.
> * @throws Exception
> */
> default void invoke(SinkContext context, IN value) throws Exception {
> invoke(value);
> }
> /**
> * Context that {@link SinkFunction SinkFunctions } can use for getting
> additional data about
> * an input record.
> *
> * @param <T> The type of elements accepted by the sink.
> */
> @Public // Interface might be extended in the future with additional
> methods.
> interface SinkContext<T> {
> /**
> * Returns the timestamp of the current input record.
> */
> long timestamp();
> }
> }
> {code}
> For now, this only allows access to the element timestamp. This would allow
> us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a
> hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to
> timestamps.
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)