[
https://issues.apache.org/jira/browse/KAFKA-5779?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16147756#comment-16147756
]
Matthias J. Sax commented on KAFKA-5779:
----------------------------------------
You can set the timestamp extractor in the config of your application via
parameter {{default.timestamp.extractor}} (or older name
{{timestamp.extractor}} -- depending on the version you are using). Cf.
http://docs.confluent.io/current/streams/developer-guide.html#streams-developer-guide-timestamp-extractor
Something like:
{noformat}
Properties props = new Properties();
props.put(StreamsConfig.DEFAULT_TIMESTAMP_EXTRACTOR,
LogAndSkipOnInvalidTimestamp.class);
// and other configs
StreamsConfig config = new StreamsConfig(props);
{noformat}
Details can be found in the corresponding KIP-93:
https://cwiki.apache.org/confluence/display/KAFKA/KIP-93%3A+Improve+invalid+timestamp+handling+in+Kafka+Streams
In {{0.11}} you can also set individual timestamp extractors for each
stream/table/globalktable you are reading, by using the corresponding overload
method of e.g. {{KStreamBuilder#stream()}} (cf. KIP-123 for details:
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=68714788)
> Single message may exploit application based on KStream
> -------------------------------------------------------
>
> Key: KAFKA-5779
> URL: https://issues.apache.org/jira/browse/KAFKA-5779
> Project: Kafka
> Issue Type: Bug
> Affects Versions: 0.10.2.1, 0.11.0.0
> Reporter: Seweryn Habdank-Wojewodzki
> Priority: Critical
>
> The context: in Kafka streamming I am *defining* simple KStream processing:
> {code}
> stringInput // line 54 of the SingleTopicStreamer class
> .filter( streamFilter::passOrFilterMessages )
> .map( normalizer )
> .to( outTopicName );
> {code}
> For some reasons I got wrong message (I am still investigating what is the
> problem),
> but anyhow my services was exploited with FATAL error:
> {code}
> 2017-08-22 17:08:44 FATAL SingleTopicStreamer:54 - Caught unhandled
> exception: Input record ConsumerRecord(topic = XXX_topic, partition = 8,
> offset = 15, CreateTime = -1, serialized key size = -1, serialized value size
> = 255, headers = RecordHeaders(headers = [], isReadOnly = false), key = null,
> value =
> {"recordTimestamp":"2017-08-22T17:07:40:619+02:00","logLevel":"INFO","sourceApplication":"WPT","message":"Kafka-Init","businessError":false,"normalizedStatus":"green","logger":"CoreLogger"})
> has invalid (negative) timestamp. Possibly because a pre-0.10 producer
> client was used to write this record to Kafka without embedding a timestamp,
> or because the input topic was created before upgrading the Kafka cluster to
> 0.10+. Use a different TimestampExtractor to process this data.;
> [org.apache.kafka.streams.processor.FailOnInvalidTimestamp.onInvalidTimestamp(FailOnInvalidTimestamp.java:63),
>
> org.apache.kafka.streams.processor.ExtractRecordMetadataTimestamp.extract(ExtractRecordMetadataTimestamp.java:61),
>
> org.apache.kafka.streams.processor.FailOnInvalidTimestamp.extract(FailOnInvalidTimestamp.java:46),
>
> org.apache.kafka.streams.processor.internals.RecordQueue.addRawRecords(RecordQueue.java:85),
>
> org.apache.kafka.streams.processor.internals.PartitionGroup.addRawRecords(PartitionGroup.java:117),
>
> org.apache.kafka.streams.processor.internals.StreamTask.addRecords(StreamTask.java:464),
>
> org.apache.kafka.streams.processor.internals.StreamThread.addRecordsToTasks(StreamThread.java:650),
>
> org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:556),
>
> org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:527)]
> in thread restreamer-d4e77d18-6e7b-4708-8436-7fea0d4b1cdf-StreamThread-3
> {code}
> The possible reason about using old producer in message is false, as we are
> using Kafka 0.10.2.1 and 0.11.0.0 and the topics had been created within this
> version of Kafka.
> The sender application is .NET client from Confluent.
> All the matter is a bit problematic with this exception, as it was suggested
> it is thrown in scope of initialization of the stream, but effectively it
> happend in processing, so adding try{} catch {} around stringInput statement
> does not help, as stream was correctly defined, but only one message send
> later had exploited all the app.
> In my opinion KStream shall be robust enough to catch all such a exception
> and shall protect application from death due to single corrupted message.
> Especially when timestamp is not embedded. In such a case one can patch
> message with current timestamp without loss of overall performance.
> I would expect Kafka Stream will handle this.
> I will continue to investigate, what is the problem with the message, but it
> is quite hard to me, as it happens internally in Kafka stream combined with
> .NET producer.
> And I had already tested, that this problem does not occur when I got these
> concrete messages in old-fashioned Kafka Consumer :-).
--
This message was sent by Atlassian JIRA
(v6.4.14#64029)