Repository: qpid-jms Updated Branches: refs/heads/master fa1334718 -> a21542cca
QPIDJMS-401 Clean up some older code in the AMQP provider Remove some now unused code paths and tests that no longer apply to how we manage consumer and producer resources. Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/a21542cc Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/a21542cc Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/a21542cc Branch: refs/heads/master Commit: a21542cca7ca3098f03e9f7441fea5d23a1cd42e Parents: fa13347 Author: Timothy Bish <tabish...@gmail.com> Authored: Thu Jul 12 12:33:52 2018 -0400 Committer: Timothy Bish <tabish...@gmail.com> Committed: Thu Jul 12 12:33:52 2018 -0400 ---------------------------------------------------------------------- .../qpid/jms/provider/amqp/AmqpProvider.java | 36 ++++---------------- .../qpid/jms/provider/amqp/AmqpSession.java | 11 +++--- 2 files changed, 10 insertions(+), 37 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a21542cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java index 890ebdb..a4dbc39 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpProvider.java @@ -89,7 +89,6 @@ import org.slf4j.LoggerFactory; import io.netty.buffer.ByteBuf; import io.netty.buffer.ByteBufUtil; -import io.netty.util.ReferenceCountUtil; /** * An AMQP v1.0 Provider. @@ -633,17 +632,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP public void run() { try { checkClosedOrFailed(); - JmsProducerId producerId = envelope.getProducerId(); - AmqpProducer producer = null; - - if (producerId.getProviderHint() instanceof AmqpFixedProducer) { - producer = (AmqpFixedProducer) producerId.getProviderHint(); - } else { - AmqpSession session = connection.getSession(producerId.getParentId()); - producer = session.getProducer(producerId); - } - + AmqpProducer producer = (AmqpProducer) producerId.getProviderHint(); producer.send(envelope, request); } catch (Throwable t) { request.onFailure(t); @@ -683,14 +673,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP checkClosedOrFailed(); JmsConsumerId consumerId = envelope.getConsumerId(); - AmqpConsumer consumer = null; - - if (consumerId.getProviderHint() instanceof AmqpConsumer) { - consumer = (AmqpConsumer) consumerId.getProviderHint(); - } else { - AmqpSession session = connection.getSession(consumerId.getParentId()); - consumer = session.getConsumer(consumerId); - } + AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint(); consumer.acknowledge(envelope, ackType); @@ -794,15 +777,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP public void run() { try { checkClosedOrFailed(); - AmqpConsumer consumer = null; - - if (consumerId.getProviderHint() instanceof AmqpConsumer) { - consumer = (AmqpConsumer) consumerId.getProviderHint(); - } else { - AmqpSession session = connection.getSession(consumerId.getParentId()); - consumer = session.getConsumer(consumerId); - } - + AmqpConsumer consumer = (AmqpConsumer) consumerId.getProviderHint(); consumer.pull(timeout, request); pumpToProtonTransport(request); } catch (Throwable t) { @@ -834,7 +809,7 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP public void onData(final ByteBuf input) { // We need to retain until the serializer gets around to processing it. - ReferenceCountUtil.retain(input); + input.retain(); serializer.execute(new Runnable() { @@ -853,7 +828,8 @@ public class AmqpProvider implements Provider, TransportListener , AmqpResourceP protonTransport.process(); } while (input.isReadable()); - ReferenceCountUtil.release(input); + // Free for pooled memory to be put back now. + input.release(); // Process the state changes from the latest data and then answer back // any pending updates to the Broker. http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/a21542cc/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java ---------------------------------------------------------------------- diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java index 61299c7..83f12f7 100644 --- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java +++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSession.java @@ -94,15 +94,13 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i } public AmqpProducer getProducer(JmsProducerInfo producerInfo) { - return getProducer(producerInfo.getId()); - } + JmsProducerId producerId = producerInfo.getId(); - public AmqpProducer getProducer(JmsProducerId producerId) { if (producerId.getProviderHint() instanceof AmqpProducer) { return (AmqpProducer) producerId.getProviderHint(); } - return null; + return producers.get(producerId); } public void createConsumer(JmsConsumerInfo consumerInfo, AsyncResult request) { @@ -111,13 +109,12 @@ public class AmqpSession extends AmqpAbstractResource<JmsSessionInfo, Session> i } public AmqpConsumer getConsumer(JmsConsumerInfo consumerInfo) { - return getConsumer(consumerInfo.getId()); - } + JmsConsumerId consumerId = consumerInfo.getId(); - public AmqpConsumer getConsumer(JmsConsumerId consumerId) { if (consumerId.getProviderHint() instanceof AmqpConsumer) { return (AmqpConsumer) consumerId.getProviderHint(); } + return consumers.get(consumerId); } --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org