cadonna commented on code in PR #12235: URL: https://github.com/apache/kafka/pull/12235#discussion_r887767497
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java: ########## @@ -166,4 +169,28 @@ public static String extractThreadId(final String fullThreadName) { final int index = fullThreadName.indexOf("StreamThread-"); return fullThreadName.substring(index); } + + public static long recordSizeInBytes(final long keyBytes, Review Comment: Could you add unit tests for this method? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ########## @@ -199,6 +201,7 @@ public <K, V> void send(final String topic, log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition); } }); + return recordSizeInBytes(keyBytes == null ? 0 : keyBytes.length, valBytes == null ? 0 : valBytes.length, topic, headers); Review Comment: Should we move the `null` checks into `recordSizeInBytes()` so that we have all `null`-checks in one place? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java: ########## @@ -145,8 +141,11 @@ int addRawRecords(final Iterable<ConsumerRecord<byte[], byte[]>> rawRecords) { * * @return StampedRecord */ - public StampedRecord poll() { + public StampedRecord poll(final long wallClockTime) { final StampedRecord recordToReturn = headRecord; + + consumedSensor.record(headRecordSizeInBytes, wallClockTime); Review Comment: Could you please add unit tests for this? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/StreamsMetricsImpl.java: ########## @@ -795,6 +795,23 @@ public static void addAvgAndSumMetricsToSensor(final Sensor sensor, ); } + public static void addTotalCountAndSumMetricsToSensor(final Sensor sensor, Review Comment: Could you please add unit tests for this method? ########## streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java: ########## @@ -114,6 +116,63 @@ public void shouldGetProcessAtSourceSensor() { verifySensor(() -> ProcessorNodeMetrics.processAtSourceSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, streamsMetrics)); } + @Test + public void shouldGetRecordsAndBytesConsumedSensor() { + final String recordsMetricNamePrefix = "records-consumed"; + final String bytesMetricNamePrefix = "bytes-consumed"; + final String descriptionOfRecordsTotal = "The total number of records consumed from this topic"; + final String descriptionOfBytesTotal = "The total number of bytes consumed from this topic"; + when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, recordsMetricNamePrefix, RecordingLevel.INFO)) + .thenReturn(expectedSensor); + when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, bytesMetricNamePrefix, RecordingLevel.INFO)) + .thenReturn(expectedSensor); + when(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).thenReturn(tagMap); + + final Map<String, String> consumedTagMap = new HashMap<>(tagMap); + consumedTagMap.put("topic", TOPIC_NAME); + StreamsMetricsImpl.addTotalCountAndSumMetricsToSensor( Review Comment: Apparently, there was a miss in transforming the tests in this class from using `PowerMock`/`EasyMock` to using `Mockito`. With `PowerMock`/`EasyMock` this call verifies that the method is called on the mock. With `Mockito` this call just calls the method on the actual class without any verifications. You need to do the following to get this right for your tests: 1. Add a static mock: ``` private final MockedStatic<StreamsMetricsImpl> streamsMetricsMockedStatic = mockStatic(StreamsMetricsImpl.class); private final StreamsMetricsImpl streamsMetrics = mock(StreamsMetricsImpl.class); // This is already there ``` 2. Change your test to: ``` public void shouldGetRecordsAndBytesConsumedSensor() { final String recordsMetricNamePrefix = "records-consumed"; final String bytesMetricNamePrefix = "bytes-consumed"; final String descriptionOfRecordsTotal = "The total number of records consumed from this topic"; final String descriptionOfBytesTotal = "The total number of bytes consumed from this topic"; when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, recordsMetricNamePrefix, RecordingLevel.INFO)) .thenReturn(expectedSensor); when(streamsMetrics.nodeLevelSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, bytesMetricNamePrefix, RecordingLevel.INFO)) .thenReturn(expectedSensor); when(streamsMetrics.nodeLevelTagMap(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID)).thenReturn(tagMap); verifySensor( () -> ProcessorNodeMetrics.consumedSensor(THREAD_ID, TASK_ID, PROCESSOR_NODE_ID, TOPIC_NAME, streamsMetrics) ); final Map<String, String> consumedTagMap = new HashMap<>(tagMap); consumedTagMap.put("topic", TOPIC_NAME); streamsMetricsMockedStatic.verify(() -> addTotalCountAndSumMetricsToSensor( expectedSensor, PROCESSOR_NODE_LEVEL_GROUP, consumedTagMap, recordsMetricNamePrefix, bytesMetricNamePrefix, descriptionOfRecordsTotal, descriptionOfBytesTotal )); } ``` We can fix the other tests in a separate PR. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java: ########## @@ -199,6 +201,7 @@ public <K, V> void send(final String topic, log.trace("Failed record: (key {} value {} timestamp {}) topic=[{}] partition=[{}]", key, value, timestamp, topic, partition); } }); + return recordSizeInBytes(keyBytes == null ? 0 : keyBytes.length, valBytes == null ? 0 : valBytes.length, topic, headers); Review Comment: I think in my previous comment we misunderstood each other. I was asking what we want to record with this metric. Do we want to record the records that are passed to the producer or do we want to measure the records actually sent to the output topic? As far as I understand there is no guarantee that a record passed to a producer is indeed sent to the output topic. There could be a crash in between passing the record to the consumer and sending the record to the output topic. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ########## @@ -82,7 +89,19 @@ public void process(final Record<KIn, VIn> record) { final String topic = topicExtractor.extract(key, value, contextForExtraction); - collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner); + final long bytesProduced = + collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner); + + producedSensorByTopic.putIfAbsent( + topic, + ProcessorNodeMetrics.producedSensor( + Thread.currentThread().getName(), + context.taskId().toString(), + name(), + topic, + context.metrics() + )); + producedSensorByTopic.get(topic).record(bytesProduced, context.currentSystemTimeMs()); Review Comment: Could you please add unit tests for this code? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org