lhotari opened a new issue, #24353:
URL: https://github.com/apache/pulsar/issues/24353

   
   ### Discussed in https://github.com/apache/pulsar/discussions/23677
   
   <div type='discussions-op-text'>
   
   <sup>Originally posted by **fabrizziocht** December  4, 2024</sup>
   I was exploring the rabbitMQ source connector 
(https://github.com/apache/pulsar/blob/v4.0.1/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java)
 that implements the class PushSource (it extends AbstractPushSource class), 
   
   I can see that each time a message is collected by the RabbitMQConsumer 
class using the handleDelivery method it invokes the RabbitMQSource consume:
   
https://github.com/apache/pulsar/blob/v4.0.1/pulsar-io/rabbitmq/src/main/java/org/apache/pulsar/io/rabbitmq/RabbitMQSource.java#L99
   
   Then it  stores only in memory in a LinkedBlockingQueue and the ack is sent 
to RabbitMQ:
   
https://github.com/apache/pulsar/blob/master/pulsar-io/core/src/main/java/org/apache/pulsar/io/core/AbstractPushSource.java#L76
   
   What happens if the pulsar IO service is restarted? I can see that the data 
stored in the LinkedBlockingQueue will be lost, because the message was already 
acked and removed in rabbitMQ and also was not still written in the pulsar 
topic. Is it a limitation of the rabbitmq source connector? or all connectors 
that implements the PushSource has the problem? Exists another interface/class 
that can guarantee that the message was written first in the pulsar topic 
before send the ack to rabbitMQ?</div>


-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: commits-unsubscr...@pulsar.apache.org.apache.org

For queries about this service, please contact Infrastructure at:
us...@infra.apache.org

Reply via email to