I think I found the cause???? :)
Look at this stack trace(which seems to be the source of our problems,
after this exceptions we got "Session close"), which at some moment in
time I got it(see bellow - I will mark with bold the what's important).
Look at the bold stack entry, in ActiveMQConnection.onException.
Un unrecoverable error in transport close the sessions, consumers,
listeners , etc but doesn't close the connection.
My question is how can I avoid getting the "Session close" message? How
can I test if the session is closed? It is this the right way to handle
a transport error, cleaning up the connection?
I cannot test if the session is closed(ActiveMQSession.closed field is
private), eventually I can try to do something with the session and in
case of failure, I will recreate the session. Is this the right way to
do it?
Thanks.
/**
* An unrecoverable exception has occured on the transport
* @param error
*/
public void onException(final IOException error) {
onAsyncException(error);
asyncConnectionThread.execute(new Runnable(){
public void run() {
*transportFailed(error);*
ServiceSupport.dispose(ActiveMQConnection.this.transport);
brokerInfoReceived.countDown();
for (Iterator iter = transportListeners.iterator();
iter.hasNext();) {
TransportListener listener = (TransportListener)
iter.next();
listener.onException(error);
}
}
});
}
and then transportFailed
protected void transportFailed(IOException error){
transportFailed.set(true);
if (firstFailureError == null) {
firstFailureError = error;
}
if (!closed.get() && !closing.get()) {
try{
*cleanup();*
}catch(JMSException e){
log.warn("Cleanup failed",e);
}
}
}
and then cleanup
public void cleanup() throws JMSException {
if( advisoryConsumer!=null ) {
advisoryConsumer.dispose();
advisoryConsumer=null;
}
for (Iterator i = this.sessions.iterator(); i.hasNext();) {
*ActiveMQSession s = (ActiveMQSession) i.next();
s.dispose();*
}
for (Iterator i = this.connectionConsumers.iterator();
i.hasNext();) {
ActiveMQConnectionConsumer c = (ActiveMQConnectionConsumer)
i.next();
c.dispose();
}
for (Iterator i = this.inputStreams.iterator(); i.hasNext();) {
ActiveMQInputStream c = (ActiveMQInputStream) i.next();
c.dispose();
}
for (Iterator i = this.outputStreams.iterator(); i.hasNext();) {
ActiveMQOutputStream c = (ActiveMQOutputStream) i.next();
c.dispose();
}
if(isConnectionInfoSentToBroker){
if(!transportFailed.get() && !closing.get()){
asyncSendPacket(info.createRemoveCommand());
}
isConnectionInfoSentToBroker=false;
}
if( userSpecifiedClientID ) {
info.setClientId(null);
userSpecifiedClientID=false;
}
clientIDSet = false;
started.set(false);
}
*Stack trace*
at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:45)
at
org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1462)
*at
org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1478)*
at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:94)
at
org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:120)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:94)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:211)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:64)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
at
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:68)
at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:73)
at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1131)
at
org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667)
at
org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:196)
at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:840)
at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:800)
at
com.daxtechnologies.sams.manager.JmsSessionWrapper.createConsumer(JmsSessionWrapper.java:106)
at
com.daxtechnologies.sams.manager.AbstractSamsManagerQueue.consume(AbstractSamsManagerQueue.java:133)
at
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.consume(SamsManagerQueues.java:602)
at
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.throttleQueue(SamsManagerQueues.java:571)
at
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.run(SamsManagerQueues.java:622)
at
com.daxtechnologies.sams.manager.SamsManagerThreadPool$WorkerThead.run(SamsManagerThreadPool.java:188)
Caused by: java.io.InterruptedIOException: Interrupted.
at
org.apache.activemq.transport.FutureResponse.set(FutureResponse.java:56)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:89)
Adrian Tarau wrote:
Even more :)
As I told you I created a a Session wrapper over the ActiveMQ session
in order to catch the close call.
you can see down bellow the close method :
public void close() throws JMSException {
try {
IllegalAccessError illegalAccessError = new
IllegalAccessError("I don't want to close");
StringWriter sw = new StringWriter();
illegalAccessError.printStackTrace(new PrintWriter(sw));
LOG.error("Somebody closed the session, let's see who\n\n"
+ sw.toString());
System.out.println("Somebody closed the session, let's see
who\n\n" + sw.toString());
} finally {
session.close();
}
}
Well, I looked in the error log, also looked in the container
log(JBoss) and didn't find any of theses strings "Somebody ...". The
conclusion is that nobody called close explicit on the session.
I placed that on 3 systems and over the weekend I got on all this
problem. One common thing between the systems is that are all
loaded(are test machines) and on all everything start to get "Session
close" after I got this exception(see bellow).
I don't know why we got java.io.InterruptedIOException:, it could be
possible because of me :). We interrupt the current thread(which
consume the message) if didn't respond after 60 secs(some kind of
watchdog) so it is possible to be because I call explicit
Thread.interrupt().
Anyway, I thinkg, this should affect the sessions state(the close
field should not be turned false by any other piece of code, except
close() function, right?) should not be affected by am interrupt
exception in a consumer.
Thats all I could get about this problem with session close, anyway I
think I will create all the time the session, to avoid this problem.
Thanks.
2006-07-15 03:24:54,628 [AcitveMQ Connection Worker: vm://localhost#0]
ERROR com.daxtechnologies.sams.manager.SamsManagerJMS - JMS Error
javax.jms.JMSException: Interrupted.
at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:45)
at
org.apache.activemq.ActiveMQConnection.onAsyncException(ActiveMQConnection.java:1462)
at
org.apache.activemq.ActiveMQConnection.onException(ActiveMQConnection.java:1478)
at
org.apache.activemq.transport.TransportFilter.onException(TransportFilter.java:94)
at
org.apache.activemq.transport.ResponseCorrelator.onException(ResponseCorrelator.java:120)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:94)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
at
org.apache.activemq.transport.ResponseCorrelator.oneway(ResponseCorrelator.java:60)
at
org.apache.activemq.broker.TransportConnection.dispatch(TransportConnection.java:211)
at
org.apache.activemq.broker.TransportConnection$1.onCommand(TransportConnection.java:64)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:97)
at
org.apache.activemq.transport.TransportFilter.onCommand(TransportFilter.java:63)
at
org.apache.activemq.transport.vm.VMTransport.oneway(VMTransport.java:76)
at
org.apache.activemq.transport.MutexTransport.oneway(MutexTransport.java:44)
at
org.apache.activemq.transport.ResponseCorrelator.asyncRequest(ResponseCorrelator.java:68)
at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:73)
at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1131)
at
org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667)
at
org.apache.activemq.ActiveMQMessageConsumer.<init>(ActiveMQMessageConsumer.java:196)
at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:840)
at
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java:800)
at
com.daxtechnologies.sams.manager.JmsSessionWrapper.createConsumer(JmsSessionWrapper.java:106)
at
com.daxtechnologies.sams.manager.AbstractSamsManagerQueue.consume(AbstractSamsManagerQueue.java:133)
at
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.consume(SamsManagerQueues.java:602)
at
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.throttleQueue(SamsManagerQueues.java:571)
at
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.run(SamsManagerQueues.java:622)
at
com.daxtechnologies.sams.manager.SamsManagerThreadPool$WorkerThead.run(SamsManagerThreadPool.java:188)
Caused by: java.io.InterruptedIOException: Interrupted.
at
org.apache.activemq.transport.FutureResponse.set(FutureResponse.java:56)
at
org.apache.activemq.transport.ResponseCorrelator.onCommand(ResponseCorrelator.java:89)
... 23 more
2006-07-15 03:24:54,805 [SAMS-Worker-queue.contact_type-3] ERROR
com.daxtechnologies.sams.manager.AbstractSamsManagerQueue - Cannot
consume message from queue queue 'Contact Queue
<system/[EMAIL PROTECTED]>' javax.jms.JMSException: Interrupted.
at
org.apache.activemq.util.JMSExceptionSupport.create(JMSExceptionSupport.java:57)
at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1141)
at
org.apache.activemq.ActiveMQSession.syncSendPacket(ActiveMQSession.java:1667)
at
org.apache.activemq.ActiveMQMessageConsumer.close(ActiveMQMessageConsumer.java:516)
at
com.daxtechnologies.sams.manager.AbstractSamsManagerQueue.consume(AbstractSamsManagerQueue.java:145)
at
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.consume(SamsManagerQueues.java:602)
at
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.throttleQueue(SamsManagerQueues.java:571)
at
com.daxtechnologies.sams.manager.SamsManagerQueues$AbstractQueueRunnable.run(SamsManagerQueues.java:622)
at
com.daxtechnologies.sams.manager.SamsManagerThreadPool$WorkerThead.run(SamsManagerThreadPool.java:188)
Caused by: java.io.InterruptedIOException: Interrupted.
at
org.apache.activemq.transport.FutureResponse.getResult(FutureResponse.java:40)
at
org.apache.activemq.transport.ResponseCorrelator.request(ResponseCorrelator.java:74)
at
org.apache.activemq.ActiveMQConnection.syncSendPacket(ActiveMQConnection.java:1131)
... 7 more
Adrian Tarau wrote:
More about closed session :)
Could be possible that the closing problem could occur because the
Session is not per Thread, even if it is accessed by only on thread
at the time(or it should be accessed, but sometime is not -
backport-concurent problems)?
The code look like that :
class MyStorageQueue {
private Session session;
private Queue queue;
protected final Semaphore lock = new Semaphore(1, true);
....
public Object consume() {
....
}
public void publish(Object object) {
....
}
}
and usage in different threads
MyStorageQueue queue = getNextQueue(...);
if (queue.lock.tryAcquire()) {
try {
Object value = consume(queue);
.....
} finally {
queue.lock.release();
}
}
Anyway I created a wrapper over an ActiveMQ session, and dump the
stack trace to see who calls the close method.
Adrian Tarau wrote:
Debugging remove is not a problem, the problem is that I need to
start the application in debug mode and keep the IDE with a
breakpoint for days until it happens.
I think I will recompile the ActiveMQ and dump the thread stack
trace :)
Sanjiv Jivan wrote:
You can attach your debugger to the Java process on the remote
machine and
set the breakpoint on the session.close() call. Once the breakpoint
it hit,
you can examine the call stack in your debugger.
Use the JPDA socket JVM args when starting your Java process. Eg :
-Xdebug
-Xnoagent
-Djava.compiler=NONE-Xrunjdwp:transport=dt_socket,server=y,suspend=n,address=5005
On 7/13/06, Adrian Tarau <[EMAIL PROTECTED]> wrote:
That's great but this usually happens on the remote machine, after
a few
days :)
James Strachan wrote:
> Maybe try run your code in a debugger and set breakpoints in the
> ActiveMQSession class to see where & why the session gets
closed? (The
> code's pretty easy to follow, its a relatively simple class).
>
>
> On 7/13/06, Adrian Tarau <[EMAIL PROTECTED]> wrote:
>> James, do you want the sources or can you recommend me some
actions? To
>> create the session all the time, not to cache it?
>>
>> Adrian Tarau wrote:
>> > Ok, so who close the session? :)
>> >
>> > James Strachan wrote:
>> >> Sorry I forgot about that :)
>> >>
>> >> The only time a VM broker is shut down is explicitly via
application
>> >> code or a shutdown handler.
>> >>
>> >> On 7/13/06, Adrian Tarau <[EMAIL PROTECTED]> wrote:
>> >>> I use vm connector so no connection problems should be
involved,
>> right?
>> >>>
>> >>> James Strachan wrote:
>> >>> > BTW are you using auto-reconnect? It could be that the
socket is
>> >>> > terminated due to some network issue?
>> >>> >
>> >>> >
>> >>>
>>
http://incubator.apache.org/activemq/how-can-i-support-auto-reconnection.html
>>
>> >>>
>> >>> >
>> >>> >
>> >>> > On 7/13/06, Adrian Tarau <[EMAIL PROTECTED]> wrote:
>> >>> >> I made a search for "close" in the source code and
except from
>> >>> produces,
>> >>> >> consumers and inputstream and outputstream I don't close
the
>> >>> connection
>> >>> >> or session, except on the JVM shutdown(Thread hook).
>> >>> >>
>> >>> >> I can provide you(private - not on the mailing list) the
>> source code
>> >>> >> because this is very annoying.
>> >>> >> Thanks.
>> >>> >>
>> >>> >> James Strachan wrote:
>> >>> >> > Apart from inactivity timeouts on the transport layer, we
>> >>> generally
>> >>> >> > don't close sessions. Are you sure nothing in your
application
>> >>> code is
>> >>> >> > trying to close the session?
>> >>> >> >
>> >>> >> > On 7/13/06, Adrian Tarau <[EMAIL PROTECTED]> wrote:
>> >>> >> >> I have this issue for some time with ActiveMQ 4.0 and
4.0.1.
>> >>> >> >>
>> >>> >> >> I use vm transport and create one session used to
produce and
>> >>> consume
>> >>> >> >> messages. Everything works fine, days in a row, until it
>> start to
>> >>> >> throw
>> >>> >> >> exception that "Session is closed". There are any watch
>> dogs that
>> >>> >> close
>> >>> >> >> sessions after a while, based on some criteria?
>> >>> >> >>
>> >>> >> >> I couldn't find any rule, when or why it happens.
>> >>> >> >> Should I create the session all the time - I
understood is
>> time
>> >>> >> >> consuming and it should be safe to cache it.
>> >>> >> >>
>> >>> >> >> Thanks.
>> >>> >> >>
>> >>> >> >> *javax.jms.IllegalStateException: The Session is closed
>> >>> >> >> at
>> >>> >> >>
>> >>> >>
>> >>>
>>
org.apache.activemq.ActiveMQSession.checkClosed(ActiveMQSession.java
:577)
>>
>> >>>
>> >>> >>
>> >>> >> >>
>> >>> >> >> at
>> >>> >> >>
>> >>> >>
>> >>>
>>
org.apache.activemq.ActiveMQSession.createConsumer(ActiveMQSession.java
:799)*
>>
>> >>>
>> >>> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >>
>> >>> >> >
>> >>> >> >
>> >>> >>
>> >>> >>
>> >>> >
>> >>> >
>> >>>
>> >>>
>> >>
>> >>
>> >
>>
>>
>
>