Github user nickwallen commented on a diff in the pull request:
https://github.com/apache/metron/pull/1082#discussion_r199287400
--- Diff:
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
---
@@ -212,10 +233,13 @@ public BulkWriterResponse write(String sensorType,
WriterConfiguration configura
writerResponse.addError(t, tuple);
continue;
}
- Future future = kafkaProducer
- .send(new ProducerRecord<String, String>(kafkaTopic,
jsonMessage));
- // we want to manage the batching
- results.add(new AbstractMap.SimpleEntry<>(tuple, future));
+ Optional<String> topic = getKafkaTopic(message);
+ if(topic.isPresent()) {
--- End diff --
In what cases would a topic not be present? If that's an unexpected
condition, we should probably log something. Or can a user choose to not route
a message by returning an empty topic value?
---