Repository: qpid-proton Updated Branches: refs/heads/master b4cadd1db -> 2d5b8d8a3
PROTON-721: expose link capabilities and wire up handling of them Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/2d5b8d8a Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/2d5b8d8a Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/2d5b8d8a Branch: refs/heads/master Commit: 2d5b8d8a333d84785fbcfac2f1c22e9c0a754874 Parents: b4cadd1 Author: Robert Gemmell <[email protected]> Authored: Mon Nov 21 16:45:10 2016 +0000 Committer: Robert Gemmell <[email protected]> Committed: Mon Nov 21 16:45:10 2016 +0000 ---------------------------------------------------------------------- .../org/apache/qpid/proton/engine/Link.java | 55 +++++++++ .../qpid/proton/engine/impl/LinkImpl.java | 50 ++++++++ .../qpid/proton/engine/impl/TransportImpl.java | 13 ++ .../qpid/proton/systemtests/LinkTest.java | 123 +++++++++++++++++++ 4 files changed, 241 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java index 634f3e0..248c687 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java @@ -243,4 +243,59 @@ public interface Link extends Endpoint public void detach(); public boolean detached(); + /** + * Sets the local link offered capabilities, to be conveyed to the peer via the Attach frame + * when attaching the link to the session. + * + * Must be called during link setup, i.e. before calling the {@link #open()} method. + * + * @param offeredCapabilities + * the offered capabilities array to send, or null for none. + */ + public void setOfferedCapabilities(Symbol[] offeredCapabilities); + + /** + * Gets the local link offered capabilities. + * + * @return the offered capabilities array, or null if none was set. + * + * @see #setOfferedCapabilities(Symbol[]) + */ + Symbol[] getOfferedCapabilities(); + + /** + * Gets the remote link offered capabilities, as conveyed from the peer via the Attach frame + * when attaching the link to the session. + * + * @return the offered capabilities array conveyed by the peer, or null if there was none. + */ + Symbol[] getRemoteOfferedCapabilities(); + + /** + * Sets the local link desired capabilities, to be conveyed to the peer via the Attach frame + * when attaching the link to the session. + * + * Must be called during link setup, i.e. before calling the {@link #open()} method. + * + * @param desiredCapabilities + * the desired capabilities array to send, or null for none. + */ + public void setDesiredCapabilities(Symbol[] desiredCapabilities); + + /** + * Gets the local link desired capabilities. + * + * @return the desired capabilities array, or null if none was set. + * + * @see #setDesiredCapabilities(Symbol[]) + */ + Symbol[] getDesiredCapabilities(); + + /** + * Gets the remote link desired capabilities, as conveyed from the peer via the Attach frame + * when attaching the link to the session. + * + * @return the desired capabilities array conveyed by the peer, or null if there was none. + */ + Symbol[] getRemoteDesiredCapabilities(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java index 63e9ddd..a67785e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java @@ -61,6 +61,10 @@ public abstract class LinkImpl extends EndpointImpl implements Link private boolean _detached; private Map<Symbol, Object> _properties; private Map<Symbol, Object> _remoteProperties; + private Symbol[] _offeredCapabilities; + private Symbol[] _remoteOfferedCapabilities; + private Symbol[] _desiredCapabilities; + private Symbol[] _remoteDesiredCapabilities; LinkImpl(SessionImpl session, String name) { @@ -400,6 +404,52 @@ public abstract class LinkImpl extends EndpointImpl implements Link } @Override + public Symbol[] getDesiredCapabilities() + { + return _desiredCapabilities; + } + + @Override + public void setDesiredCapabilities(Symbol[] desiredCapabilities) + { + _desiredCapabilities = desiredCapabilities; + } + + @Override + public Symbol[] getRemoteDesiredCapabilities() + { + return _remoteDesiredCapabilities; + } + + void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities) + { + _remoteDesiredCapabilities = remoteDesiredCapabilities; + } + + @Override + public Symbol[] getOfferedCapabilities() + { + return _offeredCapabilities; + } + + @Override + public void setOfferedCapabilities(Symbol[] offeredCapabilities) + { + _offeredCapabilities = offeredCapabilities; + } + + @Override + public Symbol[] getRemoteOfferedCapabilities() + { + return _remoteOfferedCapabilities; + } + + void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities) + { + _remoteOfferedCapabilities = remoteOfferedCapabilities; + } + + @Override public int drained() { int drained = 0; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 9a5fcbd..bb2e43b 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -781,6 +781,16 @@ public class TransportImpl extends EndpointImpl attach.setProperties(link.getProperties()); } + if(link.getOfferedCapabilities() != null) + { + attach.setOfferedCapabilities(link.getOfferedCapabilities()); + } + + if(link.getDesiredCapabilities() != null) + { + attach.setDesiredCapabilities(link.getDesiredCapabilities()); + } + attach.setRole(endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER); if(link instanceof SenderImpl) @@ -1182,6 +1192,9 @@ public class TransportImpl extends EndpointImpl link.setRemoteProperties(attach.getProperties()); + link.setRemoteDesiredCapabilities(attach.getDesiredCapabilities()); + link.setRemoteOfferedCapabilities(attach.getOfferedCapabilities()); + transportLink.setName(attach.getName()); transportLink.setRemoteHandle(handle); transportSession.addLinkRemoteHandle(transportLink, handle); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/2d5b8d8a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java index 10b509e..518960c 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java @@ -61,6 +61,129 @@ public class LinkTest extends EngineTestBase private final String _sourceAddress = getServer().containerId + "-link1-source"; @Test + public void testCapabilities() throws Exception + { + final Symbol recvOfferedCap = Symbol.valueOf("recvOfferedCapability"); + final Symbol recvDesiredCap = Symbol.valueOf("recvDesiredCapability"); + final Symbol senderOfferedCap = Symbol.valueOf("senderOfferedCapability"); + final Symbol senderDesiredCap = Symbol.valueOf("senderDesiredCapability"); + + Symbol[] clientOfferedCapabilities = new Symbol[] { recvOfferedCap }; + Symbol[] clientDesiredCapabilities = new Symbol[] { recvDesiredCap }; + + Symbol[] serverOfferedCapabilities = new Symbol[] { senderOfferedCap }; + Symbol[] serverDesiredCapabilities = new Symbol[] { senderDesiredCap }; + + LOGGER.fine(bold("======== About to create transports")); + + getClient().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); + + getServer().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); + + doOutputInputCycle(); + + getClient().connection = Proton.connection(); + getClient().transport.bind(getClient().connection); + + getServer().connection = Proton.connection(); + getServer().transport.bind(getServer().connection); + + LOGGER.fine(bold("======== About to open connections")); + getClient().connection.open(); + getServer().connection.open(); + + doOutputInputCycle(); + + LOGGER.fine(bold("======== About to open sessions")); + getClient().session = getClient().connection.session(); + getClient().session.open(); + + pumpClientToServer(); + + getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); + + getServer().session.open(); + assertEndpointState(getServer().session, ACTIVE, ACTIVE); + + pumpServerToClient(); + assertEndpointState(getClient().session, ACTIVE, ACTIVE); + + LOGGER.fine(bold("======== About to create reciever")); + + getClient().source = new Source(); + getClient().source.setAddress(_sourceAddress); + + getClient().target = new Target(); + getClient().target.setAddress(null); + + getClient().receiver = getClient().session.receiver("link1"); + getClient().receiver.setTarget(getClient().target); + getClient().receiver.setSource(getClient().source); + + getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + + // Set the client receivers capabilities + getClient().receiver.setOfferedCapabilities(clientOfferedCapabilities); + getClient().receiver.setDesiredCapabilities(clientDesiredCapabilities); + + assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); + + getClient().receiver.open(); + assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); + + pumpClientToServer(); + + LOGGER.fine(bold("======== About to set up implicitly created sender")); + + getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + + getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); + getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); + + org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource(); + getServer().sender.setSource(serverRemoteSource); + + // Set the server senders capabilities + getServer().sender.setOfferedCapabilities(serverOfferedCapabilities); + getServer().sender.setDesiredCapabilities(serverDesiredCapabilities); + + assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); + getServer().sender.open(); + + assertEndpointState(getServer().sender, ACTIVE, ACTIVE); + + pumpServerToClient(); + + assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); + + // Verify server side got the clients receiver capabilities as expected + Symbol[] serverRemoteOfferedCapabilities = getServer().sender.getRemoteOfferedCapabilities(); + assertNotNull("Server had no remote offered capabilities", serverRemoteOfferedCapabilities); + assertEquals("Server remote offered capabilities not expected size", 1, serverRemoteOfferedCapabilities.length); + assertTrue("Server remote offered capabilities lack expected value: " + recvOfferedCap, Arrays.asList(serverRemoteOfferedCapabilities).contains(recvOfferedCap)); + + Symbol[] serverRemoteDesiredCapabilities = getServer().sender.getRemoteDesiredCapabilities(); + assertNotNull("Server had no remote desired capabilities", serverRemoteDesiredCapabilities); + assertEquals("Server remote desired capabilities not expected size", 1, serverRemoteDesiredCapabilities.length); + assertTrue("Server remote desired capabilities lack expected value: " + recvDesiredCap, Arrays.asList(serverRemoteDesiredCapabilities).contains(recvDesiredCap)); + + // Verify the client side got the servers sender capabilities as expected + Symbol[] clientRemoteOfferedCapabilities = getClient().receiver.getRemoteOfferedCapabilities(); + assertNotNull("Client had no remote offered capabilities", clientRemoteOfferedCapabilities); + assertEquals("Client remote offered capabilities not expected size", 1, clientRemoteOfferedCapabilities.length); + assertTrue("Client remote offered capabilities lack expected value: " + senderOfferedCap, Arrays.asList(clientRemoteOfferedCapabilities).contains(senderOfferedCap)); + + Symbol[] clientRemoteDesiredCapabilities = getClient().receiver.getRemoteDesiredCapabilities(); + assertNotNull("Client had no remote desired capabilities", clientRemoteDesiredCapabilities); + assertEquals("Client remote desired capabilities not expected size", 1, clientRemoteDesiredCapabilities.length); + assertTrue("Client remote desired capabilities lack expected value: " + senderDesiredCap, Arrays.asList(clientRemoteDesiredCapabilities).contains(senderDesiredCap)); + } + + @Test public void testProperties() throws Exception { Map<Symbol, Object> receiverProps = new HashMap<>(); --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
