[ 
https://issues.apache.org/jira/browse/STORM-1052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14942698#comment-14942698
 ] 

ASF GitHub Bot commented on STORM-1052:
---------------------------------------

Github user harshach commented on a diff in the pull request:

    https://github.com/apache/storm/pull/743#discussion_r41099133
  
    --- Diff: 
external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
    @@ -72,27 +73,27 @@ public void prepare(Map stormConf) {
             Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
             Properties properties = new Properties();
             properties.putAll(configMap);
    -        ProducerConfig config = new ProducerConfig(properties);
    -        producer = new Producer(config);
    +        producer = new KafkaProducer(properties);
         }
     
         public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) {
    -        String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    -                topic = topicSelector.getTopic(tuple);
    -
    -                if(topic != null) {
    -                    producer.send(new KeyedMessage(topic, 
mapper.getKeyFromTuple(tuple),
    -                            mapper.getMessageFromTuple(tuple)));
    -                } else {
    -                    LOG.warn("skipping key = " + 
mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
    -                }
    -            } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + 
mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    -                LOG.warn(errorMsg, ex);
    -                throw new FailedException(errorMsg, ex);
    +        for (final TridentTuple tuple : tuples) {
    +            final String topic = topicSelector.getTopic(tuple);
    +            if(topic != null) {
    +                producer.send(new ProducerRecord(topic, 
mapper.getKeyFromTuple(tuple),
    --- End diff --
    
    We are moving from sync behavior to async here. Any reason for this? or 
can't we just call .get() on the send method.


> TridentKafkaState uses new Kafka Producer API
> ---------------------------------------------
>
>                 Key: STORM-1052
>                 URL: https://issues.apache.org/jira/browse/STORM-1052
>             Project: Apache Storm
>          Issue Type: Improvement
>          Components: storm-kafka
>            Reporter: Xin Wang
>            Assignee: Xin Wang
>
> use new kafka-clients api



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to