Karunam goyal created STORM-3981:
------------------------------------
Summary: Negative Acknowledge not implemented for Pulsar Storm
Adapter
Key: STORM-3981
URL: https://issues.apache.org/jira/browse/STORM-3981
Project: Apache Storm
Issue Type: Bug
Reporter: Karunam goyal
[https://github.com/apache/storm/blob/a837e6add1fee99115eb426077f6e62fd406eea2/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpout.java]
v2.11.0 pulsar-storm
There is no way to negatively acknowledge the consumer and the registry method
for DeadLetterPolicy is broken
{{ConsumerConfigurationData<byte[]> subscriptionConfig = new
ConsumerConfigurationData<>();
subscriptionConfig.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
subscriptionConfig.setSubscriptionType(SubscriptionType.Shared);
subscriptionConfig.setDeadLetterPolicy(DeadLetterPolicy.builder()
.deadLetterTopic(viestiSourceConfig.getDeadLetterTopic()).build());}}
{{PulsarSpoutV2 pulsarSpout = new PulsarSpoutV2( spoutConfiguration,
((ClientBuilderImpl) createBuilder(viestiSourceConfig))
.getClientConfigurationData() .clone(), subscriptionConfig);}}
This above code doesnt stick while creating PulsarSpout.
{{static class SpoutConsumer implements PulsarSpoutConsumer {}}
{{private Consumer<byte[]> consumer;}}
{{public SpoutConsumer(Consumer<byte[]> consumer) \{
this.consumer = consumer;
}
public Message<byte[]> receive(int timeout, TimeUnit unit) throws
PulsarClientException \{
return this.consumer.receive(timeout, unit);
}
public void acknowledgeAsync(Message<?> msg) \{
this.consumer.acknowledgeAsync(msg);
}
public void close() throws PulsarClientException \{
this.consumer.close();
}
public void unsubscribe() throws PulsarClientException \{
this.consumer.unsubscribe();
}
}}}
Also there is no Mechanism to negativelyAcknowledge a message.
*Expected behavior*
While Setting DeadletterPolicy It should not drop it while serialising.
Negative Acks support should be there
*Desktop (please complete the following information):*
MacOs Ventura 13.4.1
java version "1.8.0_333"
Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
Apache Storm 2.2.1
Trying to consume from Pulsar Topic in Apache Storm
--
This message was sent by Atlassian Jira
(v8.20.10#820010)