This is an automated email from the ASF dual-hosted git repository.
mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git
The following commit(s) were added to refs/heads/trunk by this push:
new 4f92a3f0afd KAFKA-14747: record discarded FK join subscription
responses (#15395)
4f92a3f0afd is described below
commit 4f92a3f0afda96c04059be81cf7867a0bbc7c276
Author: Ayoub Omari <[email protected]>
AuthorDate: Tue Mar 5 00:56:40 2024 +0100
KAFKA-14747: record discarded FK join subscription responses (#15395)
A foreign-key-join might drop a "subscription response" message, if the
value-hash changed.
This PR adds support to record such event via the existing "dropped
records" sensor.
Reviewers: Matthias J. Sax <[email protected]>
---
.../ResponseJoinProcessorSupplier.java | 13 +++++
.../SubscriptionJoinProcessorSupplier.java | 6 ++-
.../ForeignTableJoinProcessorSupplierTest.java | 2 +-
.../ResponseJoinProcessorSupplierTest.java | 56 +++++++++++++++++++---
4 files changed, 69 insertions(+), 8 deletions(-)
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
index 600b28078b9..cbb66f98fa9 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
@@ -18,6 +18,7 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.metrics.Sensor;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
@@ -27,6 +28,8 @@ import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.ProcessorSupplier;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
import org.slf4j.Logger;
@@ -71,6 +74,8 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR>
implements ProcessorSup
private Serializer<V> runtimeValueSerializer =
constructionTimeValueSerializer;
private KTableValueGetter<K, V> valueGetter;
+ private Sensor droppedRecordsSensor;
+
@SuppressWarnings("unchecked")
@Override
@@ -82,6 +87,13 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR>
implements ProcessorSup
if (runtimeValueSerializer == null) {
runtimeValueSerializer = (Serializer<V>)
context.valueSerde().serializer();
}
+
+ final InternalProcessorContext<?, ?> internalProcessorContext
= (InternalProcessorContext<?, ?>) context;
+ droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(
+ Thread.currentThread().getName(),
+ internalProcessorContext.taskId().toString(),
+ internalProcessorContext.metrics()
+ );
}
@Override
@@ -112,6 +124,7 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR>
implements ProcessorSup
context().forward(record.withValue(result));
} else {
LOG.trace("Dropping FK-join response due to hash mismatch.
Expected {}. Actual {}", messageHash, currentHash);
+ droppedRecordsSensor.record();
}
}
};
diff --git
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
index a8677ce2958..388b669e988 100644
---
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
+++
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
@@ -107,7 +107,11 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
context().forward(
record.withKey(record.key().getPrimaryKey())
- .withValue(new
SubscriptionResponseWrapper<>(value.getHash(), valueToSend,
value.getPrimaryPartition()))
+ .withValue(new SubscriptionResponseWrapper<>(
+ value.getHash(),
+ valueToSend,
+ value.getPrimaryPartition()
+ ))
.withTimestamp(resultTimestamp)
);
break;
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTest.java
index f4f35e6ff05..c292dd2e349 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTest.java
@@ -379,4 +379,4 @@ public class ForeignTableJoinProcessorSupplierTest {
new SubscriptionJoinProcessorSupplier<>(valueGetterSupplier);
return supplier.get();
}
-}
\ No newline at end of file
+}
diff --git
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
index 4c26efe2364..b32c51a3baa 100644
---
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
+++
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
@@ -17,6 +17,7 @@
package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
+import org.apache.kafka.common.MetricName;
import org.apache.kafka.common.serialization.StringSerializer;
import org.apache.kafka.streams.kstream.ValueJoiner;
import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
@@ -25,17 +26,23 @@ import
org.apache.kafka.streams.processor.api.MockProcessorContext;
import org.apache.kafka.streams.processor.api.Processor;
import org.apache.kafka.streams.processor.api.ProcessorContext;
import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
import org.apache.kafka.streams.state.ValueAndTimestamp;
import org.apache.kafka.streams.state.internals.Murmur3;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
import org.junit.Test;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
public class ResponseJoinProcessorSupplierTest {
private static final StringSerializer STRING_SERIALIZER = new
StringSerializer();
@@ -88,7 +95,7 @@ public class ResponseJoinProcessorSupplierTest {
leftJoin
);
final Processor<String, SubscriptionResponseWrapper<String>, String,
String> processor = processorSupplier.get();
- final
org.apache.kafka.streams.processor.api.MockProcessorContext<String, String>
context = new org.apache.kafka.streams.processor.api.MockProcessorContext<>();
+ final MockInternalNewProcessorContext<String, String> context = new
MockInternalNewProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0);
@@ -97,6 +104,10 @@ public class ResponseJoinProcessorSupplierTest {
processor.process(new Record<>("lhs1", new
SubscriptionResponseWrapper<>(oldHash, "rhsValue", 0), 0));
final List<MockProcessorContext.CapturedForward<? extends String, ?
extends String>> forwarded = context.forwarded();
assertThat(forwarded, empty());
+
+ // test dropped-records sensors
+ assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+ assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
}
@Test
@@ -113,7 +124,7 @@ public class ResponseJoinProcessorSupplierTest {
leftJoin
);
final Processor<String, SubscriptionResponseWrapper<String>, String,
String> processor = processorSupplier.get();
- final MockProcessorContext<String, String> context = new
MockProcessorContext<>();
+ final MockInternalNewProcessorContext<String, String> context = new
MockInternalNewProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0);
@@ -122,6 +133,10 @@ public class ResponseJoinProcessorSupplierTest {
processor.process(new Record<>("lhs1", new
SubscriptionResponseWrapper<>(hash, "rhsValue", 0), 0));
final List<MockProcessorContext.CapturedForward<? extends String, ?
extends String>> forwarded = context.forwarded();
assertThat(forwarded, empty());
+
+ // test dropped-records sensors
+ assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+ assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
}
@Test
@@ -138,7 +153,7 @@ public class ResponseJoinProcessorSupplierTest {
leftJoin
);
final Processor<String, SubscriptionResponseWrapper<String>, String,
String> processor = processorSupplier.get();
- final MockProcessorContext<String, String> context = new
MockProcessorContext<>();
+ final MockInternalNewProcessorContext<String, String> context = new
MockInternalNewProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0);
@@ -164,7 +179,7 @@ public class ResponseJoinProcessorSupplierTest {
leftJoin
);
final Processor<String, SubscriptionResponseWrapper<String>, String,
String> processor = processorSupplier.get();
- final MockProcessorContext<String, String> context = new
MockProcessorContext<>();
+ final MockInternalNewProcessorContext<String, String> context = new
MockInternalNewProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0);
@@ -190,7 +205,7 @@ public class ResponseJoinProcessorSupplierTest {
leftJoin
);
final Processor<String, SubscriptionResponseWrapper<String>, String,
String> processor = processorSupplier.get();
- final MockProcessorContext<String, String> context = new
MockProcessorContext<>();
+ final MockInternalNewProcessorContext<String, String> context = new
MockInternalNewProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0);
@@ -216,7 +231,7 @@ public class ResponseJoinProcessorSupplierTest {
leftJoin
);
final Processor<String, SubscriptionResponseWrapper<String>, String,
String> processor = processorSupplier.get();
- final MockProcessorContext<String, String> context = new
MockProcessorContext<>();
+ final MockInternalNewProcessorContext<String, String> context = new
MockInternalNewProcessorContext<>();
processor.init(context);
context.setRecordMetadata("topic", 0, 0);
@@ -227,4 +242,33 @@ public class ResponseJoinProcessorSupplierTest {
assertThat(forwarded.size(), is(1));
assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null,
0)));
}
+
+ private Object getDroppedRecordsTotalMetric(final
InternalProcessorContext<String, String> context) {
+ final MetricName dropTotalMetric = new MetricName(
+ "dropped-records-total",
+ "stream-task-metrics",
+ "The total number of dropped records",
+ mkMap(
+ mkEntry("thread-id", Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ );
+
+ return context.metrics().metrics().get(dropTotalMetric).metricValue();
+ }
+
+ private Object getDroppedRecordsRateMetric(final
InternalProcessorContext<String, String> context) {
+ final MetricName dropRateMetric = new MetricName(
+ "dropped-records-rate",
+ "stream-task-metrics",
+ "The average number of dropped records per second",
+ mkMap(
+ mkEntry("thread-id", Thread.currentThread().getName()),
+ mkEntry("task-id", "0_0")
+ )
+ );
+
+ return context.metrics().metrics().get(dropRateMetric).metricValue();
+ }
+
}