Incorrect handling of disconecting client in ClientAckMode 
-----------------------------------------------------------

                 Key: AMQ-852
                 URL: https://issues.apache.org/activemq/browse/AMQ-852
             Project: ActiveMQ
          Issue Type: Bug
          Components: Broker
    Affects Versions: 4.0.1
         Environment: Linux IA32/RHEL AS 4.0 ( Broker version 4.0.1 ) and 
client program uses OpenWire protocol (C++) CMS devel version. 
            Reporter: Radek Sedmak


When you run "my test program" and end this process  with Ctrl+C/ kill/ kill -9 
( i mostly used Ctrl+C) sometimes during let say 100 iteration "communication 
between client and broker is damaged". This results in situation when after 
restart ( new pid ) client program is unable to create producer.  call to   
consumer = rec_session->createConsumer( rec_queue );
 hangs client program. Sometimes this results with exception as you can see in 
output of my program. I think that this only occurs when the session is in 
ClienAckMode. From my point of view is there incorrect handling of situation 
when client receives message from broker and than terminates connection without 
ack/nack this receive in certatin circumstances ...

output:
 ./amqtest tcp://lxstaflik:61616 PH_Q_IN_10 PH_Q_OUT_10
Setting connection URL to 'tcp://lxstaflik:61616'
Setting receive queue name to 'PH_Q_IN_10'
Setting send queue name to 'PH_Q_OUT_10'
Init ...
Creating connection factory ...
Connection factory created, creating connection ...
Connection created, creating receive session...
receive session created, creating sending session ...
send session created, creating receive queue...'PH_Q_IN_10'
receive queue created, creating consumer for this queue ...
setting listener ...
Error on connection Unmarshal failed; unknown data structure type 49, at 
src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp line 711Exiting 
read loop due to exception: Unmarshal failed; unknown data structure type 49, 
at src/main/cpp/activemq/protocol/openwire/OpenWireMarshaller.cpp line 711


Here is source code of my example:
#include <stdio.h>
#include <unistd.h>

#include <exception>
#include <iostream>
#include <map>
#include <string>

#include "cms/IConnection.hpp"
#include "cms/IConnectionFactory.hpp"
#include "activemq/ConnectionFactory.hpp"
#include "activemq/Connection.hpp"
#include "activemq/Session.hpp"
#include "ppr/TraceException.hpp"
#include "ppr/net/Uri.hpp"
#include "ppr/util/ifr/p"

using namespace apache::activemq;
using namespace apache::cms;
using namespace apache::ppr;
using namespace apache::ppr::net;
using namespace ifr;
using namespace std;


class ActiveMQTest : public IExceptionListener, public IMessageListener {
  private:
        p<Uri>                  uri;
        p<IConnectionFactory>   factory;
        p<IConnection>          connection;
        p<ISession>             rec_session;
        p<ISession>             snd_session;
        p<IQueue>               rec_queue;
        p<IQueue>               snd_queue;
        p<IMessageProducer>     producer;
        p<IMessageConsumer>     consumer;
        p<ITextMessage>         txtmsg;
        char szRecQueue[128];
        char szSndQueue[128];
  public:
        ActiveMQTest();
        virtual ~ActiveMQTest();
        virtual void setUri(const char *);
        virtual void init();
        virtual void done();
        virtual void onException(exception& error);
        virtual void onMessage(p<IMessage> message);
        virtual void ActiveMQTest::setSndQueue(const char *szQueue);
        virtual void ActiveMQTest::setRecQueue(const char *szQueue);
};

void ActiveMQTest::onMessage( p<IMessage> message) {
   p<ITextMessage>      snd_message;

   this->txtmsg = p_dyncast<ITextMessage>(message);

   p<string> string_request = txtmsg->getText();

   if (string_request != NULL ) {
     printf("Received message : %s",string_request->c_str());
   }

   sleep(10);

   message->acknowledge();

   snd_message = snd_session->createTextMessage() ;
   snd_message->setText("TEST\n") ;
   snd_message->setJMSPersistent(1);
   // Send message
   producer->send(message) ;
}

void ActiveMQTest::done() {
   rec_session->close();
   snd_session->close();
}

void ActiveMQTest::setUri(const char * uri) {
    this->uri = new Uri(uri) ;
}

void ActiveMQTest::setSndQueue(const char *szQueue) {
  strcpy(szSndQueue,szQueue);
}

void ActiveMQTest::setRecQueue(const char *szQueue) {
  strcpy(szRecQueue,szQueue);
}

ActiveMQTest::ActiveMQTest() {
  this->connection = NULL;
  this->rec_session = NULL;
  this->snd_session = NULL;
  memset(szRecQueue,0x0,sizeof(szRecQueue));
  memset(szSndQueue,0x0,sizeof(szSndQueue));
}

ActiveMQTest::~ActiveMQTest() {
}

void ActiveMQTest::init() {
  try {
    cout.rdbuf(cerr.rdbuf());
    printf("1 Creating connection factory ... \n");
    factory = new ConnectionFactory( uri );

    printf("Connection factory created, creating connection ...\n");
    connection = factory->createConnection();

    printf("Connection created, creating receive session...\n");
    p_cast<Connection>(connection)->setExceptionListener( smartify( this ) );
    //rec_session = connection->createSession(AutoAckMode);
    rec_session = connection->createSession(ClientAckMode);

    printf("receive session created, creating sending session ...\n");
    snd_session = connection->createSession(AutoAckMode);

    printf("send session created, creating receive queue...'%s'\n",szRecQueue);
    rec_queue = rec_session->getQueue( szRecQueue );


    printf("receive queue created, creating consumer for this queue ...\n");
    consumer = rec_session->createConsumer( rec_queue );


    printf("setting listener ...\n");
    snd_queue = snd_session->getQueue( szSndQueue );
    consumer->setMessageListener( smartify( this ) );
    producer = snd_session->createProducer( snd_queue );

    (p_dyncast<apache::activemq::Session>(rec_session))->dispatch(0);

    printf("Init Ok.\n");

  } catch ( TraceException& e ) {
    printf("Error during init  ... \n");
  }
}

void ActiveMQTest::onException( exception& error ) {
  printf("Error on connection %s", error.what() );
}


int main(int argc,char *argv[]) {
  ActiveMQTest* myAq;


  if ( argc < 3 ) {
    printf("Usage: amqtest <connection_url> <receive_queue> <send_queue>\n");
    exit(0);
  }

  myAq = new ActiveMQTest;

  printf("Setting connection URL to '%s'\n",argv[1]);
  myAq->setUri(argv[1]);

  printf("Setting receive queue name to '%s'\n",argv[2]);
  myAq->setRecQueue( argv[2] );

  printf("Setting send queue name to '%s'\n",argv[3]);
  myAq->setSndQueue( argv[3] );

  printf("Init ...\n");
  myAq->init();


int i=0;
  while( i < 3000 ) {
    printf("Idle ... \n");
    sleep( 1 );
    i++;
  }

  myAq->done();

  return 0;
}




-- 
This message is automatically generated by JIRA.
-
If you think it was sent incorrectly contact one of the administrators: 
https://issues.apache.org/activemq/secure/Administrators.jspa
-
For more information on JIRA, see: http://www.atlassian.com/software/jira

        

Reply via email to