[
https://issues.apache.org/jira/browse/STORM-3981?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Richard Zowalla closed STORM-3981.
----------------------------------
Resolution: Won't Fix
Storm Pulsar will be removed in the next release of Storm. See the related
discussion on the mailing list
> Negative Acknowledge not implemented for Pulsar Storm Adapter
> --------------------------------------------------------------
>
> Key: STORM-3981
> URL: https://issues.apache.org/jira/browse/STORM-3981
> Project: Apache Storm
> Issue Type: Bug
> Reporter: Karunam goyal
> Priority: Major
>
> [https://github.com/apache/storm/blob/a837e6add1fee99115eb426077f6e62fd406eea2/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpout.java]
> v2.11.0 pulsar-storm
> There is no way to negatively acknowledge the consumer and the registry
> method for DeadLetterPolicy is broken
> {{ConsumerConfigurationData<byte[]> subscriptionConfig = new
> ConsumerConfigurationData<>();
> subscriptionConfig.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
> subscriptionConfig.setSubscriptionType(SubscriptionType.Shared);
> subscriptionConfig.setDeadLetterPolicy(DeadLetterPolicy.builder()
> .deadLetterTopic(viestiSourceConfig.getDeadLetterTopic()).build());}}
> {{PulsarSpoutV2 pulsarSpout = new PulsarSpoutV2( spoutConfiguration,
> ((ClientBuilderImpl) createBuilder(viestiSourceConfig))
> .getClientConfigurationData() .clone(), subscriptionConfig);}}
> This above code doesnt stick while creating PulsarSpout.
>
> {{static class SpoutConsumer implements PulsarSpoutConsumer {}}
> {{private Consumer<byte[]> consumer;}}
> {{public SpoutConsumer(Consumer<byte[]> consumer) \{
> this.consumer = consumer;
> }
> public Message<byte[]> receive(int timeout, TimeUnit unit) throws
> PulsarClientException \{
> return this.consumer.receive(timeout, unit);
> }
> public void acknowledgeAsync(Message<?> msg) \{
> this.consumer.acknowledgeAsync(msg);
> }
> public void close() throws PulsarClientException \{
> this.consumer.close();
> }
> public void unsubscribe() throws PulsarClientException \{
> this.consumer.unsubscribe();
> }
> }}}
>
> Also there is no Mechanism to negativelyAcknowledge a message.
> *Expected behavior*
> While Setting DeadletterPolicy It should not drop it while serialising.
> Negative Acks support should be there
> *Desktop (please complete the following information):*
> MacOs Ventura 13.4.1
> java version "1.8.0_333"
> Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
> Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
> Apache Storm 2.2.1
> Trying to consume from Pulsar Topic in Apache Storm
--
This message was sent by Atlassian Jira
(v8.20.10#820010)