https://issues.apache.org/jira/browse/AMQ-5183
Swithc to using Proton's Evet collector for processing engine state changes. All tests passing locally with this change. Project: http://git-wip-us.apache.org/repos/asf/activemq/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/38a86b47 Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/38a86b47 Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/38a86b47 Branch: refs/heads/trunk Commit: 38a86b470f26e2f6ab10fdfc16486d897926b2ed Parents: 1dd34a1 Author: Timothy Bish <[email protected]> Authored: Mon May 12 15:23:42 2014 -0400 Committer: Timothy Bish <[email protected]> Committed: Mon May 12 15:23:42 2014 -0400 ---------------------------------------------------------------------- .../transport/amqp/AmqpProtocolConverter.java | 121 ++++++++++--------- 1 file changed, 65 insertions(+), 56 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq/blob/38a86b47/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java ---------------------------------------------------------------------- diff --git a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java index a0560b4..02621fc 100644 --- a/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java +++ b/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java @@ -19,7 +19,6 @@ package org.apache.activemq.transport.amqp; import java.io.IOException; import java.io.UnsupportedEncodingException; import java.nio.ByteBuffer; -import java.util.EnumSet; import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; @@ -78,16 +77,19 @@ import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.DeliveryState; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Collector; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.EngineFactory; +import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; +import org.apache.qpid.proton.engine.impl.CollectorImpl; import org.apache.qpid.proton.engine.impl.EngineFactoryImpl; import org.apache.qpid.proton.engine.impl.ProtocolTracer; import org.apache.qpid.proton.engine.impl.TransportImpl; @@ -109,11 +111,6 @@ import org.slf4j.LoggerFactory; class AmqpProtocolConverter implements IAmqpProtocolConverter { static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES; - public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED); - public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET); - public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE); - public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED); - public static final EnumSet<EndpointState> ALL_STATES = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED); private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class); static final public byte[] EMPTY_BYTE_ARRAY = new byte[] {}; private final AmqpTransport amqpTransport; @@ -131,6 +128,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { Transport protonTransport = engineFactory.createTransport(); Connection protonConnection = engineFactory.createConnection(); MessageFactory messageFactory = messageFactoryLoader.loadFactory(); + Collector eventCollector = new CollectorImpl(); public AmqpProtocolConverter(AmqpTransport transport) { this.amqpTransport = transport; @@ -145,6 +143,7 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { this.protonTransport.setMaxFrameSize(maxFrameSize); this.protonTransport.bind(this.protonConnection); + this.protonConnection.collect(eventCollector); updateTracer(); } @@ -171,14 +170,12 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { while (!done) { ByteBuffer toWrite = protonTransport.getOutputBuffer(); if (toWrite != null && toWrite.hasRemaining()) { -// // System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 ")); amqpTransport.sendToAmqp(toWrite); protonTransport.outputConsumed(); } else { done = true; } } - // System.out.println("write done"); } catch (IOException e) { amqpTransport.onException(e); } @@ -208,7 +205,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { AmqpHeader header = (AmqpHeader) command; switch (header.getProtocolId()) { case 0: - // amqpTransport.sendToAmqp(new AmqpHeader()); break; // nothing to do.. case 3: // Client will be using SASL for auth.. sasl = protonTransport.sasl(); @@ -225,7 +221,6 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } public void onFrame(Buffer frame) throws Exception { - // System.out.println("read: " + frame.toString().substring(5).replaceAll("(..)", "$1 ")); while (frame.length > 0) { try { int count = protonTransport.input(frame.data, frame.offset, frame.length); @@ -263,54 +258,30 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } - // Handle the amqp open.. - if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) { - onConnectionOpen(); - } - - // Lets map amqp sessions to openwire sessions.. - Session session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET); - while (session != null) { - onSessionOpen(session); - session = protonConnection.sessionHead(UNINITIALIZED_SET, INITIALIZED_SET); - } - - Link link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET); - while (link != null) { - onLinkOpen(link); - link = protonConnection.linkHead(UNINITIALIZED_SET, INITIALIZED_SET); - } - - Delivery delivery = protonConnection.getWorkHead(); - while (delivery != null) { - AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext(); - if (listener != null) { - listener.onDelivery(delivery); + Event event = null; + while ((event = eventCollector.peek()) != null) { + switch (event.getType()) { + case CONNECTION_REMOTE_STATE: + processConnectionEvent(event.getConnection()); + break; + case SESSION_REMOTE_STATE: + processSessionEvent(event.getSession()); + break; + case LINK_REMOTE_STATE: + processLinkEvent(event.getLink()); + break; + case LINK_FLOW: + Link link = event.getLink(); + ((AmqpDeliveryListener) link.getContext()).drainCheck(); + break; + case DELIVERY: + processDelivery(event.getDelivery()); + break; + default: + break; } - delivery = delivery.getWorkNext(); - } - link = protonConnection.linkHead(ACTIVE_STATE, CLOSED_STATE); - while (link != null) { - ((AmqpDeliveryListener) link.getContext()).onClose(); - link.close(); - link = link.next(ACTIVE_STATE, CLOSED_STATE); - } - - link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES); - while (link != null) { - ((AmqpDeliveryListener) link.getContext()).drainCheck(); - link = link.next(ACTIVE_STATE, ALL_STATES); - } - - session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE); - while (session != null) { - // TODO - close links? - onSessionClose(session); - session = session.next(ACTIVE_STATE, CLOSED_STATE); - } - if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) { - doClose(); + eventCollector.pop(); } } catch (Throwable e) { @@ -321,6 +292,44 @@ class AmqpProtocolConverter implements IAmqpProtocolConverter { } } + protected void processConnectionEvent(Connection connection) throws Exception { + EndpointState remoteState = connection.getRemoteState(); + if (remoteState == EndpointState.ACTIVE) { + onConnectionOpen(); + } else if (remoteState == EndpointState.CLOSED) { + doClose(); + } + } + + protected void processLinkEvent(Link link) throws Exception { + EndpointState remoteState = link.getRemoteState(); + if (remoteState == EndpointState.ACTIVE) { + onLinkOpen(link); + } else if (remoteState == EndpointState.CLOSED) { + ((AmqpDeliveryListener) link.getContext()).onClose(); + link.close(); + } + } + + protected void processSessionEvent(Session session) throws Exception { + EndpointState remoteState = session.getRemoteState(); + if (remoteState == EndpointState.ACTIVE) { + onSessionOpen(session); + } else if (remoteState == EndpointState.CLOSED) { + // TODO - close links? + onSessionClose(session); + } + } + + protected void processDelivery(Delivery delivery) throws Exception { + if (!delivery.isPartial()) { + AmqpDeliveryListener listener = (AmqpDeliveryListener) delivery.getLink().getContext(); + if (listener != null) { + listener.onDelivery(delivery); + } + } + } + boolean closing = false; boolean closedSocket = false;
