Repository: qpid-jms Updated Branches: refs/heads/master 45221b2ec -> dfa2b6739
QPIDJMS-195: Ensure any errors also trip the relevant request failure. Ensure the executor shutdown is attempted during close. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/dfa2b673 Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/dfa2b673 Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/dfa2b673 Branch: refs/heads/master Commit: dfa2b67394b31712ff58c0fcd3725b1c13094b64 Parents: 45221b2 Author: Robert Gemmell <[email protected]> Authored: Wed Aug 3 18:28:43 2016 +0100 Committer: Robert Gemmell <[email protected]> Committed: Wed Aug 3 18:28:43 2016 +0100 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 73 +++++++++--------- .../jms/provider/amqp/AmqpProviderTest.java | 78 ++++++++++++++++++++ 2 files changed, 117 insertions(+), 34 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/dfa2b673/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index b2ece7f..3bfe099 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -225,15 +225,17 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP } catch (IOException e) { LOG.warn("Error caught while closing Provider: {}", e.getMessage() != null ? e.getMessage() : "<Unknown Error>"); } finally { - if (transport != null) { - try { - transport.close(); - } catch (Exception e) { - LOG.debug("Caught exception while closing down Transport: {}", e.getMessage()); + try { + if (transport != null) { + try { + transport.close(); + } catch (Exception e) { + LOG.debug("Caught exception while closing down Transport: {}", e.getMessage()); + } } + } finally { + serializer.shutdown(); } - - serializer.shutdown(); } } } @@ -338,8 +340,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP }); pumpToProtonTransport(request); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -365,8 +367,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP }); pumpToProtonTransport(request); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -392,8 +394,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP }); pumpToProtonTransport(request); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -448,8 +450,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP }); pumpToProtonTransport(request); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -480,8 +482,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP if (couldSend && envelope.isSendAsync()) { request.onSuccess(); } - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -500,8 +502,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP amqpSession.acknowledge(ackType); pumpToProtonTransport(request); request.onSuccess(); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -536,8 +538,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP pumpToProtonTransport(request); request.onSuccess(); } - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -555,8 +557,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP AmqpSession session = connection.getSession(transactionInfo.getSessionId()); session.commit(transactionInfo, request); pumpToProtonTransport(request); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -574,8 +576,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP AmqpSession session = connection.getSession(transactionInfo.getSessionId()); session.rollback(transactionInfo, request); pumpToProtonTransport(request); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -594,8 +596,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP session.recover(); pumpToProtonTransport(request); request.onSuccess(); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -612,8 +614,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP checkClosed(); connection.unsubscribe(subscription, request); pumpToProtonTransport(request); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -639,8 +641,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP consumer.pull(timeout, request); pumpToProtonTransport(request); - } catch (Exception error) { - request.onFailure(error); + } catch (Throwable t) { + request.onFailure(t); } } }); @@ -826,9 +828,12 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP // We have to do this to pump SASL bytes in as SASL is not event driven yet. processSaslAuthentication(); - } catch (Exception ex) { - LOG.warn("Caught Exception during update processing: {}", ex.getMessage(), ex); - fireProviderException(ex); + } catch (Throwable t) { + try { + LOG.warn("Caught problem during update processing: {}", t.getMessage(), t); + } finally { + fireProviderException(t); + } } } http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/dfa2b673/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java index 8f7258c..8b6ebcb 100644 --- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java +++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/AmqpProviderTest.java @@ -17,17 +17,23 @@ package org.apache.qpid.jms.provider.amqp; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; import java.io.IOException; import java.net.URI; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import javax.jms.JMSException; +import org.apache.qpid.jms.meta.JmsAbstractResourceId; import org.apache.qpid.jms.meta.JmsConnectionId; import org.apache.qpid.jms.meta.JmsConnectionInfo; +import org.apache.qpid.jms.meta.JmsResource; +import org.apache.qpid.jms.meta.JmsResourceId; +import org.apache.qpid.jms.meta.JmsResourceVistor; import org.apache.qpid.jms.provider.ProviderFuture; import org.apache.qpid.jms.test.QpidJmsTestCase; import org.apache.qpid.jms.test.testpeer.TestAmqpPeer; @@ -185,4 +191,76 @@ public class AmqpProviderTest extends QpidJmsTestCase { return connectionInfo; } + + private enum Op { + CREATE, START, STOP, DESTROY + } + + @Test(timeout = 20000) + public void testErrorDuringCreateResourceFailsRequest() throws IOException, JMSException { + doErrorDuringOperationFailsRequesTTestImpl(Op.CREATE); + } + + @Test(timeout = 20000) + public void testErrorDuringStartResourceFailsRequest() throws IOException, JMSException { + doErrorDuringOperationFailsRequesTTestImpl(Op.START); + } + + @Test(timeout = 20000) + public void testErrorDuringStopResourceFailsRequest() throws IOException, JMSException { + doErrorDuringOperationFailsRequesTTestImpl(Op.STOP); + } + + @Test(timeout = 20000) + public void testErrorDuringDestroyResourceFailsRequest() throws IOException, JMSException { + doErrorDuringOperationFailsRequesTTestImpl(Op.DESTROY); + } + + private void doErrorDuringOperationFailsRequesTTestImpl(Op operation) throws IOException, JMSException { + provider = new AmqpProvider(peerURI); + + final AtomicBoolean errorThrown = new AtomicBoolean(); + JmsResource resourceInfo = new JmsResource() { + @Override + public void visit(JmsResourceVistor visitor) { + errorThrown.set(true); + throw new Error("Deliberate error for testing"); + } + + @Override + public JmsResourceId getId() { + return new JmsAbstractResourceId() { + }; + } + }; + + assertFalse("Error should not yet be thrown", errorThrown.get()); + ProviderFuture request = new ProviderFuture(); + + switch(operation) { + case CREATE: + provider.create(resourceInfo, request); + break; + case START: + provider.start(resourceInfo, request); + break; + case STOP: + provider.stop(resourceInfo, request); + break; + case DESTROY: + provider.destroy(resourceInfo, request); + break; + default: + throw new IllegalArgumentException("Unexpected operation given"); + } + + try { + request.sync(); + fail("Request should have failed"); + } catch (IOException e) { + // Expected + } + + assertTrue("Error should have been thrown", errorThrown.get()); + } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
