[
https://issues.apache.org/jira/browse/FLINK-14034?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17283647#comment-17283647
]
Piotr Nowojski commented on FLINK-14034:
----------------------------------------
I was thinking for a minute if we can provide a more generic way of ignoring
some errors, that is independent of a specific sink implementation. For example
providing a way for the users to inject an exception handler hook inside
{{Output<T>}} collectors?
However both, this more generic and a specific implementation in the
{{FlinkKafkaProducer}}, both would have a problem to distinguish which errors
are recoverable, and which are not. I mean, maybe we can catch "message too
large", but that doesn't necessarily mean we can keep using the
{{KafkaProducer}} instance without re-initialising it.
Anyway [[email protected]], a hotfix/workaround should be plausible in this
case by just wrapping {{FlinkKafkaProducer}} and in your wrapper class you can
try to ignore a specific exception.
> In FlinkKafkaProducer, KafkaTransactionState should be made public or invoke
> should be made final
> -------------------------------------------------------------------------------------------------
>
> Key: FLINK-14034
> URL: https://issues.apache.org/jira/browse/FLINK-14034
> Project: Flink
> Issue Type: Wish
> Components: Connectors / Kafka
> Affects Versions: 1.9.0
> Reporter: Niels van Kaam
> Priority: Trivial
>
> It is not possible to override the invoke method of the FlinkKafkaProducer,
> because the first parameter, KafkaTransactionState, is a private inner class.
> It is not possible to override the original invoke of SinkFunction, because
> TwoPhaseCommitSinkFunction (which is implemented by FlinkKafkaProducer) does
> override the original invoke method with final.
> [https://github.com/apache/flink/blob/master/flink-connectors/flink-connector-kafka/src/main/java/org/apache/flink/streaming/connectors/kafka/FlinkKafkaProducer.java]
> If there is a particular reason for this, it would be better to make the
> invoke method in FlinkKafkaProducer final as well, and document the reason
> such that it is clear this is by design (I don't see any overrides in the
> same package).
> Otherwise, I would make the KafkaTransactionState publicly visible. I would
> like to override the Invoke method to create a custom KafkaProducer which
> performs some additional generic validations and transformations. (which can
> also be done in a process-function, but a custom sink would simplify the code
> of jobs)
>
--
This message was sent by Atlassian Jira
(v8.3.4#803005)