[
https://issues.apache.org/jira/browse/STORM-826?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=14597991#comment-14597991
]
ASF GitHub Bot commented on STORM-826:
--------------------------------------
Github user tgravescs commented on a diff in the pull request:
https://github.com/apache/storm/pull/572#discussion_r33066884
--- Diff: external/storm-kafka/src/jvm/storm/kafka/bolt/KafkaBolt.java ---
@@ -102,12 +105,40 @@ public void execute(Tuple input) {
key = mapper.getKeyFromTuple(input);
message = mapper.getMessageFromTuple(input);
topic = topicSelector.getTopic(input);
- if(topic != null ) {
- producer.send(new KeyedMessage<K, V>(topic, key, message));
+ if (topic != null ) {
+ Callback callback = null;
+
+ if (!fireAndForget && async) {
+ callback = new Callback() {
+ @Override
+ public void onCompletion(RecordMetadata ignored,
Exception e) {
+ synchronized(collector) {
+ if (e != null) {
+ collector.reportError(e);
+ collector.fail(input);
+ } else {
+ collector.ack(input);
+ }
+ }
+ }
+ };
+ }
+ Future<RecordMetadata> result = producer.send(new
ProducerRecord<K, V>(topic, key, message), callback);
+ if (!async) {
+ try {
+ result.get();
+ collector.ack(input);
+ } catch (ExecutionException err) {
+ collector.reportError(err);
+ collector.fail(input);
+ }
+ } else if (fireAndForget) {
--- End diff --
where did the fireAndForget option come from? Did someone request this?
I think the user can essentially set this by setting the "acks" kafka
producer config. Although looking I'm not sure that is exposed here. It seems
like this Bolt would be more useful if we allowed the user to producer configs.
I guess we can split that out into another jira though.
> As a storm developer I’d like to use the new kafka producer API to reduce
> dependencies and use long term supported kafka apis
> ------------------------------------------------------------------------------------------------------------------------------
>
> Key: STORM-826
> URL: https://issues.apache.org/jira/browse/STORM-826
> Project: Apache Storm
> Issue Type: Story
> Components: storm-kafka
> Reporter: Thomas Becker
> Assignee: Zhuo Liu
>
--
This message was sent by Atlassian JIRA
(v6.3.4#6332)