Repository: kafka Updated Branches: refs/heads/trunk 7f8edbc8e -> 9bed8fbcf
KAFKA-4393: Improve invalid/negative TS handling Author: Matthias J. Sax <[email protected]> Reviewers: Michael G. Noll, Eno Thereska, Damian Guy, Guozhang Wang Closes #2117 from mjsax/kafka-4393-improveInvalidTsHandling Project: http://git-wip-us.apache.org/repos/asf/kafka/repo Commit: http://git-wip-us.apache.org/repos/asf/kafka/commit/9bed8fbc Tree: http://git-wip-us.apache.org/repos/asf/kafka/tree/9bed8fbc Diff: http://git-wip-us.apache.org/repos/asf/kafka/diff/9bed8fbc Branch: refs/heads/trunk Commit: 9bed8fbcfc52ced719f2dcafa3f30cbfd5e6bd57 Parents: 7f8edbc Author: Matthias J. Sax <[email protected]> Authored: Fri Dec 9 16:17:36 2016 -0800 Committer: Guozhang Wang <[email protected]> Committed: Fri Dec 9 16:17:36 2016 -0800 ---------------------------------------------------------------------- checkstyle/import-control.xml | 2 +- docs/upgrade.html | 13 ++++ .../pageview/JsonTimestampExtractor.java | 2 +- .../org/apache/kafka/streams/StreamsConfig.java | 4 +- .../ConsumerRecordTimestampExtractor.java | 40 ---------- .../ExtractRecordMetadataTimestamp.java | 77 ++++++++++++++++++++ .../processor/FailOnInvalidTimestamp.java | 68 +++++++++++++++++ .../processor/LogAndSkipOnInvalidTimestamp.java | 69 ++++++++++++++++++ .../streams/processor/TimestampExtractor.java | 12 +-- .../UsePreviousTimeOnInvalidTimestamp.java | 70 ++++++++++++++++++ .../processor/WallclockTimestampExtractor.java | 16 ++-- .../processor/internals/RecordQueue.java | 9 ++- .../streams/processor/internals/SinkNode.java | 5 +- .../streams/processor/internals/StreamTask.java | 15 ++-- .../processor/internals/StreamThread.java | 8 +- .../processor/FailOnInvalidTimestampTest.java | 36 +++++++++ .../LogAndSkipOnInvalidTimestampTest.java | 56 ++++++++++++++ .../processor/TimestampExtractorTest.java | 48 ++++++++++++ .../UsePreviousTimeOnInvalidTimestampTest.java | 45 ++++++++++++ .../WallclockTimestampExtractorTest.java | 62 ++++++++++++++++ .../internals/ProcessorTopologyTest.java | 10 +-- .../processor/internals/RecordQueueTest.java | 22 ++++++ .../smoketest/TestTimestampExtractor.java | 2 +- .../kafka/test/MockTimestampExtractor.java | 2 +- 24 files changed, 617 insertions(+), 76 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/checkstyle/import-control.xml ---------------------------------------------------------------------- diff --git a/checkstyle/import-control.xml b/checkstyle/import-control.xml index 58525ad..8eebdb5 100644 --- a/checkstyle/import-control.xml +++ b/checkstyle/import-control.xml @@ -27,6 +27,7 @@ <allow pkg="javax.management" /> <allow pkg="org.slf4j" /> <allow pkg="org.junit" /> + <allow pkg="org.hamcrest" /> <allow pkg="org.easymock" /> <allow pkg="org.powermock" /> <allow pkg="java.security" /> @@ -151,7 +152,6 @@ <allow pkg="scala" /> <allow pkg="scala.collection" /> <allow pkg="org.I0Itec.zkclient" /> - <allow pkg="org.hamcrest" /> </subpackage> <subpackage name="state"> http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/docs/upgrade.html ---------------------------------------------------------------------- diff --git a/docs/upgrade.html b/docs/upgrade.html index c63487d..06b53da 100644 --- a/docs/upgrade.html +++ b/docs/upgrade.html @@ -26,6 +26,19 @@ can upgrade the brokers one at a time: shut down the broker, update the code, an Please refer to the <code>KafkaConsumer</code> Javadoc for a more in-depth explanation of this change.</li> </ul> +<h4><a id="upgrade_10_2" href="#upgrade_10_2">Upgrading from 0.8.x, 0.9.x, 0.10.0.X, or 0.10.1.X to 0.10.2.0</a></h4> + +<p><b>For a rolling upgrade:</b></p> + +<ol> + <li>Upgrading a Kafka Streams Applications: + <ul> + <li>You need to recompile your code. Just swapping the jar file will not work and will break your appliation.</li> + <li>If you use a custom timestamp extractor, you will need to update this code, because the <code>TimestampExtractor</code> interface got changed.</li> + </ul> + </li> +</ol> + <h4><a id="upgrade_10_1" href="#upgrade_10_1">Upgrading from 0.8.x, 0.9.x or 0.10.0.X to 0.10.1.0</a></h4> 0.10.1.0 has wire protocol changes. By following the recommended rolling upgrade plan below, you guarantee no downtime during the upgrade. However, please notice the <a href="#upgrade_10_1_breaking">Potential breaking changes in 0.10.1.0</a> before upgrade. http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java index 63e8377..918cd65 100644 --- a/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java +++ b/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview/JsonTimestampExtractor.java @@ -28,7 +28,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; public class JsonTimestampExtractor implements TimestampExtractor { @Override - public long extract(ConsumerRecord<Object, Object> record) { + public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { if (record.value() instanceof PageViewTypedDemo.PageView) { return ((PageViewTypedDemo.PageView) record.value()).timestamp; } http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java index 5ba4383..53f49ec 100644 --- a/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java +++ b/streams/src/main/java/org/apache/kafka/streams/StreamsConfig.java @@ -28,7 +28,7 @@ import org.apache.kafka.common.config.ConfigException; import org.apache.kafka.common.serialization.Serde; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.streams.errors.StreamsException; -import org.apache.kafka.streams.processor.ConsumerRecordTimestampExtractor; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; import org.apache.kafka.streams.processor.DefaultPartitionGrouper; import org.apache.kafka.streams.processor.internals.StreamPartitionAssignor; import org.apache.kafka.streams.processor.internals.StreamThread; @@ -172,7 +172,7 @@ public class StreamsConfig extends AbstractConfig { REPLICATION_FACTOR_DOC) .define(TIMESTAMP_EXTRACTOR_CLASS_CONFIG, Type.CLASS, - ConsumerRecordTimestampExtractor.class.getName(), + FailOnInvalidTimestamp.class.getName(), Importance.MEDIUM, TIMESTAMP_EXTRACTOR_CLASS_DOC) .define(PARTITION_GROUPER_CLASS_CONFIG, http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java deleted file mode 100644 index 0d3424e..0000000 --- a/streams/src/main/java/org/apache/kafka/streams/processor/ConsumerRecordTimestampExtractor.java +++ /dev/null @@ -1,40 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.kafka.streams.processor; - -import org.apache.kafka.clients.consumer.ConsumerRecord; - -/** - * Retrieves built-in timestamps from Kafka messages (introduced in KIP-32: Add timestamps to Kafka message). - * - * Here, "built-in" refers to the fact that compatible Kafka producer clients automatically and - * transparently embed such timestamps into messages they sent to Kafka, which can then be retrieved - * via this timestamp extractor. - * - * If <i>CreateTime</i> is used to define the built-in timestamps, using this extractor effectively provide - * <i>event-time</i> semantics. If <i>LogAppendTime</i> is used to define the built-in timestamps, using - * this extractor effectively provides <i>ingestion-time</i> semantics. - * - * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}. - */ -public class ConsumerRecordTimestampExtractor implements TimestampExtractor { - @Override - public long extract(ConsumerRecord<Object, Object> record) { - return record.timestamp(); - } -} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java new file mode 100644 index 0000000..cbe024e --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/ExtractRecordMetadataTimestamp.java @@ -0,0 +1,77 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; + +/** + * Retrieves embedded metadata timestamps from Kafka messages. + * If a record has a negative (invalid) timestamp value, an error handler method is called. + * <p> + * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new + * 0.10+ Kafka message format. + * <p> + * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved + * via this timestamp extractor. + * <p> + * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting + * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}), + * this extractor effectively provides <i>event-time</i> semantics. + * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps, + * using this extractor effectively provides <i>ingestion-time</i> semantics. + * <p> + * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}. + * + * @see FailOnInvalidTimestamp + * @see LogAndSkipOnInvalidTimestamp + * @see UsePreviousTimeOnInvalidTimestamp + * @see WallclockTimestampExtractor + */ +abstract class ExtractRecordMetadataTimestamp implements TimestampExtractor { + + /** + * Extracts the embedded metadata timestamp from the given {@link ConsumerRecord}. + * + * @param record a data record + * @param previousTimestamp the latest extracted valid timestamp of the current record's partitionË (could be -1 if unknown) + * @return the embedded metadata timestamp of the given {@link ConsumerRecord} + */ + @Override + public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { + final long timestamp = record.timestamp(); + + if (timestamp < 0) { + return onInvalidTimestamp(record, timestamp, previousTimestamp); + } + + return timestamp; + } + + /** + * Called if no valid timestamp is embedded in the record meta data. + * + * @param record a data record + * @param recordTimestamp the timestamp extractor from the record + * @param previousTimestamp the latest extracted valid timestamp of the current record's partitionË (could be -1 if unknown) + * @return a new timestamp for the record (if negative, record will not be processed but dropped silently) + */ + public abstract long onInvalidTimestamp(final ConsumerRecord<Object, Object> record, + final long recordTimestamp, + final long previousTimestamp); +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java new file mode 100644 index 0000000..d7f64a2 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/FailOnInvalidTimestamp.java @@ -0,0 +1,68 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.errors.StreamsException; + +/** + * Retrieves embedded metadata timestamps from Kafka messages. + * If a record has a negative (invalid) timestamp value, this extractor raises an exception. + * <p> + * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new + * 0.10+ Kafka message format. + * <p> + * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved + * via this timestamp extractor. + * <p> + * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting + * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}), + * this extractor effectively provides <i>event-time</i> semantics. + * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps, + * using this extractor effectively provides <i>ingestion-time</i> semantics. + * <p> + * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}. + * + * @see LogAndSkipOnInvalidTimestamp + * @see UsePreviousTimeOnInvalidTimestamp + * @see WallclockTimestampExtractor + */ +public class FailOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { + + /** + * Raises an exception on every call. + * + * @param record a data record + * @param recordTimestamp the timestamp extractor from the record + * @param previousTimestamp the latest extracted valid timestamp of the current record's partitionË (could be -1 if unknown) + * @return nothing; always raises an exception + * @throws StreamsException on every invocation + */ + @Override + public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record, + final long recordTimestamp, + final long previousTimestamp) + throws StreamsException { + throw new StreamsException("Input record " + record + " has invalid (negative) timestamp. " + + "Possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + + "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + + "Use a different TimestampExtractor to process this data."); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java new file mode 100644 index 0000000..f24fd15 --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestamp.java @@ -0,0 +1,69 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Retrieves embedded metadata timestamps from Kafka messages. + * If a record has a negative (invalid) timestamp value the timestamp is returned as-is; + * in addition, a WARN message is logged in your application. + * Returning the timestamp as-is results in dropping the record, i.e., the record will not be processed. + * <p> + * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new + * 0.10+ Kafka message format. + * <p> + * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved + * via this timestamp extractor. + * <p> + * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting + * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}), + * this extractor effectively provides <i>event-time</i> semantics. + * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps, + * using this extractor effectively provides <i>ingestion-time</i> semantics. + * <p> + * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}. + * + * @see FailOnInvalidTimestamp + * @see UsePreviousTimeOnInvalidTimestamp + * @see WallclockTimestampExtractor + */ +public class LogAndSkipOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { + private static final Logger log = LoggerFactory.getLogger(LogAndSkipOnInvalidTimestamp.class); + + /** + * Writes a log WARN message when the extracted timestamp is invalid (negative) but returns the invalid timestamp as-is, + * which ultimately causes the record to be skipped and not to be processed. + * + * @param record a data record + * @param recordTimestamp the timestamp extractor from the record + * @param previousTimestamp the latest extracted valid timestamp of the current record's partitionË (could be -1 if unknown) + * @return the originally extracted timestamp of the record + */ + @Override + public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record, + final long recordTimestamp, + final long previousTimestamp) { + log.warn("Input record {} will be dropped because it has an invalid (negative) timestamp.", record); + return recordTimestamp; + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java index c55518b..0de96ba 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/TimestampExtractor.java @@ -27,17 +27,19 @@ import org.apache.kafka.streams.kstream.KTable; public interface TimestampExtractor { /** - * Extracts a timestamp from a record. + * Extracts a timestamp from a record. The timestamp must be positive to be considered a valid timestamp. + * Returning a negative timestamp will cause the record not to be processed but rather silently skipped. * <p> * The extracted timestamp MUST represent the milliseconds since midnight, January 1, 1970 UTC. - * + * <p> * It is important to note that this timestamp may become the message timestamp for any messages sent to changelogs updated by {@link KTable}s * and joins. The message timestamp is used for log retention and log rolling, so using nonsensical values may result in * excessive log rolling and therefore broker performance degradation. * * - * @param record a data record - * @return the timestamp of the record + * @param record a data record + * @param previousTimestamp the latest extracted valid timestamp of the current record's partitionË (could be -1 if unknown) + * @return the timestamp of the record */ - long extract(ConsumerRecord<Object, Object> record); + long extract(ConsumerRecord<Object, Object> record, long previousTimestamp); } http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java new file mode 100644 index 0000000..7718b5c --- /dev/null +++ b/streams/src/main/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestamp.java @@ -0,0 +1,70 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.errors.StreamsException; + +/** + * Retrieves embedded metadata timestamps from Kafka messages. + * If a record has a negative (invalid) timestamp, a new timestamp will be inferred from the current stream-time. + * <p></p> + * Embedded metadata timestamp was introduced in "KIP-32: Add timestamps to Kafka message" for the new + * 0.10+ Kafka message format. + * <p> + * Here, "embedded metadata" refers to the fact that compatible Kafka producer clients automatically and + * transparently embed such timestamps into message metadata they send to Kafka, which can then be retrieved + * via this timestamp extractor. + * <p> + * If the embedded metadata timestamp represents <i>CreateTime</i> (cf. Kafka broker setting + * {@code message.timestamp.type} and Kafka topic setting {@code log.message.timestamp.type}), + * this extractor effectively provides <i>event-time</i> semantics. + * If <i>LogAppendTime</i> is used as broker/topic setting to define the embedded metadata timestamps, + * using this extractor effectively provides <i>ingestion-time</i> semantics. + * <p> + * If you need <i>processing-time</i> semantics, use {@link WallclockTimestampExtractor}. + * + * @see FailOnInvalidTimestamp + * @see LogAndSkipOnInvalidTimestamp + * @see WallclockTimestampExtractor + */ +public class UsePreviousTimeOnInvalidTimestamp extends ExtractRecordMetadataTimestamp { + + /** + * Returns the current stream-time as new timestamp for the record. + * + * @param record a data record + * @param recordTimestamp the timestamp extractor from the record + * @param previousTimestamp the latest extracted valid timestamp of the current record's partitionË (could be -1 if unknown) + * @return the provided latest extracted valid timestamp as new timestamp for the record + * @throws StreamsException if latest extracted valid timestamp is unknown + */ + @Override + public long onInvalidTimestamp(final ConsumerRecord<Object, Object> record, + final long recordTimestamp, + final long previousTimestamp) + throws StreamsException { + if (previousTimestamp < 0) { + throw new StreamsException("Could not infer new timestamp for input record " + record + + " because latest extracted valid timestamp is unknown."); + } + return previousTimestamp; + } + + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java index 305573b..6df9481 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/WallclockTimestampExtractor.java @@ -21,22 +21,26 @@ import org.apache.kafka.clients.consumer.ConsumerRecord; /** * Retrieves current wall clock timestamps as {@link System#currentTimeMillis()}. - * + * <p> * Using this extractor effectively provides <i>processing-time</i> semantics. - * - * If you need <i>event-time</i> semantics, use {@link ConsumerRecordTimestampExtractor} with + * <p> + * If you need <i>event-time</i> semantics, use {@link FailOnInvalidTimestamp} with * built-in <i>CreateTime</i> or <i>LogAppendTime</i> timestamp (see KIP-32: Add timestamps to Kafka message for details). + * + * @see FailOnInvalidTimestamp + * @see LogAndSkipOnInvalidTimestamp */ public class WallclockTimestampExtractor implements TimestampExtractor { /** * Return the current wall clock time as timestamp. * - * @param record a data record - * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC + * @param record a data record + * @param previousTimestamp the latest extracted valid timestamp of the current record's partitionË (could be -1 if unknown) + * @return the current wall clock time, expressed in milliseconds since midnight, January 1, 1970 UTC */ @Override - public long extract(ConsumerRecord<Object, Object> record) { + public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { return System.currentTimeMillis(); } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java index 44ef146..a40b9ff 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java @@ -102,13 +102,14 @@ public class RecordQueue { rawRecord.checksum(), rawRecord.serializedKeySize(), rawRecord.serializedValueSize(), key, value); - long timestamp = timestampExtractor.extract(record); + long timestamp = timestampExtractor.extract(record, timeTracker.get()); log.trace("Source node {} extracted timestamp {} for record {} when adding to buffered queue", source.name(), timestamp, record); - // validate that timestamp must be non-negative - if (timestamp < 0) - throw new StreamsException("Extracted timestamp value is negative, which is not allowed."); + // drop message if TS is invalid, i.e., negative + if (timestamp < 0) { + continue; + } StampedRecord stampedRecord = new StampedRecord(record, timestamp); http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java index 2f20cdb..e7f32b4 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java @@ -73,10 +73,7 @@ public class SinkNode<K, V> extends ProcessorNode<K, V> { final long timestamp = context.timestamp(); if (timestamp < 0) { - throw new StreamsException("A record consumed from an input topic has invalid (negative) timestamp, " + - "possibly because a pre-0.10 producer client was used to write this record to Kafka without embedding a timestamp, " + - "or because the input topic was created before upgrading the Kafka cluster to 0.10+. " + - "Use a different TimestampExtractor to process this data."); + throw new StreamsException("Invalid (negative) timestamp of " + timestamp + " for output record <" + key + ":" + value + ">."); } try { http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java index a40e1be..ae374ce 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java @@ -123,22 +123,27 @@ public class StreamTask extends AbstractTask implements Punctuator { } /** - * Adds records to queues + * Adds records to queues. If a record has an invalid (i.e., negative) timestamp, the record is skipped + * and not added to the queue for processing * * @param partition the partition * @param records the records + * @returns the number of added records */ @SuppressWarnings("unchecked") - public void addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) { - int queueSize = partitionGroup.addRawRecords(partition, records); + public int addRecords(TopicPartition partition, Iterable<ConsumerRecord<byte[], byte[]>> records) { + final int oldQueueSize = partitionGroup.numBuffered(); + final int newQueueSize = partitionGroup.addRawRecords(partition, records); - log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, queueSize); + log.trace("{} Added records into the buffered queue of partition {}, new queue size is {}", logPrefix, partition, newQueueSize); // if after adding these records, its partition queue's buffered size has been // increased beyond the threshold, we can then pause the consumption for this partition - if (queueSize > this.maxBufferedSize) { + if (newQueueSize > this.maxBufferedSize) { consumer.pause(singleton(partition)); } + + return newQueueSize - oldQueueSize; } /** http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java ---------------------------------------------------------------------- diff --git a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java index 5e82829..96e9963 100644 --- a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java +++ b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamThread.java @@ -549,10 +549,12 @@ public class StreamThread extends Thread { throw new StreamsException(logPrefix + " Failed to rebalance", rebalanceException); if (!records.isEmpty()) { + int numAddedRecords = 0; for (TopicPartition partition : records.partitions()) { StreamTask task = activeTasksByPartition.get(partition); - task.addRecords(partition, records.records(partition)); + numAddedRecords += task.addRecords(partition, records.records(partition)); } + sensors.skippedRecordsSensor.record(records.count() - numAddedRecords, timerStartedMs); polledRecords = true; } else { polledRecords = false; @@ -1020,6 +1022,7 @@ public class StreamThread extends Thread { final Sensor punctuateTimeSensor; final Sensor taskCreationSensor; final Sensor taskDestructionSensor; + final Sensor skippedRecordsSensor; public StreamsMetricsImpl(Metrics metrics) { this.metrics = metrics; @@ -1052,6 +1055,9 @@ public class StreamThread extends Thread { this.taskDestructionSensor = metrics.sensor(sensorNamePrefix + ".task-destruction"); this.taskDestructionSensor.add(metrics.metricName("task-destruction-rate", metricGrpName, "The average per-second number of destructed tasks", metricTags), new Rate(new Count())); + + this.skippedRecordsSensor = metrics.sensor(sensorNamePrefix + ".skipped-records"); + this.skippedRecordsSensor.add(metrics.metricName("skipped-records-count", metricGrpName, "The average per-second number of skipped records.", metricTags), new Rate(new Count())); } @Override http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java new file mode 100644 index 0000000..738e956 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/FailOnInvalidTimestampTest.java @@ -0,0 +1,36 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.streams.errors.StreamsException; +import org.junit.Test; + +public class FailOnInvalidTimestampTest extends TimestampExtractorTest { + + @Test + public void extractMetadataTimestamp() { + testExtractMetadataTimestamp(new FailOnInvalidTimestamp()); + } + + @Test(expected = StreamsException.class) + public void failOnInvalidTimestamp() { + final TimestampExtractor extractor = new FailOnInvalidTimestamp(); + extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java new file mode 100644 index 0000000..92d8709 --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/LogAndSkipOnInvalidTimestampTest.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class LogAndSkipOnInvalidTimestampTest extends TimestampExtractorTest { + + @Test + public void extractMetadataTimestamp() { + testExtractMetadataTimestamp(new LogAndSkipOnInvalidTimestamp()); + } + + @Test + public void logAndSkipOnInvalidTimestamp() { + final long invalidMetadataTimestamp = -42; + + final TimestampExtractor extractor = new LogAndSkipOnInvalidTimestamp(); + final long timestamp = extractor.extract( + new ConsumerRecord<>( + "anyTopic", + 0, + 0, + invalidMetadataTimestamp, + TimestampType.NO_TIMESTAMP_TYPE, + 0, + 0, + 0, + null, + null), + 0 + ); + + assertThat(timestamp, is(invalidMetadataTimestamp)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java new file mode 100644 index 0000000..93e0b5b --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/TimestampExtractorTest.java @@ -0,0 +1,48 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.apache.kafka.common.record.TimestampType; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +class TimestampExtractorTest { + + void testExtractMetadataTimestamp(TimestampExtractor extractor) { + final long metadataTimestamp = 42; + + final long timestamp = extractor.extract( + new ConsumerRecord<>( + "anyTopic", + 0, + 0, + metadataTimestamp, + TimestampType.NO_TIMESTAMP_TYPE, + 0, + 0, + 0, + null, + null), + 0 + ); + + assertThat(timestamp, is(metadataTimestamp)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java new file mode 100644 index 0000000..09617fa --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/UsePreviousTimeOnInvalidTimestampTest.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class UsePreviousTimeOnInvalidTimestampTest extends TimestampExtractorTest { + + @Test + public void extractMetadataTimestamp() { + testExtractMetadataTimestamp(new UsePreviousTimeOnInvalidTimestamp()); + } + + @Test + public void usePreviousTimeOnInvalidTimestamp() { + final long previousTime = 42; + + final TimestampExtractor extractor = new UsePreviousTimeOnInvalidTimestamp(); + final long timestamp = extractor.extract( + new ConsumerRecord<>("anyTopic", 0, 0, null, null), + previousTime + ); + + assertThat(timestamp, is(previousTime)); + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java new file mode 100644 index 0000000..b7b49bb --- /dev/null +++ b/streams/src/test/java/org/apache/kafka/streams/processor/WallclockTimestampExtractorTest.java @@ -0,0 +1,62 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.kafka.streams.processor; + +import org.apache.kafka.clients.consumer.ConsumerRecord; +import org.hamcrest.BaseMatcher; +import org.hamcrest.Description; +import org.junit.Test; + +import static org.hamcrest.CoreMatchers.is; +import static org.hamcrest.MatcherAssert.assertThat; + +public class WallclockTimestampExtractorTest { + + @Test + public void extractSystemTimestamp() { + final TimestampExtractor extractor = new WallclockTimestampExtractor(); + + final long before = System.currentTimeMillis(); + final long timestamp = extractor.extract(new ConsumerRecord<>("anyTopic", 0, 0, null, null), 42); + final long after = System.currentTimeMillis(); + + assertThat(timestamp, is(new InBetween(before, after))); + } + + private static class InBetween extends BaseMatcher<Long> { + private final long before; + private final long after; + + public InBetween(long before, long after) { + this.before = before; + this.after = after; + } + + @Override + public boolean matches(Object item) { + final long timestamp = (Long) item; + return before <= timestamp && timestamp <= after; + } + + @Override + public void describeMismatch(Object item, Description mismatchDescription) {} + + @Override + public void describeTo(Description description) {} + } + +} http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java index a146316..d907506 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/ProcessorTopologyTest.java @@ -17,10 +17,6 @@ package org.apache.kafka.streams.processor.internals; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; -import static org.junit.Assert.assertNull; - import org.apache.kafka.clients.consumer.ConsumerRecord; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.common.serialization.Deserializer; @@ -49,6 +45,10 @@ import org.junit.Test; import java.io.File; import java.util.Properties; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNull; + public class ProcessorTopologyTest { private static final Serializer<String> STRING_SERIALIZER = new StringSerializer(); @@ -404,7 +404,7 @@ public class ProcessorTopologyTest { public static class CustomTimestampExtractor implements TimestampExtractor { @Override - public long extract(ConsumerRecord<Object, Object> record) { + public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { return timestamp; } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java index f30e0e6..e0ee3ce 100644 --- a/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java +++ b/streams/src/test/java/org/apache/kafka/streams/processor/internals/RecordQueueTest.java @@ -29,6 +29,8 @@ import org.apache.kafka.common.serialization.IntegerSerializer; import org.apache.kafka.common.serialization.Serdes; import org.apache.kafka.common.serialization.Serializer; import org.apache.kafka.streams.errors.StreamsException; +import org.apache.kafka.streams.processor.FailOnInvalidTimestamp; +import org.apache.kafka.streams.processor.LogAndSkipOnInvalidTimestamp; import org.apache.kafka.streams.processor.TimestampExtractor; import org.apache.kafka.test.MockSourceNode; import org.apache.kafka.test.MockTimestampExtractor; @@ -136,4 +138,24 @@ public class RecordQueueTest { queue.addRawRecords(records, timestampExtractor); } + + @Test(expected = StreamsException.class) + public void shouldThrowOnNegativeTimestamp() { + final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); + final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList( + new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + queue.addRawRecords(records, new FailOnInvalidTimestamp()); + } + + @Test + public void shouldDropOnNegativeTimestamp() { + final byte[] value = Serdes.Long().serializer().serialize("foo", 1L); + final List<ConsumerRecord<byte[], byte[]>> records = Collections.singletonList( + new ConsumerRecord<>("topic", 1, 1, -1L, TimestampType.CREATE_TIME, 0L, 0, 0, recordKey, recordValue)); + + queue.addRawRecords(records, new LogAndSkipOnInvalidTimestamp()); + + assertEquals(0, queue.size()); + } } http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java index 04e264c..0cab7f5 100644 --- a/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java +++ b/streams/src/test/java/org/apache/kafka/streams/smoketest/TestTimestampExtractor.java @@ -25,7 +25,7 @@ public class TestTimestampExtractor implements TimestampExtractor { private final long base = SmokeTestUtil.START_TIME; @Override - public long extract(ConsumerRecord<Object, Object> record) { + public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { switch (record.topic()) { case "data": return base + (Integer) record.value(); http://git-wip-us.apache.org/repos/asf/kafka/blob/9bed8fbc/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java ---------------------------------------------------------------------- diff --git a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java index 274e7b5..2b24578 100644 --- a/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java +++ b/streams/src/test/java/org/apache/kafka/test/MockTimestampExtractor.java @@ -24,7 +24,7 @@ import org.apache.kafka.streams.processor.TimestampExtractor; public class MockTimestampExtractor implements TimestampExtractor { @Override - public long extract(ConsumerRecord<Object, Object> record) { + public long extract(final ConsumerRecord<Object, Object> record, final long previousTimestamp) { return record.offset(); } }
