[ 
https://issues.apache.org/jira/browse/FLINK-14034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283740#comment-17283740
 ] 

Andrew Roberts edited comment on FLINK-14034 at 2/12/21, 2:57 PM:
------------------------------------------------------------------

[~pnowojski] the catch here is, ultimately something implementing a sink 
interface must be passed to {{addSink()}}. It is not possible to wrap the 
{{TwoPhaseCommitSink}} version of invoke, because I can't reproduce the method 
signature (due to the private type at the root of this conversation). I suppose 
I could wrap a different invoke method, but my confidence in those other 
methods being called is low, since they all seem to funnel to the main 
implementation (which again is inaccessible).

Your point about assuming the underlying instance can recover after an 
exception is a good one - I hadn't considered that.

Edit: This is pushing me towards a full-throated commitment to simply ignoring 
all exceptions via the provided method - it means that we'll experience 
"unnecessary" message loss if a transient exception occurs when pushing a 
message, but if the alternative is complete loss of the job if a message 
deterministically cannot be sent, then we'll have to take it as the less-bad 
choice.

Is there any interest in a more structural (i.e. within flink) solution to this 
transient/non-transient error issue? Kafka provides 
\{{org.apache.kafka.common.errors.RetriableException}}, which we were using to 
power this selective retry- allowing exceptions extending that type to fail the 
job (restarting from the last checkpoint), while other (\{{control.NonFatal}}) 
exceptions were logged instead.


was (Author: [email protected]):
[~pnowojski] the catch here is, ultimately something implementing a sink 
interface must be passed to {{addSink()}}. It is not possible to wrap the 
{{TwoPhaseCommitSink}} version of invoke, because I can't reproduce the method 
signature (due to the private type at the root of this conversation). I suppose 
I could wrap a different invoke method, but my confidence in those other 
methods being called is low, since they all seem to funnel to the main 
implementation (which again is inaccessible).

Your point about assuming the underlying instance can recover after an 
exception is a good one - I hadn't considered that.

> In FlinkKafkaProducer, KafkaTransactionState should be made public or invoke 
> should be made final
> -------------------------------------------------------------------------------------------------
>
>                 Key: FLINK-14034
>                 URL: https://issues.apache.org/jira/browse/FLINK-14034
>             Project: Flink
>          Issue Type: Wish
>          Components: Connectors / Kafka
>    Affects Versions: 1.9.0
>            Reporter: Niels van Kaam
>            Priority: Trivial
>
> It is not possible to override the invoke method of the FlinkKafkaProducer, 
> because the first parameter, KafkaTransactionState, is a private inner class. 
>  It is not possible to override the original invoke of SinkFunction, because 
> TwoPhaseCommitSinkFunction (which is implemented by FlinkKafkaProducer) does 
> override the original invoke method with final.
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java]
> If there is a particular reason for this, it would be better to make the 
> invoke method in FlinkKafkaProducer final as well, and document the reason 
> such that it is clear this is by design (I don't see any overrides in the 
> same package).
> Otherwise, I would make the KafkaTransactionState publicly visible. I would 
> like to override the Invoke method to create a custom KafkaProducer which 
> performs some additional generic validations and transformations. (which can 
> also be done in a process-function, but a custom sink would simplify the code 
> of jobs)
>  



--
This message was sent by Atlassian Jira
(v8.3.4#803005)

Reply via email to