PROTON-1082: add support for setting and inspecting link properties conveyed during link attach
This closes #52 Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/e9e0f31c Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/e9e0f31c Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/e9e0f31c Branch: refs/heads/go1 Commit: e9e0f31c6894736e54d7d5b624bf3245f704d9af Parents: 0a6e0e7 Author: Robert Gemmell <[email protected]> Authored: Mon Dec 21 17:26:49 2015 +0000 Committer: Robert Gemmell <[email protected]> Committed: Mon Dec 21 17:35:23 2015 +0000 ---------------------------------------------------------------------- .../org/apache/qpid/proton/engine/Link.java | 25 +++ .../qpid/proton/engine/impl/LinkImpl.java | 27 +++ .../qpid/proton/engine/impl/TransportImpl.java | 7 + .../qpid/proton/systemtests/LinkTest.java | 165 +++++++++++++++++++ 4 files changed, 224 insertions(+) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9e0f31c/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java index 0ea0e74..1b214bc 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java @@ -21,7 +21,9 @@ package org.apache.qpid.proton.engine; import java.util.EnumSet; +import java.util.Map; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.amqp.transport.Source; @@ -180,6 +182,29 @@ public interface Link extends Endpoint @Deprecated void setRemoteSenderSettleMode(SenderSettleMode remoteSenderSettleMode); + /** + * Gets the local link properties. + * + * @see #setProperties(Map) + */ + Map<Symbol, Object> getProperties(); + + /** + * Sets the local link properties, to be conveyed to the peer via the Attach frame when + * attaching the link to the session. + * + * Must be called during link setup, i.e. before calling the {@link #open()} method. + */ + void setProperties(Map<Symbol, Object> properties); + + /** + * Gets the remote link properties, as conveyed from the peer via the Attach frame + * when attaching the link to the session. + * + * @return the properties Map conveyed by the peer, or null if there was none. + */ + Map<Symbol, Object> getRemoteProperties(); + public int drained(); public int getRemoteCredit(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9e0f31c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java index 6b63b9a..8a2acf0 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java @@ -21,7 +21,9 @@ package org.apache.qpid.proton.engine.impl; import java.util.EnumSet; +import java.util.Map; +import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.amqp.transport.Source; @@ -57,6 +59,8 @@ public abstract class LinkImpl extends EndpointImpl implements Link private LinkNode<LinkImpl> _node; private boolean _drain; private boolean _detached; + private Map<Symbol, Object> _properties; + private Map<Symbol, Object> _remoteProperties; LinkImpl(SessionImpl session, String name) { @@ -373,6 +377,29 @@ public abstract class LinkImpl extends EndpointImpl implements Link } @Override + public Map<Symbol, Object> getProperties() + { + return _properties; + } + + @Override + public void setProperties(Map<Symbol, Object> properties) + { + _properties = properties; + } + + @Override + public Map<Symbol, Object> getRemoteProperties() + { + return _remoteProperties; + } + + void setRemoteProperties(Map<Symbol, Object> remoteProperties) + { + _remoteProperties = remoteProperties; + } + + @Override public int drained() { int drained = 0; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9e0f31c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index f318319..a98a6f1 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -769,6 +769,11 @@ public class TransportImpl extends EndpointImpl attach.setTarget(link.getTarget()); } + if(link.getProperties() != null) + { + attach.setProperties(link.getProperties()); + } + attach.setRole(endpoint instanceof ReceiverImpl ? Role.RECEIVER : Role.SENDER); if(link instanceof SenderImpl) @@ -1168,6 +1173,8 @@ public class TransportImpl extends EndpointImpl link.setRemoteReceiverSettleMode(attach.getRcvSettleMode()); link.setRemoteSenderSettleMode(attach.getSndSettleMode()); + link.setRemoteProperties(attach.getProperties()); + transportLink.setName(attach.getName()); transportLink.setRemoteHandle(handle); transportSession.addLinkRemoteHandle(transportLink, handle); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/e9e0f31c/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java new file mode 100644 index 0000000..0811f16 --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/LinkTest.java @@ -0,0 +1,165 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.proton.systemtests; + +import static java.util.EnumSet.of; +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; +import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; +import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; +import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; +import static org.junit.Assert.assertNotNull; + +import java.util.HashMap; +import java.util.Map; +import java.util.logging.Logger; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Sender; +import org.junit.Test; + +public class LinkTest extends EngineTestBase +{ + private static final Logger LOGGER = Logger.getLogger(LinkTest.class.getName()); + + private static final Symbol RCV_PROP = Symbol.valueOf("ReceiverPropName"); + private static final Integer RCV_PROP_VAL = 1234; + private static final Symbol SND_PROP = Symbol.valueOf("SenderPropName"); + private static final Integer SND_PROP_VAL = 5678; + + private final String _sourceAddress = getServer().containerId + "-link1-source"; + + @Test + public void testProperties() throws Exception + { + Map<Symbol, Object> receiverProps = new HashMap<>(); + receiverProps.put(RCV_PROP, RCV_PROP_VAL); + + Map<Symbol, Object> senderProps = new HashMap<>(); + senderProps.put(SND_PROP, SND_PROP_VAL); + + + LOGGER.fine(bold("======== About to create transports")); + + getClient().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); + + getServer().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); + + doOutputInputCycle(); + + getClient().connection = Proton.connection(); + getClient().transport.bind(getClient().connection); + + getServer().connection = Proton.connection(); + getServer().transport.bind(getServer().connection); + + + LOGGER.fine(bold("======== About to open connections")); + getClient().connection.open(); + getServer().connection.open(); + + doOutputInputCycle(); + + + LOGGER.fine(bold("======== About to open sessions")); + getClient().session = getClient().connection.session(); + getClient().session.open(); + + pumpClientToServer(); + + getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); + + getServer().session.open(); + assertEndpointState(getServer().session, ACTIVE, ACTIVE); + + pumpServerToClient(); + assertEndpointState(getClient().session, ACTIVE, ACTIVE); + + + LOGGER.fine(bold("======== About to create reciever")); + + getClient().source = new Source(); + getClient().source.setAddress(_sourceAddress); + + getClient().target = new Target(); + getClient().target.setAddress(null); + + getClient().receiver = getClient().session.receiver("link1"); + getClient().receiver.setTarget(getClient().target); + getClient().receiver.setSource(getClient().source); + + getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + + // Set the recievers properties + getClient().receiver.setProperties(receiverProps); + + assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); + + getClient().receiver.open(); + assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); + + pumpClientToServer(); + + + LOGGER.fine(bold("======== About to set up implicitly created sender")); + + getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + + getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); + getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); + + org.apache.qpid.proton.amqp.transport.Source serverRemoteSource = getServer().sender.getRemoteSource(); + getServer().sender.setSource(serverRemoteSource); + + // Set the senders properties + getServer().sender.setProperties(senderProps); + + assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); + getServer().sender.open(); + + assertEndpointState(getServer().sender, ACTIVE, ACTIVE); + + pumpServerToClient(); + + assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); + + // Verify server side got the clients receiver properties as expected + Map<Symbol, Object> serverRemoteProperties = getServer().sender.getRemoteProperties(); + assertNotNull("Server had no remote properties", serverRemoteProperties); + assertEquals("Server remote properties not expected size", 1, serverRemoteProperties.size()); + assertTrue("Server remote properties lack expected key: " + RCV_PROP, serverRemoteProperties.containsKey(RCV_PROP)); + assertEquals("Server remote properties contain unexpected value for key: " + RCV_PROP, RCV_PROP_VAL, serverRemoteProperties.get(RCV_PROP)); + + // Verify the client side got the servers sender properties as expected + Map<Symbol, Object> clientRemoteProperties = getClient().receiver.getRemoteProperties(); + assertNotNull("Client had no remote properties", clientRemoteProperties); + assertEquals("Client remote properties not expected size", 1, clientRemoteProperties.size()); + assertTrue("Client remote properties lack expected key: " + SND_PROP, clientRemoteProperties.containsKey(SND_PROP)); + assertEquals("Client remote properties contain unexpected value for key: " + SND_PROP, SND_PROP_VAL, clientRemoteProperties.get(SND_PROP)); + } +} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
