Repository: kafka Updated Branches: refs/heads/trunk 713a67fdd -> 5792f2fb3
KAFKA-5980: FailOnInvalidTimestamp does not log error Author: Matthias J. Sax <[email protected]> Reviewers: Damian Guy <[email protected]>, Ted Yu <[email protected]>, Denis Bolshakov Closes #3966 from mjsax/kafka-5980-FailOnInvalidTimestamp-does-not-log-error Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/5792f2fb Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/5792f2fb Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/5792f2fb Branch: refs/heads/trunk Commit: 5792f2fb3db69333bfd22b57b00b42336dc16aa9 Parents: 713a67f Author: Matthias J. Sax <[email protected]> Authored: Wed Oct 4 15:10:59 2017 -0700 Committer: Guozhang Wang <[email protected]> Committed: Wed Oct 4 15:10:59 2017 -0700 ---------------------------------------------------------------------- .../streams/processor/FailOnInvalidTimestamp.java | 15 +++++++++++---- 1 file changed, 11 insertions(+), 4 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/5792f2fb/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java index e8fc78c..87cb0de 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java @@ -19,6 +19,8 @@ package org.apache.kafka.streams.processor; import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.common.annotation.InterfaceStability; import org.apache.kafka.streams.errors.StreamsException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Retrieves embedded metadata timestamps from Kafka messages. @@ -45,6 +47,7 @@ import org.apache.kafka.streams.errors.StreamsException; */ @InterfaceStability.Evolving public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { + private static final Logger log = LoggerFactory.getLogger(FailOnInvalidTimestamp.class); /** * Raises an exception on every call. @@ -60,10 +63,14 @@ public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { final long recordTimestamp, final long previousTimestamp) throws StreamsException { - throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " + - "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + - "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + - "Use a different TimestampExtractor to process this data."); + + final String message = "Input record " + record + " has invalid (negative) timestamp. " + + "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding " + + "a timestamp, or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + + "Use a different TimestampExtractor to process this data."; + + log.error(message); + throw new StreamsException(message); } }
