GitHub user williamd1618 opened a pull request:
https://github.com/apache/camel/pull/602
Feature/kafka deferred offset commit
Currently the Camel Kafka connect auto-commits or commits in batch based
upon the countdown of a CyclicBarrier from the varying partitions as they
await. This pull request introduces the ability to defer the commit instead by
barrier but by subsequent Processor invocations (e.g. a message is read, then
transformed, routed to a new topic, and if the Producer is successful then a
subsequent Processor can issue the commit).
```
from().process(new MyTransformer()).to().process(new Processor() {
public void process(Exchange exchange) throws Exception {
ConsumerConnector c =
(Consumer)exchange.getProperty(KafkaConstants.CONSUMER);
if (c != null ) c.commitOffset();
}
})
```
You can merge this pull request into a Git repository by running:
$ git pull https://github.com/williamd1618/camel
feature/kafka-deferred-offset-commit
Alternatively you can review and apply these changes as the patch at:
https://github.com/apache/camel/pull/602.patch
To close this pull request, make a commit to your master/trunk branch
with (at least) the following in the commit message:
This closes #602
----
commit e6403b54ff705bf5912fc1df837b01f9fc230f48
Author: Daniel Williams <[email protected]>
Date: 2015-08-21T20:39:18Z
Fix KafkaProducer URI
Guarantees that if the KafkaEndpoint has a UriParam
of topic the KeyedMessage will be sent to that topic,
if it does not have a topic and the Exchange message
has a header of kafka.TOPIC to use that instead.
commit 581537aa5813b43dc5035f57da1d8a2dc63987e3
Author: Daniel Williams <[email protected]>
Date: 2015-08-26T19:16:08Z
Add ability to defer Kafka offset commits
----
---
If your project is set up for it, you can reply to this email and have your
reply appear on GitHub as well. If your project does not have this feature
enabled and wishes so, or if the feature is enabled but not working, please
contact infrastructure at [email protected] or file a JIRA ticket
with INFRA.
---