This is an automated email from the ASF dual-hosted git repository.

mjsax pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/kafka.git


The following commit(s) were added to refs/heads/trunk by this push:
     new 4f92a3f0afd KAFKA-14747: record discarded FK join subscription 
responses (#15395)
4f92a3f0afd is described below

commit 4f92a3f0afda96c04059be81cf7867a0bbc7c276
Author: Ayoub Omari <[email protected]>
AuthorDate: Tue Mar 5 00:56:40 2024 +0100

    KAFKA-14747: record discarded FK join subscription responses (#15395)
    
    A foreign-key-join might drop a "subscription response" message, if the 
value-hash changed.
    This PR adds support to record such event via the existing "dropped 
records" sensor.
    
    Reviewers: Matthias J. Sax <[email protected]>
---
 .../ResponseJoinProcessorSupplier.java             | 13 +++++
 .../SubscriptionJoinProcessorSupplier.java         |  6 ++-
 .../ForeignTableJoinProcessorSupplierTest.java     |  2 +-
 .../ResponseJoinProcessorSupplierTest.java         | 56 +++++++++++++++++++---
 4 files changed, 69 insertions(+), 8 deletions(-)

diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
index 600b28078b9..cbb66f98fa9 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplier.java
@@ -18,6 +18,7 @@
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
 import org.apache.kafka.common.errors.UnsupportedVersionException;
+import org.apache.kafka.common.metrics.Sensor;
 import org.apache.kafka.common.serialization.Serializer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
@@ -27,6 +28,8 @@ import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.ProcessorSupplier;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
+import org.apache.kafka.streams.processor.internals.metrics.TaskMetrics;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.Murmur3;
 import org.slf4j.Logger;
@@ -71,6 +74,8 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR> 
implements ProcessorSup
             private Serializer<V> runtimeValueSerializer = 
constructionTimeValueSerializer;
 
             private KTableValueGetter<K, V> valueGetter;
+            private Sensor droppedRecordsSensor;
+
 
             @SuppressWarnings("unchecked")
             @Override
@@ -82,6 +87,13 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR> 
implements ProcessorSup
                 if (runtimeValueSerializer == null) {
                     runtimeValueSerializer = (Serializer<V>) 
context.valueSerde().serializer();
                 }
+
+                final InternalProcessorContext<?, ?> internalProcessorContext 
= (InternalProcessorContext<?, ?>) context;
+                droppedRecordsSensor = TaskMetrics.droppedRecordsSensor(
+                        Thread.currentThread().getName(),
+                        internalProcessorContext.taskId().toString(),
+                        internalProcessorContext.metrics()
+                );
             }
 
             @Override
@@ -112,6 +124,7 @@ public class ResponseJoinProcessorSupplier<K, V, VO, VR> 
implements ProcessorSup
                     context().forward(record.withValue(result));
                 } else {
                     LOG.trace("Dropping FK-join response due to hash mismatch. 
Expected {}. Actual {}", messageHash, currentHash);
+                    droppedRecordsSensor.record();
                 }
             }
         };
diff --git 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
index a8677ce2958..388b669e988 100644
--- 
a/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
+++ 
b/streams/src/main/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/SubscriptionJoinProcessorSupplier.java
@@ -107,7 +107,11 @@ public class SubscriptionJoinProcessorSupplier<K, KO, VO>
 
                         context().forward(
                             record.withKey(record.key().getPrimaryKey())
-                                .withValue(new 
SubscriptionResponseWrapper<>(value.getHash(), valueToSend, 
value.getPrimaryPartition()))
+                                .withValue(new SubscriptionResponseWrapper<>(
+                                        value.getHash(),
+                                        valueToSend,
+                                        value.getPrimaryPartition()
+                                ))
                                 .withTimestamp(resultTimestamp)
                         );
                         break;
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTest.java
index f4f35e6ff05..c292dd2e349 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ForeignTableJoinProcessorSupplierTest.java
@@ -379,4 +379,4 @@ public class ForeignTableJoinProcessorSupplierTest {
             new SubscriptionJoinProcessorSupplier<>(valueGetterSupplier);
         return supplier.get();
     }
-}
\ No newline at end of file
+}
diff --git 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
index 4c26efe2364..b32c51a3baa 100644
--- 
a/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
+++ 
b/streams/src/test/java/org/apache/kafka/streams/kstream/internals/foreignkeyjoin/ResponseJoinProcessorSupplierTest.java
@@ -17,6 +17,7 @@
 
 package org.apache.kafka.streams.kstream.internals.foreignkeyjoin;
 
+import org.apache.kafka.common.MetricName;
 import org.apache.kafka.common.serialization.StringSerializer;
 import org.apache.kafka.streams.kstream.ValueJoiner;
 import org.apache.kafka.streams.kstream.internals.KTableValueGetter;
@@ -25,17 +26,23 @@ import 
org.apache.kafka.streams.processor.api.MockProcessorContext;
 import org.apache.kafka.streams.processor.api.Processor;
 import org.apache.kafka.streams.processor.api.ProcessorContext;
 import org.apache.kafka.streams.processor.api.Record;
+import org.apache.kafka.streams.processor.internals.InternalProcessorContext;
 import org.apache.kafka.streams.state.ValueAndTimestamp;
 import org.apache.kafka.streams.state.internals.Murmur3;
+import org.apache.kafka.test.MockInternalNewProcessorContext;
 import org.junit.Test;
 
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 
+import static org.apache.kafka.common.utils.Utils.mkEntry;
+import static org.apache.kafka.common.utils.Utils.mkMap;
 import static org.hamcrest.CoreMatchers.is;
 import static org.hamcrest.MatcherAssert.assertThat;
 import static org.hamcrest.collection.IsEmptyCollection.empty;
+import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotEquals;
 
 public class ResponseJoinProcessorSupplierTest {
     private static final StringSerializer STRING_SERIALIZER = new 
StringSerializer();
@@ -88,7 +95,7 @@ public class ResponseJoinProcessorSupplierTest {
                 leftJoin
             );
         final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
-        final 
org.apache.kafka.streams.processor.api.MockProcessorContext<String, String> 
context = new org.apache.kafka.streams.processor.api.MockProcessorContext<>();
+        final MockInternalNewProcessorContext<String, String> context = new 
MockInternalNewProcessorContext<>();
         processor.init(context);
         context.setRecordMetadata("topic", 0, 0);
 
@@ -97,6 +104,10 @@ public class ResponseJoinProcessorSupplierTest {
         processor.process(new Record<>("lhs1", new 
SubscriptionResponseWrapper<>(oldHash, "rhsValue", 0), 0));
         final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded, empty());
+
+        // test dropped-records sensors
+        assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+        assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
     }
 
     @Test
@@ -113,7 +124,7 @@ public class ResponseJoinProcessorSupplierTest {
                 leftJoin
             );
         final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
-        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
+        final MockInternalNewProcessorContext<String, String> context = new 
MockInternalNewProcessorContext<>();
         processor.init(context);
         context.setRecordMetadata("topic", 0, 0);
 
@@ -122,6 +133,10 @@ public class ResponseJoinProcessorSupplierTest {
         processor.process(new Record<>("lhs1", new 
SubscriptionResponseWrapper<>(hash, "rhsValue", 0), 0));
         final List<MockProcessorContext.CapturedForward<? extends String, ? 
extends String>> forwarded = context.forwarded();
         assertThat(forwarded, empty());
+
+        // test dropped-records sensors
+        assertEquals(1.0, getDroppedRecordsTotalMetric(context));
+        assertNotEquals(0.0, getDroppedRecordsRateMetric(context));
     }
 
     @Test
@@ -138,7 +153,7 @@ public class ResponseJoinProcessorSupplierTest {
                 leftJoin
             );
         final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
-        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
+        final MockInternalNewProcessorContext<String, String> context = new 
MockInternalNewProcessorContext<>();
         processor.init(context);
         context.setRecordMetadata("topic", 0, 0);
 
@@ -164,7 +179,7 @@ public class ResponseJoinProcessorSupplierTest {
                 leftJoin
             );
         final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
-        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
+        final MockInternalNewProcessorContext<String, String> context = new 
MockInternalNewProcessorContext<>();
         processor.init(context);
         context.setRecordMetadata("topic", 0, 0);
 
@@ -190,7 +205,7 @@ public class ResponseJoinProcessorSupplierTest {
                 leftJoin
             );
         final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
-        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
+        final MockInternalNewProcessorContext<String, String> context = new 
MockInternalNewProcessorContext<>();
         processor.init(context);
         context.setRecordMetadata("topic", 0, 0);
 
@@ -216,7 +231,7 @@ public class ResponseJoinProcessorSupplierTest {
                 leftJoin
             );
         final Processor<String, SubscriptionResponseWrapper<String>, String, 
String> processor = processorSupplier.get();
-        final MockProcessorContext<String, String> context = new 
MockProcessorContext<>();
+        final MockInternalNewProcessorContext<String, String> context = new 
MockInternalNewProcessorContext<>();
         processor.init(context);
         context.setRecordMetadata("topic", 0, 0);
 
@@ -227,4 +242,33 @@ public class ResponseJoinProcessorSupplierTest {
         assertThat(forwarded.size(), is(1));
         assertThat(forwarded.get(0).record(), is(new Record<>("lhs1", null, 
0)));
     }
+
+    private Object getDroppedRecordsTotalMetric(final 
InternalProcessorContext<String, String> context) {
+        final MetricName dropTotalMetric = new MetricName(
+            "dropped-records-total",
+            "stream-task-metrics",
+            "The total number of dropped records",
+            mkMap(
+                mkEntry("thread-id", Thread.currentThread().getName()),
+                mkEntry("task-id", "0_0")
+            )
+        );
+
+        return context.metrics().metrics().get(dropTotalMetric).metricValue();
+    }
+
+    private Object getDroppedRecordsRateMetric(final 
InternalProcessorContext<String, String> context) {
+        final MetricName dropRateMetric = new MetricName(
+            "dropped-records-rate",
+            "stream-task-metrics",
+            "The average number of dropped records per second",
+            mkMap(
+                mkEntry("thread-id", Thread.currentThread().getName()),
+                mkEntry("task-id", "0_0")
+            )
+        );
+
+        return context.metrics().metrics().get(dropRateMetric).metricValue();
+    }
+
 }

Reply via email to