[
https://issues.apache.org/jira/browse/PROTON-1917?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Jeremy updated PROTON-1917:
---------------------------
Description:
I have a running broker, and a configured queue containing messages.
I also have a consumer, where I configured the max number of attempts to 0
(infinite retry).
I kill the broker (ctrl-c) and start it on the same port.
Upon reconnection, I get the following error message randomly:
{code:java}
receive_with_retry on_connection_open
receive_with_retry: on_error: amqp:not-found: Virtual host 'localhost' is not
active
main: end{code}
In the case where the consumer is able to connect, the consumer continues to
consume the messages normally.
However, in the broker web interface, I see upon each re-connection, an extra
connection (same ip and port) to the queue. As if, the old connection is not
killed. I wasn't expecting this behavior. This might be a separate issue.
I was able to reproduce with the following code, on windows 7 (msvc 12 2013)
{code:java}
class receive_with_retry : public proton::messaging_handler {
private:
std::string url;
std::string queueName;
public:
receive_with_retry(const std::string &u, const std::string& q) : url(u),
queueName(q) {}
void on_container_start(proton::container &c) override {
std::cout << "receive_with_retry on_container_start" << std::endl;
c.connect(
url,
proton::connection_options()
.idle_timeout(proton::duration(2000))
.reconnect(proton::reconnect_options()
.max_attempts(0)
.delay(proton::duration(3000))
.delay_multiplier(1)
.max_delay(proton::duration(3001))));
}
void on_connection_open(proton::connection& c) override {
std::cout << "receive_with_retry on_connection_open " << std::endl;
c.open_receiver(queueName, proton::receiver_options().auto_accept(false));
}
void on_session_open(proton::session& session) override {
std::cout << "receive_with_retry on_session_open " << std::endl;
}
void on_receiver_open(proton::receiver& receiver) override {
std::cout << "receive_with_retry on_receiver_open " << std::endl;
receiver.open();
}
void on_message(proton::delivery& delivery, proton::message &message)
override {
std::cout << "receive_with_retry on_message " << message.body() <<
std::endl;
// Can be used for throttling
// std::this_thread::sleep_for(std::chrono::milliseconds(100));
// commented out in order not to exit immediately, but continue on
consuming the messages.
// delivery.receiver().close();
// delivery.receiver().connection().close();
}
void on_transport_error(proton::transport& error) override {
std::cout << "receive_with_retry: on_transport_error: " <<
error.error().what() << std::endl;
error.connection().close();
}
void on_error(const proton::error_condition& error) override {
std::cout << "receive_with_retry: on_error: " << error.what() <<
std::endl;
}
};
void receiveWithRetry(const std::string& url, const std::string& queueName){
try {
std::cout << "main: start" << std::endl;
receive_with_retry receiveWithRetry(url, queueName);
proton::container(receiveWithRetry).run();
std::cout << "main: end" << std::endl;
}
catch (const std::exception& cause) {
std::cout << "main: caught exception: " << cause.what() << std::endl;
}
}
int main() {
try {
receiveWithRetry("amqp://localhost:5673", "test_queue");
return 0;
}
catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
{code}
was:
I have a running broker, and a configured queue containing messages.
I also have a consumer, where I configured the max number of attempts to 0
(infinite retry).
I kill the broker (ctrl-c) and start it on the same port.
Upon reconnection, I get the following error message randomly:
{code:java}
receive_with_retry on_connection_open
receive_with_retry: on_error: amqp:not-found: Virtual host 'localhost' is not
active
main: end{code}
In the case where the consumer is able to connect, the consumer continues to
consume the messages normally.
However, in the broker web interface, I see upon each re-connection, an extra
connection (same ip and port) to the queue. As if, the old connection is not
killed. I wasn't expecting this behavior. This might be a separate issue.
I was able to reproduce with the following code, on windows 7 (msvc 12 2013)
{code:java}
class receive_with_retry : public proton::messaging_handler {
private:
std::string url;
std::string queueName;
public:
receive_with_retry(const std::string &u, const std::string& q) : url(u),
queueName(q) {}
void on_container_start(proton::container &c) override {
std::cout << "receive_with_retry on_container_start" << std::endl;
c.connect(
url,
proton::connection_options()
.idle_timeout(proton::duration(2000))
.reconnect(proton::reconnect_options()
.max_attempts(0)
.delay(proton::duration(3000))
.delay_multiplier(1)
.max_delay(proton::duration(3001))));
}
void on_connection_open(proton::connection& c) override {
std::cout << "receive_with_retry on_connection_open " << std::endl;
c.open_receiver(queueName, proton::receiver_options().auto_accept(false));
}
void on_session_open(proton::session& session) override {
std::cout << "receive_with_retry on_session_open " << std::endl;
}
void on_receiver_open(proton::receiver& receiver) override {
std::cout << "receive_with_retry on_receiver_open " << std::endl;
receiver.open();
}
void on_message(proton::delivery& delivery, proton::message &message)
override {
std::cout << "receive_with_retry on_message " << message.body() <<
std::endl;
// commented out in order not to exit immediately, but continue on
consuming the messages.
// delivery.receiver().close();
// delivery.receiver().connection().close();
}
void on_transport_error(proton::transport& error) override {
std::cout << "receive_with_retry: on_transport_error: " <<
error.error().what() << std::endl;
error.connection().close();
}
void on_error(const proton::error_condition& error) override {
std::cout << "receive_with_retry: on_error: " << error.what() <<
std::endl;
}
};
void receiveWithRetry(const std::string& url, const std::string& queueName){
try {
std::cout << "main: start" << std::endl;
receive_with_retry receiveWithRetry(url, queueName);
proton::container(receiveWithRetry).run();
std::cout << "main: end" << std::endl;
}
catch (const std::exception& cause) {
std::cout << "main: caught exception: " << cause.what() << std::endl;
}
}
int main() {
try {
receiveWithRetry("amqp://localhost:5673", "test_queue");
return 0;
}
catch (const std::exception& e) {
std::cerr << e.what() << std::endl;
}
return 1;
}
{code}
> [proton-c] the c++ proton consumer with retry should continue to retry if
> virtual host not active
> -------------------------------------------------------------------------------------------------
>
> Key: PROTON-1917
> URL: https://issues.apache.org/jira/browse/PROTON-1917
> Project: Qpid Proton
> Issue Type: Bug
> Components: cpp-binding
> Affects Versions: proton-c-0.22.0
> Reporter: Jeremy
> Priority: Major
>
> I have a running broker, and a configured queue containing messages.
> I also have a consumer, where I configured the max number of attempts to 0
> (infinite retry).
> I kill the broker (ctrl-c) and start it on the same port.
> Upon reconnection, I get the following error message randomly:
> {code:java}
> receive_with_retry on_connection_open
> receive_with_retry: on_error: amqp:not-found: Virtual host 'localhost' is not
> active
> main: end{code}
> In the case where the consumer is able to connect, the consumer continues to
> consume the messages normally.
> However, in the broker web interface, I see upon each re-connection, an extra
> connection (same ip and port) to the queue. As if, the old connection is not
> killed. I wasn't expecting this behavior. This might be a separate issue.
> I was able to reproduce with the following code, on windows 7 (msvc 12 2013)
> {code:java}
> class receive_with_retry : public proton::messaging_handler {
> private:
> std::string url;
> std::string queueName;
> public:
> receive_with_retry(const std::string &u, const std::string& q) : url(u),
> queueName(q) {}
> void on_container_start(proton::container &c) override {
> std::cout << "receive_with_retry on_container_start" << std::endl;
> c.connect(
> url,
> proton::connection_options()
> .idle_timeout(proton::duration(2000))
> .reconnect(proton::reconnect_options()
> .max_attempts(0)
> .delay(proton::duration(3000))
> .delay_multiplier(1)
> .max_delay(proton::duration(3001))));
> }
> void on_connection_open(proton::connection& c) override {
> std::cout << "receive_with_retry on_connection_open " << std::endl;
> c.open_receiver(queueName,
> proton::receiver_options().auto_accept(false));
> }
> void on_session_open(proton::session& session) override {
> std::cout << "receive_with_retry on_session_open " << std::endl;
> }
> void on_receiver_open(proton::receiver& receiver) override {
> std::cout << "receive_with_retry on_receiver_open " << std::endl;
> receiver.open();
> }
> void on_message(proton::delivery& delivery, proton::message &message)
> override {
> std::cout << "receive_with_retry on_message " << message.body() <<
> std::endl;
> // Can be used for throttling
> // std::this_thread::sleep_for(std::chrono::milliseconds(100));
> // commented out in order not to exit immediately, but continue on
> consuming the messages.
> // delivery.receiver().close();
> // delivery.receiver().connection().close();
> }
> void on_transport_error(proton::transport& error) override {
> std::cout << "receive_with_retry: on_transport_error: " <<
> error.error().what() << std::endl;
> error.connection().close();
> }
> void on_error(const proton::error_condition& error) override {
> std::cout << "receive_with_retry: on_error: " << error.what() <<
> std::endl;
> }
> };
> void receiveWithRetry(const std::string& url, const std::string& queueName){
> try {
> std::cout << "main: start" << std::endl;
> receive_with_retry receiveWithRetry(url, queueName);
> proton::container(receiveWithRetry).run();
> std::cout << "main: end" << std::endl;
> }
> catch (const std::exception& cause) {
> std::cout << "main: caught exception: " << cause.what() << std::endl;
> }
> }
> int main() {
> try {
> receiveWithRetry("amqp://localhost:5673", "test_queue");
> return 0;
> }
> catch (const std::exception& e) {
> std::cerr << e.what() << std::endl;
> }
> return 1;
> }
> {code}
>
--
This message was sent by Atlassian JIRA
(v7.6.3#76005)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]