Author: rgodfrey
Date: Sat May 11 13:07:11 2013
New Revision: 1481321
URL: http://svn.apache.org/r1481321
Log:
QPID-4830 : [JMS AMQP 1.0] Improve error handling in the JMS client
Added:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
(with props)
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
(with props)
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
Added:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java?rev=1481321&view=auto
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
(added)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
Sat May 11 13:07:11 2013
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public class MessageRejectedException extends JMSException
+{
+ public MessageRejectedException(String s)
+ {
+ super(s);
+ }
+}
Propchange:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
Sat May 11 13:07:11 2013
@@ -220,12 +220,14 @@ public class MessageConsumerImpl impleme
return receiveImpl(0L);
}
- private MessageImpl receiveImpl(long timeout) throws IllegalStateException
+ private MessageImpl receiveImpl(long timeout) throws JMSException
{
+
org.apache.qpid.amqp_1_0.client.Message msg;
boolean redelivery;
if(_replaymessages.isEmpty())
{
+ checkReceiverError();
msg = receive0(timeout);
redelivery = false;
}
@@ -242,8 +244,21 @@ public class MessageConsumerImpl impleme
return createJMSMessage(msg, redelivery);
}
+ void checkReceiverError() throws JMSException
+ {
+ final Error receiverError = _receiver.getError();
+ if(receiverError != null)
+ {
+ JMSException jmsException =
+ new JMSException(receiverError.getDescription(),
receiverError.getCondition().toString());
+
+ throw jmsException;
+ }
+ }
+
Message receive0(final long timeout)
{
+
Message message = _receiver.receive(timeout);
if(_session.getAckModeEnum() ==
Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
{
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
Sat May 11 13:07:11 2013
@@ -19,17 +19,21 @@
package org.apache.qpid.amqp_1_0.jms.impl;
import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
import org.apache.qpid.amqp_1_0.client.Sender;
import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
import org.apache.qpid.amqp_1_0.jms.QueueSender;
import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
import javax.jms.*;
import javax.jms.IllegalStateException;
import java.util.UUID;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
public class MessageProducerImpl implements MessageProducer, QueueSender,
TopicPublisher
{
@@ -43,6 +47,8 @@ public class MessageProducerImpl impleme
private SessionImpl _session;
private Sender _sender;
private boolean _closed;
+ private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
+ private long _syncPublishTimeout =
Long.getLong("qpid.sync_publish_timeout", 30000l);
protected MessageProducerImpl(final Destination destination,
final SessionImpl session) throws JMSException
@@ -251,7 +257,28 @@ public class MessageProducerImpl impleme
final org.apache.qpid.amqp_1_0.client.Message clientMessage = new
org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
- _sender.send(clientMessage, _session.getTxn());
+ DispositionAction action = null;
+
+ if(_syncPublish)
+ {
+ action = new DispositionAction(_sender);
+ }
+
+ try
+ {
+ _sender.send(clientMessage, _session.getTxn(), action);
+ }
+ catch (LinkDetachedException e)
+ {
+ JMSException jmsException = new
InvalidDestinationException("Sender has been closed");
+ jmsException.setLinkedException(e);
+ throw jmsException;
+ }
+
+ if(_syncPublish && !action.wasAccepted(_syncPublishTimeout +
System.currentTimeMillis()))
+ {
+ throw new MessageRejectedException("Message was rejected");
+ }
if(getDestination() != null)
{
@@ -377,4 +404,61 @@ public class MessageProducerImpl impleme
{
send(topic, message, deliveryMode, priority, ttl);
}
+
+ private static class DispositionAction implements Sender.OutcomeAction
+ {
+ private final Sender _sender;
+ private final Object _lock;
+ private Outcome _outcome;
+
+ public DispositionAction(Sender sender)
+ {
+ _sender = sender;
+ _lock = sender.getEndpoint().getLock();
+ }
+
+ @Override
+ public void onOutcome(Binary deliveryTag, Outcome outcome)
+ {
+ synchronized (_lock)
+ {
+ _outcome = outcome;
+ _lock.notifyAll();
+ }
+ }
+
+ public boolean wasAccepted(long timeout) throws JMSException
+ {
+ synchronized(_lock)
+ {
+ while(_outcome == null && !_sender.getEndpoint().isDetached())
+ {
+ try
+ {
+ _lock.wait(timeout - System.currentTimeMillis());
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ if(_outcome == null)
+ {
+
+ if(_sender.getEndpoint().isDetached())
+ {
+ throw new JMSException("Link was detached");
+ }
+ else
+ {
+ throw new JMSException("Timed out waiting for message
acceptance");
+ }
+ }
+ else
+ {
+ return _outcome instanceof Accepted;
+ }
+ }
+ }
+ }
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
Sat May 11 13:07:11 2013
@@ -21,10 +21,12 @@ package org.apache.qpid.amqp_1_0.jms.imp
import java.io.Serializable;
import java.util.ArrayList;
import java.util.Enumeration;
+import java.util.Iterator;
import java.util.List;
import java.util.UUID;
import javax.jms.BytesMessage;
import javax.jms.Destination;
+import javax.jms.ExceptionListener;
import javax.jms.IllegalStateException;
import javax.jms.InvalidDestinationException;
import javax.jms.JMSException;
@@ -52,9 +54,11 @@ import org.apache.qpid.amqp_1_0.jms.Temp
import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
import org.apache.qpid.amqp_1_0.jms.TopicSession;
import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
import org.apache.qpid.amqp_1_0.type.messaging.Source;
import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class SessionImpl implements Session, QueueSession, TopicSession
{
@@ -90,6 +94,45 @@ public class SessionImpl implements Sess
jmsException.setLinkedException(e);
throw jmsException;
}
+ _session.getEndpoint().setSessionEventListener(new
SessionEventListener.DefaultSessionEventListener()
+ {
+ @Override
+ public void remoteEnd(End end)
+ {
+ if(!_closed)
+ {
+ try
+ {
+ close();
+ }
+ catch (JMSException e)
+ {
+ }
+ try
+ {
+ final Error error = end.getError();
+ final ExceptionListener exceptionListener =
_connection.getExceptionListener();
+ if(exceptionListener != null)
+ {
+ if(error != null)
+ {
+ exceptionListener.onException(new
JMSException(error.getDescription(),
+ error.getCondition().toString()));
+ }
+ else
+ {
+ exceptionListener.onException(new
JMSException("Session remotely closed"));
+ }
+ }
+ }
+ catch (JMSException e)
+ {
+
+ }
+
+ }
+ }
+ });
if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
{
_txn = _session.createSessionLocalTransaction();
@@ -846,7 +889,28 @@ public class SessionImpl implements Sess
}
}
+ Iterator<MessageConsumerImpl> consumers =
_consumers.iterator();
+ while(consumers.hasNext())
+ {
+ MessageConsumerImpl consumer = consumers.next();
+ try
+ {
+ consumer.checkReceiverError();
+ }
+ catch (JMSException e)
+ {
+ consumers.remove();
+ try
+ {
+
_connection.getExceptionListener().onException(e);
+ consumer.close();
+ }
+ catch (JMSException e1)
+ {
+ }
+ }
+ }
}
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
Sat May 11 13:07:11 2013
@@ -173,7 +173,14 @@ public class Demo extends Util
Section[] sections = { properties, appProperties, amqpValue};
final Message message1 = new Message(Arrays.asList(sections));
- s.send(message1);
+ try
+ {
+ s.send(message1);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
Map<Object, Receiver> receivingLinks = new HashMap<Object,
Receiver>();
@@ -295,7 +302,14 @@ public class Demo extends Util
m2propmap.put(VENDOR, vendor);
ApplicationProperties m2appProps = new
ApplicationProperties(m2propmap);
Message m2 = new Message(Arrays.asList(m2props,
m2appProps, new AmqpValue("AMQP-"+messageId)));
- sender.send(m2);
+ try
+ {
+ sender.send(m2);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
Map m3propmap = new HashMap();
m3propmap.put(OPCODE, LOG);
@@ -307,8 +321,14 @@ public class Demo extends Util
Message m3 = new Message(Arrays.asList(new
ApplicationProperties(m3propmap),
new
AmqpValue("AMQP-"+messageId)));
- s.send(m3);
-
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
}
responseReceiver.acknowledge(m);
@@ -336,7 +356,14 @@ public class Demo extends Util
Message m3 = new Message(Arrays.asList(new
ApplicationProperties(m3propmap),
new
AmqpValue("AMQP-"+mp.getMessageId())));
- s.send(m3);
+ try
+ {
+ s.send(m3);
+ }
+ catch (Exception e)
+ {
+ throw new RuntimeException(e);
+ }
entry.getValue().acknowledge(m);
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
Sat May 11 13:07:11 2013
@@ -121,14 +121,7 @@ public class Dump extends Util
session.close();
conn.close();
- } catch (ConnectionException e)
- {
- e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
- } catch (Sender.SenderClosingException e)
+ } catch (Exception e)
{
e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
}
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
Sat May 11 13:07:11 2013
@@ -248,27 +248,10 @@ public class Filesender extends Util
session.close();
conn.close();
}
- catch (ConnectionException e)
+ catch (Exception e)
{
e.printStackTrace();
}
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace();
- } catch (FileNotFoundException e)
- {
- e.printStackTrace();
- } catch (IOException e)
- {
- e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
- } catch (NoSuchAlgorithmException e)
- {
- e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
- } catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //To change body of catch statement use File
| Settings | File Templates.
- }
-
}
private Message createMessageFromFile(MessageDigest md5, String fileName,
File file) throws IOException
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
Sat May 11 13:07:11 2013
@@ -216,23 +216,10 @@ public class Request extends Util
conn2.close();
}
}
- catch (ConnectionException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (AmqpErrorException e)
- {
- e.printStackTrace(); //TODO.
- }
-
}
protected boolean hasSingleLinkPerConnectionMode()
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
Sat May 11 13:07:11 2013
@@ -274,21 +274,13 @@ public class Respond extends Util
_conn.close();
System.out.println("Received: " + receivedCount);
}
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderClosingException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
}
- private void respond(Message m) throws Sender.SenderCreationException,
ConnectionClosedException
+ private void respond(Message m) throws Sender.SenderCreationException,
ConnectionClosedException, LinkDetachedException
{
List<Section> sections = m.getPayload();
String replyTo = null;
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
Sat May 11 13:07:11 2013
@@ -219,19 +219,10 @@ public class Send extends Util
session.close();
conn.close();
}
- catch (Sender.SenderClosingException e)
+ catch (Exception e)
{
e.printStackTrace(); //TODO.
}
- catch (ConnectionException e)
- {
- e.printStackTrace(); //TODO.
- }
- catch (Sender.SenderCreationException e)
- {
- e.printStackTrace(); //TODO.
- }
-
}
Added:
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java?rev=1481321&view=auto
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
(added)
+++
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
Sat May 11 13:07:11 2013
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class LinkDetachedException extends Exception
+{
+ private final org.apache.qpid.amqp_1_0.type.transport.Error _remoteError;
+
+ public LinkDetachedException(Error remoteError)
+ {
+ super();
+ _remoteError = remoteError;
+ }
+
+ public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError()
+ {
+ return _remoteError;
+ }
+
+}
Propchange:
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified:
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL:
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
---
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
(original)
+++
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
Sat May 11 13:07:11 2013
@@ -22,7 +22,9 @@ package org.apache.qpid.amqp_1_0.client;
import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
import org.apache.qpid.amqp_1_0.type.*;
import org.apache.qpid.amqp_1_0.type.Source;
import org.apache.qpid.amqp_1_0.type.Target;
@@ -35,6 +37,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
public class Sender implements DeliveryStateHandler
{
@@ -44,6 +47,7 @@ public class Sender implements DeliveryS
private int _windowSize;
private Map<Binary, OutcomeAction> _outcomeActions =
Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
private boolean _closed;
+ private Error _error;
public Sender(final Session session, final String linkName, final String
targetAddr, final String sourceAddr)
throws SenderCreationException, ConnectionClosedException
@@ -166,6 +170,17 @@ public class Sender implements DeliveryS
throw new SenderCreationException("Peer did not create remote
endpoint for link, target: " + target.getAddress());
};
}
+
+ _endpoint.setLinkEventListener(new
SendingLinkListener.DefaultLinkEventListener()
+ {
+
+ @Override
+ public void remoteDetached(final LinkEndpoint endpoint, final
Detach detach)
+ {
+ _error = detach.getError();
+ super.remoteDetached(endpoint, detach);
+ }
+ });
}
public Source getSource()
@@ -178,22 +193,22 @@ public class Sender implements DeliveryS
return _endpoint.getTarget();
}
- public void send(Message message)
+ public void send(Message message) throws LinkDetachedException
{
send(message, null, null);
}
- public void send(Message message, final OutcomeAction action)
+ public void send(Message message, final OutcomeAction action) throws
LinkDetachedException
{
send(message, null, action);
}
- public void send(Message message, final Transaction txn)
+ public void send(Message message, final Transaction txn) throws
LinkDetachedException
{
send(message, txn, null);
}
- public void send(Message message, final Transaction txn, OutcomeAction
action)
+ public void send(Message message, final Transaction txn, OutcomeAction
action) throws LinkDetachedException
{
List<Section> sections = message.getPayload();
@@ -245,7 +260,7 @@ public class Sender implements DeliveryS
final Object lock = _endpoint.getLock();
synchronized(lock)
{
- while(!_endpoint.hasCreditToSend())
+ while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
{
try
{
@@ -256,6 +271,10 @@ public class Sender implements DeliveryS
e.printStackTrace(); //To change body of catch statement
use File | Settings | File Templates.
}
}
+ if(_endpoint.isDetached())
+ {
+ throw new LinkDetachedException(_error);
+ }
if(action != null)
{
_outcomeActions.put(message.getDeliveryTag(), action);
@@ -352,6 +371,21 @@ public class Sender implements DeliveryS
_endpoint.updateDisposition(deliveryTag, state, true);
}
}
+ else if(state instanceof TransactionalState)
+ {
+ OutcomeAction action;
+
+ if((action = _outcomeActions.remove(deliveryTag)) != null)
+ {
+ action.onOutcome(deliveryTag, ((TransactionalState)
state).getOutcome());
+ }
+
+ }
+ }
+
+ public SendingLinkEndpoint getEndpoint()
+ {
+ return _endpoint;
}
public Map<Binary, DeliveryState> getRemoteUnsettled()
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]