Author: rgodfrey
Date: Wed May 15 11:28:43 2013
New Revision: 1482770
URL: http://svn.apache.org/r1482770
Log:
QPID-4830 : [JMS AMQP 1.0] Improve JMS client error handling
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1482770&r1=1482769&r2=1482770&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
Wed May 15 11:28:43 2013
@@ -34,6 +34,7 @@ import javax.jms.*;
import javax.jms.IllegalStateException;
import java.util.UUID;
import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
+import org.apache.qpid.amqp_1_0.type.transport.*;
public class MessageProducerImpl implements MessageProducer, QueueSender,
TopicPublisher
{
@@ -87,6 +88,29 @@ public class MessageProducerImpl impleme
jmsEx.setLinkedException(e);
throw jmsEx;
}
+ _sender.setRemoteErrorListener(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ final ExceptionListener exceptionListener =
_session.getConnection().getExceptionListener();
+
+ if(exceptionListener != null)
+ {
+ final
org.apache.qpid.amqp_1_0.type.transport.Error receiverError =
_sender.getError();
+ exceptionListener.onException(new
JMSException(receiverError.getDescription(),
+
receiverError.getCondition().getValue().toString()));
+
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+ }
+ });
}
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1482770&r1=1482769&r2=1482770&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
Wed May 15 11:28:43 2013
@@ -117,7 +117,7 @@ public class SessionImpl implements Sess
if(error != null)
{
exceptionListener.onException(new
JMSException(error.getDescription(),
- error.getCondition().toString()));
+
error.getCondition().getValue().toString()));
}
else
{
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java?rev=1482770&r1=1482769&r2=1482770&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/TemporaryTopicImpl.java
Wed May 15 11:28:43 2013
@@ -34,6 +34,7 @@ public class TemporaryTopicImpl extends
private SessionImpl _session;
private final Set<MessageConsumer> _consumers =
Collections.synchronizedSet(new HashSet<MessageConsumer>());
+ private boolean _deleted;
protected TemporaryTopicImpl(String address, Sender sender, SessionImpl
session)
{
@@ -57,6 +58,7 @@ public class TemporaryTopicImpl extends
{
if(_consumers.isEmpty())
{
+ _deleted = true;
close();
}
else
@@ -105,6 +107,6 @@ public class TemporaryTopicImpl extends
public boolean isDeleted()
{
- return _sender == null;
+ return _deleted;
}
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java?rev=1482770&r1=1482769&r2=1482770&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Receiver.java
Wed May 15 11:28:43 2013
@@ -50,6 +50,7 @@ public class Receiver implements Deliver
private Map<Binary, SettledAction> _unsettledMap = new HashMap<Binary,
SettledAction>();
private MessageArrivalListener _messageArrivalListener;
private org.apache.qpid.amqp_1_0.type.transport.Error _error;
+ private Runnable _remoteErrorTask;
public Receiver(final Session session,
final String linkName,
@@ -125,6 +126,10 @@ public class Receiver implements Deliver
public void remoteDetached(final LinkEndpoint endpoint, final
Detach detach)
{
_error = detach.getError();
+ if(detach.getError()!=null)
+ {
+ remoteError();
+ }
super.remoteDetached(endpoint, detach);
}
});
@@ -171,6 +176,14 @@ public class Receiver implements Deliver
}
}
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
private void postPrefetchAction()
{
if(_messageArrivalListener != null)
@@ -595,4 +608,8 @@ public class Receiver implements Deliver
void messageArrived(Receiver receiver);
}
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1482770&r1=1482769&r2=1482770&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
Wed May 15 11:28:43 2013
@@ -48,6 +48,7 @@ public class Sender implements DeliveryS
private Map<Binary, OutcomeAction> _outcomeActions =
Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
private boolean _closed;
private Error _error;
+ private Runnable _remoteErrorTask;
public Sender(final Session session, final String linkName, final String
targetAddr, final String sourceAddr)
throws SenderCreationException, ConnectionClosedException
@@ -178,6 +179,10 @@ public class Sender implements DeliveryS
public void remoteDetached(final LinkEndpoint endpoint, final
Detach detach)
{
_error = detach.getError();
+ if(_error != null)
+ {
+ remoteError();
+ }
super.remoteDetached(endpoint, detach);
}
});
@@ -398,6 +403,26 @@ public class Sender implements DeliveryS
return _session;
}
+
+ private void remoteError()
+ {
+ if(_remoteErrorTask != null)
+ {
+ _remoteErrorTask.run();
+ }
+ }
+
+
+ public void setRemoteErrorListener(Runnable listener)
+ {
+ _remoteErrorTask = listener;
+ }
+
+ public Error getError()
+ {
+ return _error;
+ }
+
public class SenderCreationException extends Exception
{
public SenderCreationException(Throwable e)
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]