[ 
https://issues.apache.org/activemq/browse/AMQ-2019?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel
 ]

Hongwu LU updated AMQ-2019:
---------------------------

    Description: 
i have a program which use stomp to receive messages from activemq,
when i start it and wait for several minutes before close it, everything is ok.
but if i wait for one hour, for example, when i close it, then i find from the 
management page, the two consumers are still there.
and if i restart the program with these tow hanging consumers, then i could not 
receive any message.
i do not know whether it is a bug or just my program problem, sorry for disturb 
you here for an analyse, please.
here is my program:
#!/usr/bin/perl -w
use strict;
use Switch;
use Time::HiRes qw(usleep ualarm gettimeofday tv_interval);

use Net::Stomp;

use Term::ANSIColor;
#for multithread
use threads;
use threads::shared;

#for ActiveMQ connection
my $aq_host = "10.255.1.130";
my $aq_port = '61613';
my $sSender;
my $sReceiver;
my $aReceiver;

my $queue1 = "queue1";
my $queue2 = "queue2";

my $running : shared = 1;

my $thread_th1; 
my $thread_th2;

sub cleanup()
{
        print colored ("\n.STOP received! \n", 'red');
        $running=0;
        print ".Stopping  Thread1               ...... ";
        my $r1 = $thread_th1->join();
        print colored ("$r1\n", 'green');
        
        print ".Stopping Thread2                ...... ";
        my $r2 = $thread_th2->join();
        print colored ("$r2\n", 'green');
        
        print ".Disconnecting ActiveMQ connector        ...... ";
        $sReceiver->disconnect();
        $aReceiver->disconnect();
        print colored ("OK\n", 'green');

        print "\n";
        print colored ("ALL Nicely Stopped, see you!\n\n", 'yellow');
}

sub th1()
{
        $sReceiver->subscribe({ destination => "/queue/$queue1",
                                'ack' => 'client',
                                'activemq.prefetchSize' => 1});
        while($running){

                if ($sReceiver->can_read({timeout=>'1'}))
                {
                        my $frame = $sReceiver->receive_frame;
                        my $body = $frame->body;

                        #print "get message from queue 1: $body \n";
                        ptMsg($body);
                        #Ack msg for deleting after the procedure
                        $sReceiver->ack({ frame => $frame});
                }
        }
        
        $sReceiver->unsubscribe({ destination => "/queue/$queue1"});
        return "OK";
}

sub th2() 
{
        $aReceiver->subscribe({ destination => "/queue/$queue2",
                                                        'ack' => 'client',
                                                        'activemq.prefetchSize' 
=> 1});
        while($running)
        {
                #get one message from queue
                if ($aReceiver->can_read({timeout=>'1'}))
                {
                        my $frame = $aReceiver->receive_frame;
                        my $body = $frame->body;
                        #print "get message from queue 2: $body";
                        ptMsg($body);
                        $aReceiver->ack({ frame => $frame});
                }                                                               
                 
        }
        #when received cleanup message
        $aReceiver->unsubscribe({ destination => "/queue/$queue2"});
        return "OK";
}

sub ptMsg
{
        my $msg = shift;
        print "received message $msg \n";
}
#################################################################
#       main function start
#################################################################

print ".Creating ActiveMQ connector     ...... ";
$sReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
$sReceiver->connect();
$aReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
$aReceiver->connect();
print colored ("OK\n",'green');

$SIG{INT} = "cleanup";

print ".Starting Thread1        ...... ";
$thread_th1 = threads->create(\&th1);
print colored ("OK\n", 'green');

print ".Starting Thread2        ...... ";
$thread_th2 = threads->create(\&th2);
print colored ("OK\n", 'green');

#main process waiting for signal clean
print colored ("->running ......\n", 'green');
print "\n";

while ($running) {
        usleep(5000000);
}

  was:
i have a program which use stomp to receive messages from activemq,
when i start it and wait for several minutes before close it, everything is ok.
but if i wait for one hour, for example, when i close it, then i find from the 
management page, the two consumers are still there.
and if i restart the program with these tow hanging consumers, then i could not 
receive any message.
i do not know whether it is a bug or just my program problem, sorry for disturb 
you here for an analyse, please.
here is my program:
#!/usr/bin/perl -w
use strict;
use Switch;
use Time::HiRes qw(usleep ualarm gettimeofday tv_interval);

use Net::Stomp;

use Term::ANSIColor;
#for multithread
use threads;
use threads::shared;

#for ActiveMQ connection
my $aq_host = "10.255.1.130";
my $aq_port = '61613';
my $sSender;
my $sReceiver;
my $aReceiver;

my $queue1 = "queue1";
my $queue2 = "queue2";

my $running : shared = 1;

my $thread_th1; 
my $thread_th2;

sub cleanup()
{
        print colored ("\n.STOP received! \n", 'red');
        $running=0;
        
        print ".Stopping  Thread1               ...... ";
        my $r1 = $thread_th1->join();
        print colored ("$r1\n", 'green');
        
        print ".Stopping Thread2                ...... ";
        my $r2 = $thread_th2->join();
        print colored ("$r2\n", 'green');
        
        print ".Disconnecting ActiveMQ connector        ...... ";
        $sReceiver->disconnect();
        $aReceiver->disconnect();
        print colored ("OK\n", 'green');

        print "\n";
        print colored ("ALL Nicely Stopped, see you!\n\n", 'yellow');
}


sub th1()
{
        $sReceiver->subscribe({ destination => "/queue/$queue1",
                                'ack' => 'client',
                                'activemq.prefetchSize' => 1});
        while($running){

                if ($sReceiver->can_read({timeout=>'1'}))
                {
                        my $frame = $sReceiver->receive_frame;
                        my $body = $frame->body;

                        #print "get message from queue 1: $body \n";
                        ptMsg($body);
                        #Ack msg for deleting after the procedure
                        $sReceiver->ack({ frame => $frame});
                }
        }
        
        $sReceiver->unsubscribe({ destination => "/queue/$queue1"});
        return "OK";
}

sub th2() 
{
        $aReceiver->subscribe({ destination => "/queue/$queue2",
                                                        'ack' => 'client',
                                                        'activemq.prefetchSize' 
=> 1});
        while($running)
        {
                #get one message from queue
                if ($aReceiver->can_read({timeout=>'1'}))
                {
                        my $frame = $aReceiver->receive_frame;
                        my $body = $frame->body;
                        #print "get message from queue 2: $body";
                        ptMsg($body);
                        $aReceiver->ack({ frame => $frame});
                }                                                               
                 
        }
        #when received cleanup message
        $aReceiver->unsubscribe({ destination => "/queue/$queue2"});
        return "OK";
}

sub ptMsg
{
        my $msg = shift;
        print "received message $msg \n";
}
#################################################################
#       main function start
#################################################################

print ".Creating ActiveMQ connector     ...... ";
$sReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
$sReceiver->connect();
$aReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
$aReceiver->connect();
print colored ("OK\n",'green');

$SIG{INT} = "cleanup";

print ".Starting Thread1        ...... ";
$thread_th1 = threads->create(\&th1);
print colored ("OK\n", 'green');

print ".Starting Thread2        ...... ";
$thread_th2 = threads->create(\&th2);
print colored ("OK\n", 'green');

#main process waiting for signal clean
print colored ("->running ......\n", 'green');
print "\n";

while ($running) {
        usleep(5000000);
}
 


> Stomp client hang after a long time pending
> -------------------------------------------
>
>                 Key: AMQ-2019
>                 URL: https://issues.apache.org/activemq/browse/AMQ-2019
>             Project: ActiveMQ
>          Issue Type: Bug
>    Affects Versions: 5.2.0
>         Environment: linux with perl 5.8.8 and stomp
>            Reporter: Hongwu LU
>            Priority: Minor
>
> i have a program which use stomp to receive messages from activemq,
> when i start it and wait for several minutes before close it, everything is 
> ok.
> but if i wait for one hour, for example, when i close it, then i find from 
> the management page, the two consumers are still there.
> and if i restart the program with these tow hanging consumers, then i could 
> not receive any message.
> i do not know whether it is a bug or just my program problem, sorry for 
> disturb you here for an analyse, please.
> here is my program:
> #!/usr/bin/perl -w
> use strict;
> use Switch;
> use Time::HiRes qw(usleep ualarm gettimeofday tv_interval);
> use Net::Stomp;
> use Term::ANSIColor;
> #for multithread
> use threads;
> use threads::shared;
> #for ActiveMQ connection
> my $aq_host = "10.255.1.130";
> my $aq_port = '61613';
> my $sSender;
> my $sReceiver;
> my $aReceiver;
> my $queue1 = "queue1";
> my $queue2 = "queue2";
> my $running : shared = 1;
> my $thread_th1; 
> my $thread_th2;
> sub cleanup()
> {
>       print colored ("\n.STOP received! \n", 'red');
>       $running=0;
>       print ".Stopping  Thread1               ...... ";
>       my $r1 = $thread_th1->join();
>       print colored ("$r1\n", 'green');
>       
>       print ".Stopping Thread2                ...... ";
>       my $r2 = $thread_th2->join();
>       print colored ("$r2\n", 'green');
>       
>       print ".Disconnecting ActiveMQ connector        ...... ";
>       $sReceiver->disconnect();
>       $aReceiver->disconnect();
>       print colored ("OK\n", 'green');
>       print "\n";
>       print colored ("ALL Nicely Stopped, see you!\n\n", 'yellow');
> }
> sub th1()
> {
>       $sReceiver->subscribe({ destination => "/queue/$queue1",
>                               'ack' => 'client',
>                               'activemq.prefetchSize' => 1});
>       while($running){
>               if ($sReceiver->can_read({timeout=>'1'}))
>               {
>                       my $frame = $sReceiver->receive_frame;
>                       my $body = $frame->body;
>                       #print "get message from queue 1: $body \n";
>                       ptMsg($body);
>                       #Ack msg for deleting after the procedure
>                       $sReceiver->ack({ frame => $frame});
>               }
>       }
>       
>       $sReceiver->unsubscribe({ destination => "/queue/$queue1"});
>       return "OK";
> }
> sub th2() 
> {
>       $aReceiver->subscribe({ destination => "/queue/$queue2",
>                                                       'ack' => 'client',
>                                                       'activemq.prefetchSize' 
> => 1});
>       while($running)
>       {
>               #get one message from queue
>               if ($aReceiver->can_read({timeout=>'1'}))
>               {
>                       my $frame = $aReceiver->receive_frame;
>                       my $body = $frame->body;
>                       #print "get message from queue 2: $body";
>                       ptMsg($body);
>                       $aReceiver->ack({ frame => $frame});
>               }                                                               
>                  
>       }
>       #when received cleanup message
>       $aReceiver->unsubscribe({ destination => "/queue/$queue2"});
>       return "OK";
> }
> sub ptMsg
> {
>       my $msg = shift;
>       print "received message $msg \n";
> }
> #################################################################
> #     main function start
> #################################################################
> print ".Creating ActiveMQ connector   ...... ";
> $sReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
> $sReceiver->connect();
> $aReceiver = Net::Stomp->new({ hostname => $aq_host, port => $aq_port});
> $aReceiver->connect();
> print colored ("OK\n",'green');
> $SIG{INT} = "cleanup";
> print ".Starting Thread1      ...... ";
> $thread_th1 = threads->create(\&th1);
> print colored ("OK\n", 'green');
> print ".Starting Thread2      ...... ";
> $thread_th2 = threads->create(\&th2);
> print colored ("OK\n", 'green');
> #main process waiting for signal clean
> print colored ("->running ......\n", 'green');
> print "\n";
> while ($running) {
>       usleep(5000000);
> }

-- 
This message is automatically generated by JIRA.
-
You can reply to this email to add a comment to the issue online.

Reply via email to