[ 
https://issues.apache.org/jira/browse/METRON-1642?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=16528312#comment-16528312
 ] 

ASF GitHub Bot commented on METRON-1642:
----------------------------------------

Github user nickwallen commented on a diff in the pull request:

    https://github.com/apache/metron/pull/1082#discussion_r199287400
  
    --- Diff: 
metron-platform/metron-writer/src/main/java/org/apache/metron/writer/kafka/KafkaWriter.java
 ---
    @@ -212,10 +233,13 @@ public BulkWriterResponse write(String sensorType, 
WriterConfiguration configura
             writerResponse.addError(t, tuple);
             continue;
           }
    -      Future future = kafkaProducer
    -          .send(new ProducerRecord<String, String>(kafkaTopic, 
jsonMessage));
    -      // we want to manage the batching
    -      results.add(new AbstractMap.SimpleEntry<>(tuple, future));
    +      Optional<String> topic = getKafkaTopic(message);
    +      if(topic.isPresent()) {
    --- End diff --
    
    In what cases would a topic not be present?  If that's an unexpected 
condition, we should probably log something. Or can a user choose to not route 
a message by returning an empty topic value?


> KafkaWriter should be able choose the topic from a field in addition to 
> topology construction time
> --------------------------------------------------------------------------------------------------
>
>                 Key: METRON-1642
>                 URL: https://issues.apache.org/jira/browse/METRON-1642
>             Project: Metron
>          Issue Type: Improvement
>            Reporter: Casey Stella
>            Priority: Major
>
> Currently, we choose the kafka topic via the kafka.topic field.  It would be 
> useful to allow people to specify the topic via a field.  This would enable 
> multi-stage (or chain) parsing, among other use-cases.



--
This message was sent by Atlassian JIRA
(v7.6.3#76005)

Reply via email to