AnuragReddy2000 opened a new pull request, #60: URL: https://github.com/apache/pulsar-adapters/pull/60
Fixes #53 ### Motivation The current implementation of the Storm adaptor does not utilise the dead letter queue functionality of Pulsar. Often it may be desired that when there is some error while processing a message in the Storm topology, the message be moved into the dead letter queue so that it can be handled separately. But the PulsarSpout currently keeps retrying for a while & finally just drops the message by acking it, which leads to data loss. This PR seeks to allow users of PulsarSpout to opt in to using DLQ queues for message processing failures. ### Modifications 1. Added a method `negativeAcknowledge` to the `PulsarSpoutConsumer` interface & added the implementation for the same in the `SpoutConsumer` class. 2. Added a boolean attribute `negativeAckFailedMessages` in the `PulsarSpoutConfiguration` class along with getter & setters for the same. 3. Added a method called `negativeAck` in the `PulsarSpout` class to negatively ack messages and send them to the DLQ. 4. Modified the `fail` method in the `PulsarSpout` class to negatively acknowledge messages when `negativeAckFailedMessages` is set to `true` 5. Modified the `mapToValueAndEmit` method to throw an exception when mapping to the value fails. 6. Updated the version of Pulsar client to `4.0.2`. This is required for this fix as mentioned in the following PR https://github.com/apache/pulsar/pull/23718 ### Verifying this change - [ ] Make sure that the change passes the CI checks. This change added tests and can be verified as follows: Tested the cases where `negativeAckFailedMessages` is set to `true` and `false` in the `PulsarSpoutTest` test class. ### Does this pull request potentially affect one of the following parts: *If `yes` was chosen, please highlight the changes* - Dependencies (does it add or upgrade a dependency): yes - The public API: yes - The schema: don't know - The default values of configurations: no - The wire protocol: no - The rest endpoints: no - The admin cli options: bo - Anything that affects deployment: don't know ### Documentation - Does this pull request introduce a new feature? - yes - If yes, how is the feature documented? - I couldn't find the documentation for the pulsar storm adaptor. But I have however, added the javadoc comments to explain the changes wherever necessary. The corresponding PR in my fork: https://github.com/AnuragReddy2000/pulsar-adapters/pull/2 -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected]
