Repository: qpid-jms
Updated Branches:
  refs/heads/master 45221b2ec -> dfa2b6739


QPIDJMS-195: Ensure any errors also trip the relevant request failure. Ensure 
the executor shutdown is attempted during close.


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/dfa2b673
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/dfa2b673
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/dfa2b673

Branch: refs/heads/master
Commit: dfa2b67394b31712ff58c0fcd3725b1c13094b64
Parents: 45221b2
Author: Robert Gemmell <[email protected]>
Authored: Wed Aug 3 18:28:43 2016 +0100
Committer: Robert Gemmell <[email protected]>
Committed: Wed Aug 3 18:28:43 2016 +0100

----------------------------------------------------------------------
 .../qpid/jms/provider/amqp/AmqpProvider.java    | 73 +++++++++---------
 .../jms/provider/amqp/AmqpProviderTest.java     | 78 ++++++++++++++++++++
 2 files changed, 117 insertions(+), 34 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/dfa2b673/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
index b2ece7f..3bfe099 100644
--- 
a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
+++ 
b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java
@@ -225,15 +225,17 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
             } catch (IOException e) {
                 LOG.warn("Error caught while closing Provider: {}", 
e.getMessage() != null ? e.getMessage() : "<Unknown Error>");
             } finally {
-                if (transport != null) {
-                    try {
-                        transport.close();
-                    } catch (Exception e) {
-                        LOG.debug("Caught exception while closing down 
Transport: {}", e.getMessage());
+                try {
+                    if (transport != null) {
+                        try {
+                            transport.close();
+                        } catch (Exception e) {
+                            LOG.debug("Caught exception while closing down 
Transport: {}", e.getMessage());
+                        }
                     }
+                } finally {
+                    serializer.shutdown();
                 }
-
-                serializer.shutdown();
             }
         }
     }
@@ -338,8 +340,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     });
 
                     pumpToProtonTransport(request);
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -365,8 +367,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     });
 
                     pumpToProtonTransport(request);
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -392,8 +394,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     });
 
                     pumpToProtonTransport(request);
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -448,8 +450,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     });
 
                     pumpToProtonTransport(request);
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -480,8 +482,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     if (couldSend && envelope.isSendAsync()) {
                         request.onSuccess();
                     }
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -500,8 +502,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     amqpSession.acknowledge(ackType);
                     pumpToProtonTransport(request);
                     request.onSuccess();
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -536,8 +538,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                         pumpToProtonTransport(request);
                         request.onSuccess();
                     }
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -555,8 +557,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     AmqpSession session = 
connection.getSession(transactionInfo.getSessionId());
                     session.commit(transactionInfo, request);
                     pumpToProtonTransport(request);
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -574,8 +576,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     AmqpSession session = 
connection.getSession(transactionInfo.getSessionId());
                     session.rollback(transactionInfo, request);
                     pumpToProtonTransport(request);
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -594,8 +596,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     session.recover();
                     pumpToProtonTransport(request);
                     request.onSuccess();
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -612,8 +614,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
                     checkClosed();
                     connection.unsubscribe(subscription, request);
                     pumpToProtonTransport(request);
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -639,8 +641,8 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
 
                     consumer.pull(timeout, request);
                     pumpToProtonTransport(request);
-                } catch (Exception error) {
-                    request.onFailure(error);
+                } catch (Throwable t) {
+                    request.onFailure(t);
                 }
             }
         });
@@ -826,9 +828,12 @@ public class AmqpProvider implements Provider, 
TransportListener , AmqpResourceP
 
             // We have to do this to pump SASL bytes in as SASL is not event 
driven yet.
             processSaslAuthentication();
-        } catch (Exception ex) {
-            LOG.warn("Caught Exception during update processing: {}", 
ex.getMessage(), ex);
-            fireProviderException(ex);
+        } catch (Throwable t) {
+            try {
+                LOG.warn("Caught problem during update processing: {}", 
t.getMessage(), t);
+            } finally {
+                fireProviderException(t);
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/dfa2b673/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
----------------------------------------------------------------------
diff --git 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
index 8f7258c..8b6ebcb 100644
--- 
a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
+++ 
b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java
@@ -17,17 +17,23 @@
 package org.apache.qpid.jms.provider.amqp;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.io.IOException;
 import java.net.URI;
 import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.JMSException;
 
+import org.apache.qpid.jms.meta.JmsAbstractResourceId;
 import org.apache.qpid.jms.meta.JmsConnectionId;
 import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsResourceId;
+import org.apache.qpid.jms.meta.JmsResourceVistor;
 import org.apache.qpid.jms.provider.ProviderFuture;
 import org.apache.qpid.jms.test.QpidJmsTestCase;
 import org.apache.qpid.jms.test.testpeer.TestAmqpPeer;
@@ -185,4 +191,76 @@ public class AmqpProviderTest extends QpidJmsTestCase {
 
         return connectionInfo;
     }
+
+    private enum Op {
+        CREATE, START, STOP, DESTROY
+    }
+
+    @Test(timeout = 20000)
+    public void testErrorDuringCreateResourceFailsRequest() throws 
IOException, JMSException {
+        doErrorDuringOperationFailsRequesTTestImpl(Op.CREATE);
+    }
+
+    @Test(timeout = 20000)
+    public void testErrorDuringStartResourceFailsRequest() throws IOException, 
JMSException {
+        doErrorDuringOperationFailsRequesTTestImpl(Op.START);
+    }
+
+    @Test(timeout = 20000)
+    public void testErrorDuringStopResourceFailsRequest() throws IOException, 
JMSException {
+        doErrorDuringOperationFailsRequesTTestImpl(Op.STOP);
+    }
+
+    @Test(timeout = 20000)
+    public void testErrorDuringDestroyResourceFailsRequest() throws 
IOException, JMSException {
+        doErrorDuringOperationFailsRequesTTestImpl(Op.DESTROY);
+    }
+
+    private void doErrorDuringOperationFailsRequesTTestImpl(Op operation) 
throws IOException, JMSException {
+        provider = new AmqpProvider(peerURI);
+
+        final AtomicBoolean errorThrown = new AtomicBoolean();
+        JmsResource resourceInfo = new JmsResource() {
+            @Override
+            public void visit(JmsResourceVistor visitor) {
+                errorThrown.set(true);
+                throw new Error("Deliberate error for testing");
+            }
+
+            @Override
+            public JmsResourceId getId() {
+                return new JmsAbstractResourceId() {
+                };
+            }
+        };
+
+        assertFalse("Error should not yet be thrown", errorThrown.get());
+        ProviderFuture request = new ProviderFuture();
+
+        switch(operation) {
+        case CREATE:
+            provider.create(resourceInfo, request);
+            break;
+        case START:
+            provider.start(resourceInfo, request);
+            break;
+        case STOP:
+            provider.stop(resourceInfo, request);
+            break;
+        case DESTROY:
+            provider.destroy(resourceInfo, request);
+            break;
+        default:
+            throw new IllegalArgumentException("Unexpected operation given");
+        }
+
+        try {
+            request.sync();
+            fail("Request should have failed");
+        } catch (IOException e) {
+            // Expected
+        }
+
+        assertTrue("Error should have been thrown", errorThrown.get());
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to