[ https://issues.apache.org/jira/browse/FLINK-7552?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16145339#comment-16145339 ]
ASF GitHub Bot commented on FLINK-7552: --------------------------------------- GitHub user aljoscha opened a pull request: https://github.com/apache/flink/pull/4616 [FLINK-7552] [FLINK-7553] Enhance SinkInterface / Use in FlinkKafkaProducer ## What is the purpose of the change Enhance `SinkFunction` with a way of retrieving the element timestamp. This allows us to get rid of the hybrid nature of `FlinkKafkaProducer010`. This is keeping the legacy static "convenience" methods à la `FlinkKafkaProducer010.writeToKafkaWithTimestamps` for backwards compatibility. ## Brief change log - Enhance Sink interface - Use new interface in Kafka Producer ## Verifying this change This change is already covered by existing tests, such as *(please describe tests)*. ## Does this pull request potentially affect one of the following parts: - Dependencies (does it add or upgrade a dependency): no - The public API, i.e., is any changed class annotated with `@Public(Evolving)`: no - The serializers: no - The runtime per-record code paths (performance sensitive): yes, call stack of KafkaProducer with writing timestamps is changed slightly, also, `StreamSink` operator now has a context object. - Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Yarn/Mesos, ZooKeeper: no ## Documentation - Does this pull request introduce a new feature? no - If yes, how is the feature documented? not applicable You can merge this pull request into a Git repository by running: $ git pull https://github.com/aljoscha/flink jira-7553-fix-kafka010-producer Alternatively you can review and apply these changes as the patch at: https://github.com/apache/flink/pull/4616.patch To close this pull request, make a commit to your master/trunk branch with (at least) the following in the commit message: This closes #4616 ---- commit 0b5bea36247736a0160ce584b94050d7b676d091 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2017-08-29T13:50:56Z [FLINK-7552] Extend SinkFunction interface with SinkContext commit d3a7b294542ea40287290ff4970715ead621d398 Author: Aljoscha Krettek <aljoscha.kret...@gmail.com> Date: 2017-08-29T13:53:16Z [FLINK-7553] Use new SinkFunction interface in FlinkKafkaProducer010 ---- > Extend SinkFunction interface with SinkContext > ---------------------------------------------- > > Key: FLINK-7552 > URL: https://issues.apache.org/jira/browse/FLINK-7552 > Project: Flink > Issue Type: Bug > Components: DataStream API > Reporter: Aljoscha Krettek > Assignee: Aljoscha Krettek > Fix For: 1.4.0 > > > Now that we require Java 8 we can extend the {{SinkFunction}} interface > without breaking backwards compatibility. I'm proposing this: > {code} > /** > * Interface for implementing user defined sink functionality. > * > * @param <IN> Input type parameter. > */ > @Public > public interface SinkFunction<IN> extends Function, Serializable { > /** > * Function for standard sink behaviour. This function is called for > every record. > * > * @param value The input record. > * @throws Exception > * @deprecated Use {@link #invoke(SinkContext, Object)}. > */ > @Deprecated > default void invoke(IN value) throws Exception { > } > /** > * Writes the given value to the sink. This function is called for > every record. > * > * @param context Additional context about the input record. > * @param value The input record. > * @throws Exception > */ > default void invoke(SinkContext context, IN value) throws Exception { > invoke(value); > } > /** > * Context that {@link SinkFunction SinkFunctions } can use for getting > additional data about > * an input record. > * > * @param <T> The type of elements accepted by the sink. > */ > @Public // Interface might be extended in the future with additional > methods. > interface SinkContext<T> { > /** > * Returns the timestamp of the current input record. > */ > long timestamp(); > } > } > {code} > For now, this only allows access to the element timestamp. This would allow > us to fix the abomination that is {{FlinkKafkaProducer010}}, which is a > hybrid {{SinkFunction}}/{{StreamOperator}} only because it needs access to > timestamps. -- This message was sent by Atlassian JIRA (v6.4.14#64029)