PROTON-853: dont return the cached links if they are already in the closed state, instead create a new object and ensure the old links also get freed. Also fixes similar behaviour as in PROTON-850.
This closes #21 Project: http://git-wip-us.apache.org/repos/asf/qpid-proton/repo Commit: http://git-wip-us.apache.org/repos/asf/qpid-proton/commit/f2d7d669 Tree: http://git-wip-us.apache.org/repos/asf/qpid-proton/tree/f2d7d669 Diff: http://git-wip-us.apache.org/repos/asf/qpid-proton/diff/f2d7d669 Branch: refs/heads/kgiusti-python3 Commit: f2d7d669155a2ca57606c9381f4f1720739be79b Parents: 252f5f0 Author: Robert Gemmell <[email protected]> Authored: Mon Apr 20 17:41:10 2015 +0100 Committer: Robert Gemmell <[email protected]> Committed: Wed Apr 22 19:26:33 2015 +0100 ---------------------------------------------------------------------- .../qpid/proton/engine/impl/SessionImpl.java | 51 +++++++++++++++++++- .../qpid/proton/engine/impl/TransportImpl.java | 6 --- 2 files changed, 50 insertions(+), 7 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2d7d669/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java index 0b3524a..45fcb70 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java @@ -21,6 +21,7 @@ package org.apache.qpid.proton.engine.impl; import java.util.*; + import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.ProtonJSession; import org.apache.qpid.proton.engine.Session; @@ -32,6 +33,7 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession private Map<String, SenderImpl> _senders = new LinkedHashMap<String, SenderImpl>(); private Map<String, ReceiverImpl> _receivers = new LinkedHashMap<String, ReceiverImpl>(); + private List<LinkImpl> _oldLinksToFree = new ArrayList<LinkImpl>(); private TransportSession _transportSession; private int _incomingCapacity = 1024*1024; private int _incomingBytes = 0; @@ -58,6 +60,17 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession sender = new SenderImpl(this, name); _senders.put(name, sender); } + else + { + if(sender.getLocalState() == EndpointState.CLOSED + && sender.getRemoteState() == EndpointState.CLOSED) + { + _oldLinksToFree.add(sender); + + sender = new SenderImpl(this, name); + _senders.put(name, sender); + } + } return sender; } @@ -69,6 +82,17 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession receiver = new ReceiverImpl(this, name); _receivers.put(name, receiver); } + else + { + if(receiver.getLocalState() == EndpointState.CLOSED + && receiver.getRemoteState() == EndpointState.CLOSED) + { + _oldLinksToFree.add(receiver); + + receiver = new ReceiverImpl(this, name); + _receivers.put(name, receiver); + } + } return receiver; } @@ -115,6 +139,11 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession receiver.free(); } _receivers.clear(); + + List<LinkImpl> links = new ArrayList<LinkImpl>(_oldLinksToFree); + for(LinkImpl link : links) { + link.free(); + } } void modifyEndpoints() { @@ -145,12 +174,32 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession void freeSender(SenderImpl sender) { - _senders.remove(sender.getName()); + String name = sender.getName(); + SenderImpl existing = _senders.get(name); + if (sender.equals(existing)) + { + _senders.remove(name); + } + else + { + _oldLinksToFree.remove(sender); + } } void freeReceiver(ReceiverImpl receiver) { _receivers.remove(receiver.getName()); + + String name = receiver.getName(); + ReceiverImpl existing = _receivers.get(name); + if (receiver.equals(existing)) + { + _receivers.remove(name); + } + else + { + _oldLinksToFree.remove(receiver); + } } @Override http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/f2d7d669/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 8a7fb32..c40cdee 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -22,15 +22,12 @@ import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArr import java.nio.ByteBuffer; import java.util.HashMap; -import java.util.HashSet; import java.util.Map; -import java.util.Set; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedInteger; import org.apache.qpid.proton.amqp.UnsignedShort; -import org.apache.qpid.proton.amqp.security.SaslCode; import org.apache.qpid.proton.amqp.transport.Attach; import org.apache.qpid.proton.amqp.transport.Begin; import org.apache.qpid.proton.amqp.transport.Close; @@ -55,12 +52,9 @@ import org.apache.qpid.proton.engine.Sasl; import org.apache.qpid.proton.engine.Ssl; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.engine.SslPeerDetails; -import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.TransportException; import org.apache.qpid.proton.engine.TransportResult; import org.apache.qpid.proton.engine.TransportResultFactory; -import org.apache.qpid.proton.engine.Sasl.SaslOutcome; -import org.apache.qpid.proton.engine.impl.ssl.ProtonSslEngineProvider; import org.apache.qpid.proton.engine.impl.ssl.SslImpl; import org.apache.qpid.proton.framing.TransportFrame; --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
