[
https://issues.apache.org/activemq/browse/AMQ-852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
]
Rob Davies resolved AMQ-852.
----------------------------
Resolution: Fixed
Fix Version/s: (was: 5.4.0)
5.3.0
> Incorrect handling of disconecting client in ClientAckMode
> ----------------------------------------------------------
>
> Key: AMQ-852
> URL: https://issues.apache.org/activemq/browse/AMQ-852
> Project: ActiveMQ
> Issue Type: Bug
> Components: Broker
> Affects Versions: 4.0.1
> Environment: Linux IA32/RHEL AS 4.0 ( Broker version 4.0.1 ) and
> client program uses OpenWire protocol (C++) CMS devel version.
> Reporter: Radek Sedmak
> Fix For: 5.3.0
>
> Original Estimate: 11 weeks, 1 day, 2 hours
> Remaining Estimate: 11 weeks, 1 day, 2 hours
>
> When you run "my test program" and end this process with Ctrl+C/ kill/ kill
> -9 ( i mostly used Ctrl+C) sometimes during let say 100 iteration
> "communication between client and broker is damaged". This results in
> situation when after restart ( new pid ) client program is unable to create
> producer. call to consumer = rec_session->createConsumer( rec_queue );
> hangs client program. Sometimes this results with exception as you can see
> in output of my program. I think that this only occurs when the session is in
> ClienAckMode. From my point of view is there incorrect handling of situation
> when client receives message from broker and than terminates connection
> without ack/nack this receive in certatin circumstances ...
> output:
> ./amqtest tcp://lxstaflik:61616 PH_Q_IN_10 PH_Q_OUT_10
> Setting connection URL to 'tcp://lxstaflik:61616'
> Setting receive queue name to 'PH_Q_IN_10'
> Setting send queue name to 'PH_Q_OUT_10'
> Init ...
> Creating connection factory ...
> Connection factory created, creating connection ...
> Connection created, creating receive session...
> receive session created, creating sending session ...
> send session created, creating receive queue...'PH_Q_IN_10'
> receive queue created, creating consumer for this queue ...
> setting listener ...
> Error on connection Unmarshal failed; unknown data structure type 49, at
> src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp line
> 711Exiting read loop due to exception: Unmarshal failed; unknown data
> structure type 49, at
> src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp line 711
> Here is source code of my example:
> #include <stdio.h>
> #include <unistd.h>
> #include <exception>
> #include <iostream>
> #include <map>
> #include <string>
> #include "cms/IConnection.hpp"
> #include "cms/IConnectionFactory.hpp"
> #include "activemq/ConnectionFactory.hpp"
> #include "activemq/Connection.hpp"
> #include "activemq/Session.hpp"
> #include "ppr/TraceException.hpp"
> #include "ppr/net/Uri.hpp"
> #include "ppr/util/ifr/p"
> using namespace apache::activemq;
> using namespace apache::cms;
> using namespace apache::ppr;
> using namespace apache::ppr::net;
> using namespace ifr;
> using namespace std;
> class ActiveMQTest : public IExceptionListener, public IMessageListener {
> private:
> p<Uri> uri;
> p<IConnectionFactory> factory;
> p<IConnection> connection;
> p<ISession> rec_session;
> p<ISession> snd_session;
> p<IQueue> rec_queue;
> p<IQueue> snd_queue;
> p<IMessageProducer> producer;
> p<IMessageConsumer> consumer;
> p<ITextMessage> txtmsg;
> char szRecQueue[128];
> char szSndQueue[128];
> public:
> ActiveMQTest();
> virtual ~ActiveMQTest();
> virtual void setUri(const char *);
> virtual void init();
> virtual void done();
> virtual void onException(exception& error);
> virtual void onMessage(p<IMessage> message);
> virtual void ActiveMQTest::setSndQueue(const char *szQueue);
> virtual void ActiveMQTest::setRecQueue(const char *szQueue);
> };
> void ActiveMQTest::onMessage( p<IMessage> message) {
> p<ITextMessage> snd_message;
> this->txtmsg = p_dyncast<ITextMessage>(message);
> p<string> string_request = txtmsg->getText();
> if (string_request != NULL ) {
> printf("Received message : %s",string_request->c_str());
> }
> sleep(10);
> message->acknowledge();
> snd_message = snd_session->createTextMessage() ;
> snd_message->setText("TEST\n") ;
> snd_message->setJMSPersistent(1);
> // Send message
> producer->send(message) ;
> }
> void ActiveMQTest::done() {
> rec_session->close();
> snd_session->close();
> }
> void ActiveMQTest::setUri(const char * uri) {
> this->uri = new Uri(uri) ;
> }
> void ActiveMQTest::setSndQueue(const char *szQueue) {
> strcpy(szSndQueue,szQueue);
> }
> void ActiveMQTest::setRecQueue(const char *szQueue) {
> strcpy(szRecQueue,szQueue);
> }
> ActiveMQTest::ActiveMQTest() {
> this->connection = NULL;
> this->rec_session = NULL;
> this->snd_session = NULL;
> memset(szRecQueue,0x0,sizeof(szRecQueue));
> memset(szSndQueue,0x0,sizeof(szSndQueue));
> }
> ActiveMQTest::~ActiveMQTest() {
> }
> void ActiveMQTest::init() {
> try {
> cout.rdbuf(cerr.rdbuf());
> printf("1 Creating connection factory ... \n");
> factory = new ConnectionFactory( uri );
> printf("Connection factory created, creating connection ...\n");
> connection = factory->createConnection();
> printf("Connection created, creating receive session...\n");
> p_cast<Connection>(connection)->setExceptionListener( smartify( this ) );
> //rec_session = connection->createSession(AutoAckMode);
> rec_session = connection->createSession(ClientAckMode);
> printf("receive session created, creating sending session ...\n");
> snd_session = connection->createSession(AutoAckMode);
> printf("send session created, creating receive
> queue...'%s'\n",szRecQueue);
> rec_queue = rec_session->getQueue( szRecQueue );
> printf("receive queue created, creating consumer for this queue ...\n");
> consumer = rec_session->createConsumer( rec_queue );
> printf("setting listener ...\n");
> snd_queue = snd_session->getQueue( szSndQueue );
> consumer->setMessageListener( smartify( this ) );
> producer = snd_session->createProducer( snd_queue );
> (p_dyncast<apache::activemq::Session>(rec_session))->dispatch(0);
> printf("Init Ok.\n");
> } catch ( TraceException& e ) {
> printf("Error during init ... \n");
> }
> }
> void ActiveMQTest::onException( exception& error ) {
> printf("Error on connection %s", error.what() );
> }
> int main(int argc,char *argv[]) {
> ActiveMQTest* myAq;
> if ( argc < 3 ) {
> printf("Usage: amqtest <connection_url> <receive_queue> <send_queue>\n");
> exit(0);
> }
> myAq = new ActiveMQTest;
> printf("Setting connection URL to '%s'\n",argv[1]);
> myAq->setUri(argv[1]);
> printf("Setting receive queue name to '%s'\n",argv[2]);
> myAq->setRecQueue( argv[2] );
> printf("Setting send queue name to '%s'\n",argv[3]);
> myAq->setSndQueue( argv[3] );
> printf("Init ...\n");
> myAq->init();
> int i=0;
> while( i < 3000 ) {
> printf("Idle ... \n");
> sleep( 1 );
> i++;
> }
> myAq->done();
> return 0;
> }
--
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.