[ 
https://issues.apache.org/jira/browse/STORM-405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072292#comment-14072292
 ] 

ASF GitHub Bot commented on STORM-405:
--------------------------------------

Github user ptgoetz commented on a diff in the pull request:

    https://github.com/apache/incubator-storm/pull/195#discussion_r15313891
  
    --- Diff: external/storm-kafka/src/test/storm/kafka/TestUtils.java ---
    @@ -34,4 +49,51 @@ public static GlobalPartitionInformation 
buildPartitionInfo(int numPartitions, i
             return globalPartitionInformation;
         }
     
    +    public static SimpleConsumer getKafkaConsumer(KafkaTestBroker broker) {
    +        BrokerHosts brokerHosts = getBrokerHosts(broker);
    +        KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);
    +        SimpleConsumer simpleConsumer = new SimpleConsumer("localhost", 
broker.getPort(), 60000, 1024, "testClient");
    +        return simpleConsumer;
    +    }
    +
    +    public static KafkaConfig getKafkaConfig(KafkaTestBroker broker) {
    +        BrokerHosts brokerHosts = getBrokerHosts(broker);
    +        KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);
    +        return kafkaConfig;
    +    }
    +
    +    private static BrokerHosts getBrokerHosts(KafkaTestBroker broker) {
    +        GlobalPartitionInformation globalPartitionInformation = new 
GlobalPartitionInformation();
    +        globalPartitionInformation.addPartition(0, 
Broker.fromString(broker.getBrokerConnectionString()));
    +        return new StaticHosts(globalPartitionInformation);
    +    }
    +
    +    public static Config getConfig(String brokerConnectionString) {
    +        Config config = new Config();
    +        Properties props = new Properties();
    +        props.put("metadata.broker.list", brokerConnectionString);
    +        props.put("request.required.acks", "1");
    +        props.put("serializer.class", "kafka.serializer.StringEncoder");
    --- End diff --
    
    Would it make sense to make the serializer class configurable? I can 
imagine cases where users would want to use other serialization schemes as they 
can with the spouts.


> Add kafka trident state so messages can be sent to kafka topics
> ---------------------------------------------------------------
>
>                 Key: STORM-405
>                 URL: https://issues.apache.org/jira/browse/STORM-405
>             Project: Apache Storm (Incubating)
>          Issue Type: Improvement
>    Affects Versions: 0.9.3-incubating
>            Reporter: Parth Brahmbhatt
>            Priority: Minor
>             Fix For: 0.9.3-incubating
>
>
> Currently storm has a bolt for writing to kafka but we have no implementation 
> of trident state. We need a trident state implementation that allows wrting 
> tuples directly to kafka topics as part of trident topology. 



--
This message was sent by Atlassian JIRA
(v6.2#6252)

Reply via email to