cadonna commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r887767497


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java:
##########
@@ -166,4 +169,28 @@ public static String extractThreadId(final String 
fullThreadName) {
         final int index = fullThreadName.indexOf("StreamThread-");
         return fullThreadName.substring(index);
     }
+
+    public static long recordSizeInBytes(final long keyBytes,

Review Comment:
   Could you add unit tests for this method?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -199,6 +201,7 @@ public <K, V> void send(final String topic,
                 log.trace("Failed record: (key {} value {} timestamp {}) 
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
             }
         });
+        return recordSizeInBytes(keyBytes == null ? 0 : keyBytes.length, 
valBytes == null ? 0 : valBytes.length, topic, headers);

Review Comment:
   Should we move the `null` checks into `recordSizeInBytes()` so that we have 
all `null`-checks in one place? 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##########
@@ -145,8 +141,11 @@ int addRawRecords(final Iterable<ConsumerRecord<byte[], 
byte[]>> rawRecords) {
      *
      * @return StampedRecord
      */
-    public StampedRecord poll() {
+    public StampedRecord poll(final long wallClockTime) {
         final StampedRecord recordToReturn = headRecord;
+
+        consumedSensor.record(headRecordSizeInBytes, wallClockTime);

Review Comment:
   Could you please add unit tests for this?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java:
##########
@@ -795,6 +795,23 @@ public static void addAvgAndSumMetricsToSensor(final 
Sensor sensor,
         );
     }
 
+    public static void addTotalCountAndSumMetricsToSensor(final Sensor sensor,

Review Comment:
   Could you please add unit tests for this method?



##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java:
##########
@@ -114,6 +116,63 @@ public void shouldGetProcessAtSourceSensor() {
         verifySensor(() -> 
ProcessorNodeMetrics.processAtSourceSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, streamsMetrics));
     }
 
+    @Test
+    public void shouldGetRecordsAndBytesConsumedSensor() {
+        final String recordsMetricNamePrefix = "records-consumed";
+        final String bytesMetricNamePrefix = "bytes-consumed";
+        final String descriptionOfRecordsTotal = "The total number of records 
consumed from this topic";
+        final String descriptionOfBytesTotal = "The total number of bytes 
consumed from this topic";
+        when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, recordsMetricNamePrefix, RecordingLevel.INFO))
+            .thenReturn(expectedSensor);
+        when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, bytesMetricNamePrefix, RecordingLevel.INFO))
+            .thenReturn(expectedSensor);
+        when(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID)).thenReturn(tagMap);
+
+        final Map<String, String> consumedTagMap = new HashMap<>(tagMap);
+        consumedTagMap.put("topic", TOPIC_NAME);
+        StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor(

Review Comment:
   Apparently, there was a miss in transforming the tests in this class from 
using `PowerMock`/`EasyMock` to using `Mockito`. With `PowerMock`/`EasyMock` 
this call verifies that the method is called on the mock. With `Mockito` this 
call just calls the method on the actual class without any verifications. You 
need to do the following to get this right for your tests:
   
   1. Add a static mock:
   ```
       private final MockedStatic<StreamsMetricsImpl> 
streamsMetricsMockedStatic = mockStatic(StreamsMetricsImpl.class);
       private final StreamsMetricsImpl streamsMetrics = 
mock(StreamsMetricsImpl.class); // This is already there
   ```
   
   2. Change your test to:
   
   ```
       public void shouldGetRecordsAndBytesConsumedSensor() {
           final String recordsMetricNamePrefix = "records-consumed";
           final String bytesMetricNamePrefix = "bytes-consumed";
           final String descriptionOfRecordsTotal = "The total number of 
records consumed from this topic";
           final String descriptionOfBytesTotal = "The total number of bytes 
consumed from this topic";
           when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, recordsMetricNamePrefix, RecordingLevel.INFO))
               .thenReturn(expectedSensor);
           when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, bytesMetricNamePrefix, RecordingLevel.INFO))
               .thenReturn(expectedSensor);
           when(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID)).thenReturn(tagMap);
   
           verifySensor(
               () -> ProcessorNodeMetrics.consumedSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics)
           );
   
           final Map<String, String> consumedTagMap = new HashMap<>(tagMap);
           consumedTagMap.put("topic", TOPIC_NAME);
           streamsMetricsMockedStatic.verify(() -> 
addTotalCountAndSumMetricsToSensor(
               expectedSensor,
               PROCESSOR_NODE_LEVEL_GROUP,
               consumedTagMap,
               recordsMetricNamePrefix,
               bytesMetricNamePrefix,
               descriptionOfRecordsTotal,
               descriptionOfBytesTotal
           ));
       }
   ```
   We can fix the other tests in a separate PR.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -199,6 +201,7 @@ public <K, V> void send(final String topic,
                 log.trace("Failed record: (key {} value {} timestamp {}) 
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
             }
         });
+        return recordSizeInBytes(keyBytes == null ? 0 : keyBytes.length, 
valBytes == null ? 0 : valBytes.length, topic, headers);

Review Comment:
   I think in my previous comment we misunderstood each other. I was asking 
what we want to record with this metric. Do we want to record the records that 
are passed to the producer or do we want to measure the records actually sent 
to the output topic? As far as I understand there is no guarantee that a record 
passed to a producer is indeed sent to the output topic. There could be a crash 
in between passing the record to the consumer and sending the record to the 
output topic.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -82,7 +89,19 @@ public void process(final Record<KIn, VIn> record) {
 
         final String topic = topicExtractor.extract(key, value, 
contextForExtraction);
 
-        collector.send(topic, key, value, record.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
+        final long bytesProduced =
+            collector.send(topic, key, value, record.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
+
+        producedSensorByTopic.putIfAbsent(
+            topic,
+            ProcessorNodeMetrics.producedSensor(
+                Thread.currentThread().getName(),
+                context.taskId().toString(),
+                name(),
+                topic,
+                context.metrics()
+        ));
+        producedSensorByTopic.get(topic).record(bytesProduced, 
context.currentSystemTimeMs());

Review Comment:
   Could you please add unit tests for this code? 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to