This is an automated email from the ASF dual-hosted git repository.
guozhang pushed a commit to branch 2.1
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/2.1 by this push:
new d07fcd7 KAFKA-7223: Add late-record metrics (#5742)
d07fcd7 is described below
commit d07fcd782d258b005cc2dca636952507dacba282
Author: John Roesler <[email protected]>
AuthorDate: Fri Oct 12 11:12:51 2018 -0500
KAFKA-7223: Add late-record metrics (#5742)
Add late record metrics, as specified in KIP-328
Reviewers: Bill Bejeck <[email protected]>, Guozhang Wang
<[email protected]>
---
build.gradle | 1 +
gradle/dependencies.gradle | 2 +
.../streams/kstream/internals/metrics/Sensors.java | 36 +++++
.../processor/internals/PartitionGroup.java | 12 +-
.../streams/processor/internals/StreamTask.java | 3 +-
...KStreamSessionWindowAggregateProcessorTest.java | 29 +++-
.../internals/KStreamWindowAggregateTest.java | 170 +++++++++++++--------
.../processor/internals/PartitionGroupTest.java | 103 +++++++------
.../org/apache/kafka/test/StreamsTestUtils.java | 2 +
.../apache/kafka/streams/TopologyTestDriver.java | 10 +-
10 files changed, 248 insertions(+), 120 deletions(-)
diff --git a/build.gradle b/build.gradle
index 95f3eb3..e78d2ce 100644
--- a/build.gradle
+++ b/build.gradle
@@ -974,6 +974,7 @@ project(':streams') {
testCompile libs.junit
testCompile libs.easymock
testCompile libs.bcpkix
+ testCompile libs.hamcrest
testRuntimeOnly project(':streams:test-utils')
testRuntime libs.slf4jlog4j
diff --git a/gradle/dependencies.gradle b/gradle/dependencies.gradle
index e22885e..e11ded1 100644
--- a/gradle/dependencies.gradle
+++ b/gradle/dependencies.gradle
@@ -57,6 +57,7 @@ versions += [
jetty: "9.4.12.v20180830",
jersey: "2.27",
jmh: "1.21",
+ hamcrest: "1.3",
log4j: "1.2.17",
scalaLogging: "3.9.0",
jaxb: "2.3.0",
@@ -117,6 +118,7 @@ libs += [
jmhGeneratorAnnProcess:
"org.openjdk.jmh:jmh-generator-annprocess:$versions.jmh",
joptSimple: "net.sf.jopt-simple:jopt-simple:$versions.jopt",
junit: "junit:junit:$versions.junit",
+ hamcrest: "org.hamcrest:hamcrest-all:1.3",
kafkaStreams_0100: "org.apache.kafka:kafka-streams:$versions.kafka_0100",
kafkaStreams_0101: "org.apache.kafka:kafka-streams:$versions.kafka_0101",
kafkaStreams_0102: "org.apache.kafka:kafka-streams:$versions.kafka_0102",
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
index 04c7150..a85bbb8 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/metrics/Sensors.java
@@ -16,10 +16,15 @@
*/
package org.apache.kafka.streams.kstream.internals.metrics;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Avg;
+import org.apache.kafka.common.metrics.stats.Max;
import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.processor.internals.metrics.StreamsMetricsImpl;
+import java.util.Map;
+
public class Sensors {
private Sensors() {}
@@ -39,4 +44,35 @@ public class Sensors {
);
return sensor;
}
+
+ public static Sensor recordLatenessSensor(final InternalProcessorContext
context) {
+ final StreamsMetricsImpl metrics = context.metrics();
+
+ final Sensor sensor = metrics.taskLevelSensor(
+ context.taskId().toString(),
+ "record-lateness",
+ Sensor.RecordingLevel.DEBUG
+ );
+
+ final Map<String, String> tags = metrics.tagMap(
+ "task-id", context.taskId().toString()
+ );
+ sensor.add(
+ new MetricName(
+ "record-lateness-avg",
+ "stream-processor-node-metrics",
+ "The average observed lateness of records.",
+ tags),
+ new Avg()
+ );
+ sensor.add(
+ new MetricName(
+ "record-lateness-max",
+ "stream-processor-node-metrics",
+ "The max observed lateness of records.",
+ tags),
+ new Max()
+ );
+ return sensor;
+ }
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
index 7020253..1fdd454 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/PartitionGroup.java
@@ -18,6 +18,7 @@ package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Sensor;
import java.util.Collections;
import java.util.Comparator;
@@ -38,6 +39,7 @@ import java.util.Set;
public class PartitionGroup {
private final Map<TopicPartition, RecordQueue> partitionQueues;
+ private final Sensor recordLatenessSensor;
private final PriorityQueue<RecordQueue> nonEmptyQueuesByTime;
private long streamTime;
@@ -61,9 +63,10 @@ public class PartitionGroup {
}
}
- PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues) {
+ PartitionGroup(final Map<TopicPartition, RecordQueue> partitionQueues,
final Sensor recordLatenessSensor) {
nonEmptyQueuesByTime = new PriorityQueue<>(partitionQueues.size(),
Comparator.comparingLong(RecordQueue::timestamp));
this.partitionQueues = partitionQueues;
+ this.recordLatenessSensor = recordLatenessSensor;
totalBuffered = 0;
allBuffered = false;
streamTime = RecordQueue.UNKNOWN;
@@ -95,7 +98,12 @@ public class PartitionGroup {
}
// always update the stream time to the record's timestamp yet
to be processed if it is larger
- streamTime = Math.max(streamTime, record.timestamp);
+ if (record.timestamp > streamTime) {
+ streamTime = record.timestamp;
+ recordLatenessSensor.record(0);
+ } else {
+ recordLatenessSensor.record(streamTime - record.timestamp);
+ }
}
}
diff --git
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
index 2ad0acc..247a156 100644
---
a/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
+++
b/streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java
@@ -52,6 +52,7 @@ import java.util.concurrent.TimeUnit;
import static java.lang.String.format;
import static java.util.Collections.singleton;
+import static
org.apache.kafka.streams.kstream.internals.metrics.Sensors.recordLatenessSensor;
/**
* A StreamTask is associated with a {@link PartitionGroup}, and is assigned
to a StreamThread for processing.
@@ -234,7 +235,7 @@ public class StreamTask extends AbstractTask implements
ProcessorNodePunctuator
}
recordInfo = new PartitionGroup.RecordInfo();
- partitionGroup = new PartitionGroup(partitionQueues);
+ partitionGroup = new PartitionGroup(partitionQueues,
recordLatenessSensor(processorContextImpl));
processorContextImpl.setStreamTimeSupplier(partitionGroup::timestamp);
stateMgr.registerGlobalStateStores(topology.globalStateStores());
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
index 419c861..1074f02f 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamSessionWindowAggregateProcessorTest.java
@@ -16,7 +16,6 @@
*/
package org.apache.kafka.streams.kstream.internals;
-import org.apache.kafka.common.Metric;
import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.metrics.Metrics;
import org.apache.kafka.common.serialization.Serdes;
@@ -55,7 +54,9 @@ import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
import static org.hamcrest.CoreMatchers.hasItem;
+import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
@@ -110,7 +111,7 @@ public class KStreamSessionWindowAggregateProcessorTest {
final StoreBuilder<SessionStore<String, Long>> storeBuilder =
Stores.sessionStoreBuilder(Stores.persistentSessionStore(STORE_NAME,
ofMillis(GAP_MS * 3)),
Serdes.String(),
Serdes.Long())
- .withLoggingDisabled();
+
.withLoggingDisabled();
if (enableCaching) {
storeBuilder.withCachingEnabled();
@@ -335,9 +336,11 @@ public class KStreamSessionWindowAggregateProcessorTest {
context.setStreamTime(20);
context.setRecordContext(new ProcessorRecordContext(0, -2, -3,
"topic", null));
processor.process("A", "1");
+ context.setRecordContext(new ProcessorRecordContext(1, -2, -3,
"topic", null));
+ processor.process("A", "1");
LogCaptureAppender.unregister(appender);
- final Metric dropMetric = metrics.metrics().get(new MetricName(
+ final MetricName dropMetric = new MetricName(
"late-record-drop-total",
"stream-processor-node-metrics",
"The total number of occurrence of late-record-drop operations.",
@@ -346,8 +349,24 @@ public class KStreamSessionWindowAggregateProcessorTest {
mkEntry("task-id", "0_0"),
mkEntry("processor-node-id", "TESTING_NODE")
)
- ));
- assertEquals(1.0, dropMetric.metricValue());
+ );
+
+ assertThat(metrics.metrics().get(dropMetric).metricValue(), is(2.0));
+
+ final MetricName dropRate = new MetricName(
+ "late-record-drop-rate",
+ "stream-processor-node-metrics",
+ "The average number of occurrence of late-record-drop operations.",
+ mkMap(
+ mkEntry("client-id", "test"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("processor-node-id", "TESTING_NODE")
+ )
+ );
+
+ assertThat((Double) metrics.metrics().get(dropRate).metricValue(),
greaterThan(0.0));
+
assertThat(appender.getMessages(), hasItem("Skipping record for
expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[0]
window=[0,0) expiration=[10]"));
+ assertThat(appender.getMessages(), hasItem("Skipping record for
expired window. key=[A] topic=[topic] partition=[-3] offset=[-2] timestamp=[1]
window=[1,1) expiration=[10]"));
}
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
index 8ae6284..236cd8c 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/KStreamWindowAggregateTest.java
@@ -27,6 +27,7 @@ import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.TopologyTestDriver;
import org.apache.kafka.streams.kstream.Consumed;
+import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.KTable;
import org.apache.kafka.streams.kstream.Materialized;
@@ -42,6 +43,7 @@ import org.apache.kafka.test.MockInitializer;
import org.apache.kafka.test.MockProcessor;
import org.apache.kafka.test.MockProcessorSupplier;
import org.apache.kafka.test.StreamsTestUtils;
+import org.hamcrest.Matcher;
import org.junit.Test;
import java.util.List;
@@ -51,9 +53,10 @@ import static java.time.Duration.ofMillis;
import static org.apache.kafka.common.utils.Utils.mkEntry;
import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.apache.kafka.test.StreamsTestUtils.getMetricByName;
-import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.CoreMatchers.hasItem;
import static org.hamcrest.CoreMatchers.hasItems;
+import static org.hamcrest.CoreMatchers.is;
+import static org.hamcrest.CoreMatchers.not;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertEquals;
@@ -70,7 +73,7 @@ public class KStreamWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
@@ -128,7 +131,7 @@ public class KStreamWindowAggregateTest {
final KTable<Windowed<String>, String> table1 = builder
.stream(topic1, Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic1-Canonized").withValueSerde(Serdes.String()));
@@ -137,7 +140,7 @@ public class KStreamWindowAggregateTest {
final KTable<Windowed<String>, String> table2 = builder
.stream(topic2, Consumed.with(Serdes.String(), Serdes.String()))
- .groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
.aggregate(MockInitializer.STRING_INIT,
MockAggregator.TOSTRING_ADDER, Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic2-Canonized").withValueSerde(Serdes.String()));
@@ -231,8 +234,9 @@ public class KStreamWindowAggregateTest {
final StreamsBuilder builder = new StreamsBuilder();
final String topic = "topic";
- final KStream<String, String> stream1 = builder.stream(topic,
Consumed.with(Serdes.String(), Serdes.String()));
- stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
+ builder
+ .stream(topic, Consumed.with(Serdes.String(), Serdes.String()))
+ .groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)))
.aggregate(
MockInitializer.STRING_INIT,
@@ -258,15 +262,15 @@ public class KStreamWindowAggregateTest {
final KStream<String, String> stream1 = builder.stream(topic,
Consumed.with(Serdes.String(), Serdes.String()));
stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
- .aggregate(
- () -> "",
- MockAggregator.toStringInstance("+"),
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
- )
- .toStream()
- .map((key, value) -> new KeyValue<>(key.toString(), value))
- .to("output");
+
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).until(100))
+ .aggregate(
+ () -> "",
+ MockAggregator.toStringInstance("+"),
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+ )
+ .toStream()
+ .map((key, value) -> new KeyValue<>(key.toString(), value))
+ .to("output");
LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister();
@@ -281,17 +285,13 @@ public class KStreamWindowAggregateTest {
driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
LogCaptureAppender.unregister(appender);
- final MetricName metricName = new MetricName(
- "late-record-drop-total",
- "stream-processor-node-metrics",
- "The total number of occurrence of late-record-drop
operations.",
- mkMap(
- mkEntry("client-id",
"topology-test-driver-virtual-thread"),
- mkEntry("task-id", "0_0"),
- mkEntry("processor-node-id",
"KSTREAM-AGGREGATE-0000000001")
- )
+ assertLatenessMetrics(
+ driver,
+ is(7.0), // how many events get dropped
+ is(100.0), // k:0 is 100ms late, since its time is 0, but it
arrives at stream time 100.
+ is(84.875) // (0 + 100 + 99 + 98 + 97 + 96 + 95 + 94) / 8
);
- assertThat(driver.metrics().get(metricName).metricValue(),
equalTo(7.0));
+
assertThat(appender.getMessages(), hasItems(
"Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
"Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
@@ -316,59 +316,101 @@ public class KStreamWindowAggregateTest {
final String topic = "topic";
final KStream<String, String> stream1 = builder.stream(topic,
Consumed.with(Serdes.String(), Serdes.String()));
- stream1.groupByKey(Serialized.with(Serdes.String(), Serdes.String()))
-
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(5)).grace(ofMillis(90L)))
- .aggregate(
- () -> "",
- MockAggregator.toStringInstance("+"),
- Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
- )
- .toStream()
- .map((key, value) -> new KeyValue<>(key.toString(), value))
- .to("output");
+ stream1.groupByKey(Grouped.with(Serdes.String(), Serdes.String()))
+
.windowedBy(TimeWindows.of(ofMillis(10)).advanceBy(ofMillis(10)).grace(ofMillis(90L)))
+ .aggregate(
+ () -> "",
+ MockAggregator.toStringInstance("+"),
+ Materialized.<String, String, WindowStore<Bytes,
byte[]>>as("topic1-Canonicalized").withValueSerde(Serdes.String()).withCachingDisabled().withLoggingDisabled()
+ )
+ .toStream()
+ .map((key, value) -> new KeyValue<>(key.toString(), value))
+ .to("output");
LogCaptureAppender.setClassLoggerToDebug(KStreamWindowAggregate.class);
final LogCaptureAppender appender =
LogCaptureAppender.createAndRegister();
try (final TopologyTestDriver driver = new
TopologyTestDriver(builder.build(), props, 0L)) {
- driver.pipeInput(recordFactory.create(topic, "k", "100", 100L));
- driver.pipeInput(recordFactory.create(topic, "k", "0", 0L));
- driver.pipeInput(recordFactory.create(topic, "k", "1", 1L));
- driver.pipeInput(recordFactory.create(topic, "k", "2", 2L));
- driver.pipeInput(recordFactory.create(topic, "k", "3", 3L));
- driver.pipeInput(recordFactory.create(topic, "k", "4", 4L));
- driver.pipeInput(recordFactory.create(topic, "k", "5", 5L));
+ driver.pipeInput(recordFactory.create(topic, "k", "100", 200L));
+ driver.pipeInput(recordFactory.create(topic, "k", "0", 100L));
+ driver.pipeInput(recordFactory.create(topic, "k", "1", 101L));
+ driver.pipeInput(recordFactory.create(topic, "k", "2", 102L));
+ driver.pipeInput(recordFactory.create(topic, "k", "3", 103L));
+ driver.pipeInput(recordFactory.create(topic, "k", "4", 104L));
+ driver.pipeInput(recordFactory.create(topic, "k", "5", 105L));
driver.pipeInput(recordFactory.create(topic, "k", "6", 6L));
LogCaptureAppender.unregister(appender);
- final MetricName metricName = new MetricName(
- "late-record-drop-total",
- "stream-processor-node-metrics",
- "The total number of occurrence of late-record-drop
operations.",
- mkMap(
- mkEntry("client-id",
"topology-test-driver-virtual-thread"),
- mkEntry("task-id", "0_0"),
- mkEntry("processor-node-id",
"KSTREAM-AGGREGATE-0000000001")
- )
- );
- assertThat(driver.metrics().get(metricName).metricValue(),
equalTo(7.0));
+ assertLatenessMetrics(driver, is(7.0), is(194.0), is(97.375));
+
assertThat(appender.getMessages(), hasItems(
- "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[1] timestamp=[0] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[2] timestamp=[1] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[3] timestamp=[2] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[4] timestamp=[3] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[5] timestamp=[4] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[6] timestamp=[5] window=[0,10) expiration=[10]",
- "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[10]"
+ "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[1] timestamp=[100] window=[100,110) expiration=[110]",
+ "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[2] timestamp=[101] window=[100,110) expiration=[110]",
+ "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[3] timestamp=[102] window=[100,110) expiration=[110]",
+ "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[4] timestamp=[103] window=[100,110) expiration=[110]",
+ "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[5] timestamp=[104] window=[100,110) expiration=[110]",
+ "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[6] timestamp=[105] window=[100,110) expiration=[110]",
+ "Skipping record for expired window. key=[k] topic=[topic]
partition=[0] offset=[7] timestamp=[6] window=[0,10) expiration=[110]"
));
- OutputVerifier.compareKeyValueTimestamp(getOutput(driver),
"[k@95/105]", "+100", 100);
- OutputVerifier.compareKeyValueTimestamp(getOutput(driver),
"[k@100/110]", "+100", 100);
- OutputVerifier.compareKeyValueTimestamp(getOutput(driver),
"[k@5/15]", "+5", 5);
- OutputVerifier.compareKeyValueTimestamp(getOutput(driver),
"[k@5/15]", "+5+6", 6);
+ OutputVerifier.compareKeyValueTimestamp(getOutput(driver),
"[k@200/210]", "+100", 200);
assertThat(driver.readOutput("output"), nullValue());
}
}
+ private void assertLatenessMetrics(final TopologyTestDriver driver,
+ final Matcher<Object> dropTotal,
+ final Matcher<Object> maxLateness,
+ final Matcher<Object> avgLateness) {
+ final MetricName dropMetric = new MetricName(
+ "late-record-drop-total",
+ "stream-processor-node-metrics",
+ "The total number of occurrence of late-record-drop operations.",
+ mkMap(
+ mkEntry("client-id", "topology-test-driver-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
+ )
+ );
+
+ assertThat(driver.metrics().get(dropMetric).metricValue(), dropTotal);
+
+
+ final MetricName dropRate = new MetricName(
+ "late-record-drop-rate",
+ "stream-processor-node-metrics",
+ "The average number of occurrence of late-record-drop operations.",
+ mkMap(
+ mkEntry("client-id", "topology-test-driver-virtual-thread"),
+ mkEntry("task-id", "0_0"),
+ mkEntry("processor-node-id", "KSTREAM-AGGREGATE-0000000001")
+ )
+ );
+
+ assertThat(driver.metrics().get(dropRate).metricValue(), not(0.0));
+
+ final MetricName latenessMaxMetric = new MetricName(
+ "record-lateness-max",
+ "stream-processor-node-metrics",
+ "The max observed lateness of records.",
+ mkMap(
+ mkEntry("client-id", "topology-test-driver-virtual-thread"),
+ mkEntry("task-id", "0_0")
+ )
+ );
+ assertThat(driver.metrics().get(latenessMaxMetric).metricValue(),
maxLateness);
+
+ final MetricName latenessAvgMetric = new MetricName(
+ "record-lateness-avg",
+ "stream-processor-node-metrics",
+ "The average observed lateness of records.",
+ mkMap(
+ mkEntry("client-id", "topology-test-driver-virtual-thread"),
+ mkEntry("task-id", "0_0")
+ )
+ );
+ assertThat(driver.metrics().get(latenessAvgMetric).metricValue(),
avgLateness);
+ }
+
private ProducerRecord<String, String> getOutput(final TopologyTestDriver
driver) {
return driver.readOutput("output", new StringDeserializer(), new
StringDeserializer());
}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
index 2df4f66..c84bbc2 100644
---
a/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/processor/internals/PartitionGroupTest.java
@@ -17,7 +17,11 @@
package org.apache.kafka.streams.processor.internals;
import org.apache.kafka.clients.consumer.ConsumerRecord;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.TopicPartition;
+import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
+import org.apache.kafka.common.metrics.stats.Value;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.IntegerDeserializer;
import org.apache.kafka.common.serialization.IntegerSerializer;
@@ -65,7 +69,19 @@ public class PartitionGroupTest {
private final byte[] recordValue = intSerializer.serialize(null, 10);
private final byte[] recordKey = intSerializer.serialize(null, 1);
- private final PartitionGroup group = new
PartitionGroup(mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)));
+ private final Metrics metrics = new Metrics();
+ private final MetricName lastLatenessValue = new
MetricName("record-lateness-last-value", "", "", mkMap());
+
+ private final PartitionGroup group = new PartitionGroup(
+ mkMap(mkEntry(partition1, queue1), mkEntry(partition2, queue2)),
+ getValueSensor(metrics, lastLatenessValue)
+ );
+
+ private static Sensor getValueSensor(final Metrics metrics, final
MetricName metricName) {
+ final Sensor lastRecordedValue = metrics.sensor(metricName.name());
+ lastRecordedValue.add(metricName, new Value());
+ return lastRecordedValue;
+ }
@Test
public void testTimeTracking() {
@@ -90,10 +106,9 @@ public class PartitionGroupTest {
// 2:[2, 4, 6]
// st: -1 since no records was being processed yet
- assertEquals(6, group.numBuffered());
- assertEquals(3, group.numBuffered(partition1));
- assertEquals(3, group.numBuffered(partition2));
+ verifyBuffered(6, 3, 3);
assertEquals(-1L, group.timestamp());
+ assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
StampedRecord record;
final PartitionGroup.RecordInfo info = new PartitionGroup.RecordInfo();
@@ -104,11 +119,9 @@ public class PartitionGroupTest {
// 2:[2, 4, 6]
// st: 2
assertEquals(partition1, info.partition());
- assertEquals(1L, record.timestamp);
- assertEquals(5, group.numBuffered());
- assertEquals(2, group.numBuffered(partition1));
- assertEquals(3, group.numBuffered(partition2));
- assertEquals(1L, group.timestamp());
+ verifyTimes(record, 1L, 1L);
+ verifyBuffered(5, 2, 3);
+ assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one record, now the time should be advanced
record = group.nextRecord(info);
@@ -116,11 +129,9 @@ public class PartitionGroupTest {
// 2:[4, 6]
// st: 3
assertEquals(partition2, info.partition());
- assertEquals(2L, record.timestamp);
- assertEquals(4, group.numBuffered());
- assertEquals(2, group.numBuffered(partition1));
- assertEquals(2, group.numBuffered(partition2));
- assertEquals(2L, group.timestamp());
+ verifyTimes(record, 2L, 2L);
+ verifyBuffered(4, 2, 2);
+ assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// add 2 more records with timestamp 2, 4 to partition-1
final List<ConsumerRecord<byte[], byte[]>> list3 = Arrays.asList(
@@ -131,10 +142,9 @@ public class PartitionGroupTest {
// 1:[3, 5, 2, 4]
// 2:[4, 6]
// st: 3 (non-decreasing, so adding 2 doesn't change it)
- assertEquals(6, group.numBuffered());
- assertEquals(4, group.numBuffered(partition1));
- assertEquals(2, group.numBuffered(partition2));
+ verifyBuffered(6, 4, 2);
assertEquals(2L, group.timestamp());
+ assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one record, time should not be advanced
record = group.nextRecord(info);
@@ -142,11 +152,9 @@ public class PartitionGroupTest {
// 2:[4, 6]
// st: 4 as partition st is now {5, 4}
assertEquals(partition1, info.partition());
- assertEquals(3L, record.timestamp);
- assertEquals(5, group.numBuffered());
- assertEquals(3, group.numBuffered(partition1));
- assertEquals(2, group.numBuffered(partition2));
- assertEquals(3L, group.timestamp());
+ verifyTimes(record, 3L, 3L);
+ verifyBuffered(5, 3, 2);
+ assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one record, time should not be advanced
record = group.nextRecord(info);
@@ -154,11 +162,9 @@ public class PartitionGroupTest {
// 2:[6]
// st: 5 as partition st is now {5, 6}
assertEquals(partition2, info.partition());
- assertEquals(4L, record.timestamp);
- assertEquals(4, group.numBuffered());
- assertEquals(3, group.numBuffered(partition1));
- assertEquals(1, group.numBuffered(partition2));
- assertEquals(4L, group.timestamp());
+ verifyTimes(record, 4L, 4L);
+ verifyBuffered(4, 3, 1);
+ assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one more record, now time should be advanced
record = group.nextRecord(info);
@@ -166,11 +172,9 @@ public class PartitionGroupTest {
// 2:[6]
// st: 5
assertEquals(partition1, info.partition());
- assertEquals(5L, record.timestamp);
- assertEquals(3, group.numBuffered());
- assertEquals(2, group.numBuffered(partition1));
- assertEquals(1, group.numBuffered(partition2));
- assertEquals(5L, group.timestamp());
+ verifyTimes(record, 5L, 5L);
+ verifyBuffered(3, 2, 1);
+ assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
// get one more record, time should not be advanced
record = group.nextRecord(info);
@@ -178,11 +182,9 @@ public class PartitionGroupTest {
// 2:[6]
// st: 5
assertEquals(partition1, info.partition());
- assertEquals(2L, record.timestamp);
- assertEquals(2, group.numBuffered());
- assertEquals(1, group.numBuffered(partition1));
- assertEquals(1, group.numBuffered(partition2));
- assertEquals(5L, group.timestamp());
+ verifyTimes(record, 2L, 5L);
+ verifyBuffered(2, 1, 1);
+ assertEquals(3.0, metrics.metric(lastLatenessValue).metricValue());
// get one more record, time should not be advanced
record = group.nextRecord(info);
@@ -190,11 +192,9 @@ public class PartitionGroupTest {
// 2:[6]
// st: 4 (doesn't advance because 1 is empty, so it's still reporting
the last-known time of 4)
assertEquals(partition1, info.partition());
- assertEquals(4L, record.timestamp);
- assertEquals(1, group.numBuffered());
- assertEquals(0, group.numBuffered(partition1));
- assertEquals(1, group.numBuffered(partition2));
- assertEquals(5L, group.timestamp());
+ verifyTimes(record, 4L, 5L);
+ verifyBuffered(1, 0, 1);
+ assertEquals(1.0, metrics.metric(lastLatenessValue).metricValue());
// get one more record, time should not be advanced
record = group.nextRecord(info);
@@ -202,11 +202,20 @@ public class PartitionGroupTest {
// 2:[]
// st: 4 (1 and 2 are empty, so they are still reporting the
last-known times of 4 and 6.)
assertEquals(partition2, info.partition());
- assertEquals(6L, record.timestamp);
- assertEquals(0, group.numBuffered());
- assertEquals(0, group.numBuffered(partition1));
- assertEquals(0, group.numBuffered(partition2));
- assertEquals(6L, group.timestamp());
+ verifyTimes(record, 6L, 6L);
+ verifyBuffered(0, 0, 0);
+ assertEquals(0.0, metrics.metric(lastLatenessValue).metricValue());
+
+ }
+
+ private void verifyTimes(final StampedRecord record, final long
recordTime, final long streamTime) {
+ assertEquals(recordTime, record.timestamp);
+ assertEquals(streamTime, group.timestamp());
+ }
+ private void verifyBuffered(final int totalBuffered, final int
partitionOneBuffered, final int partitionTwoBuffered) {
+ assertEquals(totalBuffered, group.numBuffered());
+ assertEquals(partitionOneBuffered, group.numBuffered(partition1));
+ assertEquals(partitionTwoBuffered, group.numBuffered(partition2));
}
}
diff --git a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
index 1d64316..1dcebb5 100644
--- a/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
+++ b/streams/src/test/java/org/apache/kafka/test/StreamsTestUtils.java
@@ -32,6 +32,7 @@ import java.util.Map;
import java.util.Properties;
import java.util.UUID;
+import static org.apache.kafka.common.metrics.Sensor.RecordingLevel.DEBUG;
import static org.hamcrest.CoreMatchers.equalTo;
import static org.hamcrest.MatcherAssert.assertThat;
@@ -50,6 +51,7 @@ public final class StreamsTestUtils {
props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG,
keySerdeClassName);
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG,
valueSerdeClassName);
props.put(StreamsConfig.STATE_DIR_CONFIG,
TestUtils.tempDirectory().getPath());
+ props.put(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG, DEBUG.name);
props.putAll(additional);
return props;
}
diff --git
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
index d10a45c..2abfd63 100644
---
a/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
+++
b/streams/test-utils/src/main/java/org/apache/kafka/streams/TopologyTestDriver.java
@@ -29,7 +29,9 @@ import org.apache.kafka.common.PartitionInfo;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.annotation.InterfaceStability;
import org.apache.kafka.common.header.internals.RecordHeaders;
+import org.apache.kafka.common.metrics.MetricConfig;
import org.apache.kafka.common.metrics.Metrics;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.record.TimestampType;
import org.apache.kafka.common.serialization.ByteArraySerializer;
import org.apache.kafka.common.serialization.Deserializer;
@@ -255,7 +257,13 @@ public class TopologyTestDriver implements Closeable {
final MockConsumer<byte[], byte[]> consumer = new
MockConsumer<>(OffsetResetStrategy.EARLIEST);
stateDirectory = new StateDirectory(streamsConfig, mockWallClockTime);
- metrics = new Metrics();
+
+ final MetricConfig metricConfig = new MetricConfig()
+
.samples(streamsConfig.getInt(StreamsConfig.METRICS_NUM_SAMPLES_CONFIG))
+
.recordLevel(Sensor.RecordingLevel.forName(streamsConfig.getString(StreamsConfig.METRICS_RECORDING_LEVEL_CONFIG)))
+
.timeWindow(streamsConfig.getLong(StreamsConfig.METRICS_SAMPLE_WINDOW_MS_CONFIG),
TimeUnit.MILLISECONDS);
+
+ metrics = new Metrics(metricConfig, mockWallClockTime);
final StreamsMetricsImpl streamsMetrics = new StreamsMetricsImpl(
metrics,
"topology-test-driver-virtual-thread"