guozhangwang commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r887401929


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ClientUtils.java:
##########
@@ -166,4 +169,28 @@ public static String extractThreadId(final String 
fullThreadName) {
         final int index = fullThreadName.indexOf("StreamThread-");
         return fullThreadName.substring(index);
     }
+
+    public static long recordSizeInBytes(final long keyBytes,

Review Comment:
   Could we use it in PartitionGroupTest#getBytesBufferedForRawRecords as well?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -82,7 +103,11 @@ public void process(final Record<KIn, VIn> record) {
 
         final String topic = topicExtractor.extract(key, value, 
contextForExtraction);
 
-        collector.send(topic, key, value, record.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
+        final long bytesProduced =
+            collector.send(topic, key, value, record.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
+
+        bytesProducedSensor.record(bytesProduced, 
context.currentSystemTimeMs());

Review Comment:
   A minor suggestion: I think we can use one sensor that contains two metrics, 
one as "sum" and one as "count", and then call `record` here once on that 
sensor which would update both metrics. Similar for consumed.
   
   Examples can be found in those `addInvocationRateAndCountToSensor` etc 
functions.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/StreamTask.java:
##########
@@ -184,7 +184,7 @@ public StreamTask(final TaskId id,
             createPartitionQueues(),
             mainConsumer::currentLag,
             TaskMetrics.recordLatenessSensor(threadId, taskId, streamsMetrics),
-            TaskMetrics.totalBytesSensor(threadId, taskId, streamsMetrics),
+            TaskMetrics.totalInputBufferBytesSensor(threadId, taskId, 
streamsMetrics),

Review Comment:
   +1



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java:
##########
@@ -136,6 +154,86 @@ public static Sensor processAtSourceSensor(final String 
threadId,
         );
     }
 
+    public static Sensor bytesConsumedSensor(final String threadId,
+                                             final String taskId,
+                                             final String processorNodeId,
+                                             final String topic,
+                                             final StreamsMetricsImpl 
streamsMetrics) {
+        final String sensorName = processorNodeId + "-" + BYTES_CONSUMED;
+        final Sensor sensor =
+            streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, 
sensorName, RecordingLevel.INFO);
+        final Map<String, String> tagMap = new 
HashMap<>(streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId));
+        tagMap.put("topic", topic);
+        addSumMetricToSensor(
+            sensor,
+            PROCESSOR_NODE_LEVEL_GROUP,
+            tagMap,
+            BYTES_CONSUMED,
+            BYTES_CONSUMED_TOTAL_DESCRIPTION
+        );
+        return sensor;
+    }
+
+    public static Sensor recordsConsumedSensor(final String threadId,
+                                               final String taskId,
+                                               final String processorNodeId,
+                                               final String topic,
+                                               final StreamsMetricsImpl 
streamsMetrics) {
+        final String sensorName = processorNodeId + "-" + RECORDS_CONSUMED;
+        final Sensor sensor =
+            streamsMetrics.nodeLevelSensor(threadId, taskId, processorNodeId, 
sensorName, RecordingLevel.INFO);
+        final Map<String, String> tagMap = new 
HashMap<>(streamsMetrics.nodeLevelTagMap(threadId, taskId, processorNodeId));
+        tagMap.put("topic", topic);
+        addSumMetricToSensor(

Review Comment:
   Since we always record `1` for this sensor, I think we can combine this and 
the bytes metrics in a single sensor with `sum` and `count` (see my other 
comment above). Ditto for produced.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to