[ 
https://issues.apache.org/activemq/browse/AMQ-852?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Rob Davies resolved AMQ-852.
----------------------------

       Resolution: Fixed
    Fix Version/s:     (was: 5.4.0)
                   5.3.0

> Incorrect handling of disconecting client in ClientAckMode
> ----------------------------------------------------------
>
>                 Key: AMQ-852
>                 URL: https://issues.apache.org/activemq/browse/AMQ-852
>             Project: ActiveMQ
>          Issue Type: Bug
>          Components: Broker
>    Affects Versions: 4.0.1
>         Environment: Linux IA32/RHEL AS 4.0 ( Broker version 4.0.1 ) and 
> client program uses OpenWire protocol (C++) CMS devel version. 
>            Reporter: Radek Sedmak
>             Fix For: 5.3.0
>
>   Original Estimate: 11 weeks, 1 day, 2 hours
>  Remaining Estimate: 11 weeks, 1 day, 2 hours
>
> When you run "my test program" and end this process  with Ctrl+C/ kill/ kill 
> -9 ( i mostly used Ctrl+C) sometimes during let say 100 iteration 
> "communication between client and broker is damaged". This results in 
> situation when after restart ( new pid ) client program is unable to create 
> producer.  call to   consumer = rec_session->createConsumer( rec_queue );
>  hangs client program. Sometimes this results with exception as you can see 
> in output of my program. I think that this only occurs when the session is in 
> ClienAckMode. From my point of view is there incorrect handling of situation 
> when client receives message from broker and than terminates connection 
> without ack/nack this receive in certatin circumstances ...
> output:
>  ./amqtest tcp://lxstaflik:61616 PH_Q_IN_10 PH_Q_OUT_10
> Setting connection URL to 'tcp://lxstaflik:61616'
> Setting receive queue name to 'PH_Q_IN_10'
> Setting send queue name to 'PH_Q_OUT_10'
> Init ...
> Creating connection factory ...
> Connection factory created, creating connection ...
> Connection created, creating receive session...
> receive session created, creating sending session ...
> send session created, creating receive queue...'PH_Q_IN_10'
> receive queue created, creating consumer for this queue ...
> setting listener ...
> Error on connection Unmarshal failed; unknown data structure type 49, at 
> src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp line 
> 711Exiting read loop due to exception: Unmarshal failed; unknown data 
> structure type 49, at 
> src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp line 711
> Here is source code of my example:
> #include <stdio.h>
> #include <unistd.h>
> #include <exception>
> #include <iostream>
> #include <map>
> #include <string>
> #include "cms/IConnection.hpp"
> #include "cms/IConnectionFactory.hpp"
> #include "activemq/ConnectionFactory.hpp"
> #include "activemq/Connection.hpp"
> #include "activemq/Session.hpp"
> #include "ppr/TraceException.hpp"
> #include "ppr/net/Uri.hpp"
> #include "ppr/util/ifr/p"
> using namespace apache::activemq;
> using namespace apache::cms;
> using namespace apache::ppr;
> using namespace apache::ppr::net;
> using namespace ifr;
> using namespace std;
> class ActiveMQTest : public IExceptionListener, public IMessageListener {
>   private:
>         p<Uri>                  uri;
>         p<IConnectionFactory>   factory;
>         p<IConnection>          connection;
>         p<ISession>             rec_session;
>         p<ISession>             snd_session;
>         p<IQueue>               rec_queue;
>         p<IQueue>               snd_queue;
>         p<IMessageProducer>     producer;
>         p<IMessageConsumer>     consumer;
>         p<ITextMessage>         txtmsg;
>         char szRecQueue[128];
>         char szSndQueue[128];
>   public:
>         ActiveMQTest();
>         virtual ~ActiveMQTest();
>         virtual void setUri(const char *);
>         virtual void init();
>         virtual void done();
>         virtual void onException(exception& error);
>         virtual void onMessage(p<IMessage> message);
>         virtual void ActiveMQTest::setSndQueue(const char *szQueue);
>         virtual void ActiveMQTest::setRecQueue(const char *szQueue);
> };
> void ActiveMQTest::onMessage( p<IMessage> message) {
>    p<ITextMessage>      snd_message;
>    this->txtmsg = p_dyncast<ITextMessage>(message);
>    p<string> string_request = txtmsg->getText();
>    if (string_request != NULL ) {
>      printf("Received message : %s",string_request->c_str());
>    }
>    sleep(10);
>    message->acknowledge();
>    snd_message = snd_session->createTextMessage() ;
>    snd_message->setText("TEST\n") ;
>    snd_message->setJMSPersistent(1);
>    // Send message
>    producer->send(message) ;
> }
> void ActiveMQTest::done() {
>    rec_session->close();
>    snd_session->close();
> }
> void ActiveMQTest::setUri(const char * uri) {
>     this->uri = new Uri(uri) ;
> }
> void ActiveMQTest::setSndQueue(const char *szQueue) {
>   strcpy(szSndQueue,szQueue);
> }
> void ActiveMQTest::setRecQueue(const char *szQueue) {
>   strcpy(szRecQueue,szQueue);
> }
> ActiveMQTest::ActiveMQTest() {
>   this->connection = NULL;
>   this->rec_session = NULL;
>   this->snd_session = NULL;
>   memset(szRecQueue,0x0,sizeof(szRecQueue));
>   memset(szSndQueue,0x0,sizeof(szSndQueue));
> }
> ActiveMQTest::~ActiveMQTest() {
> }
> void ActiveMQTest::init() {
>   try {
>     cout.rdbuf(cerr.rdbuf());
>     printf("1 Creating connection factory ... \n");
>     factory = new ConnectionFactory( uri );
>     printf("Connection factory created, creating connection ...\n");
>     connection = factory->createConnection();
>     printf("Connection created, creating receive session...\n");
>     p_cast<Connection>(connection)->setExceptionListener( smartify( this ) );
>     //rec_session = connection->createSession(AutoAckMode);
>     rec_session = connection->createSession(ClientAckMode);
>     printf("receive session created, creating sending session ...\n");
>     snd_session = connection->createSession(AutoAckMode);
>     printf("send session created, creating receive 
> queue...'%s'\n",szRecQueue);
>     rec_queue = rec_session->getQueue( szRecQueue );
>     printf("receive queue created, creating consumer for this queue ...\n");
>     consumer = rec_session->createConsumer( rec_queue );
>     printf("setting listener ...\n");
>     snd_queue = snd_session->getQueue( szSndQueue );
>     consumer->setMessageListener( smartify( this ) );
>     producer = snd_session->createProducer( snd_queue );
>     (p_dyncast<apache::activemq::Session>(rec_session))->dispatch(0);
>     printf("Init Ok.\n");
>   } catch ( TraceException& e ) {
>     printf("Error during init  ... \n");
>   }
> }
> void ActiveMQTest::onException( exception& error ) {
>   printf("Error on connection %s", error.what() );
> }
> int main(int argc,char *argv[]) {
>   ActiveMQTest* myAq;
>   if ( argc < 3 ) {
>     printf("Usage: amqtest <connection_url> <receive_queue> <send_queue>\n");
>     exit(0);
>   }
>   myAq = new ActiveMQTest;
>   printf("Setting connection URL to '%s'\n",argv[1]);
>   myAq->setUri(argv[1]);
>   printf("Setting receive queue name to '%s'\n",argv[2]);
>   myAq->setRecQueue( argv[2] );
>   printf("Setting send queue name to '%s'\n",argv[3]);
>   myAq->setSndQueue( argv[3] );
>   printf("Init ...\n");
>   myAq->init();
> int i=0;
>   while( i < 3000 ) {
>     printf("Idle ... \n");
>     sleep( 1 );
>     i++;
>   }
>   myAq->done();
>   return 0;
> }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to