Karunam goyal created STORM-3981:
------------------------------------

             Summary: Negative Acknowledge not implemented for Pulsar Storm 
Adapter 
                 Key: STORM-3981
                 URL: https://issues.apache.org/jira/browse/STORM-3981
             Project: Apache Storm
          Issue Type: Bug
            Reporter: Karunam goyal


[https://github.com/apache/storm/blob/a837e6add1fee99115eb426077f6e62fd406eea2/external/storm-pulsar/src/main/java/org/apache/pulsar/storm/PulsarSpout.java]

v2.11.0 pulsar-storm
There is no way to negatively acknowledge the consumer and the registry method 
for DeadLetterPolicy is broken

{{ConsumerConfigurationData<byte[]> subscriptionConfig = new 
ConsumerConfigurationData<>(); 
subscriptionConfig.setSubscriptionInitialPosition(SubscriptionInitialPosition.Earliest);
 subscriptionConfig.setSubscriptionType(SubscriptionType.Shared); 
subscriptionConfig.setDeadLetterPolicy(DeadLetterPolicy.builder() 
.deadLetterTopic(viestiSourceConfig.getDeadLetterTopic()).build());}}
{{PulsarSpoutV2 pulsarSpout = new PulsarSpoutV2( spoutConfiguration, 
((ClientBuilderImpl) createBuilder(viestiSourceConfig)) 
.getClientConfigurationData() .clone(), subscriptionConfig);}}

This above code doesnt stick while creating PulsarSpout.

 
{{static class SpoutConsumer implements PulsarSpoutConsumer {}}
{{private Consumer<byte[]> consumer;}}
{{public SpoutConsumer(Consumer<byte[]> consumer) \{
        this.consumer = consumer;
    }

    public Message<byte[]> receive(int timeout, TimeUnit unit) throws 
PulsarClientException \{
        return this.consumer.receive(timeout, unit);
    }

    public void acknowledgeAsync(Message<?> msg) \{
        this.consumer.acknowledgeAsync(msg);
    }

    public void close() throws PulsarClientException \{
        this.consumer.close();
    }

    public void unsubscribe() throws PulsarClientException \{
        this.consumer.unsubscribe();
    }
}}}
 
Also there is no Mechanism to negativelyAcknowledge a message.

*Expected behavior*

While Setting DeadletterPolicy It should not drop it while serialising. 
Negative Acks support should be there

*Desktop (please complete the following information):*
MacOs Ventura 13.4.1
java version "1.8.0_333"
Java(TM) SE Runtime Environment (build 1.8.0_333-b02)
Java HotSpot(TM) 64-Bit Server VM (build 25.333-b02, mixed mode)
Apache Storm 2.2.1

Trying to consume from Pulsar Topic in Apache Storm



--
This message was sent by Atlassian Jira
(v8.20.10#820010)

Reply via email to