Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ExchangeDestination.java Fri Apr 15 10:10:16 2016 @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.Rejected; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; +import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; +import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.model.Exchange; import org.apache.qpid.server.txn.ServerTransaction;
Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/FrameOutputHandler.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/FrameOutputHandler.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/FrameOutputHandler.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/FrameOutputHandler.java Fri Apr 15 10:10:16 2016 @@ -18,9 +18,9 @@ * under the License. * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.framing.AMQFrame; +import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame; import java.nio.ByteBuffer; Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEndpoint.java Fri Apr 15 10:10:16 2016 @@ -19,29 +19,29 @@ * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; import java.util.Collections; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Source; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.Target; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.transport.Attach; -import org.apache.qpid.amqp_1_0.type.transport.Detach; -import org.apache.qpid.amqp_1_0.type.transport.Error; -import org.apache.qpid.amqp_1_0.type.transport.Flow; -import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.Role; -import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.Source; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.Target; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; +import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; +import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; public abstract class LinkEndpoint<T extends LinkEventListener> { @@ -178,26 +178,15 @@ public abstract class LinkEndpoint<T ext public void remoteDetached(final Detach detach) { - synchronized (getLock()) + switch (_state) { - switch(_state) - { - case DETACH_SENT: - _state = State.DETACHED; - break; - case ATTACHED: - _state = State.DETACH_RECVD; - getSession().getConnection().addPostLockAction(new Runnable() - { - @Override - public void run() - { - _linkEventListener.remoteDetached(LinkEndpoint.this, detach); - } - }); - break; - } - getLock().notifyAll(); + case DETACH_SENT: + _state = State.DETACHED; + break; + case ATTACHED: + _state = State.DETACH_RECVD; + _linkEventListener.remoteDetached(LinkEndpoint.this, detach); + break; } } @@ -206,22 +195,13 @@ public abstract class LinkEndpoint<T ext // TODO } - public void settledByPeer(final Binary deliveryTag) - { - // TODO - } - public void receiveFlow(final Flow flow) { } public void addUnsettled(final Delivery unsettled) { - synchronized(getLock()) - { - _unsettledTransfers.put(unsettled.getDeliveryTag(), unsettled); - getLock().notifyAll(); - } + _unsettledTransfers.put(unsettled.getDeliveryTag(), unsettled); } public void receiveDeliveryState(final Delivery unsettled, @@ -229,21 +209,15 @@ public abstract class LinkEndpoint<T ext final Boolean settled) { // TODO - synchronized(getLock()) + if (_deliveryStateHandler != null) { - if(_deliveryStateHandler != null) - { - _deliveryStateHandler.handle(unsettled.getDeliveryTag(), state, settled); - } - - if(settled) - { - settle(unsettled.getDeliveryTag()); - } - - getLock().notifyAll(); + _deliveryStateHandler.handle(unsettled.getDeliveryTag(), state, settled); } + if (settled) + { + settle(unsettled.getDeliveryTag()); + } } public void settle(final Binary deliveryTag) @@ -256,14 +230,6 @@ public abstract class LinkEndpoint<T ext } - public int getUnsettledCount() - { - synchronized(getLock()) - { - return _unsettledTransfers.size(); - } - } - public void setLocalHandle(final UnsignedInteger localHandle) { _localHandle = localHandle; @@ -271,140 +237,100 @@ public abstract class LinkEndpoint<T ext public void receiveAttach(final Attach attach) { - synchronized(getLock()) + switch (_state) { - switch(_state) + case ATTACH_SENT: { - case ATTACH_SENT: - { - _state = State.ATTACHED; - getLock().notifyAll(); + _state = State.ATTACHED; - _initialUnsettledMap = attach.getUnsettled(); + _initialUnsettledMap = attach.getUnsettled(); /* TODO - don't yet handle: - attach.getUnsettled(); attach.getProperties(); attach.getDurable(); attach.getExpiryPolicy(); attach.getTimeout(); */ - break; - } - - case DETACHED: - { - _state = State.ATTACHED; - getLock().notifyAll(); - } - - + break; } - if(attach.getRole() == Role.SENDER) - { - _source = attach.getSource(); - } - else + case DETACHED: { - _target = attach.getTarget(); + _state = State.ATTACHED; } - if(getRole() == Role.SENDER) - { - _maxMessageSize = attach.getMaxMessageSize(); - } } - } - public boolean isAttached() - { - synchronized (getLock()) + if (attach.getRole() == Role.SENDER) { - return _state == State.ATTACHED; + _source = attach.getSource(); } - } - - public boolean isDetached() - { - synchronized (getLock()) + else { - return _state == State.DETACHED || _session.isEnded(); + _target = attach.getTarget(); } - } - - public SessionEndpoint getSession() - { - return _session; - } - public UnsignedInteger getLocalHandle() - { - return _localHandle; + if (getRole() == Role.SENDER) + { + _maxMessageSize = attach.getMaxMessageSize(); + } } - public Object getLock() + public boolean isAttached() { - return _session.getLock(); + return _state == State.ATTACHED; } - - public long getSyncTimeout() + public boolean isDetached() { - return _session.getSyncTimeout(); + return _state == State.DETACHED || _session.isEnded(); } - public void waitUntil(Predicate predicate) throws TimeoutException, InterruptedException + public SessionEndpoint getSession() { - _session.waitUntil(predicate); + return _session; } - public void waitUntil(Predicate predicate, long timeout) throws TimeoutException, InterruptedException + public UnsignedInteger getLocalHandle() { - _session.waitUntil(predicate, timeout); + return _localHandle; } public void attach() { - synchronized(getLock()) - { - Attach attachToSend = new Attach(); - attachToSend.setName(getName()); - attachToSend.setRole(getRole()); - attachToSend.setHandle(getLocalHandle()); - attachToSend.setSource(getSource()); - attachToSend.setTarget(getTarget()); - attachToSend.setSndSettleMode(getSendingSettlementMode()); - attachToSend.setRcvSettleMode(getReceivingSettlementMode()); - attachToSend.setUnsettled(_localUnsettled); - - if(getRole() == Role.SENDER) - { - attachToSend.setInitialDeliveryCount(_deliveryCount); - } - - switch(_state) - { - case DETACHED: - _state = State.ATTACH_SENT; - break; - case ATTACH_RECVD: - _state = State.ATTACHED; - break; - default: - // TODO ERROR - } - - getSession().sendAttach(attachToSend); - - getLock().notifyAll(); - + Attach attachToSend = new Attach(); + attachToSend.setName(getName()); + attachToSend.setRole(getRole()); + attachToSend.setHandle(getLocalHandle()); + attachToSend.setSource(getSource()); + attachToSend.setTarget(getTarget()); + attachToSend.setSndSettleMode(getSendingSettlementMode()); + attachToSend.setRcvSettleMode(getReceivingSettlementMode()); + attachToSend.setUnsettled(_localUnsettled); + + if (getRole() == Role.SENDER) + { + attachToSend.setInitialDeliveryCount(_deliveryCount); + } + + switch (_state) + { + case DETACHED: + _state = State.ATTACH_SENT; + break; + case ATTACH_RECVD: + _state = State.ATTACHED; + break; + default: + // TODO ERROR } + getSession().sendAttach(attachToSend); + } @@ -430,35 +356,29 @@ public abstract class LinkEndpoint<T ext private void detach(Error error, boolean close) { - synchronized(getLock()) + //TODO + switch (_state) { - //TODO - switch(_state) - { - case ATTACHED: - _state = State.DETACH_SENT; - break; - case DETACH_RECVD: - _state = State.DETACHED; - break; - default: - return; - } - - if (!(getSession().getState() == SessionState.END_RECVD || getSession().isEnded())) - { - Detach detach = new Detach(); - detach.setHandle(getLocalHandle()); - if(close) - detach.setClosed(close); - detach.setError(error); - - getSession().sendDetach(detach); - } + case ATTACHED: + _state = State.DETACH_SENT; + break; + case DETACH_RECVD: + _state = State.DETACHED; + break; + default: + return; + } + + if (!(getSession().getState() == SessionState.END_RECVD || getSession().isEnded())) + { + Detach detach = new Detach(); + detach.setHandle(getLocalHandle()); + if (close) + detach.setClosed(close); + detach.setError(error); - getLock().notifyAll(); + getSession().sendDetach(detach); } - } @@ -534,23 +454,12 @@ public abstract class LinkEndpoint<T ext public void setLinkEventListener(final T linkEventListener) { - synchronized(getLock()) - { - _linkEventListener = linkEventListener; - } - } - - public DeliveryStateHandler getDeliveryStateHandler() - { - return _deliveryStateHandler; + _linkEventListener = linkEventListener; } public void setDeliveryStateHandler(final DeliveryStateHandler deliveryStateHandler) { - synchronized(getLock()) - { - _deliveryStateHandler = deliveryStateHandler; - } + _deliveryStateHandler = deliveryStateHandler; } public void setSendingSettlementMode(SenderSettleMode sendingSettlementMode) Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEventListener.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEventListener.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEventListener.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/LinkEventListener.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/LinkEventListener.java Fri Apr 15 10:10:16 2016 @@ -17,9 +17,9 @@ * under the License. */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.transport.Detach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; public interface LinkEventListener { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_Internal_to_v1_0.java Fri Apr 15 10:10:16 2016 @@ -29,16 +29,16 @@ import java.util.Date; import java.util.List; import java.util.Map; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.UnsignedByte; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Section; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; +import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Data; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.plugin.PluggableService; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_from_1_0.java Fri Apr 15 10:10:16 2016 @@ -38,18 +38,18 @@ import java.util.Map; import java.util.Set; import java.util.UUID; -import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedByte; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.UnsignedLong; -import org.apache.qpid.amqp_1_0.type.UnsignedShort; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Section; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedByte; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedLong; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Data; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.store.StoredMessage; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_to_1_0.java Fri Apr 15 10:10:16 2016 @@ -30,14 +30,14 @@ import java.util.List; import java.util.ListIterator; import java.util.Map; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.Data; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.Section; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Data; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.ServerMessage; import org.apache.qpid.server.model.VirtualHost; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageConverter_v1_0_to_Internal.java Fri Apr 15 10:10:16 2016 @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.server.message.internal.InternalMessage; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.plugin.MessageConverter; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageMetaData_1_0.java Fri Apr 15 10:10:16 2016 @@ -32,22 +32,22 @@ import java.util.Set; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.amqp_1_0.codec.ValueHandler; -import org.apache.qpid.amqp_1_0.messaging.SectionDecoder; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpSequence; -import org.apache.qpid.amqp_1_0.type.messaging.AmqpValue; -import org.apache.qpid.amqp_1_0.type.messaging.ApplicationProperties; -import org.apache.qpid.amqp_1_0.type.messaging.Data; -import org.apache.qpid.amqp_1_0.type.messaging.DeliveryAnnotations; -import org.apache.qpid.amqp_1_0.type.messaging.Footer; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.MessageAnnotations; -import org.apache.qpid.amqp_1_0.type.messaging.Properties; +import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoder; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.Section; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpSequence; +import org.apache.qpid.server.protocol.v1_0.type.messaging.AmqpValue; +import org.apache.qpid.server.protocol.v1_0.type.messaging.ApplicationProperties; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Data; +import org.apache.qpid.server.protocol.v1_0.type.messaging.DeliveryAnnotations; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Footer; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; +import org.apache.qpid.server.protocol.v1_0.type.messaging.MessageAnnotations; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Properties; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AMQMessageHeader; import org.apache.qpid.server.plugin.MessageMetaDataType; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/MessageSourceDestination.java Fri Apr 15 10:10:16 2016 @@ -20,8 +20,8 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.message.MessageSource; public class MessageSourceDestination implements SendingDestination Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Message_1_0.java Fri Apr 15 10:10:16 2016 @@ -24,9 +24,9 @@ package org.apache.qpid.server.protocol. import java.util.Collection; import java.util.Collections; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; -import org.apache.qpid.amqp_1_0.type.Section; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl; +import org.apache.qpid.server.protocol.v1_0.type.Section; +import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.message.AbstractServerMessageImpl; import org.apache.qpid.server.store.StoredMessage; Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Node.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Node.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Node.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Node.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Node.java Fri Apr 15 10:10:16 2016 @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; public class Node { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/NodeReceivingDestination.java Fri Apr 15 10:10:16 2016 @@ -20,11 +20,11 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.Rejected; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusExpiryPolicy; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Rejected; +import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; +import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusExpiryPolicy; import org.apache.qpid.server.message.InstanceProperties; import org.apache.qpid.server.message.MessageDestination; import org.apache.qpid.server.txn.ServerTransaction; Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Predicate.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Predicate.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Predicate.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Predicate.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Predicate.java Fri Apr 15 10:10:16 2016 @@ -18,7 +18,7 @@ * under the License. * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; public interface Predicate { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/QueueDestination.java Fri Apr 15 10:10:16 2016 @@ -20,8 +20,8 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.model.Queue; import org.apache.qpid.server.store.MessageEnqueueRecord; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingDestination.java Fri Apr 15 10:10:16 2016 @@ -20,7 +20,7 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; import org.apache.qpid.server.txn.ServerTransaction; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkAttachment.java Fri Apr 15 10:10:16 2016 @@ -20,12 +20,10 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Source; -import org.apache.qpid.amqp_1_0.type.Target; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; +import org.apache.qpid.server.protocol.v1_0.type.Source; +import org.apache.qpid.server.protocol.v1_0.type.Target; public class ReceivingLinkAttachment { Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkEndpoint.java Fri Apr 15 10:10:16 2016 @@ -19,7 +19,7 @@ * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; import java.util.ArrayList; import java.util.Iterator; @@ -29,15 +29,15 @@ import java.util.Map; import java.util.SortedMap; import java.util.TreeMap; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.Attach; -import org.apache.qpid.amqp_1_0.type.transport.Flow; -import org.apache.qpid.amqp_1_0.type.transport.Role; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; +import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; public class ReceivingLinkEndpoint extends LinkEndpoint<ReceivingLinkListener> { @@ -121,58 +121,49 @@ public class ReceivingLinkEndpoint exten @Override public void receiveTransfer(final Transfer transfer, final Delivery delivery) { - synchronized (getLock()) + TransientState transientState; + final Binary deliveryTag = delivery.getDeliveryTag(); + boolean existingState = _unsettledMap.containsKey(deliveryTag); + if (!existingState || transfer.getState() != null) { - TransientState transientState; - final Binary deliveryTag = delivery.getDeliveryTag(); - boolean existingState = _unsettledMap.containsKey(deliveryTag); - if(!existingState || transfer.getState() != null) - { - _unsettledMap.put(deliveryTag, transfer.getState()); - } - if(!existingState) - { - transientState = new TransientState(transfer.getDeliveryId()); - if(delivery.isSettled()) - { - transientState.setSettled(true); - } - _unsettledIds.put(deliveryTag, transientState); - setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE)); - setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE)); - - } - else + _unsettledMap.put(deliveryTag, transfer.getState()); + } + if (!existingState) + { + transientState = new TransientState(transfer.getDeliveryId()); + if (delivery.isSettled()) { - transientState = _unsettledIds.get(deliveryTag); - transientState.incrementCredit(); - if(delivery.isSettled()) - { - transientState.setSettled(true); - } + transientState.setSettled(true); } + _unsettledIds.put(deliveryTag, transientState); + setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE)); + setDeliveryCount(getDeliveryCount().add(UnsignedInteger.ONE)); - if(transientState.isSettled() && delivery.isComplete()) + } + else + { + transientState = _unsettledIds.get(deliveryTag); + transientState.incrementCredit(); + if (delivery.isSettled()) { - _unsettledMap.remove(deliveryTag); + transientState.setSettled(true); } - getLinkEventListener().messageTransfer(transfer); - + } - getLock().notifyAll(); + if (transientState.isSettled() && delivery.isComplete()) + { + _unsettledMap.remove(deliveryTag); } + getLinkEventListener().messageTransfer(transfer); + } @Override public void receiveFlow(final Flow flow) { - synchronized (getLock()) - { - super.receiveFlow(flow); - _remoteDrain = Boolean.TRUE.equals((Boolean)flow.getDrain()); - setAvailable(flow.getAvailable()); - setDeliveryCount(flow.getDeliveryCount()); - getLock().notifyAll(); - } + super.receiveFlow(flow); + _remoteDrain = Boolean.TRUE.equals((Boolean) flow.getDrain()); + setAvailable(flow.getAvailable()); + setDeliveryCount(flow.getDeliveryCount()); } @@ -181,127 +172,97 @@ public class ReceivingLinkEndpoint exten return getDrain() && getDeliveryCount().equals(getDrainLimit()); } - @Override - public void settledByPeer(final Binary deliveryTag) - { - synchronized (getLock()) - { - // TODO XXX : need to do anything about the window here? - if(settled(deliveryTag) && _creditWindow) - { - sendFlowConditional(); - } - } - } - public boolean settled(final Binary deliveryTag) { - synchronized(getLock()) + boolean deleted; + if (deleted = (_unsettledIds.remove(deliveryTag) != null)) { - boolean deleted; - if(deleted = (_unsettledIds.remove(deliveryTag) != null)) - { - _unsettledMap.remove(deliveryTag); - - getLock().notifyAll(); - } + _unsettledMap.remove(deliveryTag); - return deleted; } + + return deleted; } public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled) { - synchronized(getLock()) + if (_unsettledMap.containsKey(deliveryTag)) { - if(_unsettledMap.containsKey(deliveryTag)) + boolean outcomeUpdate = false; + Outcome outcome = null; + if (state instanceof Outcome) { - boolean outcomeUpdate = false; - Outcome outcome=null; - if(state instanceof Outcome) - { - outcome = (Outcome)state; - } - else if(state instanceof TransactionalState) - { - // TODO? Is this correct - outcome = ((TransactionalState)state).getOutcome(); - } - - if(outcome != null) - { - Object oldOutcome = _unsettledMap.put(deliveryTag, outcome); - outcomeUpdate = !outcome.equals(oldOutcome); - } + outcome = (Outcome) state; + } + else if (state instanceof TransactionalState) + { + // TODO? Is this correct + outcome = ((TransactionalState) state).getOutcome(); + } + if (outcome != null) + { + Object oldOutcome = _unsettledMap.put(deliveryTag, outcome); + outcomeUpdate = !outcome.equals(oldOutcome); + } + TransientState transientState = _unsettledIds.get(deliveryTag); + if (outcomeUpdate || settled) + { - TransientState transientState = _unsettledIds.get(deliveryTag); - if(outcomeUpdate || settled) - { + final UnsignedInteger transferId = transientState.getDeliveryId(); - final UnsignedInteger transferId = transientState.getDeliveryId(); + getSession().updateDisposition(getRole(), transferId, transferId, state, settled); + } - getSession().updateDisposition(getRole(), transferId, transferId, state, settled); - } + if (settled) + { - if(settled) + if (settled(deliveryTag)) { - - if(settled(deliveryTag)) + if (!isDetached() && _creditWindow) + { + setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE)); + sendFlowConditional(); + } + else { - if(!isDetached() && _creditWindow) - { - setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE)); - sendFlowConditional(); - } - else - { - getSession().sendFlowConditional(); - } + getSession().sendFlowConditional(); } } - getLock().notifyAll(); } - else + } + else + { + TransientState transientState = _unsettledIds.get(deliveryTag); + if (_creditWindow) { - TransientState transientState = _unsettledIds.get(deliveryTag); - if(_creditWindow) - { - setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE)); - sendFlowConditional(); - } - + setLinkCredit(getLinkCredit().add(UnsignedInteger.ONE)); + sendFlowConditional(); } - } + } } - public void setCreditWindow() { setCreditWindow(true); } + public void setCreditWindow(boolean window) { - _creditWindow = window; sendFlowConditional(); - } public void drain() { - synchronized (getLock()) - { - setDrain(true); - _creditWindow = false; - _drainLimit = getDeliveryCount().add(getLinkCredit()); - sendFlowWithEcho(); - getLock().notifyAll(); - } + setDrain(true); + _creditWindow = false; + _drainLimit = getDeliveryCount().add(getLinkCredit()); + sendFlowWithEcho(); } @Override @@ -320,14 +281,10 @@ public class ReceivingLinkEndpoint exten public void requestTransactionalSend(Object txnId) { - synchronized (getLock()) - { - setDrain(true); - _creditWindow = false; - setTransactionId(txnId); - sendFlow(); - getLock().notifyAll(); - } + setDrain(true); + _creditWindow = false; + setTransactionId(txnId); + sendFlow(); } private void sendFlow(final Object transactionId) @@ -338,101 +295,85 @@ public class ReceivingLinkEndpoint exten public void clearDrain() { - synchronized (getLock()) - { - setDrain(false); - sendFlow(); - getLock().notifyAll(); - } + setDrain(false); + sendFlow(); } public void updateAllDisposition(Binary deliveryTag, DeliveryState deliveryState, boolean settled) { - synchronized(getLock()) + if(!_unsettledIds.isEmpty()) { - if(!_unsettledIds.isEmpty()) - { - Binary firstTag = _unsettledIds.keySet().iterator().next(); - Binary lastTag = deliveryTag; - updateDispositions(firstTag, lastTag, deliveryState, settled); - } + Binary firstTag = _unsettledIds.keySet().iterator().next(); + Binary lastTag = deliveryTag; + updateDispositions(firstTag, lastTag, deliveryState, settled); } } private void updateDispositions(Binary firstTag, Binary lastTag, DeliveryState state, boolean settled) { - SortedMap<UnsignedInteger, UnsignedInteger> ranges = new TreeMap<UnsignedInteger,UnsignedInteger>(); - - synchronized(getLock()) - { + SortedMap<UnsignedInteger, UnsignedInteger> ranges = new TreeMap<UnsignedInteger, UnsignedInteger>(); - Iterator<Binary> iter = _unsettledIds.keySet().iterator(); - List<Binary> tagsToUpdate = new ArrayList<Binary>(); - Binary tag = null; + Iterator<Binary> iter = _unsettledIds.keySet().iterator(); + List<Binary> tagsToUpdate = new ArrayList<Binary>(); + Binary tag = null; - while(iter.hasNext() && !(tag = iter.next()).equals(firstTag)); + while (iter.hasNext() && !(tag = iter.next()).equals(firstTag)) ; - if(firstTag.equals(tag)) - { - tagsToUpdate.add(tag); + if (firstTag.equals(tag)) + { + tagsToUpdate.add(tag); - UnsignedInteger deliveryId = _unsettledIds.get(firstTag).getDeliveryId(); + UnsignedInteger deliveryId = _unsettledIds.get(firstTag).getDeliveryId(); - UnsignedInteger first = deliveryId; - UnsignedInteger last = first; + UnsignedInteger first = deliveryId; + UnsignedInteger last = first; - if(!firstTag.equals(lastTag)) + if (!firstTag.equals(lastTag)) + { + while (iter.hasNext()) { - while(iter.hasNext()) - { - tag = iter.next(); - tagsToUpdate.add(tag); + tag = iter.next(); + tagsToUpdate.add(tag); - deliveryId = _unsettledIds.get(tag).getDeliveryId(); + deliveryId = _unsettledIds.get(tag).getDeliveryId(); - if(deliveryId.equals(last.add(UnsignedInteger.ONE))) - { - last = deliveryId; - } - else - { - ranges.put(first,last); - first = last = deliveryId; - } - - if(tag.equals(lastTag)) - { - break; - } + if (deliveryId.equals(last.add(UnsignedInteger.ONE))) + { + last = deliveryId; + } + else + { + ranges.put(first, last); + first = last = deliveryId; } - } - ranges.put(first,last); - } - - if(settled) - { - for(Binary deliveryTag : tagsToUpdate) - { - if(settled(deliveryTag) && _creditWindow) + if (tag.equals(lastTag)) { - setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(1))); + break; } } - sendFlowConditional(); } + ranges.put(first, last); + } + if (settled) + { - - for(Map.Entry<UnsignedInteger,UnsignedInteger> range : ranges.entrySet()) + for (Binary deliveryTag : tagsToUpdate) { - getSession().updateDisposition(getRole(), range.getKey(), range.getValue(), state, settled); + if (settled(deliveryTag) && _creditWindow) + { + setLinkCredit(getLinkCredit().add(UnsignedInteger.valueOf(1))); + } } + sendFlowConditional(); + } - getLock().notifyAll(); + for (Map.Entry<UnsignedInteger, UnsignedInteger> range : ranges.entrySet()) + { + getSession().updateDisposition(getRole(), range.getKey(), range.getValue(), state, settled); } - } @Override Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkListener.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkListener.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkListener.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ReceivingLinkListener.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLinkListener.java Fri Apr 15 10:10:16 2016 @@ -16,15 +16,15 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.transport.*; +import org.apache.qpid.server.protocol.v1_0.type.transport.*; public interface ReceivingLinkListener extends LinkEventListener { void messageTransfer(Transfer xfr); - class DefaultLinkEventListener implements org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener + class DefaultLinkEventListener implements ReceivingLinkListener { public void messageTransfer(final Transfer xfr) { @@ -37,7 +37,7 @@ public interface ReceivingLinkListener e } } - public static final org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener DEFAULT = new DefaultLinkEventListener(); + public static final ReceivingLinkListener DEFAULT = new DefaultLinkEventListener(); } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ReceivingLink_1_0.java Fri Apr 15 10:10:16 2016 @@ -26,25 +26,20 @@ import java.util.Collections; import java.util.HashMap; import java.util.List; import java.util.Map; -import org.apache.qpid.amqp_1_0.messaging.SectionDecoderImpl; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.ReceivingLinkListener; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; -import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.AmqpError; -import org.apache.qpid.amqp_1_0.type.transport.Detach; -import org.apache.qpid.amqp_1_0.type.transport.Error; -import org.apache.qpid.amqp_1_0.type.transport.ReceiverSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionDecoderImpl; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; +import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; +import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; +import org.apache.qpid.server.protocol.v1_0.type.transport.ReceiverSettleMode; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; import org.apache.qpid.bytebuffer.QpidByteBuffer; -import org.apache.qpid.framing.AMQShortString; import org.apache.qpid.server.message.MessageReference; import org.apache.qpid.server.model.VirtualHost; import org.apache.qpid.server.security.SecurityManager; @@ -94,7 +89,7 @@ public class ReceivingLink_1_0 implement List<QpidByteBuffer> fragments = null; - org.apache.qpid.amqp_1_0.type.DeliveryState xfrState = xfr.getState(); + org.apache.qpid.server.protocol.v1_0.type.DeliveryState xfrState = xfr.getState(); final Binary deliveryTag = xfr.getDeliveryTag(); if(Boolean.TRUE.equals(xfr.getMore()) && _incompleteMessage == null) Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SASLEndpoint.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SASLEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SASLEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SASLEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SASLEndpoint.java Fri Apr 15 10:10:16 2016 @@ -17,13 +17,13 @@ * under the License. */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.security.SaslChallenge; -import org.apache.qpid.amqp_1_0.type.security.SaslInit; -import org.apache.qpid.amqp_1_0.type.security.SaslMechanisms; -import org.apache.qpid.amqp_1_0.type.security.SaslOutcome; -import org.apache.qpid.amqp_1_0.type.security.SaslResponse; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse; public interface SASLEndpoint { Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SaslServerProvider.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SaslServerProvider.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SaslServerProvider.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SaslServerProvider.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SaslServerProvider.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SaslServerProvider.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SaslServerProvider.java Fri Apr 15 10:10:16 2016 @@ -17,7 +17,7 @@ * under the License. */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; import java.security.Principal; Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkAttachment.java Fri Apr 15 10:10:16 2016 @@ -20,11 +20,9 @@ */ package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Source; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; +import org.apache.qpid.server.protocol.v1_0.type.Source; public class SendingLinkAttachment { Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkEndpoint.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkEndpoint.java Fri Apr 15 10:10:16 2016 @@ -19,20 +19,20 @@ * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; import java.util.HashMap; import java.util.Map; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.transport.Attach; -import org.apache.qpid.amqp_1_0.type.transport.Flow; -import org.apache.qpid.amqp_1_0.type.transport.Role; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; +import org.apache.qpid.server.protocol.v1_0.type.transport.Role; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; public class SendingLinkEndpoint extends LinkEndpoint<SendingLinkListener> { @@ -85,23 +85,20 @@ public class SendingLinkEndpoint extends { SessionEndpoint s = getSession(); xfr.setMessageFormat(UnsignedInteger.ZERO); - synchronized(getLock()) + if(decrementCredit) { - if(decrementCredit) - { - setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE)); - } + setLinkCredit(getLinkCredit().subtract(UnsignedInteger.ONE)); + } - setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + 1))); + setDeliveryCount(UnsignedInteger.valueOf((getDeliveryCount().intValue() + 1))); - xfr.setHandle(getLocalHandle()); + xfr.setHandle(getLocalHandle()); - s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag)); + s.sendTransfer(xfr, this, !xfr.getDeliveryTag().equals(_lastDeliveryTag)); - if(!Boolean.TRUE.equals(xfr.getSettled())) - { - _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId()); - } + if(!Boolean.TRUE.equals(xfr.getSettled())) + { + _unsettledMap.put(xfr.getDeliveryTag(), xfr.getDeliveryId()); } if(Boolean.TRUE.equals(xfr.getMore())) @@ -119,12 +116,9 @@ public class SendingLinkEndpoint extends public void drained() { - synchronized (getLock()) - { - setDeliveryCount(getDeliveryCount().add(getLinkCredit())); - setLinkCredit(UnsignedInteger.ZERO); - sendFlow(); - } + setDeliveryCount(getDeliveryCount().add(getLinkCredit())); + setLinkCredit(UnsignedInteger.ZERO); + sendFlow(); } @Override @@ -157,14 +151,8 @@ public class SendingLinkEndpoint extends setLinkCredit(limit.subtract(getDeliveryCount())); } } - getSession().getConnection().addPostLockAction(new Runnable() - { - @Override - public void run() - { - flowStateChanged(); - } - }); + flowStateChanged(); + } @Override @@ -203,15 +191,11 @@ public class SendingLinkEndpoint extends public void updateDisposition(final Binary deliveryTag, DeliveryState state, boolean settled) { - synchronized(getLock()) + UnsignedInteger deliveryId; + if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null) { - UnsignedInteger deliveryId; - if(settled && (deliveryId = _unsettledMap.remove(deliveryTag))!=null) - { - settle(deliveryTag); - getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled); - } - + settle(deliveryTag); + getSession().updateDisposition(getRole(), deliveryId, deliveryId, state, settled); } } Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkListener.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkListener.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkListener.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SendingLinkListener.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLinkListener.java Fri Apr 15 10:10:16 2016 @@ -16,16 +16,16 @@ * specific language governing permissions and limitations * under the License. */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; public interface SendingLinkListener extends LinkEventListener { void flowStateChanged(); - class DefaultLinkEventListener implements org.apache.qpid.amqp_1_0.transport.SendingLinkListener + class DefaultLinkEventListener implements SendingLinkListener { - public void remoteDetached(final LinkEndpoint endpoint, final org.apache.qpid.amqp_1_0.type.transport.Detach detach) + public void remoteDetached(final LinkEndpoint endpoint, final org.apache.qpid.server.protocol.v1_0.type.transport.Detach detach) { endpoint.detach(); } @@ -36,6 +36,6 @@ public interface SendingLinkListener ext } } - public static final org.apache.qpid.amqp_1_0.transport.SendingLinkListener DEFAULT = new DefaultLinkEventListener(); + public static final SendingLinkListener DEFAULT = new DefaultLinkEventListener(); } Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SendingLink_1_0.java Fri Apr 15 10:10:16 2016 @@ -35,31 +35,27 @@ import java.util.concurrent.ConcurrentMa import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.amqp_1_0.transport.DeliveryStateHandler; -import org.apache.qpid.amqp_1_0.transport.LinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.transport.SendingLinkListener; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.Symbol; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.ExactSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Filter; -import org.apache.qpid.amqp_1_0.type.messaging.MatchingSubjectFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Modified; -import org.apache.qpid.amqp_1_0.type.messaging.NoLocalFilter; -import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.messaging.Source; -import org.apache.qpid.amqp_1_0.type.messaging.StdDistMode; -import org.apache.qpid.amqp_1_0.type.messaging.Target; -import org.apache.qpid.amqp_1_0.type.messaging.TerminusDurability; -import org.apache.qpid.amqp_1_0.type.transport.AmqpError; -import org.apache.qpid.amqp_1_0.type.transport.Detach; -import org.apache.qpid.amqp_1_0.type.transport.Error; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; +import org.apache.qpid.server.protocol.v1_0.type.messaging.ExactSubjectFilter; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Filter; +import org.apache.qpid.server.protocol.v1_0.type.messaging.MatchingSubjectFilter; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified; +import org.apache.qpid.server.protocol.v1_0.type.messaging.NoLocalFilter; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Released; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Source; +import org.apache.qpid.server.protocol.v1_0.type.messaging.StdDistMode; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Target; +import org.apache.qpid.server.protocol.v1_0.type.messaging.TerminusDurability; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; import org.apache.qpid.exchange.ExchangeDefaults; import org.apache.qpid.filter.SelectorParsingException; import org.apache.qpid.filter.selector.ParseException; @@ -146,10 +142,10 @@ public class SendingLink_1_0 implements actualFilters.put(entry.getKey(), entry.getValue()); noLocal = true; } - else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) + else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) { - org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) entry.getValue(); + org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue(); try { messageFilter = new JMSSelectorFilter(selectorFilter.getValue()); @@ -276,10 +272,10 @@ public class SendingLink_1_0 implements actualFilters.put(entry.getKey(), entry.getValue()); noLocal = true; } - else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) + else if(messageFilter == null && entry.getValue() instanceof org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) { - org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.amqp_1_0.type.messaging.JMSSelectorFilter) entry.getValue(); + org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter selectorFilter = (org.apache.qpid.server.protocol.v1_0.type.messaging.JMSSelectorFilter) entry.getValue(); try { messageFilter = new JMSSelectorFilter(selectorFilter.getValue()); @@ -485,17 +481,12 @@ public class SendingLink_1_0 implements while(!_resumeAcceptedTransfers.isEmpty() && getEndpoint().hasCreditToSend()) { Accepted accepted = new Accepted(); - synchronized(getLock()) - { - - Transfer xfr = new Transfer(); - Binary dt = _resumeAcceptedTransfers.remove(0); - xfr.setDeliveryTag(dt); - xfr.setState(accepted); - xfr.setResume(Boolean.TRUE); - getEndpoint().transfer(xfr, true); - } - + Transfer xfr = new Transfer(); + Binary dt = _resumeAcceptedTransfers.remove(0); + xfr.setDeliveryTag(dt); + xfr.setState(accepted); + xfr.setResume(Boolean.TRUE); + getEndpoint().transfer(xfr, true); } if(_resumeAcceptedTransfers.isEmpty()) { @@ -518,20 +509,18 @@ public class SendingLink_1_0 implements { if(getEndpoint() != null) { - synchronized(getEndpoint().getLock()) + if (_draining) { - if(_draining) - { - //TODO - getEndpoint().drained(); - _draining = false; - return true; - } - else - { - return false; - } + //TODO + getEndpoint().drained(); + _draining = false; + return true; + } + else + { + return false; } + } else { @@ -583,11 +572,6 @@ public class SendingLink_1_0 implements return endpoint == null ? null : endpoint.getTransactionId(); } - public synchronized Object getLock() - { - return _linkAttachment == null ? this : getEndpoint().getLock(); - } - public boolean isDetached() { return _linkAttachment == null || getEndpoint().isDetached(); Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/SequenceNumber.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/SequenceNumber.java Fri Apr 15 10:10:16 2016 @@ -19,7 +19,7 @@ * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; public class SequenceNumber implements Comparable<SequenceNumber>, Cloneable { --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
