[ 
https://issues.apache.org/jira/browse/STORM-1357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033047#comment-15033047
 ] 

ASF GitHub Bot commented on STORM-1357:
---------------------------------------

Github user zhuoliu commented on a diff in the pull request:

    https://github.com/apache/storm/pull/906#discussion_r46239106
  
    --- Diff: 
external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
    @@ -66,33 +64,30 @@ public void commit(Long txid) {
             LOG.debug("commit is Noop.");
         }
     
    -    public void prepare(Map stormConf) {
    +    public void prepare(Properties options) {
             Validate.notNull(mapper, "mapper can not be null");
             Validate.notNull(topicSelector, "topicSelector can not be null");
    -        Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
    -        Properties properties = new Properties();
    -        properties.putAll(configMap);
    -        ProducerConfig config = new ProducerConfig(properties);
    -        producer = new Producer(config);
    +        producer = new KafkaProducer(options);
         }
     
         public void updateState(List<TridentTuple> tuples, TridentCollector 
collector) {
    -        String topic = null;
    -        for (TridentTuple tuple : tuples) {
    -            try {
    -                topic = topicSelector.getTopic(tuple);
    -
    -                if(topic != null) {
    -                    producer.send(new KeyedMessage(topic, 
mapper.getKeyFromTuple(tuple),
    -                            mapper.getMessageFromTuple(tuple)));
    -                } else {
    -                    LOG.warn("skipping key = " + 
mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
    -                }
    -            } catch (Exception ex) {
    -                String errorMsg = "Could not send message with key = " + 
mapper.getKeyFromTuple(tuple)
    -                        + " to topic = " + topic;
    -                LOG.warn(errorMsg, ex);
    -                throw new FailedException(errorMsg, ex);
    +        for (final TridentTuple tuple : tuples) {
    +            final String topic = topicSelector.getTopic(tuple);
    +            if(topic != null) {
    +                producer.send(new ProducerRecord(topic, 
mapper.getKeyFromTuple(tuple),
    +                        mapper.getMessageFromTuple(tuple)),new Callback() {
    +                            @Override
    --- End diff --
    
    Nit. Line 79 to 88 may only has 4 space indention after Line 77. Also, need 
indention for Line 83.


> Support writing to Kafka streams in Storm SQL
> ---------------------------------------------
>
>                 Key: STORM-1357
>                 URL: https://issues.apache.org/jira/browse/STORM-1357
>             Project: Apache Storm
>          Issue Type: New Feature
>          Components: storm-sql
>            Reporter: Haohui Mai
>            Assignee: Haohui Mai
>
> This jira proposes to add supports to write SQL results to Kafka streams.



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

Reply via email to