[
https://issues.apache.org/jira/browse/KAFKA-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16143414#comment-16143414
]
Seweryn Habdank-Wojewodzki commented on KAFKA-5779:
---------------------------------------------------
What you had written makes sense (I understand your point of view), but it is
not really what I expect from streaming apps.
What I expect is that only "out of resource" (out of memory, disk full etc.)
errors may lead to situation where app shall exit in time of message
processing. Any kind of parsing or whatever errors, which could be handled
shall be handled.
In worst case app shall log about corupted message and only this one shall be
ignored. It shall be default behavior.
By default app shall be on the safe side and not in danger, that single damaged
(perhaps on purpose) message must not exploit complete app.
Any extra error handling shall be parametrized.
So I would expect that Streaming API is constructed other way round as it is
now.
And BTW I have in parallel similar processing app made on basis of old
producer/consumer API,
this one had been reliably handling all those broken messages,
so it is definitely asymmetry in API design between Streams and old school
Producer/Consumer.
> Single message may exploit application based on KStream
> -------------------------------------------------------
>
> Key: KAFKA-5779
> URL: https://issues.apache.org/jira/browse/KAFKA-5779
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.10.2.1, 0.11.0.0
> Reporter: Seweryn Habdank-Wojewodzki
> Priority: Critical
>
> The context: in Kafka streamming I am *defining* simple KStream processing:
> {code}
> stringInput // line 54 of the SingleTopicStreamer class
> .filter( streamFilter::passOrFilterMessages )
> .map( normalizer )
> .to( outTopicName );
> {code}
> For some reasons I got wrong message (I am still investigating what is the
> problem),
> but anyhow my services was exploited with FATAL error:
> {code}
> 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled
> exception: Input record ConsumerRecord(topic = XXX_topic, partition = 8,
> offset = 15, CreateTime = -1, serialized key size = -1, serialized value size
> = 255, headers = RecordHeaders(headers = [], isReadOnly = false), key = null,
> value =
> {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
> has invalid (negative) timestamp. Possibly because a pre-0.10 producer
> client was used to write this record to Kafka without embedding a timestamp,
> or because the input topic was created before upgrading the Kafka cluster to
> 0.10+. Use a different TimestampExtractor to process this data.;
> [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
>
> org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61),
>
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46),
>
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85),
>
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117),
>
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464),
>
> org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556),
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
> in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3
> {code}
> The possible reason about using old producer in message is false, as we are
> using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this
> version of Kafka.
> The sender application is .NET client from Confluent.
> All the matter is a bit problematic with this exception, as it was suggested
> it is thrown in scope of initialization of the stream, but effectively it
> happend in processing, so adding try{} catch {} around stringInput statement
> does not help, as stream was correctly defined, but only one message send
> later had exploited all the app.
> In my opinion KStream shall be robust enough to catch all such a exception
> and shall protect application from death due to single corrupted message.
> Especially when timestamp is not embedded. In such a case one can patch
> message with current timestamp without loss of overall performance.
> I would expect Kafka Stream will handle this.
> I will continue to investigate, what is the problem with the message, but it
> is quite hard to me, as it happens internally in Kafka stream combined with
> .NET producer.
> And I had already tested, that this problem does not occur when I got these
> concrete messages in old-fashioned Kafka Consumer :-).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)