guozhangwang commented on code in PR #12235: URL: https://github.com/apache/kafka/pull/12235#discussion_r887401929
########## streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java: ########## @@ -166,4 +169,28 @@ public static String extractThreadId(final String fullThreadName) { final int index = fullThreadName.indexOf("StreamThread-"); return fullThreadName.substring(index); } + + public static long recordSizeInBytes(final long keyBytes, Review Comment: Could we use it in PartitionGroupTest#getBytesBufferedForRawRecords as well? ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java: ########## @@ -82,7 +103,11 @@ public void process(final Record<KIn, VIn> record) { final String topic = topicExtractor.extract(key, value, contextForExtraction); - collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner); + final long bytesProduced = + collector.send(topic, key, value, record.headers(), timestamp, keySerializer, valSerializer, partitioner); + + bytesProducedSensor.record(bytesProduced, context.currentSystemTimeMs()); Review Comment: A minor suggestion: I think we can use one sensor that contains two metrics, one as "sum" and one as "count", and then call `record` here once on that sensor which would update both metrics. Similar for consumed. Examples can be found in those `addInvocationRateAndCountToSensor` etc functions. ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java: ########## @@ -184,7 +184,7 @@ public StreamTask(final TaskId id, createPartitionQueues(), mainConsumer::currentLag, TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics), - TaskMetrics.totalBytesSensor(threadId, taskId, streamsMetrics), + TaskMetrics.totalInputBufferBytesSensor(threadId, taskId, streamsMetrics), Review Comment: +1 ########## streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java: ########## @@ -136,6 +154,86 @@ public static Sensor processAtSourceSensor(final String threadId, ); } + public static Sensor bytesConsumedSensor(final String threadId, + final String taskId, + final String processorNodeId, + final String topic, + final StreamsMetricsImpl streamsMetrics) { + final String sensorName = processorNodeId + "-" + BYTES_CONSUMED; + final Sensor sensor = + streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, sensorName, RecordingLevel.INFO); + final Map<String, String> tagMap = new HashMap<>(streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId)); + tagMap.put("topic", topic); + addSumMetricToSensor( + sensor, + PROCESSOR_NODE_LEVEL_GROUP, + tagMap, + BYTES_CONSUMED, + BYTES_CONSUMED_TOTAL_DESCRIPTION + ); + return sensor; + } + + public static Sensor recordsConsumedSensor(final String threadId, + final String taskId, + final String processorNodeId, + final String topic, + final StreamsMetricsImpl streamsMetrics) { + final String sensorName = processorNodeId + "-" + RECORDS_CONSUMED; + final Sensor sensor = + streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, sensorName, RecordingLevel.INFO); + final Map<String, String> tagMap = new HashMap<>(streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId)); + tagMap.put("topic", topic); + addSumMetricToSensor( Review Comment: Since we always record `1` for this sensor, I think we can combine this and the bytes metrics in a single sensor with `sum` and `count` (see my other comment above). Ditto for produced. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org For queries about this service, please contact Infrastructure at: us...@infra.apache.org