cadonna commented on code in PR #12235:
URL: https://github.com/apache/kafka/pull/12235#discussion_r887131106


##########
streams/src/test/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetricsTest.java:
##########
@@ -111,7 +111,7 @@ public void shouldGetProcessAtSourceSensor() {
                 expectedParentSensor
         );
 
-        verifySensor(() -> 
ProcessorNodeMetrics.processAtSourceSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, streamsMetrics));
+        verifySensor(() -> 
ProcessorNodeMetrics.recordsProcessedAtSourceSensor(THREAD_ID, TASK_ID, 
PROCESSOR_NODE_ID, streamsMetrics));

Review Comment:
   `recordsProcessedAtSourceSensor()` does not exist and causes a compilation 
error.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/metrics/ProcessorNodeMetrics.java:
##########
@@ -136,6 +153,82 @@ public static Sensor processAtSourceSensor(final String 
threadId,
         );
     }
 
+    public static Sensor bytesConsumedSensor(final String threadId,

Review Comment:
   Could you please add unit tests?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -40,12 +40,12 @@
     private final Processor<KIn, VIn, KOut, VOut> processor;
     private final FixedKeyProcessor<KIn, VIn, VOut> fixedKeyProcessor;
     private final String name;
-    private final Time time;
+    protected final Time time;
 
     public final Set<String> stateStores;
 
-    private InternalProcessorContext<KOut, VOut> internalProcessorContext;
-    private String threadId;
+    protected InternalProcessorContext<KOut, VOut> internalProcessorContext;

Review Comment:
   I do not think you need to do this. You can simply use the context that is 
passed in into the `init()` method of `SinkNode`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -82,7 +107,12 @@ public void process(final Record<KIn, VIn> record) {
 
         final String topic = topicExtractor.extract(key, value, 
contextForExtraction);
 
-        collector.send(topic, key, value, record.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
+        final int bytesProduced =
+            collector.send(topic, key, value, record.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
+
+        // Just use the cached system time to avoid the clock lookup

Review Comment:
   You can also remove the comment.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -56,6 +61,26 @@ public void addChild(final ProcessorNode<Void, Void, ?, ?> 
child) {
 
     @Override
     public void init(final InternalProcessorContext<Void, Void> context) {
+        // It is important to first create the sensor before calling init on 
the
+        // parent object. Otherwise due to backwards compatibility an empty 
sensor
+        // without parent is created with the same name.
+        // Once the backwards compatibility is not needed anymore it might be 
possible to
+        // change this.

Review Comment:
   Nice, I just realized that you copied that comment from code that I wrote! 🙂 
   I guess the backwards compatibility was the one with the old metrics 
structure that we changed in KIP-444. We removed the old structure in 3.0, so I 
guess that this is an instance of comments that started to lie.
   Moreover, from where you copied the comment (I guess it was `SourceNode`) 
the sensor `processAtSourceSensor` has indeed a parent. I think the comment 
does not make sense in this class and we need to verify if it still makes sense 
in the other class. That does not need to be verifed in this PR.  
   



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -40,12 +40,12 @@
     private final Processor<KIn, VIn, KOut, VOut> processor;
     private final FixedKeyProcessor<KIn, VIn, VOut> fixedKeyProcessor;
     private final String name;
-    private final Time time;
+    protected final Time time;

Review Comment:
   Is this really needed? Could not find where is is used outside 
`ProcessorNode`.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -56,6 +61,26 @@ public void addChild(final ProcessorNode<Void, Void, ?, ?> 
child) {
 
     @Override
     public void init(final InternalProcessorContext<Void, Void> context) {
+        // It is important to first create the sensor before calling init on 
the
+        // parent object. Otherwise due to backwards compatibility an empty 
sensor
+        // without parent is created with the same name.
+        // Once the backwards compatibility is not needed anymore it might be 
possible to
+        // change this.

Review Comment:
   I actually do not understand this comment. Backwards compatibility of what? 
What does "empty sensor without parent" mean? `recordsProducedSensor()` and 
`bytesProducedSensor()` do not create sensors with parents.



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/ProcessorNode.java:
##########
@@ -40,12 +40,12 @@
     private final Processor<KIn, VIn, KOut, VOut> processor;
     private final FixedKeyProcessor<KIn, VIn, VOut> fixedKeyProcessor;
     private final String name;
-    private final Time time;
+    protected final Time time;
 
     public final Set<String> stateStores;
 
-    private InternalProcessorContext<KOut, VOut> internalProcessorContext;
-    private String threadId;
+    protected InternalProcessorContext<KOut, VOut> internalProcessorContext;
+    protected String threadId;

Review Comment:
   Also this does not seem to be strictly needed since in `SinkNode` you could 
use `Thread.currentThread().getName()` as in `init()` of `SourceNode`. I even 
think you need to use `Thread.currentThread().getName()` in `SinkNode` since 
you use `threadId` before the variable is set in `super.init()`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordQueue.java:
##########
@@ -71,6 +74,20 @@ public class RecordQueue {
             processorContext.taskId().toString(),
             processorContext.metrics()
         );
+        bytesConsumedSensor = ProcessorNodeMetrics.bytesConsumedSensor(
+            Thread.currentThread().getName(),

Review Comment:
   Could you store the thread name in a local variable and use that variable in 
all sensor retrieval methods?



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/SinkNode.java:
##########
@@ -82,7 +107,12 @@ public void process(final Record<KIn, VIn> record) {
 
         final String topic = topicExtractor.extract(key, value, 
contextForExtraction);
 
-        collector.send(topic, key, value, record.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
+        final int bytesProduced =
+            collector.send(topic, key, value, record.headers(), timestamp, 
keySerializer, valSerializer, partitioner);
+
+        // Just use the cached system time to avoid the clock lookup

Review Comment:
   I think it is fine as you do it since we do it similarly in other places 
like `processAtSourceSensor.record(1.0d, context.currentSystemTimeMs());` in  
`SourceNode`. 



##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -199,6 +199,7 @@ public <K, V> void send(final String topic,
                 log.trace("Failed record: (key {} value {} timestamp {}) 
topic=[{}] partition=[{}]", key, value, timestamp, topic, partition);
             }
         });
+        return keyBytes.length + valBytes.length;

Review Comment:
   Should the metric record the actually produced bytes or the bytes that were 
passed to the producer for sending? Here we would record the latter. The value 
of the metric might be higher than the actually produced bytes to the output 
topic. 



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]

Reply via email to