Repository: qpid-proton Updated Branches: refs/heads/master 2d5b8d8a3 -> 331e3b9fa
PROTON-722: expose session properties and capabilities and wire up handling of them Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/331e3b9f Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/331e3b9f Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/331e3b9f Branch: refs/heads/master Commit: 331e3b9fafaad7e08f76ea7e2b82f6f690562cfc Parents: 2d5b8d8 Author: Robert Gemmell <[email protected]> Authored: Mon Nov 21 18:04:54 2016 +0000 Committer: Robert Gemmell <[email protected]> Committed: Mon Nov 21 18:04:54 2016 +0000 ---------------------------------------------------------------------- .../org/apache/qpid/proton/engine/Session.java | 87 ++++++++ .../qpid/proton/engine/impl/SessionImpl.java | 76 +++++++ .../qpid/proton/engine/impl/TransportImpl.java | 19 ++ .../qpid/proton/systemtests/SessionTest.java | 198 +++++++++++++++++++ 4 files changed, 380 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/331e3b9f/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java index 2179dda..1a28c26 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Session.java @@ -21,6 +21,9 @@ package org.apache.qpid.proton.engine; import java.util.EnumSet; +import java.util.Map; + +import org.apache.qpid.proton.amqp.Symbol; /** @@ -60,4 +63,88 @@ public interface Session extends Endpoint * @param outgoingWindowSize the outgoing window size */ public void setOutgoingWindow(long outgoingWindowSize); + + /** + * Sets the local session properties, to be conveyed to the peer via the Begin frame when + * attaching the session to the session. + * + * Must be called during session setup, i.e. before calling the {@link #open()} method. + * + * @param properties + * the properties map to send, or null for none. + */ + void setProperties(Map<Symbol, Object> properties); + + /** + * Gets the local session properties. + * + * @return the properties map, or null if none was set. + * + * @see #setProperties(Map) + */ + Map<Symbol, Object> getProperties(); + + /** + * Gets the remote session properties, as conveyed from the peer via the Begin frame + * when opening the session. + * + * @return the properties Map conveyed by the peer, or null if there was none. + */ + Map<Symbol, Object> getRemoteProperties(); + + /** + * Sets the local session offered capabilities, to be conveyed to the peer via the Begin frame + * when opening the session. + * + * Must be called during session setup, i.e. before calling the {@link #open()} method. + * + * @param offeredCapabilities + * the offered capabilities array to send, or null for none. + */ + public void setOfferedCapabilities(Symbol[] offeredCapabilities); + + /** + * Gets the local session offered capabilities. + * + * @return the offered capabilities array, or null if none was set. + * + * @see #setOfferedCapabilities(Symbol[]) + */ + Symbol[] getOfferedCapabilities(); + + /** + * Gets the remote session offered capabilities, as conveyed from the peer via the Begin frame + * when opening the session. + * + * @return the offered capabilities array conveyed by the peer, or null if there was none. + */ + Symbol[] getRemoteOfferedCapabilities(); + + /** + * Sets the local session desired capabilities, to be conveyed to the peer via the Begin frame + * when opening the session. + * + * Must be called during session setup, i.e. before calling the {@link #open()} method. + * + * @param desiredCapabilities + * the desired capabilities array to send, or null for none. + */ + public void setDesiredCapabilities(Symbol[] desiredCapabilities); + + /** + * Gets the local session desired capabilities. + * + * @return the desired capabilities array, or null if none was set. + * + * @see #setDesiredCapabilities(Symbol[]) + */ + Symbol[] getDesiredCapabilities(); + + /** + * Gets the remote session desired capabilities, as conveyed from the peer via the Begin frame + * when opening the session. + * + * @return the desired capabilities array conveyed by the peer, or null if there was none. + */ + Symbol[] getRemoteDesiredCapabilities(); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/331e3b9f/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java index e5dd9e8..c7b796d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java @@ -26,6 +26,7 @@ import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.ProtonJSession; @@ -45,6 +46,12 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession private int _incomingDeliveries = 0; private int _outgoingDeliveries = 0; private long _outgoingWindow = Integer.MAX_VALUE; + private Map<Symbol, Object> _properties; + private Map<Symbol, Object> _remoteProperties; + private Symbol[] _offeredCapabilities; + private Symbol[] _remoteOfferedCapabilities; + private Symbol[] _desiredCapabilities; + private Symbol[] _remoteDesiredCapabilities; private LinkNode<SessionImpl> _node; @@ -286,4 +293,73 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession { return _outgoingWindow; } + + @Override + public Map<Symbol, Object> getProperties() + { + return _properties; + } + + @Override + public void setProperties(Map<Symbol, Object> properties) + { + _properties = properties; + } + + @Override + public Map<Symbol, Object> getRemoteProperties() + { + return _remoteProperties; + } + + void setRemoteProperties(Map<Symbol, Object> remoteProperties) + { + _remoteProperties = remoteProperties; + } + + @Override + public Symbol[] getDesiredCapabilities() + { + return _desiredCapabilities; + } + + @Override + public void setDesiredCapabilities(Symbol[] desiredCapabilities) + { + _desiredCapabilities = desiredCapabilities; + } + + @Override + public Symbol[] getRemoteDesiredCapabilities() + { + return _remoteDesiredCapabilities; + } + + void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities) + { + _remoteDesiredCapabilities = remoteDesiredCapabilities; + } + + @Override + public Symbol[] getOfferedCapabilities() + { + return _offeredCapabilities; + } + + @Override + public void setOfferedCapabilities(Symbol[] offeredCapabilities) + { + _offeredCapabilities = offeredCapabilities; + } + + @Override + public Symbol[] getRemoteOfferedCapabilities() + { + return _remoteOfferedCapabilities; + } + + void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities) + { + _remoteOfferedCapabilities = remoteOfferedCapabilities; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/331e3b9f/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index bb2e43b..42126b0 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -881,6 +881,21 @@ public class TransportImpl extends EndpointImpl begin.setOutgoingWindow(transportSession.getOutgoingWindowSize()); begin.setNextOutgoingId(transportSession.getNextOutgoingId()); + if(session.getProperties() != null) + { + begin.setProperties(session.getProperties()); + } + + if(session.getOfferedCapabilities() != null) + { + begin.setOfferedCapabilities(session.getOfferedCapabilities()); + } + + if(session.getDesiredCapabilities() != null) + { + begin.setDesiredCapabilities(session.getDesiredCapabilities()); + } + writeFrame(channelId, begin, null, null); transportSession.sentBegin(); } @@ -1118,6 +1133,10 @@ public class TransportImpl extends EndpointImpl transportSession.setRemoteChannel(channel); session.setRemoteState(EndpointState.ACTIVE); transportSession.setNextIncomingId(begin.getNextOutgoingId()); + session.setRemoteProperties(begin.getProperties()); + session.setRemoteDesiredCapabilities(begin.getDesiredCapabilities()); + session.setRemoteOfferedCapabilities(begin.getOfferedCapabilities()); + _remoteSessions.put(channel, transportSession); _connectionEndpoint.put(Event.Type.SESSION_REMOTE_OPEN, session); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/331e3b9f/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SessionTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SessionTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SessionTest.java new file mode 100644 index 0000000..728c6b9 --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/SessionTest.java @@ -0,0 +1,198 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.proton.systemtests; + +import static java.util.EnumSet.of; +import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; +import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; +import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; + +import java.util.Arrays; +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Symbol; +import org.junit.Test; + +public class SessionTest extends EngineTestBase +{ + private static final Logger LOGGER = Logger.getLogger(SessionTest.class.getName()); + + @Test + public void testCapabilities() throws Exception + { + final Symbol clientOfferedCap = Symbol.valueOf("clientOfferedCapability"); + final Symbol clientDesiredCap = Symbol.valueOf("clientDesiredCapability"); + final Symbol serverOfferedCap = Symbol.valueOf("serverOfferedCapability"); + final Symbol serverDesiredCap = Symbol.valueOf("serverDesiredCapability"); + + Symbol[] clientOfferedCapabilities = new Symbol[] { clientOfferedCap }; + Symbol[] clientDesiredCapabilities = new Symbol[] { clientDesiredCap }; + + Symbol[] serverOfferedCapabilities = new Symbol[] { serverOfferedCap }; + Symbol[] serverDesiredCapabilities = new Symbol[] { serverDesiredCap }; + + LOGGER.fine(bold("======== About to create transports")); + + getClient().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); + + getServer().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); + + doOutputInputCycle(); + + getClient().connection = Proton.connection(); + getClient().transport.bind(getClient().connection); + + getServer().connection = Proton.connection(); + getServer().transport.bind(getServer().connection); + + LOGGER.fine(bold("======== About to open connections")); + getClient().connection.open(); + getServer().connection.open(); + + doOutputInputCycle(); + + LOGGER.fine(bold("======== About to open sessions")); + getClient().session = getClient().connection.session(); + + // Set the client session capabilities + getClient().session.setOfferedCapabilities(clientOfferedCapabilities); + getClient().session.setDesiredCapabilities(clientDesiredCapabilities); + + getClient().session.open(); + + pumpClientToServer(); + + getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); + + // Set the server session capabilities + getServer().session.setOfferedCapabilities(serverOfferedCapabilities); + getServer().session.setDesiredCapabilities(serverDesiredCapabilities); + + getServer().session.open(); + assertEndpointState(getServer().session, ACTIVE, ACTIVE); + + pumpServerToClient(); + + // Verify server side got the clients session capabilities as expected + Symbol[] serverRemoteOfferedCapabilities = getServer().session.getRemoteOfferedCapabilities(); + assertNotNull("Server had no remote offered capabilities", serverRemoteOfferedCapabilities); + assertEquals("Server remote offered capabilities not expected size", 1, serverRemoteOfferedCapabilities.length); + assertTrue("Server remote offered capabilities lack expected value: " + clientOfferedCap, Arrays.asList(serverRemoteOfferedCapabilities).contains(clientOfferedCap)); + + Symbol[] serverRemoteDesiredCapabilities = getServer().session.getRemoteDesiredCapabilities(); + assertNotNull("Server had no remote desired capabilities", serverRemoteDesiredCapabilities); + assertEquals("Server remote desired capabilities not expected size", 1, serverRemoteDesiredCapabilities.length); + assertTrue("Server remote desired capabilities lack expected value: " + clientDesiredCap, Arrays.asList(serverRemoteDesiredCapabilities).contains(clientDesiredCap)); + + // Verify the client side got the servers session capabilities as expected + Symbol[] clientRemoteOfferedCapabilities = getClient().session.getRemoteOfferedCapabilities(); + assertNotNull("Client had no remote offered capabilities", clientRemoteOfferedCapabilities); + assertEquals("Client remote offered capabilities not expected size", 1, clientRemoteOfferedCapabilities.length); + assertTrue("Client remote offered capabilities lack expected value: " + serverOfferedCap, Arrays.asList(clientRemoteOfferedCapabilities).contains(serverOfferedCap)); + + Symbol[] clientRemoteDesiredCapabilities = getClient().session.getRemoteDesiredCapabilities(); + assertNotNull("Client had no remote desired capabilities", clientRemoteDesiredCapabilities); + assertEquals("Client remote desired capabilities not expected size", 1, clientRemoteDesiredCapabilities.length); + assertTrue("Client remote desired capabilities lack expected value: " + serverDesiredCap, Arrays.asList(clientRemoteDesiredCapabilities).contains(serverDesiredCap)); + } + + @Test + public void testProperties() throws Exception + { + final Symbol clientPropName = Symbol.valueOf("ClientPropName"); + final Integer clientPropValue = 1234; + final Symbol serverPropName = Symbol.valueOf("ServerPropName"); + final Integer serverPropValue = 5678; + + Map<Symbol, Object> clientProps = new HashMap<>(); + clientProps.put(clientPropName, clientPropValue); + + Map<Symbol, Object> serverProps = new HashMap<>(); + serverProps.put(serverPropName, serverPropValue); + + LOGGER.fine(bold("======== About to create transports")); + + getClient().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); + + getServer().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); + + doOutputInputCycle(); + + getClient().connection = Proton.connection(); + getClient().transport.bind(getClient().connection); + + getServer().connection = Proton.connection(); + getServer().transport.bind(getServer().connection); + + LOGGER.fine(bold("======== About to open connections")); + getClient().connection.open(); + getServer().connection.open(); + + doOutputInputCycle(); + + LOGGER.fine(bold("======== About to open sessions")); + getClient().session = getClient().connection.session(); + + // Set the client session properties + getClient().session.setProperties(clientProps); + + getClient().session.open(); + + pumpClientToServer(); + + getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); + + // Set the server session properties + getServer().session.setProperties(serverProps); + + getServer().session.open(); + + assertEndpointState(getServer().session, ACTIVE, ACTIVE); + + pumpServerToClient(); + + assertEndpointState(getClient().session, ACTIVE, ACTIVE); + + // Verify server side got the clients session properties as expected + Map<Symbol, Object> serverRemoteProperties = getServer().session.getRemoteProperties(); + assertNotNull("Server had no remote properties", serverRemoteProperties); + assertEquals("Server remote properties not expected size", 1, serverRemoteProperties.size()); + assertTrue("Server remote properties lack expected key: " + clientPropName, serverRemoteProperties.containsKey(clientPropName)); + assertEquals("Server remote properties contain unexpected value for key: " + clientPropName, clientPropValue, serverRemoteProperties.get(clientPropName)); + + // Verify the client side got the servers session properties as expected + Map<Symbol, Object> clientRemoteProperties = getClient().session.getRemoteProperties(); + assertNotNull("Client had no remote properties", clientRemoteProperties); + assertEquals("Client remote properties not expected size", 1, clientRemoteProperties.size()); + assertTrue("Client remote properties lack expected key: " + serverPropName, clientRemoteProperties.containsKey(serverPropName)); + assertEquals("Client remote properties contain unexpected value for key: " + serverPropName, serverPropValue, clientRemoteProperties.get(serverPropName)); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
