[
https://issues.apache.org/jira/browse/STORM-1052?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14979821#comment-14979821
]
ASF GitHub Bot commented on STORM-1052:
---------------------------------------
Github user vesense commented on a diff in the pull request:
https://github.com/apache/storm/pull/743#discussion_r43349321
--- Diff:
external/storm-kafka/src/jvm/storm/kafka/trident/TridentKafkaState.java ---
@@ -72,27 +73,27 @@ public void prepare(Map stormConf) {
Map configMap = (Map) stormConf.get(KAFKA_BROKER_PROPERTIES);
Properties properties = new Properties();
properties.putAll(configMap);
- ProducerConfig config = new ProducerConfig(properties);
- producer = new Producer(config);
+ producer = new KafkaProducer(properties);
}
public void updateState(List<TridentTuple> tuples, TridentCollector
collector) {
- String topic = null;
- for (TridentTuple tuple : tuples) {
- try {
- topic = topicSelector.getTopic(tuple);
-
- if(topic != null) {
- producer.send(new KeyedMessage(topic,
mapper.getKeyFromTuple(tuple),
- mapper.getMessageFromTuple(tuple)));
- } else {
- LOG.warn("skipping key = " +
mapper.getKeyFromTuple(tuple) + ", topic selector returned null.");
- }
- } catch (Exception ex) {
- String errorMsg = "Could not send message with key = " +
mapper.getKeyFromTuple(tuple)
- + " to topic = " + topic;
- LOG.warn(errorMsg, ex);
- throw new FailedException(errorMsg, ex);
+ for (final TridentTuple tuple : tuples) {
+ final String topic = topicSelector.getTopic(tuple);
+ if(topic != null) {
+ producer.send(new ProducerRecord(topic,
mapper.getKeyFromTuple(tuple),
--- End diff --
Sure, we should just call get() method to send. I will fix this. Thx.
> TridentKafkaState uses new Kafka Producer API
> ---------------------------------------------
>
> Key: STORM-1052
> URL: https://issues.apache.org/jira/browse/STORM-1052
> Project: Apache Storm
> Issue Type: Improvement
> Components: storm-kafka
> Reporter: Xin Wang
> Assignee: Xin Wang
>
> use new kafka-clients api
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)