clolov commented on code in PR #12836:
URL: https://github.com/apache/kafka/pull/12836#discussion_r1018037337


##########
streams/src/main/java/org/apache/kafka/streams/processor/internals/RecordCollectorImpl.java:
##########
@@ -219,26 +219,20 @@ public <K, V> void send(final String topic,
                 }
 
                 if (!topic.endsWith("-changelog")) {
-                    final Map<String, Sensor> producedSensorByTopic = 
sinkNodeToProducedSensorByTopic.get(processorNodeId);
-                    if (producedSensorByTopic == null) {
-                        log.error("Unable to records bytes produced to topic 
{} by sink node {} as the node is not recognized.\n"
-                                      + "Known sink nodes are {}.", topic, 
processorNodeId, sinkNodeToProducedSensorByTopic.keySet());
-                    } else {
-                        // we may not have created a sensor during 
initialization if the node uses dynamic topic routing,
-                        // as all topics are not known up front, so create the 
sensor for that topic if absent
-                        final Sensor topicProducedSensor = 
producedSensorByTopic.computeIfAbsent(
+                    // we may not have created a sensor during initialization 
if the node uses dynamic topic routing,
+                    // as all topics are not known up front, so create the 
sensor for this topic if absent
+                    final Sensor topicProducedSensor = 
producedSensorByTopic.computeIfAbsent(
+                        topic,
+                        t -> TopicMetrics.producedSensor(
+                            Thread.currentThread().getName(),
+                            taskId.toString(),
+                            processorNodeId,
                             topic,
-                            t -> TopicMetrics.producedSensor(
-                                Thread.currentThread().getName(),
-                                taskId.toString(),
-                                processorNodeId,
-                                topic,
-                                context.metrics()
-                            )
-                        );
-                        final long bytesProduced = 
producerRecordSizeInBytes(serializedRecord);
-                        topicProducedSensor.record(bytesProduced, 
context.currentSystemTimeMs());
-                    }
+                            context.metrics()
+                        )
+                    );
+                    final long bytesProduced = 
producerRecordSizeInBytes(serializedRecord);
+                    topicProducedSensor.record(bytesProduced, 
context.currentSystemTimeMs());

Review Comment:
   To answer my own question: The `NullPointerException` surfaces on line 235 
and not on line 231 because the topic already makes it into the Map as it is 
the sink and sinks are registered here: 
https://github.com/apache/kafka/pull/12836/files#diff-36dd8c03fa5252dbd39042bb49a0d6272728fbd46459c6a45f5189fa59749b32R97



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: jira-unsubscr...@kafka.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to