[
https://issues.apache.org/jira/browse/KAFKA-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16141251#comment-16141251
]
Seweryn Habdank-Wojewodzki commented on KAFKA-5779:
---------------------------------------------------
Hi Matthias,
Thanks for the response.
The change of the TimestampExtractor is not the point. I do not care about the
messages, which are sent in any kind of broken form.
The problem is that code around parsing timestamp or even higher around Kafka
header extraction is not catching exceptions.
So, the one message may exploit complete streaming service.
And even worse, the state in which streammig app is later is like zombie
process.
On the Linux you will see existing process (ps ax will tell you process
exists), but it is dead, it is doing nothing.
So the bug is about possibility to easy attack Kafka streaming service with
DoS, but not by flooding the app with gazillion messages, but more subtle, it
is enough to send one message with broken Kafka header (e.g. with broken time
stamp) and app will not work without signs of death.
For example is the world of services, easy checks if app shall be restarted
will fail, bacause e.g. ps ax will tell - process exists.
> Single message may exploit application based on KStream
> -------------------------------------------------------
>
> Key: KAFKA-5779
> URL: https://issues.apache.org/jira/browse/KAFKA-5779
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.10.2.1, 0.11.0.0
> Reporter: Seweryn Habdank-Wojewodzki
> Priority: Critical
>
> The context: in Kafka streamming I am *defining* simple KStream processing:
> {code}
> stringInput // line 54 of the SingleTopicStreamer class
> .filter( streamFilter::passOrFilterMessages )
> .map( normalizer )
> .to( outTopicName );
> {code}
> For some reasons I got wrong message (I am still investigating what is the
> problem),
> but anyhow my services was exploited with FATAL error:
> {code}
> 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled
> exception: Input record ConsumerRecord(topic = XXX_topic, partition = 8,
> offset = 15, CreateTime = -1, serialized key size = -1, serialized value size
> = 255, headers = RecordHeaders(headers = [], isReadOnly = false), key = null,
> value =
> {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
> has invalid (negative) timestamp. Possibly because a pre-0.10 producer
> client was used to write this record to Kafka without embedding a timestamp,
> or because the input topic was created before upgrading the Kafka cluster to
> 0.10+. Use a different TimestampExtractor to process this data.;
> [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
>
> org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61),
>
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46),
>
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85),
>
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117),
>
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464),
>
> org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556),
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
> in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3
> {code}
> The possible reason about using old producer in message is false, as we are
> using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this
> version of Kafka.
> The sender application is .NET client from Confluent.
> All the matter is a bit problematic with this exception, as it was suggested
> it is thrown in scope of initialization of the stream, but effectively it
> happend in processing, so adding try{} catch {} around stringInput statement
> does not help, as stream was correctly defined, but only one message send
> later had exploited all the app.
> In my opinion KStream shall be robust enough to catch all such a exception
> and shall protect application from death due to single corrupted message.
> Especially when timestamp is not embedded. In such a case one can patch
> message with current timestamp without loss of overall performance.
> I would expect Kafka Stream will handle this.
> I will continue to investigate, what is the problem with the message, but it
> is quite hard to me, as it happens internally in Kafka stream combined with
> .NET producer.
> And I had already tested, that this problem does not occur when I got these
> concrete messages in old-fashioned Kafka Consumer :-).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)