Author: rajdavies
Date: Tue Feb 7 13:40:23 2006
New Revision: 375721
URL: http://svn.apache.org/viewcvs?rev=375721&view=rev
Log:
receive() returns null on connection transport failure
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java?rev=375721&r1=375720&r2=375721&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQConnection.java
Tue Feb 7 13:40:23 2006
@@ -126,6 +126,7 @@
private final AtomicBoolean started = new AtomicBoolean(false);
private final AtomicBoolean closing = new AtomicBoolean(false);
private final AtomicBoolean closed = new AtomicBoolean(false);
+ private final AtomicBoolean transportFailed = new AtomicBoolean(false);
private final CopyOnWriteArrayList sessions = new CopyOnWriteArrayList();
private final CopyOnWriteArrayList connectionConsumers = new
CopyOnWriteArrayList();
private final CopyOnWriteArrayList inputStreams = new
CopyOnWriteArrayList();
@@ -246,7 +247,7 @@
* @since 1.1
*/
public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQSession(this, getNextSessionId(), (transacted ?
Session.SESSION_TRANSACTED
: (acknowledgeMode == Session.SESSION_TRANSACTED ?
Session.AUTO_ACKNOWLEDGE : acknowledgeMode)), asyncDispatch);
@@ -273,7 +274,7 @@
* connection due to some internal error.
*/
public String getClientID() throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
return this.info.getClientId();
}
@@ -319,7 +320,7 @@
* configured.
*/
public void setClientID(String newClientID) throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
if (this.clientIDSet) {
throw new IllegalStateException("The clientID has already been
set");
@@ -344,7 +345,7 @@
* @see javax.jms.ConnectionMetaData
*/
public ConnectionMetaData getMetaData() throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
return ActiveMQConnectionMetaData.INSTANCE;
}
@@ -362,7 +363,7 @@
* @see javax.jms.Connection#setExceptionListener(ExceptionListener)
*/
public ExceptionListener getExceptionListener() throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
return this.exceptionListener;
}
@@ -391,7 +392,7 @@
* this connection.
*/
public void setExceptionListener(ExceptionListener listener) throws
JMSException {
- checkClosed();
+ checkClosedOrFailed();
this.exceptionListener = listener;
}
@@ -406,7 +407,7 @@
* @see javax.jms.Connection#stop()
*/
public void start() throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
if (started.compareAndSet(false, true)) {
for (Iterator i = sessions.iterator(); i.hasNext();) {
@@ -456,7 +457,7 @@
* @see javax.jms.Connection#start()
*/
public void stop() throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
if (started.compareAndSet(true, false)) {
for (Iterator i = sessions.iterator(); i.hasNext();) {
ActiveMQSession s = (ActiveMQSession) i.next();
@@ -647,7 +648,7 @@
public ConnectionConsumer createDurableConnectionConsumer(Topic topic,
String subscriptionName,
String messageSelector, ServerSessionPool sessionPool, int
maxMessages, boolean noLocal)
throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
SessionId sessionId = new SessionId(info.getConnectionId(), -1);
ConsumerInfo info = new ConsumerInfo(new ConsumerId(sessionId,
consumerIdGenerator
@@ -945,7 +946,7 @@
public ConnectionConsumer createConnectionConsumer(Destination
destination, String messageSelector, ServerSessionPool sessionPool, int
maxMessages, boolean noLocal) throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
ConsumerId consumerId = createConsumerId();
ConsumerInfo info = new ConsumerInfo(consumerId);
@@ -1092,6 +1093,19 @@
/**
* simply throws an exception if the Connection is already closed
+ * or the Transport has failed
+ *
+ * @throws JMSException
+ */
+ protected synchronized void checkClosedOrFailed() throws JMSException {
+ checkClosed();
+ if (transportFailed.get()){
+ throw new ConnectionFailedException();
+ }
+ }
+
+ /**
+ * simply throws an exception if the Connection is already closed
*
* @throws JMSException
*/
@@ -1315,9 +1329,11 @@
} else {
log.warn("Async exception with no exception listener: " +
error, error);
}
+ transportFailed(error);
}
}
+
public void onException(IOException error) {
onAsyncException(error);
ServiceSupport.dispose(this.transport);
@@ -1359,7 +1375,7 @@
*/
public void deleteTempDestination(ActiveMQTempDestination destination)
throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
activeTempDestinations.remove(destination);
DestinationInfo info = new DestinationInfo();
@@ -1394,7 +1410,7 @@
public void destroyDestination(ActiveMQDestination destination) throws
JMSException {
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
DestinationInfo info = new DestinationInfo();
@@ -1447,7 +1463,7 @@
}
private InputStream doCreateInputStream(Destination dest, String
messageSelector, boolean noLocal, String subName) throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQInputStream(this, createConsumerId(),
ActiveMQDestination.transform(dest), messageSelector, noLocal, subName,
prefetchPolicy.getInputStreamPrefetch());
}
@@ -1458,7 +1474,7 @@
}
public OutputStream createOutputStream(Destination dest, Map
streamProperties, int deliveryMode, int priority, long timeToLive) throws
JMSException {
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQOutputStream(this, createProducerId(),
ActiveMQDestination.transform(dest), streamProperties, deliveryMode, priority,
timeToLive);
}
@@ -1484,7 +1500,7 @@
* @since 1.1
*/
public void unsubscribe(String name) throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
rsi.setConnectionId(getConnectionInfo().getConnectionId());
rsi.setSubcriptionName(name);
@@ -1500,7 +1516,7 @@
* - Does not allow you to send /w a transaction.
*/
void send(ActiveMQDestination destination, ActiveMQMessage msg, MessageId
messageId, int deliveryMode, int priority, long timeToLive, boolean async)
throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
if( destination.isTemporary() && isDeleted(destination) ) {
throw new JMSException("Cannot publish to a deleted Destination:
"+destination);
@@ -1561,6 +1577,16 @@
System.exit(0);
}
}
+ }
+
+ protected void transportFailed(Throwable error){
+ transportFailed.set(true);
+ try{
+ cleanup();
+ }catch(JMSException e){
+ log.warn("Cleanup failed",e);
+ }
+
}
public void setCopyMessageOnSend(boolean copyMessageOnSend) {
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java?rev=375721&r1=375720&r2=375721&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ActiveMQXAConnection.java
Tue Feb 7 13:40:23 2006
@@ -73,7 +73,7 @@
}
public Session createSession(boolean transacted, int acknowledgeMode)
throws JMSException {
- checkClosed();
+ checkClosedOrFailed();
ensureConnectionInfoSent();
return new ActiveMQXASession(this, getNextSessionId(),
Session.SESSION_TRANSACTED, asyncDispatch);
}
Modified:
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
URL:
http://svn.apache.org/viewcvs/incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java?rev=375721&r1=375720&r2=375721&view=diff
==============================================================================
---
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
(original)
+++
incubator/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/ConnectionFailedException.java
Tue Feb 7 13:40:23 2006
@@ -35,6 +35,10 @@
initCause(cause);
setLinkedException(cause);
}
+
+ public ConnectionFailedException() {
+ super("The JMS connection has failed due ti a Transport problem");
+ }
static private String extractMessage(IOException cause) {
String m = cause.getMessage();