Author: rgodfrey
Date: Sat May 11 13:07:11 2013
New Revision: 1481321

URL: http://svn.apache.org/r1481321
Log:
QPID-4830 : [JMS AMQP 1.0] Improve error handling in the JMS client

Added:
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
   (with props)
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
   (with props)
Modified:
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
    
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
    
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
    
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
    
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
    
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
    
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
    
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java

Added: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java?rev=1481321&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
 (added)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
 Sat May 11 13:07:11 2013
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.amqp_1_0.jms;
+
+import javax.jms.JMSException;
+
+public class MessageRejectedException extends JMSException
+{
+    public MessageRejectedException(String s)
+    {
+        super(s);
+    }
+}

Propchange: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/MessageRejectedException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageConsumerImpl.java
 Sat May 11 13:07:11 2013
@@ -220,12 +220,14 @@ public class MessageConsumerImpl impleme
         return receiveImpl(0L);
     }
 
-    private MessageImpl receiveImpl(long timeout) throws IllegalStateException
+    private MessageImpl receiveImpl(long timeout) throws JMSException
     {
+
         org.apache.qpid.amqp_1_0.client.Message msg;
         boolean redelivery;
         if(_replaymessages.isEmpty())
         {
+            checkReceiverError();
             msg = receive0(timeout);
             redelivery = false;
         }
@@ -242,8 +244,21 @@ public class MessageConsumerImpl impleme
         return createJMSMessage(msg, redelivery);
     }
 
+    void checkReceiverError() throws JMSException
+    {
+        final Error receiverError = _receiver.getError();
+        if(receiverError != null)
+        {
+            JMSException jmsException =
+                    new JMSException(receiverError.getDescription(), 
receiverError.getCondition().toString());
+
+            throw jmsException;
+        }
+    }
+
     Message receive0(final long timeout)
     {
+
         Message message = _receiver.receive(timeout);
         if(_session.getAckModeEnum() == 
Session.AcknowledgeMode.CLIENT_ACKNOWLEDGE)
         {

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/MessageProducerImpl.java
 Sat May 11 13:07:11 2013
@@ -19,17 +19,21 @@
 package org.apache.qpid.amqp_1_0.jms.impl;
 
 import org.apache.qpid.amqp_1_0.client.ConnectionClosedException;
+import org.apache.qpid.amqp_1_0.client.LinkDetachedException;
 import org.apache.qpid.amqp_1_0.client.Sender;
 import org.apache.qpid.amqp_1_0.jms.MessageProducer;
+import org.apache.qpid.amqp_1_0.jms.MessageRejectedException;
 import org.apache.qpid.amqp_1_0.jms.QueueSender;
 import org.apache.qpid.amqp_1_0.jms.TemporaryDestination;
 import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
 import org.apache.qpid.amqp_1_0.type.Binary;
+import org.apache.qpid.amqp_1_0.type.Outcome;
 import org.apache.qpid.amqp_1_0.type.UnsignedInteger;
 
 import javax.jms.*;
 import javax.jms.IllegalStateException;
 import java.util.UUID;
+import org.apache.qpid.amqp_1_0.type.messaging.Accepted;
 
 public class MessageProducerImpl implements MessageProducer, QueueSender, 
TopicPublisher
 {
@@ -43,6 +47,8 @@ public class MessageProducerImpl impleme
     private SessionImpl _session;
     private Sender _sender;
     private boolean _closed;
+    private boolean _syncPublish = Boolean.getBoolean("qpid.sync_publish");
+    private long _syncPublishTimeout = 
Long.getLong("qpid.sync_publish_timeout", 30000l);
 
     protected MessageProducerImpl(final Destination destination,
                                final SessionImpl session) throws JMSException
@@ -251,7 +257,28 @@ public class MessageProducerImpl impleme
 
         final org.apache.qpid.amqp_1_0.client.Message clientMessage = new 
org.apache.qpid.amqp_1_0.client.Message(msg.getSections());
 
-        _sender.send(clientMessage, _session.getTxn());
+        DispositionAction action = null;
+
+        if(_syncPublish)
+        {
+            action = new DispositionAction(_sender);
+        }
+
+        try
+        {
+            _sender.send(clientMessage, _session.getTxn(), action);
+        }
+        catch (LinkDetachedException e)
+        {
+            JMSException jmsException = new 
InvalidDestinationException("Sender has been closed");
+            jmsException.setLinkedException(e);
+            throw jmsException;
+        }
+
+        if(_syncPublish && !action.wasAccepted(_syncPublishTimeout + 
System.currentTimeMillis()))
+        {
+            throw new MessageRejectedException("Message was rejected");
+        }
 
         if(getDestination() != null)
         {
@@ -377,4 +404,61 @@ public class MessageProducerImpl impleme
     {
         send(topic, message, deliveryMode, priority, ttl);
     }
+
+    private static class DispositionAction implements Sender.OutcomeAction
+    {
+        private final Sender _sender;
+        private final Object _lock;
+        private Outcome _outcome;
+
+        public DispositionAction(Sender sender)
+        {
+            _sender = sender;
+            _lock = sender.getEndpoint().getLock();
+        }
+
+        @Override
+        public void onOutcome(Binary deliveryTag, Outcome outcome)
+        {
+            synchronized (_lock)
+            {
+                _outcome = outcome;
+                _lock.notifyAll();
+            }
+        }
+
+        public boolean wasAccepted(long timeout) throws JMSException
+        {
+            synchronized(_lock)
+            {
+                while(_outcome == null && !_sender.getEndpoint().isDetached())
+                {
+                    try
+                    {
+                        _lock.wait(timeout - System.currentTimeMillis());
+                    }
+                    catch (InterruptedException e)
+                    {
+                        Thread.currentThread().interrupt();
+                    }
+                }
+                if(_outcome == null)
+                {
+
+                    if(_sender.getEndpoint().isDetached())
+                    {
+                        throw new JMSException("Link was detached");
+                    }
+                    else
+                    {
+                        throw new JMSException("Timed out waiting for message 
acceptance");
+                    }
+                }
+                else
+                {
+                    return _outcome instanceof Accepted;
+                }
+            }
+        }
+    }
 }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client-jms/src/main/java/org/apache/qpid/amqp_1_0/jms/impl/SessionImpl.java
 Sat May 11 13:07:11 2013
@@ -21,10 +21,12 @@ package org.apache.qpid.amqp_1_0.jms.imp
 import java.io.Serializable;
 import java.util.ArrayList;
 import java.util.Enumeration;
+import java.util.Iterator;
 import java.util.List;
 import java.util.UUID;
 import javax.jms.BytesMessage;
 import javax.jms.Destination;
+import javax.jms.ExceptionListener;
 import javax.jms.IllegalStateException;
 import javax.jms.InvalidDestinationException;
 import javax.jms.JMSException;
@@ -52,9 +54,11 @@ import org.apache.qpid.amqp_1_0.jms.Temp
 import org.apache.qpid.amqp_1_0.jms.TopicPublisher;
 import org.apache.qpid.amqp_1_0.jms.TopicSession;
 import org.apache.qpid.amqp_1_0.jms.TopicSubscriber;
+import org.apache.qpid.amqp_1_0.transport.SessionEventListener;
 import org.apache.qpid.amqp_1_0.type.messaging.Source;
 import org.apache.qpid.amqp_1_0.type.messaging.Target;
-import org.apache.qpid.amqp_1_0.type.transport.AmqpError;
+import org.apache.qpid.amqp_1_0.type.transport.*;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
 
 public class SessionImpl implements Session, QueueSession, TopicSession
 {
@@ -90,6 +94,45 @@ public class SessionImpl implements Sess
             jmsException.setLinkedException(e);
             throw jmsException;
         }
+        _session.getEndpoint().setSessionEventListener(new 
SessionEventListener.DefaultSessionEventListener()
+        {
+            @Override
+            public void remoteEnd(End end)
+            {
+                if(!_closed)
+                {
+                    try
+                    {
+                        close();
+                    }
+                    catch (JMSException e)
+                    {
+                    }
+                    try
+                    {
+                        final Error error = end.getError();
+                        final ExceptionListener exceptionListener = 
_connection.getExceptionListener();
+                        if(exceptionListener != null)
+                        {
+                            if(error != null)
+                            {
+                                exceptionListener.onException(new 
JMSException(error.getDescription(),
+                                        error.getCondition().toString()));
+                            }
+                            else
+                            {
+                                exceptionListener.onException(new 
JMSException("Session remotely closed"));
+                            }
+                        }
+                    }
+                    catch (JMSException e)
+                    {
+
+                    }
+
+                }
+            }
+        });
         if(_acknowledgeMode == AcknowledgeMode.SESSION_TRANSACTED)
         {
             _txn = _session.createSessionLocalTransaction();
@@ -846,7 +889,28 @@ public class SessionImpl implements Sess
                         }
 
                     }
+                    Iterator<MessageConsumerImpl> consumers = 
_consumers.iterator();
+                    while(consumers.hasNext())
+                    {
+                        MessageConsumerImpl consumer = consumers.next();
+                        try
+                        {
+                            consumer.checkReceiverError();
+                        }
+                        catch (JMSException e)
+                        {
 
+                            consumers.remove();
+                            try
+                            {
+                                
_connection.getExceptionListener().onException(e);
+                                consumer.close();
+                            }
+                            catch (JMSException e1)
+                            {
+                            }
+                        }
+                    }
 
                 }
             }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Demo.java
 Sat May 11 13:07:11 2013
@@ -173,7 +173,14 @@ public class Demo extends Util
             Section[] sections = { properties, appProperties, amqpValue};
             final Message message1 = new Message(Arrays.asList(sections));
 
-            s.send(message1);
+            try
+            {
+                s.send(message1);
+            }
+            catch (Exception e)
+            {
+                throw new RuntimeException(e);
+            }
 
             Map<Object, Sender> sendingLinks = new HashMap<Object, Sender>();
             Map<Object, Receiver> receivingLinks = new HashMap<Object, 
Receiver>();
@@ -295,7 +302,14 @@ public class Demo extends Util
                         m2propmap.put(VENDOR, vendor);
                         ApplicationProperties m2appProps = new 
ApplicationProperties(m2propmap);
                         Message m2 = new Message(Arrays.asList(m2props, 
m2appProps, new AmqpValue("AMQP-"+messageId)));
-                        sender.send(m2);
+                        try
+                        {
+                            sender.send(m2);
+                        }
+                        catch (Exception e)
+                        {
+                            throw new RuntimeException(e);
+                        }
 
                         Map m3propmap = new HashMap();
                         m3propmap.put(OPCODE, LOG);
@@ -307,8 +321,14 @@ public class Demo extends Util
 
                         Message m3 = new Message(Arrays.asList(new 
ApplicationProperties(m3propmap),
                                                                new 
AmqpValue("AMQP-"+messageId)));
-                        s.send(m3);
-
+                        try
+                        {
+                            s.send(m3);
+                        }
+                        catch (Exception e)
+                        {
+                            throw new RuntimeException(e);
+                        }
                     }
 
                     responseReceiver.acknowledge(m);
@@ -336,7 +356,14 @@ public class Demo extends Util
 
                             Message m3 = new Message(Arrays.asList(new 
ApplicationProperties(m3propmap),
                                                                    new 
AmqpValue("AMQP-"+mp.getMessageId())));
-                            s.send(m3);
+                            try
+                            {
+                                s.send(m3);
+                            }
+                            catch (Exception e)
+                            {
+                                throw new RuntimeException(e);
+                            }
 
                             entry.getValue().acknowledge(m);
                         }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Dump.java
 Sat May 11 13:07:11 2013
@@ -121,14 +121,7 @@ public class Dump extends Util
             session.close();
             conn.close();
 
-        } catch (ConnectionException e)
-        {
-            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
-        }
-        catch (Sender.SenderCreationException e)
-        {
-            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
-        } catch (Sender.SenderClosingException e)
+        } catch (Exception e)
         {
             e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
         }

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Filesender.java
 Sat May 11 13:07:11 2013
@@ -248,27 +248,10 @@ public class Filesender extends Util
             session.close();
             conn.close();
         }
-        catch (ConnectionException e)
+        catch (Exception e)
         {
             e.printStackTrace();
         }
-        catch (Sender.SenderCreationException e)
-        {
-            e.printStackTrace();
-        } catch (FileNotFoundException e)
-        {
-            e.printStackTrace();
-        } catch (IOException e)
-        {
-            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
-        } catch (NoSuchAlgorithmException e)
-        {
-            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
-        } catch (Sender.SenderClosingException e)
-        {
-            e.printStackTrace();  //To change body of catch statement use File 
| Settings | File Templates.
-        }
-
     }
 
     private Message createMessageFromFile(MessageDigest md5, String fileName, 
File file) throws IOException

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Request.java
 Sat May 11 13:07:11 2013
@@ -216,23 +216,10 @@ public class Request extends Util
                 conn2.close();
             }
         }
-        catch (ConnectionException e)
+        catch (Exception e)
         {
             e.printStackTrace();  //TODO.
         }
-        catch (Sender.SenderClosingException e)
-        {
-            e.printStackTrace();  //TODO.
-        }
-        catch (Sender.SenderCreationException e)
-        {
-            e.printStackTrace();  //TODO.
-        }
-        catch (AmqpErrorException e)
-        {
-            e.printStackTrace();  //TODO.
-        }
-
     }
 
     protected boolean hasSingleLinkPerConnectionMode()

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Respond.java
 Sat May 11 13:07:11 2013
@@ -274,21 +274,13 @@ public class Respond extends Util
             _conn.close();
             System.out.println("Received: " + receivedCount);
         }
-        catch (ConnectionException e)
-        {
-            e.printStackTrace();  //TODO.
-        }
-        catch (Sender.SenderClosingException e)
-        {
-            e.printStackTrace();  //TODO.
-        }
-        catch (Sender.SenderCreationException e)
+        catch (Exception e)
         {
             e.printStackTrace();  //TODO.
         }
     }
 
-    private void respond(Message m) throws Sender.SenderCreationException, 
ConnectionClosedException
+    private void respond(Message m) throws Sender.SenderCreationException, 
ConnectionClosedException, LinkDetachedException
     {
         List<Section> sections = m.getPayload();
         String replyTo = null;

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/example/src/main/java/org/apache/qpid/amqp_1_0/client/Send.java
 Sat May 11 13:07:11 2013
@@ -219,19 +219,10 @@ public class Send extends Util
             session.close();
             conn.close();
         }
-        catch (Sender.SenderClosingException e)
+        catch (Exception e)
         {
             e.printStackTrace();  //TODO.
         }
-        catch (ConnectionException e)
-        {
-            e.printStackTrace();  //TODO.
-        }
-        catch (Sender.SenderCreationException e)
-        {
-            e.printStackTrace();  //TODO.
-        }
-
 
     }
 

Added: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java?rev=1481321&view=auto
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
 (added)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
 Sat May 11 13:07:11 2013
@@ -0,0 +1,40 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.amqp_1_0.client;
+
+import org.apache.qpid.amqp_1_0.type.transport.Error;
+
+public class LinkDetachedException extends Exception
+{
+    private final org.apache.qpid.amqp_1_0.type.transport.Error _remoteError;
+
+    public LinkDetachedException(Error remoteError)
+    {
+        super();
+        _remoteError = remoteError;
+    }
+
+    public org.apache.qpid.amqp_1_0.type.transport.Error getRemoteError()
+    {
+        return _remoteError;
+    }
+
+}

Propchange: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/LinkDetachedException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Modified: 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
URL: 
http://svn.apache.org/viewvc/qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java?rev=1481321&r1=1481320&r2=1481321&view=diff
==============================================================================
--- 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
 (original)
+++ 
qpid/trunk/qpid/java/amqp-1-0-client/src/main/java/org/apache/qpid/amqp_1_0/client/Sender.java
 Sat May 11 13:07:11 2013
@@ -22,7 +22,9 @@ package org.apache.qpid.amqp_1_0.client;
 
 import org.apache.qpid.amqp_1_0.messaging.SectionEncoder;
 import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler;
+import org.apache.qpid.amqp_1_0.transport.LinkEndpoint;
 import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint;
+import org.apache.qpid.amqp_1_0.transport.SendingLinkListener;
 import org.apache.qpid.amqp_1_0.type.*;
 import org.apache.qpid.amqp_1_0.type.Source;
 import org.apache.qpid.amqp_1_0.type.Target;
@@ -35,6 +37,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import org.apache.qpid.amqp_1_0.type.transport.Error;
 
 public class Sender implements DeliveryStateHandler
 {
@@ -44,6 +47,7 @@ public class Sender implements DeliveryS
     private int _windowSize;
     private Map<Binary, OutcomeAction> _outcomeActions = 
Collections.synchronizedMap(new HashMap<Binary, OutcomeAction>());
     private boolean _closed;
+    private Error _error;
 
     public Sender(final Session session, final String linkName, final String 
targetAddr, final String sourceAddr)
             throws SenderCreationException, ConnectionClosedException
@@ -166,6 +170,17 @@ public class Sender implements DeliveryS
                 throw new SenderCreationException("Peer did not create remote 
endpoint for link, target: " + target.getAddress());
             };
         }
+
+        _endpoint.setLinkEventListener(new 
SendingLinkListener.DefaultLinkEventListener()
+        {
+
+            @Override
+            public void remoteDetached(final LinkEndpoint endpoint, final 
Detach detach)
+            {
+                _error = detach.getError();
+                super.remoteDetached(endpoint, detach);
+            }
+        });
     }
 
     public Source getSource()
@@ -178,22 +193,22 @@ public class Sender implements DeliveryS
         return _endpoint.getTarget();
     }
 
-    public void send(Message message)
+    public void send(Message message) throws LinkDetachedException
     {
         send(message, null, null);
     }
 
-    public void send(Message message, final OutcomeAction action)
+    public void send(Message message, final OutcomeAction action) throws 
LinkDetachedException
     {
         send(message, null, action);
     }
 
-    public void send(Message message, final Transaction txn)
+    public void send(Message message, final Transaction txn) throws 
LinkDetachedException
     {
         send(message, txn, null);
     }
 
-    public void send(Message message, final Transaction txn, OutcomeAction 
action)
+    public void send(Message message, final Transaction txn, OutcomeAction 
action) throws LinkDetachedException
     {
 
         List<Section> sections = message.getPayload();
@@ -245,7 +260,7 @@ public class Sender implements DeliveryS
         final Object lock = _endpoint.getLock();
         synchronized(lock)
         {
-            while(!_endpoint.hasCreditToSend())
+            while(!_endpoint.hasCreditToSend() && !_endpoint.isDetached())
             {
                 try
                 {
@@ -256,6 +271,10 @@ public class Sender implements DeliveryS
                     e.printStackTrace();  //To change body of catch statement 
use File | Settings | File Templates.
                 }
             }
+            if(_endpoint.isDetached())
+            {
+                throw new LinkDetachedException(_error);
+            }
             if(action != null)
             {
                 _outcomeActions.put(message.getDeliveryTag(), action);
@@ -352,6 +371,21 @@ public class Sender implements DeliveryS
                 _endpoint.updateDisposition(deliveryTag, state, true);
             }
         }
+        else if(state instanceof TransactionalState)
+        {
+            OutcomeAction action;
+
+            if((action = _outcomeActions.remove(deliveryTag)) != null)
+            {
+                action.onOutcome(deliveryTag, ((TransactionalState) 
state).getOutcome());
+            }
+
+        }
+    }
+
+    public SendingLinkEndpoint getEndpoint()
+    {
+        return _endpoint;
     }
 
     public Map<Binary, DeliveryState> getRemoteUnsettled()



---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to