[
https://issues.apache.org/jira/browse/STORM-1357?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=15033047#comment-15033047
]
ASF GitHub Bot commented on STORM-1357:
---------------------------------------
Github user zhuoliu commented on a diff in the pull request:
https://github.com/apache/storm/pull/906#discussion_r46239106
--- Diff:
external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
@@ -66,33 +64,30 @@ public void commit(Long txid) {
LOG.debug("commit is Noop.");
}
- public void prepare(Map stormConf) {
+ public void prepare(Properties options) {
Validate.notNull(mapper, "mapper can not be null");
Validate.notNull(topicSelector, "topicSelector can not be null");
- Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
- Properties properties = new Properties();
- properties.putAll(configMap);
- ProducerConfig config = new ProducerConfig(properties);
- producer = new Producer(config);
+ producer = new KafkaProducer(options);
}
public void updateState(List<TridentTuple> tuples, TridentCollector
collector) {
- String topic = null;
- for (TridentTuple tuple : tuples) {
- try {
- topic = topicSelector.getTopic(tuple);
-
- if(topic != null) {
- producer.send(new KeyedMessage(topic,
mapper.getKeyFromTuple(tuple),
- mapper.getMessageFromTuple(tuple)));
- } else {
- LOG.warn("skipping key = " +
mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
- }
- } catch (Exception ex) {
- String errorMsg = "Could not send message with key = " +
mapper.getKeyFromTuple(tuple)
- + " to topic = " + topic;
- LOG.warn(errorMsg, ex);
- throw new FailedException(errorMsg, ex);
+ for (final TridentTuple tuple : tuples) {
+ final String topic = topicSelector.getTopic(tuple);
+ if(topic != null) {
+ producer.send(new ProducerRecord(topic,
mapper.getKeyFromTuple(tuple),
+ mapper.getMessageFromTuple(tuple)),new Callback() {
+ @Override
--- End diff --
Nit. Line 79 to 88 may only has 4 space indention after Line 77. Also, need
indention for Line 83.
> Support writing to Kafka streams in Storm SQL
> ---------------------------------------------
>
> Key: STORM-1357
> URL: https://issues.apache.org/jira/browse/STORM-1357
> Project: Apache Storm
> Issue Type: New Feature
> Components: storm-sql
> Reporter: Haohui Mai
> Assignee: Haohui Mai
>
> This jira proposes to add supports to write SQL results to Kafka streams.
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)