[
https://issues.apache.org/jira/browse/IGNITE-19459?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17724257#comment-17724257
]
Vedran Ljubovic commented on IGNITE-19459:
------------------------------------------
After some experiments, I have found that simply moving extractor
initialization before others fixes the problem. Apparently
StreamerContext.getStreamer() can sometimes take a few seconds, kc starts
streaming messages before it is done and these messages will be dropped because
the key is null. Patch is included.
> Kafka Connect IgniteSinkConnector drops messages in case of error
> -----------------------------------------------------------------
>
> Key: IGNITE-19459
> URL: https://issues.apache.org/jira/browse/IGNITE-19459
> Project: Ignite
> Issue Type: Bug
> Components: extensions
> Affects Versions: 2.15
> Reporter: Vedran Ljubovic
> Priority: Major
> Attachments: event_dropping.patch
>
>
> We are using Kafka Connect (KC) to stream messages from Kafka to Ignite.
> Since the Kafka topic is using null key, we have created a custom
> SingleTupleExtractor to generate keys from payload. This works very well when
> everything is ok. However, if there are any kind of issues with starting a
> cache on Ignite (such as if cluster state is inactive or if cache has
> lostParts), we expect KC to fail to start. Instead, KC will start and appear
> to be running, and the messages will be dropped - which means that once the
> problems are removed, KC will not attempt to resend the messages even after
> restart! This for us is unacceptable, we believe that the system should be
> reliable and fault-tolerant.
> In logs we notice errors such as:
> {code:java}
> Failed to stream a record with null key! {code}
> which is useless since we do have a SingleTupleExtractor for this purpose and
> we can see that it isn't being called at all!
> When KC REST API [1] is used, we find the state is RUNNING which means that
> we have no way to detect this error other than parsing the logs which is
> unreliable.
> Upon investigating this issue, we found the following:
> * Ignite connection and IgniteDataStreamer are declared as private static
> final fields of an inner class, they will be initialized when calling the
> start() method of IgniteSinkConnector. From KC docs [2], we conclude that
> method initialize() should be overloaded and the connections created there,
> also that appropriate exception types should be thrown so that KC knows that
> connection has failed and terminate the task/connector.
> * When start() method is called, StreamerContext.getStreamer() in line 72
> will fail with exception. This exception is not handled by KC so it doesn't
> know that task failed to start. In addition, code will never reach line 91
> where SingleTupleExtractor is created therefore there will be no extractor.
> Solution would be to catch all types of exceptions and throw those exceptions
> which will be detected by KC as critical errors. Alternatively, put() method
> should throw an exception is stopped is true.
> * When put() method is called, if there is no key and no extractor, in line
> 121 we see that the error is logged but exception is not thrown so KC thinks
> that the message was successfully streamed. Here, ConnectException should be
> thrown. If users want the current behavior (which is to stream Kafka messages
> with key and skip those without key), they can set option errors.tolerance =
> all in connector config. [3]
> [1]
> [https://docs.confluent.io/platform/current/connect/references/restapi.html]
> [2]
> [https://kafka.apache.org/34/javadoc/org/apache/kafka/connect/sink/SinkTask.html]
> [3]
> [https://www.confluent.io/blog/kafka-connect-deep-dive-error-handling-dead-letter-queues/]
>
--
This message was sent by Atlassian Jira
(v8.20.10#820010)