[
https://issues.apache.org/jira/browse/STORM-405?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14072292#comment-14072292
]
ASF GitHub Bot commented on STORM-405:
--------------------------------------
Github user ptgoetz commented on a diff in the pull request:
https://github.com/apache/incubator-storm/pull/195#discussion_r15313891
--- Diff: external/storm-kafka/src/test/storm/kafka/TestUtils.java ---
@@ -34,4 +49,51 @@ public static GlobalPartitionInformation
buildPartitionInfo(int numPartitions, i
return globalPartitionInformation;
}
+ public static SimpleConsumer getKafkaConsumer(KafkaTestBroker broker) {
+ BrokerHosts brokerHosts = getBrokerHosts(broker);
+ KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);
+ SimpleConsumer simpleConsumer = new SimpleConsumer("localhost",
broker.getPort(), 60000, 1024, "testClient");
+ return simpleConsumer;
+ }
+
+ public static KafkaConfig getKafkaConfig(KafkaTestBroker broker) {
+ BrokerHosts brokerHosts = getBrokerHosts(broker);
+ KafkaConfig kafkaConfig = new KafkaConfig(brokerHosts, TOPIC);
+ return kafkaConfig;
+ }
+
+ private static BrokerHosts getBrokerHosts(KafkaTestBroker broker) {
+ GlobalPartitionInformation globalPartitionInformation = new
GlobalPartitionInformation();
+ globalPartitionInformation.addPartition(0,
Broker.fromString(broker.getBrokerConnectionString()));
+ return new StaticHosts(globalPartitionInformation);
+ }
+
+ public static Config getConfig(String brokerConnectionString) {
+ Config config = new Config();
+ Properties props = new Properties();
+ props.put("metadata.broker.list", brokerConnectionString);
+ props.put("request.required.acks", "1");
+ props.put("serializer.class", "kafka.serializer.StringEncoder");
--- End diff --
Would it make sense to make the serializer class configurable? I can
imagine cases where users would want to use other serialization schemes as they
can with the spouts.
> Add kafka trident state so messages can be sent to kafka topics
> ---------------------------------------------------------------
>
> Key: STORM-405
> URL: https://issues.apache.org/jira/browse/STORM-405
> Project: Apache Storm (Incubating)
> Issue Type: Improvement
> Affects Versions: 0.9.3-incubating
> Reporter: Parth Brahmbhatt
> Priority: Minor
> Fix For: 0.9.3-incubating
>
>
> Currently storm has a bolt for writing to kafka but we have no implementation
> of trident state. We need a trident state implementation that allows wrting
> tuples directly to kafka topics as part of trident topology.
--
This message was sent by Atlassian JIRA
(v6.2#6252)