[ 
https://issues.apache.org/jira/browse/FLINK-14034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16927392#comment-16927392
 ] 

Niels van Kaam edited comment on FLINK-14034 at 9/11/19 8:40 AM:
-----------------------------------------------------------------

I want to place a generic wrapper around the events I am writing to Kafka, 
which for now just contains the processing time of the moment the event is 
being written to Kafka. I am already using Kafka's timestamp for the 
event-time. In the Future I expect some additional context might be added to 
this wrapper.

This can also be solved by using a ProcessFunction straight before the 
SinkFunction, and read the processing time from the context (Or if I would just 
use System.currentTimeMillis(), a mapfunction would suffice).

The reason I would prefer overriding the sink is because we need this logic in 
all our jobs writing to a specific Kakfa cluster. Using a specific Sink for 
this cluster is easier to explain/document for the rest of my team.

I can however also build a component which both adds the processfunction and 
the sinkfunction, which is also easy to document. So if the transaction state 
is not stable enough to be exposed, making the invoke method final is perfectly 
fine by me.


was (Author: nvankaam):
I want to place a generic wrapper around the events I am writing to Kafka, 
which for now just contains the processing time of the moment the event is 
being written to Kafka. I am already using Kafka's timestamp for the 
event-time. In the Future I expect some additional context might be added to 
this wrapper.

This can also be solved by using a ProcessFunction straight before the 
SinkFunction (Or if I would just use System.currentTimeMillis(), a mapfunction 
would suffice).

The reason I would prefer overriding the sink is because we need this logic in 
all our jobs writing to a specific Kakfa cluster. Using a specific Sink for 
this cluster is easier to explain/document for the rest of my team.

I can however also build a component which both adds the processfunction and 
the sinkfunction, which is also easy to document. So if the transaction state 
is not stable enough to be exposed, making the invoke method final is perfectly 
fine by me.

> In FlinkKafkaProducer, KafkaTransactionState should be made public or invoke 
> should be made final
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14034
>                 URL: https://issues.apache.org/jira/browse/FLINK-14034
>             Project: Flink
>          Issue Type: Wish
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Niels van Kaam
>            Priority: Trivial
>
> It is not possible to override the invoke method of the FlinkKafkaProducer, 
> because the first parameter, KafkaTransactionState, is a private inner class. 
>  It is not possible to override the original invoke of SinkFunction, because 
> TwoPhaseCommitSinkFunction (which is implemented by FlinkKafkaProducer) does 
> override the original invoke method with final.
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java]
> If there is a particular reason for this, it would be better to make the 
> invoke method in FlinkKafkaProducer final as well, and document the reason 
> such that it is clear this is by design (I don't see any overrides in the 
> same package).
> Otherwise, I would make the KafkaTransactionState publicly visible. I would 
> like to override the Invoke method to create a custom KafkaProducer which 
> performs some additional generic validations and transformations. (which can 
> also be done in a process-function, but a custom sink would simplify the code 
> of jobs)
>  



--
This message was sent by Atlassian Jira
(v8.3.2#803003)

Reply via email to