Author: rajdavies
Date: Tue Feb  7 13:40:23 2006
New Revision: 375721

URL: http://svn.apache.org/viewcvs?rev=375721&view=rev
Log:
receive() returns null on connection transport failure

Modified:
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
    
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=375721&r1=375720&r2=375721&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
 Tue Feb  7 13:40:23 2006
@@ -126,6 +126,7 @@
     private final AtomicBoolean started = new AtomicBoolean(false);
     private final AtomicBoolean closing = new AtomicBoolean(false);
     private final AtomicBoolean closed = new AtomicBoolean(false);
+    private final AtomicBoolean transportFailed = new AtomicBoolean(false);
     private final CopyOnWriteArrayList sessions = new CopyOnWriteArrayList();
     private final CopyOnWriteArrayList connectionConsumers = new 
CopyOnWriteArrayList();
     private final CopyOnWriteArrayList inputStreams = new 
CopyOnWriteArrayList();
@@ -246,7 +247,7 @@
      * @since 1.1
      */
     public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         return new ActiveMQSession(this, getNextSessionId(), (transacted ? 
Session.SESSION_TRANSACTED
                 : (acknowledgeMode == Session.SESSION_TRANSACTED ? 
Session.AUTO_ACKNOWLEDGE : acknowledgeMode)), asyncDispatch);
@@ -273,7 +274,7 @@
      *             connection due to some internal error.
      */
     public String getClientID() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         return this.info.getClientId();
     }
 
@@ -319,7 +320,7 @@
      *             configured.
      */
     public void setClientID(String newClientID) throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
 
         if (this.clientIDSet) {
             throw new IllegalStateException("The clientID has already been 
set");
@@ -344,7 +345,7 @@
      * @see javax.jms.ConnectionMetaData
      */
     public ConnectionMetaData getMetaData() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         return ActiveMQConnectionMetaData.INSTANCE;
     }
 
@@ -362,7 +363,7 @@
      * @see javax.jms.Connection#setExceptionListener(ExceptionListener)
      */
     public ExceptionListener getExceptionListener() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         return this.exceptionListener;
     }
 
@@ -391,7 +392,7 @@
      *             this connection.
      */
     public void setExceptionListener(ExceptionListener listener) throws 
JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         this.exceptionListener = listener;
     }
 
@@ -406,7 +407,7 @@
      * @see javax.jms.Connection#stop()
      */
     public void start() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         if (started.compareAndSet(false, true)) {
             for (Iterator i = sessions.iterator(); i.hasNext();) {
@@ -456,7 +457,7 @@
      * @see javax.jms.Connection#start()
      */
     public void stop() throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         if (started.compareAndSet(true, false)) {
             for (Iterator i = sessions.iterator(); i.hasNext();) {
                 ActiveMQSession s = (ActiveMQSession) i.next();
@@ -647,7 +648,7 @@
     public ConnectionConsumer createDurableConnectionConsumer(Topic topic, 
String subscriptionName,
             String messageSelector, ServerSessionPool sessionPool, int 
maxMessages, boolean noLocal)
             throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         SessionId sessionId = new SessionId(info.getConnectionId(), -1);
         ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId, 
consumerIdGenerator
@@ -945,7 +946,7 @@
 
     public ConnectionConsumer createConnectionConsumer(Destination 
destination, String messageSelector, ServerSessionPool sessionPool, int 
maxMessages, boolean noLocal) throws JMSException {
         
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         ConsumerId consumerId = createConsumerId();
         ConsumerInfo info = new ConsumerInfo(consumerId);
@@ -1092,6 +1093,19 @@
 
     /**
      * simply throws an exception if the Connection is already closed
+     * or the Transport has failed
+     * 
+     * @throws JMSException
+     */
+    protected synchronized void checkClosedOrFailed() throws JMSException {
+        checkClosed();
+        if (transportFailed.get()){
+            throw new ConnectionFailedException();
+        }
+    }
+    
+    /**
+     * simply throws an exception if the Connection is already closed
      * 
      * @throws JMSException
      */
@@ -1315,9 +1329,11 @@
             } else {
                 log.warn("Async exception with no exception listener: " + 
error, error);
             }
+            transportFailed(error);
         }
     }
 
+    
     public void onException(IOException error) {
         onAsyncException(error);
         ServiceSupport.dispose(this.transport);
@@ -1359,7 +1375,7 @@
      */
     public void deleteTempDestination(ActiveMQTempDestination destination) 
throws JMSException {
         
-        checkClosed();        
+        checkClosedOrFailed();        
         activeTempDestinations.remove(destination);
 
         DestinationInfo info = new DestinationInfo();
@@ -1394,7 +1410,7 @@
 
     public void destroyDestination(ActiveMQDestination destination) throws 
JMSException {
         
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
 
         DestinationInfo info = new DestinationInfo();
@@ -1447,7 +1463,7 @@
     }
     
     private InputStream doCreateInputStream(Destination dest, String 
messageSelector, boolean noLocal, String subName) throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         return new ActiveMQInputStream(this, createConsumerId(), 
ActiveMQDestination.transform(dest), messageSelector, noLocal, subName, 
prefetchPolicy.getInputStreamPrefetch());
     }
@@ -1458,7 +1474,7 @@
     }
 
     public OutputStream createOutputStream(Destination dest, Map 
streamProperties, int deliveryMode, int priority, long timeToLive) throws 
JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         return new ActiveMQOutputStream(this, createProducerId(), 
ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority, 
timeToLive);
     }
@@ -1484,7 +1500,7 @@
      * @since 1.1
      */
     public void unsubscribe(String name) throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
         rsi.setConnectionId(getConnectionInfo().getConnectionId());
         rsi.setSubcriptionName(name);
@@ -1500,7 +1516,7 @@
      *  - Does not allow you to send /w a transaction.
      */
     void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId 
messageId, int deliveryMode, int priority, long timeToLive, boolean async) 
throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
 
         if( destination.isTemporary() && isDeleted(destination) ) {
             throw new JMSException("Cannot publish to a deleted Destination: 
"+destination);
@@ -1561,6 +1577,16 @@
                 System.exit(0);
             }
         }
+    }
+    
+    protected void transportFailed(Throwable error){
+        transportFailed.set(true);
+        try{
+            cleanup();
+        }catch(JMSException e){
+           log.warn("Cleanup failed",e);
+        }
+        
     }
 
     public void setCopyMessageOnSend(boolean copyMessageOnSend) {

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=375721&r1=375720&r2=375721&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
 Tue Feb  7 13:40:23 2006
@@ -73,7 +73,7 @@
     }
 
     public Session createSession(boolean transacted, int acknowledgeMode) 
throws JMSException {
-        checkClosed();
+        checkClosedOrFailed();
         ensureConnectionInfoSent();
         return new ActiveMQXASession(this, getNextSessionId(), 
Session.SESSION_TRANSACTED, asyncDispatch);
     }

Modified: 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
URL: 
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java?rev=375721&r1=375720&r2=375721&view=diff
==============================================================================
--- 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
 (original)
+++ 
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
 Tue Feb  7 13:40:23 2006
@@ -35,6 +35,10 @@
         initCause(cause);
         setLinkedException(cause);
     }
+    
+    public ConnectionFailedException() {
+        super("The JMS connection has failed due ti a Transport problem");
+    }
 
     static private String extractMessage(IOException cause) {
         String m = cause.getMessage();


Reply via email to