http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java index 75ba56c..a0a2e95 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Event.java @@ -28,59 +28,47 @@ package org.apache.qpid.proton.engine; public interface Event { - public enum Category { - CONNECTION, - SESSION, - LINK, - DELIVERY, - TRANSPORT; - } public enum Type { - CONNECTION_INIT(Category.CONNECTION, 1), - CONNECTION_OPEN(Category.CONNECTION, 2), - CONNECTION_REMOTE_OPEN(Category.CONNECTION, 3), - CONNECTION_CLOSE(Category.CONNECTION, 4), - CONNECTION_REMOTE_CLOSE(Category.CONNECTION, 5), - CONNECTION_FINAL(Category.CONNECTION, 6), - - SESSION_INIT(Category.SESSION, 1), - SESSION_OPEN(Category.SESSION, 2), - SESSION_REMOTE_OPEN(Category.SESSION, 3), - SESSION_CLOSE(Category.SESSION, 4), - SESSION_REMOTE_CLOSE(Category.SESSION, 5), - SESSION_FINAL(Category.SESSION, 6), - - LINK_INIT(Category.LINK, 1), - LINK_OPEN(Category.LINK, 2), - LINK_REMOTE_OPEN(Category.LINK, 3), - LINK_CLOSE(Category.LINK, 4), - LINK_REMOTE_CLOSE(Category.LINK, 5), - LINK_FLOW(Category.LINK, 6), - LINK_FINAL(Category.LINK, 7), - - DELIVERY(Category.DELIVERY, 1), - TRANSPORT(Category.TRANSPORT, 1); - - private int _opcode; - private Category _category; - - private Type(Category c, int o) - { - this._category = c; - this._opcode = o; - } - - public Category getCategory() - { - return this._category; - } - } + CONNECTION_INIT, + CONNECTION_BOUND, + CONNECTION_UNBOUND, + CONNECTION_OPEN, + CONNECTION_REMOTE_OPEN, + CONNECTION_CLOSE, + CONNECTION_REMOTE_CLOSE, + CONNECTION_FINAL, + + SESSION_INIT, + SESSION_OPEN, + SESSION_REMOTE_OPEN, + SESSION_CLOSE, + SESSION_REMOTE_CLOSE, + SESSION_FINAL, + + LINK_INIT, + LINK_OPEN, + LINK_REMOTE_OPEN, + LINK_CLOSE, + LINK_REMOTE_CLOSE, + LINK_DETACH, + LINK_REMOTE_DETACH, + LINK_FLOW, + LINK_FINAL, - Category getCategory(); + DELIVERY, + + TRANSPORT, + TRANSPORT_ERROR, + TRANSPORT_HEAD_CLOSED, + TRANSPORT_TAIL_CLOSED, + TRANSPORT_CLOSED + } Type getType(); + Object getContext(); + Connection getConnection(); Session getSession();
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java index 04ec3f4..c965a29 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Link.java @@ -184,4 +184,7 @@ public interface Link extends Endpoint public int getRemoteCredit(); public boolean getDrain(); + + public void detach(); + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java index ba806bd..e0dc9c2 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/Transport.java @@ -22,6 +22,7 @@ package org.apache.qpid.proton.engine; import java.nio.ByteBuffer; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.impl.TransportImpl; @@ -221,4 +222,6 @@ public interface Transport extends Endpoint int getRemoteChannelMax(); + ErrorCondition getCondition(); + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java index 1dcebe4..60d6dfe 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java @@ -56,11 +56,6 @@ class EventImpl implements Event context = null; } - public Category getCategory() - { - return type.getCategory(); - } - public Type getType() { return type; @@ -73,16 +68,15 @@ class EventImpl implements Event public Connection getConnection() { - switch (type.getCategory()) { - case CONNECTION: + if (context instanceof Connection) { return (Connection) context; - case TRANSPORT: + } else if (context instanceof Transport) { Transport transport = getTransport(); if (transport == null) { return null; } return ((TransportImpl) transport).getConnectionImpl(); - default: + } else { Session ssn = getSession(); if (ssn == null) { return null; @@ -93,10 +87,9 @@ class EventImpl implements Event public Session getSession() { - switch (type.getCategory()) { - case SESSION: + if (context instanceof Session) { return (Session) context; - default: + } else { Link link = getLink(); if (link == null) { return null; @@ -107,10 +100,9 @@ class EventImpl implements Event public Link getLink() { - switch (type.getCategory()) { - case LINK: + if (context instanceof Link) { return (Link) context; - default: + } else { Delivery dlv = getDelivery(); if (dlv == null) { return null; @@ -121,20 +113,18 @@ class EventImpl implements Event public Delivery getDelivery() { - switch (type.getCategory()) { - case DELIVERY: + if (context instanceof Delivery) { return (Delivery) context; - default: + } else { return null; } } public Transport getTransport() { - switch (type.getCategory()) { - case TRANSPORT: + if (context instanceof Transport) { return (Transport) context; - default: + } else { return null; } } @@ -150,4 +140,5 @@ class EventImpl implements Event { return "EventImpl{" + "type=" + type + ", context=" + context + '}'; } + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java index 37433d5..0da2a60 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/LinkImpl.java @@ -55,6 +55,7 @@ public abstract class LinkImpl extends EndpointImpl implements Link private LinkNode<LinkImpl> _node; private boolean _drain; + private boolean _detached; LinkImpl(SessionImpl session, String name) { @@ -398,4 +399,17 @@ public abstract class LinkImpl extends EndpointImpl implements Link { getConnectionImpl().put(Event.Type.LINK_CLOSE, this); } + + public void detach() + { + _detached = true; + getConnectionImpl().put(Event.Type.LINK_DETACH, this); + modified(); + } + + boolean detached() + { + return _detached; + } + } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java index 57010d2..26ad9ca 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/SessionImpl.java @@ -103,11 +103,14 @@ public class SessionImpl extends EndpointImpl implements ProtonJSession _connection.removeSessionEndpoint(_node); _node = null; - for(SenderImpl sender : _senders.values()) { + List<SenderImpl> senders = new ArrayList<SenderImpl>(_senders.values()); + for(SenderImpl sender : senders) { sender.free(); } _senders.clear(); - for(ReceiverImpl receiver : _receivers.values()) { + + List<ReceiverImpl> receivers = new ArrayList<ReceiverImpl>(_receivers.values()); + for(ReceiverImpl receiver : receivers) { receiver.free(); } _receivers.clear(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 62cb10d..2c6c884 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -121,7 +121,10 @@ public class TransportImpl extends EndpointImpl private FrameHandler _frameHandler = this; private boolean _head_closed = false; - private TransportException _tail_error = null; + private ErrorCondition _condition = null; + + private boolean postedHeadClosed = false; + private boolean postedTailClosed = false; /** * @deprecated This constructor's visibility will be reduced to the default scope in a future release. @@ -208,10 +211,18 @@ public class TransportImpl extends EndpointImpl } @Override + public ErrorCondition getCondition() + { + return _condition; + } + + @Override public void bind(Connection conn) { - _connectionEndpoint = (ConnectionImpl) conn; // TODO - check if already bound + + _connectionEndpoint = (ConnectionImpl) conn; + put(Event.Type.CONNECTION_BOUND, conn); _connectionEndpoint.setTransport(this); _connectionEndpoint.incref(); @@ -230,6 +241,7 @@ public class TransportImpl extends EndpointImpl @Override public void unbind() { + put(Event.Type.CONNECTION_UNBOUND, _connectionEndpoint); _connectionEndpoint.modifyEndpoints(); _connectionEndpoint.setTransport(null); @@ -369,7 +381,7 @@ public class TransportImpl extends EndpointImpl SessionImpl session = link.getSession(); TransportSession transportSession = getTransportState(session); - if(link.getLocalState() == EndpointState.CLOSED + if(((link.getLocalState() == EndpointState.CLOSED) || link.detached()) && transportLink.isLocalHandleSet() && !_isCloseSent) { @@ -389,8 +401,7 @@ public class TransportImpl extends EndpointImpl Detach detach = new Detach(); detach.setHandle(localHandle); - // TODO - need an API for detaching rather than closing the link - detach.setClosed(true); + detach.setClosed(!link.detached()); ErrorCondition localError = link.getCondition(); if( localError.getCondition() !=null ) @@ -517,6 +528,11 @@ public class TransportImpl extends EndpointImpl transfer.setDeliveryTag(new Binary(delivery.getTag())); transfer.setHandle(tpLink.getLocalHandle()); + if(delivery.getLocalState() != null) + { + transfer.setState(delivery.getLocalState()); + } + if(delivery.isSettled()) { transfer.setSettled(Boolean.TRUE); @@ -742,7 +758,7 @@ public class TransportImpl extends EndpointImpl private void processOpen() { - if ((_tail_error != null || + if ((_condition != null || (_connectionEndpoint != null && _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED)) && !_isOpenSent) { @@ -919,7 +935,7 @@ public class TransportImpl extends EndpointImpl private void processClose() { - if ((_tail_error != null || + if ((_condition != null || (_connectionEndpoint != null && _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) && !_isCloseSent) { @@ -930,8 +946,7 @@ public class TransportImpl extends EndpointImpl ErrorCondition localError; if (_connectionEndpoint == null) { - localError = new ErrorCondition(ConnectionError.FRAMING_ERROR, - _tail_error.toString()); + localError = _condition; } else { localError = _connectionEndpoint.getCondition(); } @@ -1154,7 +1169,11 @@ public class TransportImpl extends EndpointImpl LinkImpl link = transportLink.getLink(); transportLink.receivedDetach(); transportSession.freeRemoteHandle(transportLink.getRemoteHandle()); - _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link); + if (detach.getClosed()) { + _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link); + } else { + _connectionEndpoint.put(Event.Type.LINK_REMOTE_DETACH, link); + } transportLink.clearRemoteHandle(); link.setRemoteState(EndpointState.CLOSED); if(detach.getError() != null) @@ -1231,17 +1250,40 @@ public class TransportImpl extends EndpointImpl return _closeReceived; } + void put(Event.Type type, Object context) { + if (_connectionEndpoint != null) { + _connectionEndpoint.put(type, context); + } + } + + private void maybePostClosed() + { + if (postedHeadClosed && postedTailClosed) { + put(Event.Type.TRANSPORT_CLOSED, this); + } + } + @Override public void closed(TransportException error) { if (!_closeReceived || error != null) { if (error == null) { - _tail_error = new TransportException("connection aborted"); + _condition = new ErrorCondition(ConnectionError.FRAMING_ERROR, + "connection aborted"); } else { - _tail_error = error; + _condition = new ErrorCondition(ConnectionError.FRAMING_ERROR, + error.toString()); } _head_closed = true; } + if (_condition != null) { + put(Event.Type.TRANSPORT_ERROR, this); + } + if (!postedTailClosed) { + put(Event.Type.TRANSPORT_TAIL_CLOSED, this); + postedTailClosed = true; + maybePostClosed(); + } } @Override @@ -1346,6 +1388,13 @@ public class TransportImpl extends EndpointImpl { init(); _outputProcessor.pop(bytes); + + int p = pending(); + if (p < 0 && !postedHeadClosed) { + put(Event.Type.TRANSPORT_HEAD_CLOSED, this); + postedHeadClosed = true; + maybePostClosed(); + } } @Override http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java index ff9fbbe..6ae75a7 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/Address.java @@ -26,7 +26,7 @@ package org.apache.qpid.proton.messenger.impl; * */ -class Address +public class Address { private String _address; @@ -38,13 +38,7 @@ class Address private String _port; private String _name; - public Address(String address) - { - _address = address; - parse(); - } - - private void parse() + public void clear() { _passive = false; _scheme = null; @@ -53,21 +47,30 @@ class Address _host = null; _port = null; _name = null; + } + + public Address() + { + clear(); + } + public Address(String address) + { + clear(); int start = 0; - int schemeEnd = _address.indexOf("://", start); + int schemeEnd = address.indexOf("://", start); if (schemeEnd >= 0) { - _scheme = _address.substring(start, schemeEnd); + _scheme = address.substring(start, schemeEnd); start = schemeEnd + 3; } String uphp; - int slash = _address.indexOf("/", start); - if (slash > 0) { - uphp = _address.substring(start, slash); - _name = _address.substring(slash + 1); + int slash = address.indexOf("/", start); + if (slash >= 0) { + uphp = address.substring(start, slash); + _name = address.substring(slash + 1); } else { - uphp = _address.substring(start); + uphp = address.substring(start); } String hp; @@ -115,7 +118,18 @@ class Address public String toString() { - return _address; + String str = new String(); + if (_scheme != null) str += _scheme + "://"; + if (_user != null) str += _user; + if (_pass != null) str += ":" + _pass; + if (_user != null || _pass != null) str += "@"; + if (_host != null) { + if (_host.contains(":")) str += "[" + _host + "]"; + else str += _host; + } + if (_port != null) str += ":" + _port; + if (_name != null) str += "/" + _name; + return str; } public boolean isPassive() @@ -168,4 +182,33 @@ class Address return _name; } + public void setScheme(String scheme) + { + _scheme= scheme; + } + + public void setUser(String user) + { + _user= user; + } + + public void setPass(String pass) + { + _pass= pass; + } + + public void setHost(String host) + { + _host= host; + } + + public void setPort(String port) + { + _port= port; + } + + public void setName(String name) + { + _name= name; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/resources/cengine.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/cengine.py b/proton-j/src/main/resources/cengine.py index c47ab58..102ee77 100644 --- a/proton-j/src/main/resources/cengine.py +++ b/proton-j/src/main/resources/cengine.py @@ -625,6 +625,10 @@ def pn_link_close(link): link.on_close() link.impl.close() +def pn_link_detach(link): + link.on_close() + link.impl.detach() + def pn_link_flow(link, n): link.impl.flow(n) @@ -863,6 +867,7 @@ class pn_transport_wrapper: def __init__(self, impl): self.impl = impl + self.condition = pn_condition() def pn_transport(): return wrap(Proton.transport(), pn_transport_wrapper) @@ -940,15 +945,15 @@ def pn_transport_close_tail(trans): def pn_transport_closed(trans): return trans.impl.isClosed() -from org.apache.qpid.proton.engine import Event +def pn_transport_condition(trans): + trans.condition.decode(trans.impl.getCondition()) + return trans.condition -PN_EVENT_CATEGORY_CONNECTION = Event.Category.CONNECTION -PN_EVENT_CATEGORY_SESSION = Event.Category.SESSION -PN_EVENT_CATEGORY_LINK = Event.Category.LINK -PN_EVENT_CATEGORY_DELIVERY = Event.Category.DELIVERY -PN_EVENT_CATEGORY_TRANSPORT = Event.Category.TRANSPORT +from org.apache.qpid.proton.engine import Event PN_CONNECTION_INIT = Event.Type.CONNECTION_INIT +PN_CONNECTION_BOUND = Event.Type.CONNECTION_BOUND +PN_CONNECTION_UNBOUND = Event.Type.CONNECTION_UNBOUND PN_CONNECTION_OPEN = Event.Type.CONNECTION_OPEN PN_CONNECTION_REMOTE_OPEN = Event.Type.CONNECTION_REMOTE_OPEN PN_CONNECTION_CLOSE = Event.Type.CONNECTION_CLOSE @@ -965,10 +970,16 @@ PN_LINK_OPEN = Event.Type.LINK_OPEN PN_LINK_REMOTE_OPEN = Event.Type.LINK_REMOTE_OPEN PN_LINK_CLOSE = Event.Type.LINK_CLOSE PN_LINK_REMOTE_CLOSE = Event.Type.LINK_REMOTE_CLOSE +PN_LINK_DETACH = Event.Type.LINK_DETACH +PN_LINK_REMOTE_DETACH = Event.Type.LINK_REMOTE_DETACH PN_LINK_FLOW = Event.Type.LINK_FLOW PN_LINK_FINAL = Event.Type.LINK_FINAL PN_DELIVERY = Event.Type.DELIVERY PN_TRANSPORT = Event.Type.TRANSPORT +PN_TRANSPORT_ERROR = Event.Type.TRANSPORT_ERROR +PN_TRANSPORT_HEAD_CLOSED = Event.Type.TRANSPORT_HEAD_CLOSED +PN_TRANSPORT_TAIL_CLOSED = Event.Type.TRANSPORT_TAIL_CLOSED +PN_TRANSPORT_CLOSED = Event.Type.TRANSPORT_CLOSED def pn_collector(): return Proton.collector() @@ -1000,8 +1011,33 @@ def pn_event_delivery(event): def pn_event_transport(event): return wrap(event.getTransport(), pn_transport_wrapper) +from org.apache.qpid.proton.engine.impl import ConnectionImpl, SessionImpl, \ + SenderImpl, ReceiverImpl, DeliveryImpl, TransportImpl + +J2C = { + ConnectionImpl: "pn_connection", + SessionImpl: "pn_session", + SenderImpl: "pn_link", + ReceiverImpl: "pn_link", + DeliveryImpl: "pn_delivery", + TransportImpl: "pn_transport" +} + +wrappers = { + "pn_connection": lambda x: wrap(x, pn_connection_wrapper), + "pn_session": lambda x: wrap(x, pn_session_wrapper), + "pn_link": lambda x: wrap(x, pn_link_wrapper), + "pn_delivery": lambda x: wrap(x, pn_delivery_wrapper), + "pn_transport": lambda x: wrap(x, pn_transport_wrapper), + "pn_void": lambda x: x +} + def pn_event_class(event): - return event.getClass() + ctx = event.getContext() + return J2C.get(ctx.getClass(), "pn_void") + +def pn_event_context(event): + return wrappers[pn_event_class(event)](event.getContext()) def pn_event_type(event): return event.getType() http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/resources/cobject.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/cobject.py b/proton-j/src/main/resources/cobject.py new file mode 100644 index 0000000..29cc06f --- /dev/null +++ b/proton-j/src/main/resources/cobject.py @@ -0,0 +1,42 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. +# + +def pn_class_name(cls): + return cls + +def pn_void2py(obj): + return obj + +def pn_py2void(obj): + return obj + +def pn_cast_pn_connection(obj): + return obj + +def pn_cast_pn_session(obj): + return obj + +def pn_cast_pn_link(obj): + return obj + +def pn_cast_pn_delivery(obj): + return obj + +def pn_cast_pn_transport(obj): + return obj http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/resources/cproton.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/cproton.py b/proton-j/src/main/resources/cproton.py index b91eb86..3d9a498 100644 --- a/proton-j/src/main/resources/cproton.py +++ b/proton-j/src/main/resources/cproton.py @@ -23,6 +23,11 @@ exposed to python via swig. This allows tests defined in python to run against both the C and Java protocol implementations. """ +# @todo(kgiusti) dynamically set these via filters in the pom.xml file +PN_VERSION_MAJOR = 0 +PN_VERSION_MINOR = 0 + +from cobject import * from cerror import * from ccodec import * from cengine import * @@ -31,3 +36,4 @@ from cssl import * from cdriver import * from cmessenger import * from cmessage import * +from curl import * http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/main/resources/curl.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/curl.py b/proton-j/src/main/resources/curl.py new file mode 100644 index 0000000..d4d3d37 --- /dev/null +++ b/proton-j/src/main/resources/curl.py @@ -0,0 +1,47 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License +# + +from org.apache.qpid.proton.messenger.impl import Address + +def pn_url(): + return Address() + +def pn_url_parse(urlstr): + return Address(urlstr) + +def pn_url_free(url): pass + +def pn_url_clear(url): + url.clear(); + +def pn_url_str(url): return url.toString() + +def pn_url_get_scheme(url): return url.getScheme() +def pn_url_get_username(url): return url.getUser() +def pn_url_get_password(url): return url.getPass() +def pn_url_get_host(url): return url.getHost() or None +def pn_url_get_port(url): return url.getPort() +def pn_url_get_path(url): return url.getName() + +def pn_url_set_scheme(url, value): url.setScheme(value) +def pn_url_set_username(url, value): url.setUser(value) +def pn_url_set_password(url, value): url.setPass(value) +def pn_url_set_host(url, value): url.setHost(value) +def pn_url_set_port(url, value): url.setPort(value) +def pn_url_set_path(url, value): url.setName(value) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/test/java/org/apache/qpid/proton/amqp/BinaryTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/amqp/BinaryTest.java b/proton-j/src/test/java/org/apache/qpid/proton/amqp/BinaryTest.java new file mode 100644 index 0000000..4ab4766 --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/amqp/BinaryTest.java @@ -0,0 +1,108 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.amqp; + +import static org.junit.Assert.*; + +import java.util.Arrays; + +import org.junit.Test; + +public class BinaryTest +{ + + @Test + public void testNotEqualsWithDifferentTypeObject() + { + Binary bin = createSteppedValueBinary(10); + + assertFalse("Objects should not be equal with different type", bin.equals("not-a-Binary")); + } + + @Test + public void testEqualsWithItself() + { + Binary bin = createSteppedValueBinary(10); + + assertTrue("Object should be equal to itself", bin.equals(bin)); + } + + @Test + public void testEqualsWithDifferentBinaryOfSameLengthAndContent() + { + int length = 10; + Binary bin1 = createSteppedValueBinary(length); + Binary bin2 = createSteppedValueBinary(length); + + assertTrue("Objects should be equal", bin1.equals(bin2)); + assertTrue("Objects should be equal", bin2.equals(bin1)); + } + + @Test + public void testEqualsWithDifferentLengthBinaryOfDifferentBytes() + { + int length1 = 10; + Binary bin1 = createSteppedValueBinary(length1); + Binary bin2 = createSteppedValueBinary(length1 + 1); + + assertFalse("Objects should not be equal", bin1.equals(bin2)); + assertFalse("Objects should not be equal", bin2.equals(bin1)); + } + + @Test + public void testEqualsWithDifferentLengthBinaryOfSameByte() + { + Binary bin1 = createNewRepeatedValueBinary(10, (byte) 1); + Binary bin2 = createNewRepeatedValueBinary(123, (byte) 1); + + assertFalse("Objects should not be equal", bin1.equals(bin2)); + assertFalse("Objects should not be equal", bin2.equals(bin1)); + } + + @Test + public void testEqualsWithDifferentContentBinary() + { + int length = 10; + Binary bin1 = createNewRepeatedValueBinary(length, (byte) 1); + + Binary bin2 = createNewRepeatedValueBinary(length, (byte) 1); + bin2.getArray()[5] = (byte) 0; + + assertFalse("Objects should not be equal", bin1.equals(bin2)); + assertFalse("Objects should not be equal", bin2.equals(bin1)); + } + + private Binary createSteppedValueBinary(int length) { + byte[] bytes = new byte[length]; + for (int i = 0; i < length; i++) { + bytes[i] = (byte) (length - i); + } + + return new Binary(bytes); + } + + private Binary createNewRepeatedValueBinary(int length, byte repeatedByte){ + byte[] bytes = new byte[length]; + Arrays.fill(bytes, repeatedByte); + + return new Binary(bytes); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/test/java/org/apache/qpid/proton/codec/StringTypeTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/codec/StringTypeTest.java b/proton-j/src/test/java/org/apache/qpid/proton/codec/StringTypeTest.java new file mode 100644 index 0000000..550386a --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/codec/StringTypeTest.java @@ -0,0 +1,148 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.proton.codec; + +import static org.junit.Assert.assertEquals; + +import java.lang.Character.UnicodeBlock; +import java.nio.ByteBuffer; +import java.nio.charset.Charset; +import java.util.Arrays; +import java.util.HashSet; +import java.util.Set; + +import org.junit.Test; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; + +/** + * Test the encoding and decoding of {@link StringType} values. + */ +public class StringTypeTest +{ + private static final Charset CHARSET_UTF8 = Charset.forName("UTF-8"); + + /** + * Loop over all the chars in given {@link UnicodeBlock}s and return a + * {@link Set <String>} containing all the possible values as their + * {@link String} values. + * + * @param blocks the {@link UnicodeBlock}s to loop over + * @return a {@link Set <String>} containing all the possible values as + * {@link String} values + */ + private static Set<String> getAllStringsFromUnicodeBlocks(final UnicodeBlock... blocks) + { + final Set<UnicodeBlock> blockSet = new HashSet<UnicodeBlock>(Arrays.asList(blocks)); + final Set<String> strings = new HashSet<String>(); + for (int codePoint = 0; codePoint <= Character.MAX_CODE_POINT; codePoint++) + { + if (blockSet.contains(UnicodeBlock.of(codePoint))) + { + final int charCount = Character.charCount(codePoint); + final StringBuilder sb = new StringBuilder( + charCount); + if (charCount == 1) + { + sb.append(String.valueOf((char) codePoint)); + } + else if (charCount == 2) + { + //TODO: use Character.highSurrogate(codePoint) and Character.lowSurrogate(codePoint) when Java 7 is baseline + char highSurrogate = (char) ((codePoint >>> 10) + ('\uD800' - (0x010000 >>> 10))); + char lowSurrogate = (char) ((codePoint & 0x3ff) + '\uDC00'); + + sb.append(highSurrogate); + sb.append(lowSurrogate); + } + else + { + throw new IllegalArgumentException("Character.charCount of " + + charCount + " not supported."); + } + strings.add(sb.toString()); + } + } + return strings; + } + + + /** + * Test the encoding and decoding of various complicated Unicode characters + * which will end up as "surrogate pairs" when encoded to UTF-8 + */ + @Test + public void calculateUTF8Length() + { + for (final String input : generateTestData()) + { + assertEquals("Incorrect string length calculated for string '"+input+"'",input.getBytes(CHARSET_UTF8).length, StringType.calculateUTF8Length(input)); + } + } + + /** + * Test the encoding and decoding of various Unicode characters + */ + @Test + public void encodeDecodeStrings() + { + final DecoderImpl decoder = new DecoderImpl(); + final EncoderImpl encoder = new EncoderImpl(decoder); + AMQPDefinedTypes.registerAllTypes(decoder, encoder); + final ByteBuffer bb = ByteBuffer.allocate(16); + + for (final String input : generateTestData()) + { + bb.clear(); + final AmqpValue inputValue = new AmqpValue(input); + encoder.setByteBuffer(bb); + encoder.writeObject(inputValue); + bb.clear(); + decoder.setByteBuffer(bb); + final AmqpValue outputValue = (AmqpValue) decoder.readObject(); + assertEquals("Failed to round trip String correctly: ", input, outputValue.getValue()); + } + } + + // build up some test data with a set of suitable Unicode characters + private Set<String> generateTestData() + { + return new HashSet<String>() + { + private static final long serialVersionUID = 7331717267070233454L; + + { + // non-surrogate pair blocks + addAll(getAllStringsFromUnicodeBlocks(UnicodeBlock.BASIC_LATIN, + UnicodeBlock.LATIN_1_SUPPLEMENT, + UnicodeBlock.GREEK, + UnicodeBlock.LETTERLIKE_SYMBOLS)); + // blocks with surrogate pairs + //TODO: restore others when Java 7 is baseline + addAll(getAllStringsFromUnicodeBlocks(/*UnicodeBlock.MISCELLANEOUS_SYMBOLS_AND_PICTOGRAPHS,*/ + UnicodeBlock.MUSICAL_SYMBOLS, + /*UnicodeBlock.EMOTICONS,*/ + /*UnicodeBlock.PLAYING_CARDS,*/ + UnicodeBlock.SUPPLEMENTARY_PRIVATE_USE_AREA_A, + UnicodeBlock.SUPPLEMENTARY_PRIVATE_USE_AREA_B)); + } + }; + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java b/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java index c0d2d92..4b233d5 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/messenger/impl/AddressTest.java @@ -6,24 +6,25 @@ import org.junit.Test; public class AddressTest { - private void testParse(String url, String scheme, String user, String pass, String host, String port, String name) - { - Address address = new Address(url); - assertEquals(scheme, address.getScheme()); - assertEquals(user, address.getUser()); - assertEquals(pass, address.getPass()); - assertEquals(host, address.getHost()); - assertEquals(port, address.getPort()); - } + private void testParse(String url, String scheme, String user, String pass, String host, String port, String name) + { + Address address = new Address(url); + assertEquals(scheme, address.getScheme()); + assertEquals(user, address.getUser()); + assertEquals(pass, address.getPass()); + assertEquals(host, address.getHost()); + assertEquals(port, address.getPort()); + assertEquals(url, address.toString()); + } - @Test - public void addressTests() - { - testParse("host", null, null, null, "host", null, null); - testParse("host:423", null, null, null, "host", "423", null); - testParse("user@host", null, "user", null, "host", null, null); - testParse("user:1243^&^:pw@host:423", null, "user", "1243^&^:pw", "host", "423", null); - testParse("user:1243^&^:pw@host:423/Foo.bar:90087", null, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087"); + @Test + public void addressTests() + { + testParse("host", null, null, null, "host", null, null); + testParse("host:423", null, null, null, "host", "423", null); + testParse("user@host", null, "user", null, "host", null, null); + testParse("user:1243^&^:pw@host:423", null, "user", "1243^&^:pw", "host", "423", null); + testParse("user:1243^&^:pw@host:423/Foo.bar:90087", null, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087"); testParse("user:1243^&^:pw@host:423/Foo.bar:90087@somewhere", null, "user", "1243^&^:pw", "host", "423", "Foo.bar:90087@somewhere"); testParse("[::1]", null, null, null, "::1", null, null); testParse("[::1]:amqp", null, null, null, "::1", "amqp", null); @@ -38,14 +39,13 @@ public class AddressTest { testParse("amqp://user@[1234:52:0:1260:f2de:f1ff:fe59:8f87]:amqp", "amqp", "user", null, "1234:52:0:1260:f2de:f1ff:fe59:8f87", "amqp", null); testParse("amqp://user:1243^&^:pw@[::1]:amqp", "amqp", "user", "1243^&^:pw", "::1", "amqp", null); testParse("amqp://user:1243^&^:pw@[::1]:amqp/Foo.bar:90087", "amqp", "user", "1243^&^:pw", "::1", "amqp", "Foo.bar:90087"); - testParse("amqp://host", "amqp", null, null, "host", null, null); - testParse("amqp://user@host", "amqp", "user", null, "host", null, null); - testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%"); - testParse("amqp://user@host:5674/path:%", "amqp", "user", null, "host", "5674", "path:%"); - testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%"); - testParse("amqp://bigbird@host/queue@host", "amqp", "bigbird", null, "host", null, "queue@host"); - testParse("amqp://host/queue@host", "amqp", null, null, "host", null, "queue@host"); - testParse("amqp://host:9765/queue@host", "amqp", null, null, "host", "9765", "queue@host"); - } - + testParse("amqp://host", "amqp", null, null, "host", null, null); + testParse("amqp://user@host", "amqp", "user", null, "host", null, null); + testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%"); + testParse("amqp://user@host:5674/path:%", "amqp", "user", null, "host", "5674", "path:%"); + testParse("amqp://user@host/path:%", "amqp", "user", null, "host", null, "path:%"); + testParse("amqp://bigbird@host/queue@host", "amqp", "bigbird", null, "host", null, "queue@host"); + testParse("amqp://host/queue@host", "amqp", null, null, "host", null, "queue@host"); + testParse("amqp://host:9765/queue@host", "amqp", null, null, "host", "9765", "queue@host"); + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java new file mode 100644 index 0000000..7f1822c --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/EngineTestBase.java @@ -0,0 +1,113 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.proton.systemtests; + +import static junit.framework.Assert.assertEquals; +import static junit.framework.Assert.assertTrue; + +import java.nio.ByteBuffer; +import java.util.logging.Logger; + +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.engine.Endpoint; +import org.apache.qpid.proton.engine.EndpointState; + +public abstract class EngineTestBase +{ + private static final Logger LOGGER = Logger.getLogger(EngineTestBase.class.getName()); + + private final TestLoggingHelper _testLoggingHelper = new TestLoggingHelper(LOGGER); + private final ProtonContainer _client = new ProtonContainer("clientContainer"); + private final ProtonContainer _server = new ProtonContainer("serverContainer"); + + protected TestLoggingHelper getTestLoggingHelper() + { + return _testLoggingHelper; + } + + protected ProtonContainer getClient() + { + return _client; + } + + protected ProtonContainer getServer() + { + return _server; + } + + protected void assertClientHasNothingToOutput() + { + assertEquals(0, getClient().transport.getOutputBuffer().remaining()); + getClient().transport.outputConsumed(); + } + + protected void pumpServerToClient() + { + ByteBuffer serverBuffer = getServer().transport.getOutputBuffer(); + + getTestLoggingHelper().prettyPrint(" <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer); + assertTrue("Server expected to produce some output", serverBuffer.hasRemaining()); + + ByteBuffer clientBuffer = getClient().transport.getInputBuffer(); + + clientBuffer.put(serverBuffer); + + assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining()); + + getClient().transport.processInput().checkIsOk(); + getServer().transport.outputConsumed(); + } + + protected void pumpClientToServer() + { + ByteBuffer clientBuffer = getClient().transport.getOutputBuffer(); + + getTestLoggingHelper().prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer); + assertTrue("Client expected to produce some output", clientBuffer.hasRemaining()); + + ByteBuffer serverBuffer = getServer().transport.getInputBuffer(); + + serverBuffer.put(clientBuffer); + + assertEquals("Server expected to consume all client's output", 0, clientBuffer.remaining()); + + getClient().transport.outputConsumed(); + getServer().transport.processInput().checkIsOk(); + } + + protected void doOutputInputCycle() throws Exception + { + pumpClientToServer(); + + pumpServerToClient(); + } + + protected void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState remoteState) + { + assertEquals(localState, endpoint.getLocalState()); + assertEquals(remoteState, endpoint.getRemoteState()); + } + + protected void assertTerminusEquals(org.apache.qpid.proton.amqp.transport.Target expectedTarget, org.apache.qpid.proton.amqp.transport.Target actualTarget) + { + assertEquals( + ((Target)expectedTarget).getAddress(), + ((Target)actualTarget).getAddress()); + } +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java new file mode 100644 index 0000000..7687206 --- /dev/null +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/FreeTest.java @@ -0,0 +1,236 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.qpid.proton.systemtests; + +import static java.util.EnumSet.of; +import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; +import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; +import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertNotSame; + +import java.util.logging.Logger; + +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.junit.Test; + +public class FreeTest extends EngineTestBase +{ + private static final Logger LOGGER = Logger.getLogger(FreeTest.class.getName()); + + @Test + public void testFreeConnectionWithMultipleSessionsAndSendersAndReceiversDoesNotThrowCME() throws Exception + { + LOGGER.fine(bold("======== About to create transports")); + + getClient().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); + + getServer().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); + + getClient().connection = Proton.connection(); + getClient().transport.bind(getClient().connection); + + getServer().connection = Proton.connection(); + getServer().transport.bind(getServer().connection); + + + + LOGGER.fine(bold("======== About to open connections")); + getClient().connection.open(); + getServer().connection.open(); + + doOutputInputCycle(); + + + + LOGGER.fine(bold("======== About to open sessions")); + getClient().session = getClient().connection.session(); + getClient().session.open(); + + Session clientSession2 = getClient().connection.session(); + clientSession2.open(); + + pumpClientToServer(); + + getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); + + getServer().session.open(); + assertEndpointState(getServer().session, ACTIVE, ACTIVE); + + Session serverSession2 = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertNotNull("Engine did not return expected second server session", serverSession2); + assertNotSame("Engine did not return expected second server session", serverSession2, getServer().session); + serverSession2.open(); + + pumpServerToClient(); + assertEndpointState(getClient().session, ACTIVE, ACTIVE); + assertEndpointState(clientSession2, ACTIVE, ACTIVE); + + + + LOGGER.fine(bold("======== About to create client senders")); + + getClient().source = new Source(); + getClient().source.setAddress(null); + + getClient().target = new Target(); + getClient().target.setAddress("myQueue"); + + getClient().sender = getClient().session.sender("sender1"); + getClient().sender.setTarget(getClient().target); + getClient().sender.setSource(getClient().source); + + getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + getClient().sender.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED); + + getClient().sender.open(); + assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); + + + Sender clientSender2 = getClient().session.sender("sender2"); + clientSender2.setTarget(getClient().target); + clientSender2.setSource(getClient().source); + + clientSender2.setSenderSettleMode(SenderSettleMode.UNSETTLED); + clientSender2.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + assertEndpointState(clientSender2, UNINITIALIZED, UNINITIALIZED); + + clientSender2.open(); + assertEndpointState(clientSender2, ACTIVE, UNINITIALIZED); + + pumpClientToServer(); + + + LOGGER.fine(bold("======== About to set up server receivers")); + + getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + // Accept the settlement modes suggested by the client + getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); + getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); + + org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget(); + assertTerminusEquals(getClient().target, serverRemoteTarget); + + getServer().receiver.setTarget(serverRemoteTarget); + + assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); + getServer().receiver.open(); + + assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); + + Receiver serverReceiver2 = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + serverReceiver2.open(); + assertEndpointState(serverReceiver2, ACTIVE, ACTIVE); + + pumpServerToClient(); + assertEndpointState(getClient().sender, ACTIVE, ACTIVE); + assertEndpointState(clientSender2, ACTIVE, ACTIVE); + + + + LOGGER.fine(bold("======== About to create client receivers")); + + Source src = new Source(); + src.setAddress("myQueue"); + + Target tgt1 = new Target(); + tgt1.setAddress("receiver1"); + + getClient().receiver = getClient().session.receiver("receiver1"); + getClient().receiver.setSource(src); + getClient().receiver.setTarget(tgt1); + + getClient().receiver.setSenderSettleMode(SenderSettleMode.UNSETTLED); + getClient().receiver.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + assertEndpointState(getClient().receiver, UNINITIALIZED, UNINITIALIZED); + + getClient().receiver.open(); + assertEndpointState(getClient().receiver, ACTIVE, UNINITIALIZED); + + + Target tgt2 = new Target(); + tgt1.setAddress("receiver2"); + + Receiver clientReceiver2 = getClient().session.receiver("receiver2"); + clientReceiver2.setSource(src); + clientReceiver2.setTarget(tgt2); + + clientReceiver2.setSenderSettleMode(SenderSettleMode.UNSETTLED); + clientReceiver2.setReceiverSettleMode(ReceiverSettleMode.FIRST); + + assertEndpointState(clientReceiver2, UNINITIALIZED, UNINITIALIZED); + + clientReceiver2.open(); + assertEndpointState(clientReceiver2, ACTIVE, UNINITIALIZED); + + pumpClientToServer(); + + + + LOGGER.fine(bold("======== About to set up server senders")); + + getServer().sender = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + // Accept the settlement modes suggested by the client + getServer().sender.setSenderSettleMode(getServer().sender.getRemoteSenderSettleMode()); + getServer().sender.setReceiverSettleMode(getServer().sender.getRemoteReceiverSettleMode()); + + org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget2 = getServer().sender.getRemoteTarget(); + assertTerminusEquals(tgt1, serverRemoteTarget2); + + getServer().sender.setTarget(serverRemoteTarget2); + + assertEndpointState(getServer().sender, UNINITIALIZED, ACTIVE); + getServer().sender.open(); + assertEndpointState(getServer().sender, ACTIVE, ACTIVE); + + Sender serverSender2 = (Sender) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + + serverRemoteTarget2 = serverSender2.getRemoteTarget(); + assertTerminusEquals(tgt2, serverRemoteTarget2); + serverSender2.setTarget(serverRemoteTarget2); + serverSender2.open(); + assertEndpointState(serverSender2, ACTIVE, ACTIVE); + + pumpServerToClient(); + assertEndpointState(getClient().receiver, ACTIVE, ACTIVE); + assertEndpointState(clientReceiver2, ACTIVE, ACTIVE); + + + + LOGGER.fine(bold("======== About to close and free client's connection")); + + getClient().connection.close(); + getClient().connection.free(); + } + +} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java index 9c5dbb3..a24bbdd 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java @@ -29,7 +29,6 @@ import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; -import java.nio.ByteBuffer; import java.util.Arrays; import java.util.logging.Logger; @@ -42,8 +41,6 @@ import org.apache.qpid.proton.amqp.messaging.Target; import org.apache.qpid.proton.amqp.transport.ReceiverSettleMode; import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Endpoint; -import org.apache.qpid.proton.engine.EndpointState; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; import org.junit.Test; @@ -64,84 +61,79 @@ import org.junit.Test; * * Does not illustrate use of the Messenger API. */ -public class ProtonEngineExampleTest +public class ProtonEngineExampleTest extends EngineTestBase { private static final Logger LOGGER = Logger.getLogger(ProtonEngineExampleTest.class.getName()); private static final int BUFFER_SIZE = 4096; - private TestLoggingHelper _testLoggingHelper = new TestLoggingHelper(LOGGER); - - private final ProtonContainer _client = new ProtonContainer("clientContainer"); - private final ProtonContainer _server = new ProtonContainer("serverContainer"); - - private final String _targetAddress = _server.containerId + "-link1-target"; + private final String _targetAddress = getServer().containerId + "-link1-target"; @Test public void test() throws Exception { LOGGER.fine(bold("======== About to create transports")); - _client.transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(_client.transport, TestLoggingHelper.CLIENT_PREFIX); + getClient().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getClient().transport, TestLoggingHelper.CLIENT_PREFIX); - _server.transport = Proton.transport(); - ProtocolTracerEnabler.setProtocolTracer(_server.transport, " " + TestLoggingHelper.SERVER_PREFIX); + getServer().transport = Proton.transport(); + ProtocolTracerEnabler.setProtocolTracer(getServer().transport, " " + TestLoggingHelper.SERVER_PREFIX); doOutputInputCycle(); - _client.connection = Proton.connection(); - _client.transport.bind(_client.connection); + getClient().connection = Proton.connection(); + getClient().transport.bind(getClient().connection); - _server.connection = Proton.connection(); - _server.transport.bind(_server.connection); + getServer().connection = Proton.connection(); + getServer().transport.bind(getServer().connection); LOGGER.fine(bold("======== About to open connections")); - _client.connection.open(); - _server.connection.open(); + getClient().connection.open(); + getServer().connection.open(); doOutputInputCycle(); LOGGER.fine(bold("======== About to open sessions")); - _client.session = _client.connection.session(); - _client.session.open(); + getClient().session = getClient().connection.session(); + getClient().session.open(); pumpClientToServer(); - _server.session = _server.connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); - assertEndpointState(_server.session, UNINITIALIZED, ACTIVE); + getServer().session = getServer().connection.sessionHead(of(UNINITIALIZED), of(ACTIVE)); + assertEndpointState(getServer().session, UNINITIALIZED, ACTIVE); - _server.session.open(); - assertEndpointState(_server.session, ACTIVE, ACTIVE); + getServer().session.open(); + assertEndpointState(getServer().session, ACTIVE, ACTIVE); pumpServerToClient(); - assertEndpointState(_client.session, ACTIVE, ACTIVE); + assertEndpointState(getClient().session, ACTIVE, ACTIVE); LOGGER.fine(bold("======== About to create sender")); - _client.source = new Source(); - _client.source.setAddress(null); + getClient().source = new Source(); + getClient().source.setAddress(null); - _client.target = new Target(); - _client.target.setAddress(_targetAddress); + getClient().target = new Target(); + getClient().target.setAddress(_targetAddress); - _client.sender = _client.session.sender("link1"); - _client.sender.setTarget(_client.target); - _client.sender.setSource(_client.source); + getClient().sender = getClient().session.sender("link1"); + getClient().sender.setTarget(getClient().target); + getClient().sender.setSource(getClient().source); // Exactly once delivery semantics - _client.sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); - _client.sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); + getClient().sender.setSenderSettleMode(SenderSettleMode.UNSETTLED); + getClient().sender.setReceiverSettleMode(ReceiverSettleMode.SECOND); - assertEndpointState(_client.sender, UNINITIALIZED, UNINITIALIZED); + assertEndpointState(getClient().sender, UNINITIALIZED, UNINITIALIZED); - _client.sender.open(); - assertEndpointState(_client.sender, ACTIVE, UNINITIALIZED); + getClient().sender.open(); + assertEndpointState(getClient().sender, ACTIVE, UNINITIALIZED); pumpClientToServer(); @@ -152,46 +144,46 @@ public class ProtonEngineExampleTest // A real application would be interested in more states than simply ACTIVE, as there // exists the possibility that the link could have moved to another state already e.g. CLOSED. // (See pipelining). - _server.receiver = (Receiver) _server.connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); + getServer().receiver = (Receiver) getServer().connection.linkHead(of(UNINITIALIZED), of(ACTIVE)); // Accept the settlement modes suggested by the client - _server.receiver.setSenderSettleMode(_server.receiver.getRemoteSenderSettleMode()); - _server.receiver.setReceiverSettleMode(_server.receiver.getRemoteReceiverSettleMode()); + getServer().receiver.setSenderSettleMode(getServer().receiver.getRemoteSenderSettleMode()); + getServer().receiver.setReceiverSettleMode(getServer().receiver.getRemoteReceiverSettleMode()); - org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = _server.receiver.getRemoteTarget(); - assertTerminusEquals(_client.target, serverRemoteTarget); + org.apache.qpid.proton.amqp.transport.Target serverRemoteTarget = getServer().receiver.getRemoteTarget(); + assertTerminusEquals(getClient().target, serverRemoteTarget); - _server.receiver.setTarget(applicationDeriveTarget(serverRemoteTarget)); + getServer().receiver.setTarget(applicationDeriveTarget(serverRemoteTarget)); - assertEndpointState(_server.receiver, UNINITIALIZED, ACTIVE); - _server.receiver.open(); + assertEndpointState(getServer().receiver, UNINITIALIZED, ACTIVE); + getServer().receiver.open(); - assertEndpointState(_server.receiver, ACTIVE, ACTIVE); + assertEndpointState(getServer().receiver, ACTIVE, ACTIVE); pumpServerToClient(); - assertEndpointState(_client.sender, ACTIVE, ACTIVE); + assertEndpointState(getClient().sender, ACTIVE, ACTIVE); - _server.receiver.flow(1); + getServer().receiver.flow(1); pumpServerToClient(); LOGGER.fine(bold("======== About to create a message and send it to the server")); - _client.message = Proton.message(); + getClient().message = Proton.message(); Section messageBody = new AmqpValue("Hello"); - _client.message.setBody(messageBody); - _client.messageData = new byte[BUFFER_SIZE]; - int lengthOfEncodedMessage = _client.message.encode(_client.messageData, 0, BUFFER_SIZE); - _testLoggingHelper.prettyPrint(TestLoggingHelper.MESSAGE_PREFIX, Arrays.copyOf(_client.messageData, lengthOfEncodedMessage)); + getClient().message.setBody(messageBody); + getClient().messageData = new byte[BUFFER_SIZE]; + int lengthOfEncodedMessage = getClient().message.encode(getClient().messageData, 0, BUFFER_SIZE); + getTestLoggingHelper().prettyPrint(TestLoggingHelper.MESSAGE_PREFIX, Arrays.copyOf(getClient().messageData, lengthOfEncodedMessage)); byte[] deliveryTag = "delivery1".getBytes(); - _client.delivery = _client.sender.delivery(deliveryTag); - int numberOfBytesAcceptedBySender = _client.sender.send(_client.messageData, 0, lengthOfEncodedMessage); + getClient().delivery = getClient().sender.delivery(deliveryTag); + int numberOfBytesAcceptedBySender = getClient().sender.send(getClient().messageData, 0, lengthOfEncodedMessage); assertEquals("For simplicity, assume the sender can accept all the data", lengthOfEncodedMessage, numberOfBytesAcceptedBySender); - assertNull(_client.delivery.getLocalState()); + assertNull(getClient().delivery.getLocalState()); - boolean senderAdvanced = _client.sender.advance(); + boolean senderAdvanced = getClient().sender.advance(); assertTrue("sender has not advanced", senderAdvanced); pumpClientToServer(); @@ -199,106 +191,106 @@ public class ProtonEngineExampleTest LOGGER.fine(bold("======== About to process the message on the server")); - _server.delivery = _server.connection.getWorkHead(); + getServer().delivery = getServer().connection.getWorkHead(); assertEquals("The received delivery should be on our receiver", - _server.receiver, _server.delivery.getLink()); - assertNull(_server.delivery.getLocalState()); - assertNull(_server.delivery.getRemoteState()); + getServer().receiver, getServer().delivery.getLink()); + assertNull(getServer().delivery.getLocalState()); + assertNull(getServer().delivery.getRemoteState()); - assertFalse(_server.delivery.isPartial()); - assertTrue(_server.delivery.isReadable()); + assertFalse(getServer().delivery.isPartial()); + assertTrue(getServer().delivery.isReadable()); - _server.messageData = new byte[BUFFER_SIZE]; - int numberOfBytesProducedByReceiver = _server.receiver.recv(_server.messageData, 0, BUFFER_SIZE); + getServer().messageData = new byte[BUFFER_SIZE]; + int numberOfBytesProducedByReceiver = getServer().receiver.recv(getServer().messageData, 0, BUFFER_SIZE); assertEquals(numberOfBytesAcceptedBySender, numberOfBytesProducedByReceiver); - _server.message = Proton.message(); - _server.message.decode(_server.messageData, 0, numberOfBytesProducedByReceiver); + getServer().message = Proton.message(); + getServer().message.decode(getServer().messageData, 0, numberOfBytesProducedByReceiver); - boolean messageProcessed = applicationProcessMessage(_server.message); + boolean messageProcessed = applicationProcessMessage(getServer().message); assertTrue(messageProcessed); - _server.delivery.disposition(Accepted.getInstance()); - assertEquals(Accepted.getInstance(), _server.delivery.getLocalState()); + getServer().delivery.disposition(Accepted.getInstance()); + assertEquals(Accepted.getInstance(), getServer().delivery.getLocalState()); pumpServerToClient(); - assertEquals(Accepted.getInstance(), _client.delivery.getRemoteState()); + assertEquals(Accepted.getInstance(), getClient().delivery.getRemoteState()); LOGGER.fine(bold("======== About to accept and settle the message on the client")); - Delivery clientDelivery = _client.connection.getWorkHead(); - assertEquals(_client.delivery, clientDelivery); + Delivery clientDelivery = getClient().connection.getWorkHead(); + assertEquals(getClient().delivery, clientDelivery); assertTrue(clientDelivery.isUpdated()); - assertEquals(_client.sender, clientDelivery.getLink()); + assertEquals(getClient().sender, clientDelivery.getLink()); clientDelivery.disposition(clientDelivery.getRemoteState()); - assertEquals(Accepted.getInstance(), _client.delivery.getLocalState()); + assertEquals(Accepted.getInstance(), getClient().delivery.getLocalState()); clientDelivery.settle(); - assertNull("Now we've settled, the delivery should no longer be in the work list", _client.connection.getWorkHead()); + assertNull("Now we've settled, the delivery should no longer be in the work list", getClient().connection.getWorkHead()); pumpClientToServer(); LOGGER.fine(bold("======== About to settle the message on the server")); - assertEquals(Accepted.getInstance(), _server.delivery.getRemoteState()); - Delivery serverDelivery = _server.connection.getWorkHead(); - assertEquals(_server.delivery, serverDelivery); + assertEquals(Accepted.getInstance(), getServer().delivery.getRemoteState()); + Delivery serverDelivery = getServer().connection.getWorkHead(); + assertEquals(getServer().delivery, serverDelivery); assertTrue(serverDelivery.isUpdated()); assertTrue("Client should have already settled", serverDelivery.remotelySettled()); serverDelivery.settle(); assertTrue(serverDelivery.isSettled()); - assertNull("Now we've settled, the delivery should no longer be in the work list", _server.connection.getWorkHead()); + assertNull("Now we've settled, the delivery should no longer be in the work list", getServer().connection.getWorkHead()); // Increment the receiver's credit so its ready for another message. // When using proton-c, this call is required in order to generate a Flow frame // (proton-j sends one even without it to eagerly restore the session incoming window). - _server.receiver.flow(1); + getServer().receiver.flow(1); pumpServerToClient(); LOGGER.fine(bold("======== About to close client's sender")); - _client.sender.close(); + getClient().sender.close(); pumpClientToServer(); LOGGER.fine(bold("======== Server about to process client's link closure")); - assertSame(_server.receiver, _server.connection.linkHead(of(ACTIVE), of(CLOSED))); - _server.receiver.close(); + assertSame(getServer().receiver, getServer().connection.linkHead(of(ACTIVE), of(CLOSED))); + getServer().receiver.close(); pumpServerToClient(); LOGGER.fine(bold("======== About to close client's session")); - _client.session.close(); + getClient().session.close(); pumpClientToServer(); LOGGER.fine(bold("======== Server about to process client's session closure")); - assertSame(_server.session, _server.connection.sessionHead(of(ACTIVE), of(CLOSED))); - _server.session.close(); + assertSame(getServer().session, getServer().connection.sessionHead(of(ACTIVE), of(CLOSED))); + getServer().session.close(); pumpServerToClient(); LOGGER.fine(bold("======== About to close client's connection")); - _client.connection.close(); + getClient().connection.close(); pumpClientToServer(); LOGGER.fine(bold("======== Server about to process client's connection closure")); - assertEquals(CLOSED, _server.connection.getRemoteState()); - _server.connection.close(); + assertEquals(CLOSED, getServer().connection.getRemoteState()); + getServer().connection.close(); pumpServerToClient(); @@ -331,66 +323,4 @@ public class ProtonEngineExampleTest Object messageBody = ((AmqpValue)message.getBody()).getValue(); return "Hello".equals(messageBody); } - - private void assertTerminusEquals( - org.apache.qpid.proton.amqp.transport.Target expectedTarget, - org.apache.qpid.proton.amqp.transport.Target actualTarget) - { - assertEquals( - ((Target)expectedTarget).getAddress(), - ((Target)actualTarget).getAddress()); - } - - private void assertEndpointState(Endpoint endpoint, EndpointState localState, EndpointState remoteState) - { - assertEquals(localState, endpoint.getLocalState()); - assertEquals(remoteState, endpoint.getRemoteState()); - } - - private void doOutputInputCycle() throws Exception - { - pumpClientToServer(); - - pumpServerToClient(); - } - - private void pumpClientToServer() - { - ByteBuffer clientBuffer = _client.transport.getOutputBuffer(); - - _testLoggingHelper.prettyPrint(TestLoggingHelper.CLIENT_PREFIX + ">>> ", clientBuffer); - assertTrue("Client expected to produce some output", clientBuffer.hasRemaining()); - - ByteBuffer serverBuffer = _server.transport.getInputBuffer(); - - serverBuffer.put(clientBuffer); - - assertEquals("Server expected to consume all client's output", 0, clientBuffer.remaining()); - - _client.transport.outputConsumed(); - _server.transport.processInput().checkIsOk(); - } - - private void pumpServerToClient() - { - ByteBuffer serverBuffer = _server.transport.getOutputBuffer(); - - _testLoggingHelper.prettyPrint(" <<<" + TestLoggingHelper.SERVER_PREFIX + " ", serverBuffer); - assertTrue("Server expected to produce some output", serverBuffer.hasRemaining()); - - ByteBuffer clientBuffer = _client.transport.getInputBuffer(); - - clientBuffer.put(serverBuffer); - - assertEquals("Client expected to consume all server's output", 0, serverBuffer.remaining()); - - _client.transport.processInput().checkIsOk(); - _server.transport.outputConsumed(); - } - - private void assertClientHasNothingToOutput() - { - assertEquals(0, _client.transport.getOutputBuffer().remaining()); - _client.transport.outputConsumed(); - } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/tests/python/proton_tests/__init__.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/__init__.py b/tests/python/proton_tests/__init__.py index 1853da6..1cecf38 100644 --- a/tests/python/proton_tests/__init__.py +++ b/tests/python/proton_tests/__init__.py @@ -26,4 +26,4 @@ import proton_tests.transport import proton_tests.ssl import proton_tests.interop import proton_tests.soak - +import proton_tests.url http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/tests/python/proton_tests/engine.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/engine.py b/tests/python/proton_tests/engine.py index 44157e7..c821107 100644 --- a/tests/python/proton_tests/engine.py +++ b/tests/python/proton_tests/engine.py @@ -2101,6 +2101,11 @@ class CollectorTest(Test): assert False, "actual events %s did not match any of the expected sequences: %s" % (events, sequences) + def expect_until(self, *types): + events = self.drain() + etypes = tuple([e.type for e in events[-len(types):]]) + assert etypes == types, "actual events %s did not end in expect sequence: %s" % (events, types) + class EventTest(CollectorTest): def teardown(self): @@ -2150,8 +2155,8 @@ class EventTest(CollectorTest): self.pump() c1.free() c1._transport.unbind() - self.expect(Event.SESSION_FINAL, Event.LINK_FINAL, Event.SESSION_FINAL, - Event.CONNECTION_FINAL) + self.expect(Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.LINK_FINAL, + Event.SESSION_FINAL, Event.CONNECTION_FINAL) def testConnectionINIT_FINAL(self): c = Connection() @@ -2215,8 +2220,8 @@ class EventTest(CollectorTest): self.expect(Event.LINK_REMOTE_OPEN, Event.DELIVERY) rcv.session.connection._transport.unbind() rcv.session.connection.free() - self.expect(Event.TRANSPORT, Event.LINK_FINAL, Event.SESSION_FINAL, - Event.CONNECTION_FINAL) + self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT, Event.LINK_FINAL, + Event.SESSION_FINAL, Event.CONNECTION_FINAL) def testDeliveryEventsDisp(self): snd, rcv = self.testFlowEvents() @@ -2233,7 +2238,85 @@ class EventTest(CollectorTest): rdlv.update(Delivery.ACCEPTED) self.pump() event = self.expect(Event.DELIVERY) - assert event.delivery == dlv + assert event.context == dlv + + def testConnectionBOUND_UNBOUND(self): + c = Connection() + c.collect(self.collector) + self.expect(Event.CONNECTION_INIT) + t = Transport() + t.bind(c) + self.expect(Event.CONNECTION_BOUND) + t.unbind() + self.expect(Event.CONNECTION_UNBOUND, Event.TRANSPORT) + + def testTransportERROR_CLOSE(self): + c = Connection() + c.collect(self.collector) + self.expect(Event.CONNECTION_INIT) + t = Transport() + t.bind(c) + self.expect(Event.CONNECTION_BOUND) + assert t.condition is None + t.push("asdf") + self.expect(Event.TRANSPORT_ERROR, Event.TRANSPORT_TAIL_CLOSED) + assert t.condition is not None + assert t.condition.name == "amqp:connection:framing-error" + assert "AMQP header mismatch" in t.condition.description + p = t.pending() + assert p > 0 + t.pop(p) + self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) + + def testTransportCLOSED(self): + c = Connection() + c.collect(self.collector) + self.expect(Event.CONNECTION_INIT) + t = Transport() + t.bind(c) + c.open() + + self.expect(Event.CONNECTION_BOUND, Event.CONNECTION_OPEN, Event.TRANSPORT) + + c2 = Connection() + t2 = Transport() + t2.bind(c2) + c2.open() + c2.close() + + pump(t, t2) + + self.expect(Event.CONNECTION_REMOTE_OPEN, Event.CONNECTION_REMOTE_CLOSE, + Event.TRANSPORT_TAIL_CLOSED) + + c.close() + + pump(t, t2) + + self.expect(Event.CONNECTION_CLOSE, Event.TRANSPORT, + Event.TRANSPORT_HEAD_CLOSED, Event.TRANSPORT_CLOSED) + + def testLinkDetach(self): + c1 = Connection() + c1.collect(self.collector) + t1 = Transport() + t1.bind(c1) + c1.open() + s1 = c1.session() + s1.open() + l1 = s1.sender("asdf") + l1.open() + l1.detach() + self.expect_until(Event.LINK_DETACH, Event.TRANSPORT) + + c2 = Connection() + c2.collect(self.collector) + t2 = Transport() + t2.bind(c2) + + pump(t1, t2) + + self.expect_until(Event.LINK_REMOTE_DETACH) class PeerTest(CollectorTest): @@ -2255,7 +2338,8 @@ class TeardownLeakTest(PeerTest): def doLeak(self, local, remote): self.connection.open() - self.expect(Event.CONNECTION_INIT, Event.CONNECTION_OPEN, Event.TRANSPORT) + self.expect(Event.CONNECTION_INIT, Event.CONNECTION_BOUND, + Event.CONNECTION_OPEN, Event.TRANSPORT) ssn = self.connection.session() ssn.open() @@ -2294,19 +2378,23 @@ class TeardownLeakTest(PeerTest): self.pump() if remote: - self.expect_oneof((Event.LINK_REMOTE_CLOSE, Event.SESSION_REMOTE_CLOSE, - Event.CONNECTION_REMOTE_CLOSE), - (Event.LINK_REMOTE_CLOSE, Event.LINK_FINAL, - Event.SESSION_REMOTE_CLOSE, - Event.CONNECTION_REMOTE_CLOSE)) + self.expect_oneof((Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE, + Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE, + Event.TRANSPORT_TAIL_CLOSED, Event.TRANSPORT_CLOSED), + (Event.TRANSPORT_HEAD_CLOSED, Event.LINK_REMOTE_CLOSE, + Event.LINK_FINAL, Event.SESSION_REMOTE_CLOSE, + Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED, + Event.TRANSPORT_CLOSED)) else: - self.expect(Event.SESSION_REMOTE_CLOSE, Event.CONNECTION_REMOTE_CLOSE) + self.expect(Event.TRANSPORT_HEAD_CLOSED, Event.SESSION_REMOTE_CLOSE, + Event.CONNECTION_REMOTE_CLOSE, Event.TRANSPORT_TAIL_CLOSED, + Event.TRANSPORT_CLOSED) self.connection.free() self.transport.unbind() - self.expect_oneof((Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL), - (Event.SESSION_FINAL, Event.CONNECTION_FINAL)) + self.expect_oneof((Event.LINK_FINAL, Event.CONNECTION_UNBOUND, Event.SESSION_FINAL, Event.CONNECTION_FINAL), + (Event.CONNECTION_UNBOUND, Event.LINK_FINAL, Event.SESSION_FINAL, Event.CONNECTION_FINAL)) def testLocalRemoteLeak(self): self.doLeak(True, True) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/92b8098c/tests/python/proton_tests/scratch.py ---------------------------------------------------------------------- diff --git a/tests/python/proton_tests/scratch.py b/tests/python/proton_tests/scratch.py new file mode 100644 index 0000000..7e8ae5b --- /dev/null +++ b/tests/python/proton_tests/scratch.py @@ -0,0 +1,44 @@ + def xxx_test_reopen_on_same_session(self): + ssn1 = self.snd.session + ssn2 = self.rcv.session + + self.snd.open() + self.rcv.open() + self.pump() + + assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE + assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE + + self.snd.close() + self.rcv.close() + self.pump() + + assert self.snd.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED + assert self.rcv.state == Endpoint.LOCAL_CLOSED | Endpoint.REMOTE_CLOSED + + print self.snd._link + self.snd = ssn1.sender("test-link") + print self.snd._link + self.rcv = ssn2.receiver("test-link") + self.snd.open() + self.rcv.open() + self.pump() + + assert self.snd.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE + assert self.rcv.state == Endpoint.LOCAL_ACTIVE | Endpoint.REMOTE_ACTIVE + +class SessionPipelineTest(PeerTest): + + def xxx_test(self): + self.connection.open() + self.peer.open() + self.pump() + ssn = self.connection.session() + ssn.open() + self.pump() + peer_ssn = self.peer.session_head(0) + ssn.close() + self.pump() + peer_ssn.close() + self.peer.close() + self.pump() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
