http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java index 1949652..62cb10d 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportImpl.java @@ -17,14 +17,14 @@ package org.apache.qpid.proton.engine.impl; -import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newReadableBuffer; -import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pour; import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourArrayToBuffer; import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.pourBufferToArray; import java.nio.ByteBuffer; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.UnsignedInteger; @@ -32,6 +32,7 @@ import org.apache.qpid.proton.amqp.UnsignedShort; import org.apache.qpid.proton.amqp.transport.Attach; import org.apache.qpid.proton.amqp.transport.Begin; import org.apache.qpid.proton.amqp.transport.Close; +import org.apache.qpid.proton.amqp.transport.ConnectionError; import org.apache.qpid.proton.amqp.transport.Detach; import org.apache.qpid.proton.amqp.transport.Disposition; import org.apache.qpid.proton.amqp.transport.End; @@ -44,10 +45,8 @@ import org.apache.qpid.proton.amqp.transport.Transfer; import org.apache.qpid.proton.codec.AMQPDefinedTypes; import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; -import org.apache.qpid.proton.codec.WritableBuffer; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.Event; import org.apache.qpid.proton.engine.ProtonJTransport; import org.apache.qpid.proton.engine.Sasl; @@ -66,7 +65,20 @@ public class TransportImpl extends EndpointImpl implements ProtonJTransport, FrameBody.FrameBodyHandler<Integer>, FrameHandler, TransportOutputWriter { - private static final byte AMQP_FRAME_TYPE = 0; + static final int BUFFER_RELEASE_THRESHOLD = Integer.getInteger("proton.transport_buffer_release_threshold", 2 * 1024 * 1024); + + private static final boolean getBooleanEnv(String name) + { + String value = System.getenv(name); + return "true".equalsIgnoreCase(value) || + "1".equals(value) || + "yes".equalsIgnoreCase(value); + } + + private static final boolean FRM_ENABLED = getBooleanEnv("PN_TRACE_FRM"); + + // trace levels + private int _levels = (FRM_ENABLED ? this.TRACE_FRM : 0); private FrameParser _frameParser; @@ -100,16 +112,16 @@ public class TransportImpl extends EndpointImpl private Open _open; private SaslImpl _sasl; private SslImpl _ssl; - private ProtocolTracer _protocolTracer = null; - - private ByteBuffer _lastInputBuffer; + private final Ref<ProtocolTracer> _protocolTracer = new Ref(null); private TransportResult _lastTransportResult = TransportResultFactory.ok(); private boolean _init; + private boolean _processingStarted; private FrameHandler _frameHandler = this; private boolean _head_closed = false; + private TransportException _tail_error = null; /** * @deprecated This constructor's visibility will be reduced to the default scope in a future release. @@ -134,7 +146,6 @@ public class TransportImpl extends EndpointImpl FrameWriter.AMQP_FRAME_TYPE, _protocolTracer, this); - } private void init() @@ -149,6 +160,11 @@ public class TransportImpl extends EndpointImpl } @Override + public void trace(int levels) { + _levels = levels; + } + + @Override public int getMaxFrameSize() { return _maxFrameSize; @@ -194,9 +210,10 @@ public class TransportImpl extends EndpointImpl @Override public void bind(Connection conn) { - // TODO - check if already bound - ((ConnectionImpl) conn).setTransport(this); _connectionEndpoint = (ConnectionImpl) conn; + // TODO - check if already bound + _connectionEndpoint.setTransport(this); + _connectionEndpoint.incref(); if(getRemoteState() != EndpointState.UNINITIALIZED) { @@ -211,6 +228,23 @@ public class TransportImpl extends EndpointImpl } @Override + public void unbind() + { + _connectionEndpoint.modifyEndpoints(); + + _connectionEndpoint.setTransport(null); + _connectionEndpoint.decref(); + + for (TransportSession ts: _transportSessionState.values()) { + ts.unbind(); + } + + for (TransportLink tl: _transportLinkState.values()) { + tl.unbind(); + } + } + + @Override public int input(byte[] bytes, int offset, int length) { oldApiCheckStateBeforeInput(length).checkIsOk(); @@ -278,8 +312,13 @@ public class TransportImpl extends EndpointImpl { if(_sasl == null) { + if(_processingStarted) + { + throw new IllegalStateException("Sasl can't be initiated after transport has started processing"); + } + init(); - _sasl = new SaslImpl(_remoteMaxFrameSize); + _sasl = new SaslImpl(this, _remoteMaxFrameSize); TransportWrapper transportWrapper = _sasl.wrap(_inputProcessor, _outputProcessor); _inputProcessor = transportWrapper; _outputProcessor = transportWrapper; @@ -334,38 +373,37 @@ public class TransportImpl extends EndpointImpl && transportLink.isLocalHandleSet() && !_isCloseSent) { - if(!(link instanceof SenderImpl) - || link.getQueued() == 0 - || transportLink.detachReceived() - || transportSession.endReceived() - || _closeReceived) - { - UnsignedInteger localHandle = transportLink.getLocalHandle(); - transportLink.clearLocalHandle(); - transportSession.freeLocalHandle(localHandle); + if((link instanceof SenderImpl) + && link.getQueued() > 0 + && !transportLink.detachReceived() + && !transportSession.endReceived() + && !_closeReceived) { + endpoint = endpoint.transportNext(); + continue; + } + UnsignedInteger localHandle = transportLink.getLocalHandle(); + transportLink.clearLocalHandle(); + transportSession.freeLocalHandle(localHandle); - Detach detach = new Detach(); - detach.setHandle(localHandle); - // TODO - need an API for detaching rather than closing the link - detach.setClosed(true); - ErrorCondition localError = link.getCondition(); - if( localError.getCondition() !=null ) - { - detach.setError(localError); - } + Detach detach = new Detach(); + detach.setHandle(localHandle); + // TODO - need an API for detaching rather than closing the link + detach.setClosed(true); + ErrorCondition localError = link.getCondition(); + if( localError.getCondition() !=null ) + { + detach.setError(localError); + } - writeFrame(transportSession.getLocalChannel(), detach, null, null); - endpoint.clearModified(); - // TODO - temporary hack for PROTON-154, this line should be removed and replaced - // with proper handling for closed links - link.free(); - } + writeFrame(transportSession.getLocalChannel(), detach, null, null); } + endpoint.clearModified(); + } endpoint = endpoint.transportNext(); } @@ -411,8 +449,6 @@ public class TransportImpl extends EndpointImpl sender.setDrained(0); writeFlow(transportSession, transportLink); - - endpoint.clearModified(); } } @@ -470,7 +506,7 @@ public class TransportImpl extends EndpointImpl if(!delivery.isDone() && (delivery.getDataLength() > 0 || delivery != snd.current()) && tpSession.hasOutgoingCredit() && tpLink.hasCredit() && - tpLink.getLocalHandle() != null) + tpLink.getLocalHandle() != null && !_frameWriter.isFull()) { UnsignedInteger deliveryId = tpSession.getOutgoingDeliveryId(); TransportDelivery tpDelivery = new TransportDelivery(deliveryId, delivery, tpLink); @@ -529,6 +565,8 @@ public class TransportImpl extends EndpointImpl delivery.setDataLength(payload.remaining()); session.incrementOutgoingBytes(-delta); } + + getConnectionImpl().put(Event.Type.LINK_FLOW, snd); } if(wasDone && delivery.getLocalState() != null) @@ -596,10 +634,6 @@ public class TransportImpl extends EndpointImpl { transportLink.addCredit(credits); writeFlow(transportSession, transportLink); - if(receiver.getLocalState() == EndpointState.ACTIVE) - { - endpoint.clearModified(); - } } } } @@ -689,10 +723,6 @@ public class TransportImpl extends EndpointImpl writeFrame(transportSession.getLocalChannel(), attach, null, null); transportLink.sentAttach(); - if(link.getLocalState() == EndpointState.ACTIVE && (link instanceof SenderImpl || !link.hasCredit())) - { - endpoint.clearModified(); - } } } } @@ -712,15 +742,22 @@ public class TransportImpl extends EndpointImpl private void processOpen() { - if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED && !_isOpenSent) - { + if ((_tail_error != null || + (_connectionEndpoint != null && + _connectionEndpoint.getLocalState() != EndpointState.UNINITIALIZED)) && + !_isOpenSent) { Open open = new Open(); - String cid = _connectionEndpoint.getLocalContainerId(); - open.setContainerId(cid == null ? "" : cid); - open.setHostname(_connectionEndpoint.getHostname()); - open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities()); - open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities()); - open.setProperties(_connectionEndpoint.getProperties()); + if (_connectionEndpoint != null) { + String cid = _connectionEndpoint.getLocalContainerId(); + open.setContainerId(cid == null ? "" : cid); + open.setHostname(_connectionEndpoint.getHostname()); + open.setDesiredCapabilities(_connectionEndpoint.getDesiredCapabilities()); + open.setOfferedCapabilities(_connectionEndpoint.getOfferedCapabilities()); + open.setProperties(_connectionEndpoint.getProperties()); + } else { + open.setContainerId(""); + } + if (_maxFrameSize > 0) { open.setMaxFrameSize(UnsignedInteger.valueOf(_maxFrameSize)); } @@ -731,7 +768,6 @@ public class TransportImpl extends EndpointImpl _isOpenSent = true; writeFrame(0, open, null, null); - } } @@ -762,10 +798,6 @@ public class TransportImpl extends EndpointImpl writeFrame(channelId, begin, null, null); transportSession.sentBegin(); - if(session.getLocalState() == EndpointState.ACTIVE) - { - endpoint.clearModified(); - } } } endpoint = endpoint.transportNext(); @@ -829,21 +861,27 @@ public class TransportImpl extends EndpointImpl SessionImpl session; TransportSession transportSession; - if((endpoint instanceof SessionImpl) - && (session = (SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED - && (transportSession = session.getTransportSession()).isLocalChannelSet() - && !hasSendableMessages(session) - && !_isCloseSent) - { - int channel = freeLocalChannel(transportSession); - End end = new End(); - ErrorCondition localError = endpoint.getCondition(); - if( localError.getCondition() !=null ) + if((endpoint instanceof SessionImpl)) { + if ((session = (SessionImpl)endpoint).getLocalState() == EndpointState.CLOSED + && (transportSession = session.getTransportSession()).isLocalChannelSet() + && !_isCloseSent) { - end.setError(localError); + if (hasSendableMessages(session)) { + endpoint = endpoint.transportNext(); + continue; + } + + int channel = freeLocalChannel(transportSession); + End end = new End(); + ErrorCondition localError = endpoint.getCondition(); + if( localError.getCondition() !=null ) + { + end.setError(localError); + } + + writeFrame(channel, end, null, null); } - writeFrame(channel, end, null, null); endpoint.clearModified(); } @@ -854,6 +892,9 @@ public class TransportImpl extends EndpointImpl private boolean hasSendableMessages(SessionImpl session) { + if (_connectionEndpoint == null) { + return false; + } if(!_closeReceived && (session == null || !session.getTransportSession().endReceived())) { @@ -878,14 +919,24 @@ public class TransportImpl extends EndpointImpl private void processClose() { - if(_connectionEndpoint != null && _connectionEndpoint.getLocalState() == EndpointState.CLOSED && !_isCloseSent) - { + if ((_tail_error != null || + (_connectionEndpoint != null && + _connectionEndpoint.getLocalState() == EndpointState.CLOSED)) && + !_isCloseSent) { if(!hasSendableMessages(null)) { Close close = new Close(); - ErrorCondition localError = _connectionEndpoint.getCondition(); - if( localError.getCondition() !=null ) + ErrorCondition localError; + + if (_connectionEndpoint == null) { + localError = new ErrorCondition(ConnectionError.FRAMING_ERROR, + _tail_error.toString()); + } else { + localError = _connectionEndpoint.getCondition(); + } + + if(localError.getCondition() != null) { close.setError(localError); } @@ -893,6 +944,10 @@ public class TransportImpl extends EndpointImpl _isCloseSent = true; writeFrame(0, close, null, null); + + if (_connectionEndpoint != null) { + _connectionEndpoint.clearModified(); + } } } } @@ -912,10 +967,10 @@ public class TransportImpl extends EndpointImpl } @Override - public void free() - { - super.free(); - } + void postFinal() {} + + @Override + void doFree() { } //================================================================================================================== // handle incoming amqp data @@ -967,6 +1022,9 @@ public class TransportImpl extends EndpointImpl { // TODO check null transportSession = _localSessions.get(begin.getRemoteChannel().intValue()); + if (transportSession == null) { + throw new NullPointerException("uncorrelated channel: " + begin.getRemoteChannel()); + } session = transportSession.getSession(); } @@ -975,10 +1033,7 @@ public class TransportImpl extends EndpointImpl transportSession.setNextIncomingId(begin.getNextOutgoingId()); _remoteSessions.put(channel, transportSession); - EventImpl ev = _connectionEndpoint.put(Event.Type.SESSION_REMOTE_STATE); - if (ev != null) { - ev.init(session); - } + _connectionEndpoint.put(Event.Type.SESSION_REMOTE_OPEN, session); } } @@ -1034,10 +1089,7 @@ public class TransportImpl extends EndpointImpl } - EventImpl ev = _connectionEndpoint.put(Event.Type.LINK_REMOTE_STATE); - if (ev != null) { - ev.init(link); - } + _connectionEndpoint.put(Event.Type.LINK_REMOTE_OPEN, link); } } @@ -1102,16 +1154,13 @@ public class TransportImpl extends EndpointImpl LinkImpl link = transportLink.getLink(); transportLink.receivedDetach(); transportSession.freeRemoteHandle(transportLink.getRemoteHandle()); + _connectionEndpoint.put(Event.Type.LINK_REMOTE_CLOSE, link); + transportLink.clearRemoteHandle(); link.setRemoteState(EndpointState.CLOSED); if(detach.getError() != null) { link.getRemoteCondition().copyFrom(detach.getError()); } - - EventImpl ev = _connectionEndpoint.put(Event.Type.LINK_REMOTE_STATE); - if (ev != null) { - ev.init(link); - } } else { @@ -1140,10 +1189,7 @@ public class TransportImpl extends EndpointImpl session.getRemoteCondition().copyFrom(errorCondition); } - EventImpl ev = _connectionEndpoint.put(Event.Type.SESSION_REMOTE_STATE); - if (ev != null) { - ev.init(session); - } + _connectionEndpoint.put(Event.Type.SESSION_REMOTE_CLOSE, session); } } @@ -1160,10 +1206,7 @@ public class TransportImpl extends EndpointImpl _connectionEndpoint.getRemoteCondition().copyFrom(close.getError()); } - EventImpl ev = _connectionEndpoint.put(Event.Type.CONNECTION_REMOTE_STATE); - if (ev != null) { - ev.init(_connectionEndpoint); - } + _connectionEndpoint.put(Event.Type.CONNECTION_REMOTE_CLOSE, _connectionEndpoint); } } @@ -1176,11 +1219,12 @@ public class TransportImpl extends EndpointImpl throw new IllegalStateException("Transport cannot accept frame: " + frame); } - log(this, INCOMING, frame); + log(INCOMING, frame); - if( _protocolTracer != null ) + ProtocolTracer tracer = _protocolTracer.get(); + if( tracer != null ) { - _protocolTracer.receivedFrame(frame); + tracer.receivedFrame(frame); } frame.getBody().invoke(this,frame.getPayload(), frame.getChannel()); @@ -1188,10 +1232,15 @@ public class TransportImpl extends EndpointImpl } @Override - public void closed() + public void closed(TransportException error) { - if (!_closeReceived) { - throw new TransportException("connection aborted"); + if (!_closeReceived || error != null) { + if (error == null) { + _tail_error = new TransportException("connection aborted"); + } else { + _tail_error = error; + } + _head_closed = true; } } @@ -1204,13 +1253,13 @@ public class TransportImpl extends EndpointImpl @Override public ProtocolTracer getProtocolTracer() { - return _protocolTracer; + return _protocolTracer.get(); } @Override public void setProtocolTracer(ProtocolTracer protocolTracer) { - this._protocolTracer = protocolTracer; + this._protocolTracer.set(protocolTracer); } @Override @@ -1260,6 +1309,8 @@ public class TransportImpl extends EndpointImpl @Override public void process() throws TransportException { + _processingStarted = true; + try { init(); _inputProcessor.process(); @@ -1303,6 +1354,12 @@ public class TransportImpl extends EndpointImpl _outputProcessor.close_head(); } + public boolean isClosed() { + int p = pending(); + int c = capacity(); + return p == END_OF_STREAM && c == END_OF_STREAM; + } + @Override public String toString() { @@ -1337,21 +1394,11 @@ public class TransportImpl extends EndpointImpl static String INCOMING = "<-"; static String OUTGOING = "->"; - private static final boolean getBooleanEnv(String name) + void log(String event, TransportFrame frame) { - String value = System.getenv(name); - return "true".equalsIgnoreCase(value) || - "1".equals(value) || - "yes".equalsIgnoreCase(value); - } - - private static final boolean ENABLED = getBooleanEnv("PN_TRACE_FRM"); - - static void log(Object ctx, String event, TransportFrame frame) - { - if (ENABLED) { + if ((_levels & TRACE_FRM) != 0) { StringBuilder msg = new StringBuilder(); - msg.append("[").append(System.identityHashCode(ctx)).append(":") + msg.append("[").append(System.identityHashCode(this)).append(":") .append(frame.getChannel()).append("]"); msg.append(" ").append(event).append(" ").append(frame.getBody()); if (frame.getPayload() != null) { @@ -1366,7 +1413,8 @@ public class TransportImpl extends EndpointImpl } @Override - protected void localStateChanged() - { - } + void localOpen() {} + + @Override + void localClose() {} }
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java index d9de3a7..4b94a42 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportLink.java @@ -51,6 +51,12 @@ class TransportLink<T extends LinkImpl> : new TransportSender((SenderImpl)link)); } + void unbind() + { + clearLocalHandle(); + clearRemoteHandle(); + } + public UnsignedInteger getLocalHandle() { return _localHandle; @@ -58,6 +64,9 @@ class TransportLink<T extends LinkImpl> public void setLocalHandle(UnsignedInteger localHandle) { + if (_localHandle == null) { + _link.incref(); + } _localHandle = localHandle; } @@ -78,6 +87,9 @@ class TransportLink<T extends LinkImpl> public void clearLocalHandle() { + if (_localHandle != null) { + _link.decref(); + } _localHandle = null; } @@ -88,9 +100,20 @@ class TransportLink<T extends LinkImpl> public void setRemoteHandle(UnsignedInteger remoteHandle) { + if (_remoteHandle == null) { + _link.incref(); + } _remoteHandle = remoteHandle; } + public void clearRemoteHandle() + { + if (_remoteHandle != null) { + _link.decref(); + } + _remoteHandle = null; + } + public UnsignedInteger getDeliveryCount() { return _deliveryCount; @@ -122,10 +145,7 @@ class TransportLink<T extends LinkImpl> _remoteLinkCredit = flow.getLinkCredit(); - EventImpl ev = _link.getConnectionImpl().put(Event.Type.LINK_FLOW); - if (ev != null) { - ev.init(_link); - } + _link.getConnectionImpl().put(Event.Type.LINK_FLOW, _link); } void setLinkCredit(UnsignedInteger linkCredit) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java index cc23355..2c43bfe 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptor.java @@ -26,23 +26,20 @@ import org.apache.qpid.proton.engine.Transport; class TransportOutputAdaptor implements TransportOutput { - private TransportOutputWriter _transportOutputWriter; + private static final ByteBuffer _emptyHead = newReadableBuffer(0).asReadOnlyBuffer(); - private final ByteBuffer _outputBuffer; - private final ByteBuffer _head; + private final TransportOutputWriter _transportOutputWriter; + private final int _maxFrameSize; + + private ByteBuffer _outputBuffer = null; + private ByteBuffer _head = null; private boolean _output_done = false; private boolean _head_closed = false; TransportOutputAdaptor(TransportOutputWriter transportOutputWriter, int maxFrameSize) { _transportOutputWriter = transportOutputWriter; - if (maxFrameSize > 0) { - _outputBuffer = newWriteableBuffer(maxFrameSize); - } else { - _outputBuffer = newWriteableBuffer(4*1024); - } - _head = _outputBuffer.asReadOnlyBuffer(); - _head.limit(0); + _maxFrameSize = maxFrameSize > 0 ? maxFrameSize : 4*1024; } @Override @@ -52,13 +49,26 @@ class TransportOutputAdaptor implements TransportOutput return Transport.END_OF_STREAM; } + if(_outputBuffer == null) + { + init_buffers(); + } + _output_done = _transportOutputWriter.writeInto(_outputBuffer); _head.limit(_outputBuffer.position()); - if (_output_done && _outputBuffer.position() == 0) { + if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) + { + release_buffers(); + } + + if (_output_done && (_outputBuffer == null || _outputBuffer.position() == 0)) + { return Transport.END_OF_STREAM; - } else { - return _outputBuffer.position(); + } + else + { + return _outputBuffer == null ? 0 : _outputBuffer.position(); } } @@ -66,24 +76,40 @@ class TransportOutputAdaptor implements TransportOutput public ByteBuffer head() { pending(); - return _head; + return _head != null ? _head : _emptyHead; } @Override public void pop(int bytes) { - _outputBuffer.flip(); - _outputBuffer.position(bytes); - _outputBuffer.compact(); - _head.position(0); - _head.limit(_outputBuffer.position()); + if (_outputBuffer != null) { + _outputBuffer.flip(); + _outputBuffer.position(bytes); + _outputBuffer.compact(); + _head.position(0); + _head.limit(_outputBuffer.position()); + if (_outputBuffer.position() == 0 && _outputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) { + release_buffers(); + } + } } @Override public void close_head() { _head_closed = true; - _transportOutputWriter.closed(); + _transportOutputWriter.closed(null); + release_buffers(); + } + + private void init_buffers() { + _outputBuffer = newWriteableBuffer(_maxFrameSize); + _head = _outputBuffer.asReadOnlyBuffer(); + _head.limit(0); } + private void release_buffers() { + _head = null; + _outputBuffer = null; + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java index 2428da1..76c0df7 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportOutputWriter.java @@ -20,6 +20,8 @@ package org.apache.qpid.proton.engine.impl; import java.nio.ByteBuffer; +import org.apache.qpid.proton.engine.TransportException; + interface TransportOutputWriter { /** @@ -28,6 +30,6 @@ interface TransportOutputWriter */ boolean writeInto(ByteBuffer outputBuffer); - void closed(); + void closed(TransportException error); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java index 873254a..6d96043 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/TransportSession.java @@ -69,6 +69,12 @@ class TransportSession _session = session; } + void unbind() + { + unsetLocalChannel(); + unsetRemoteChannel(); + } + public SessionImpl getSession() { return _session; @@ -81,6 +87,9 @@ class TransportSession public void setLocalChannel(int localChannel) { + if (!isLocalChannelSet()) { + _session.incref(); + } _localChannel = localChannel; } @@ -91,6 +100,9 @@ class TransportSession public void setRemoteChannel(int remoteChannel) { + if (!isRemoteChannelSet()) { + _session.incref(); + } _remoteChannel = remoteChannel; } @@ -116,11 +128,17 @@ class TransportSession public void unsetLocalChannel() { + if (isLocalChannelSet()) { + _session.decref(); + } _localChannel = -1; } public void unsetRemoteChannel() { + if (isRemoteChannelSet()) { + _session.decref(); + } _remoteChannel = -1; } @@ -262,7 +280,7 @@ class TransportSession _unsettledIncomingDeliveriesById.put(_incomingDeliveryId, delivery); getSession().incrementIncomingDeliveries(1); } - if( transfer.getState()!=null ) + if( transfer.getState()!=null ) { delivery.setRemoteDeliveryState(transfer.getState()); } @@ -308,15 +326,12 @@ class TransportSession delivery.getLink().modified(false); } - EventImpl ev = getSession().getConnection().put(Event.Type.DELIVERY); - if (ev != null) { - ev.init(delivery); - } + getSession().getConnection().put(Event.Type.DELIVERY, delivery); } public void freeLocalChannel() { - _localChannel = -1; + unsetLocalChannel(); } private void setRemoteIncomingWindow(UnsignedInteger incomingWindow) @@ -394,10 +409,7 @@ class TransportSession } delivery.updateWork(); - EventImpl ev = getSession().getConnection().put(Event.Type.DELIVERY); - if (ev != null) { - ev.init(delivery); - } + getSession().getConnection().put(Event.Type.DELIVERY, delivery); } id = id.add(UnsignedInteger.ONE); } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java index 38341bf..2599290 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapper.java @@ -338,22 +338,13 @@ public class SimpleSslTransportWrapper implements SslTransportWrapper _inputBuffer.flip(); - try - { - try { - unwrapInput(); - } catch (SSLException e) { - throw new TransportException(e); - } - } - catch (TransportException e) - { + try { + unwrapInput(); + } catch (SSLException e) { + _logger.log(Level.WARNING, e.getMessage()); _inputBuffer.position(_inputBuffer.limit()); _tail_closed = true; - throw e; - } - finally - { + } finally { _inputBuffer.compact(); } } @@ -374,17 +365,17 @@ public class SimpleSslTransportWrapper implements SslTransportWrapper try { wrapOutput(); } catch (SSLException e) { - throw new TransportException(e); + _logger.log(Level.WARNING, e.getMessage()); + _head_closed = true; } _head.limit(_outputBuffer.position()); - if (_head_closed && _outputBuffer.position() == 0) - { + if (_head_closed && _outputBuffer.position() == 0) { return Transport.END_OF_STREAM; - } else { - return _outputBuffer.position(); } + + return _outputBuffer.position(); } @Override @@ -408,6 +399,10 @@ public class SimpleSslTransportWrapper implements SslTransportWrapper public void close_head() { _underlyingOutput.close_head(); + int p = pending(); + if (p > 0) { + pop(p); + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java index b9ec972..fbcb0f5 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslDomainImpl.java @@ -19,7 +19,6 @@ package org.apache.qpid.proton.engine.impl.ssl; import org.apache.qpid.proton.ProtonUnsupportedOperationException; -import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.ProtonJSslDomain; import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.engine.SslPeerDetails; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java index a873e8e..87a9fe3 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ssl/SslPeerDetailsImpl.java @@ -18,7 +18,6 @@ */ package org.apache.qpid.proton.engine.impl.ssl; -import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.ProtonJSslPeerDetails; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java b/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java index d039001..aede34b 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/message/Message.java @@ -28,6 +28,8 @@ import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.impl.MessageImpl; + /** * Represents a Message within Proton. * @@ -36,6 +38,27 @@ import org.apache.qpid.proton.amqp.messaging.Section; */ public interface Message { + + public static final class Factory + { + public static Message create() { + return new MessageImpl(); + } + + public static Message create(Header header, + DeliveryAnnotations deliveryAnnotations, + MessageAnnotations messageAnnotations, + Properties properties, + ApplicationProperties applicationProperties, + Section body, + Footer footer) { + return new MessageImpl(header, deliveryAnnotations, + messageAnnotations, properties, + applicationProperties, body, footer); + } + } + + short DEFAULT_PRIORITY = 4; boolean isDurable(); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/message/MessageFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/message/MessageFactory.java b/proton-j/src/main/java/org/apache/qpid/proton/message/MessageFactory.java deleted file mode 100644 index 1323726..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/message/MessageFactory.java +++ /dev/null @@ -1,37 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.message; - -import org.apache.qpid.proton.ProtonFactory; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; -import org.apache.qpid.proton.amqp.messaging.Footer; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Properties; -import org.apache.qpid.proton.amqp.messaging.Section; - -public interface MessageFactory extends ProtonFactory -{ - Message createMessage(); - Message createMessage(Header header, - DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations, - Properties properties, ApplicationProperties applicationProperties, - Section body, Footer footer); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageFactoryImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageFactoryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageFactoryImpl.java deleted file mode 100644 index 293fd8c..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageFactoryImpl.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.message.impl; - -import org.apache.qpid.proton.ProtonFactoryImpl; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; -import org.apache.qpid.proton.amqp.messaging.Footer; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Properties; -import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.message.MessageFactory; -import org.apache.qpid.proton.message.ProtonJMessage; - -public class MessageFactoryImpl extends ProtonFactoryImpl implements MessageFactory -{ - - @SuppressWarnings("deprecation") // TODO remove once the constructor is made non-public (and therefore non-deprecated) - @Override - public ProtonJMessage createMessage() - { - return new MessageImpl(); - } - - @SuppressWarnings("deprecation") // TODO remove once the constructor is made non-public (and therefore non-deprecated) - @Override - public ProtonJMessage createMessage(Header header, - DeliveryAnnotations deliveryAnnotations, MessageAnnotations messageAnnotations, - Properties properties, ApplicationProperties applicationProperties, - Section body, Footer footer) - { - return new MessageImpl(header, - deliveryAnnotations, messageAnnotations, - properties, applicationProperties, - body, footer); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java index 272756d..c43ba3e 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/message/impl/MessageImpl.java @@ -573,8 +573,15 @@ public class MessageImpl implements ProtonJMessage @Override public int decode(byte[] data, int offset, int length) { - DecoderImpl decoder = tlsCodec.get().decoder; final ByteBuffer buffer = ByteBuffer.wrap(data, offset, length); + decode(buffer); + + return length-buffer.remaining(); + } + + public void decode(ByteBuffer buffer) + { + DecoderImpl decoder = tlsCodec.get().decoder; decoder.setByteBuffer(buffer); _header = null; @@ -680,9 +687,6 @@ public class MessageImpl implements ProtonJMessage } decoder.setByteBuffer(null); - - return length-buffer.remaining(); - } @Override http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java index cf8dd9a..6d3f362 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/Messenger.java @@ -25,6 +25,8 @@ import java.io.IOException; import org.apache.qpid.proton.TimeoutException; import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.messenger.impl.MessengerImpl; + /** * * Messenger defines a high level interface for sending and receiving @@ -69,6 +71,18 @@ import org.apache.qpid.proton.message.Message; */ public interface Messenger { + + public static final class Factory + { + public static Messenger create() { + return new MessengerImpl(); + } + + public static Messenger create(String name) { + return new MessengerImpl(name); + } + } + /** * Flag for use with reject(), accept() and settle() methods. */ http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/messenger/MessengerFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/MessengerFactory.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/MessengerFactory.java deleted file mode 100644 index 9d85aae..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/MessengerFactory.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.messenger; - -import org.apache.qpid.proton.ProtonFactory; - -public interface MessengerFactory extends ProtonFactory -{ - Messenger createMessenger(); - Messenger createMessenger(String name); -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerFactoryImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerFactoryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerFactoryImpl.java deleted file mode 100644 index 6a2bd12..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerFactoryImpl.java +++ /dev/null @@ -1,42 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.messenger.impl; - -import org.apache.qpid.proton.ProtonFactoryImpl; -import org.apache.qpid.proton.messenger.Messenger; -import org.apache.qpid.proton.messenger.MessengerFactory; - -public class MessengerFactoryImpl extends ProtonFactoryImpl implements MessengerFactory -{ - @SuppressWarnings("deprecation") // TODO remove once the constructor is made non-public (and therefore non-deprecated) - @Override - public Messenger createMessenger() - { - return new MessengerImpl(); - } - - @SuppressWarnings("deprecation") // TODO remove once the constructor is made non-public (and therefore non-deprecated) - @Override - public Messenger createMessenger(String name) - { - return new MessengerImpl(name); - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java index 29bb9ca..e6475b9 100644 --- a/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java +++ b/proton-j/src/main/java/org/apache/qpid/proton/messenger/impl/MessengerImpl.java @@ -29,17 +29,14 @@ import java.util.logging.Level; import java.util.logging.Logger; import org.apache.qpid.proton.Proton; -import org.apache.qpid.proton.ProtonFactoryLoader; import org.apache.qpid.proton.InterruptException; import org.apache.qpid.proton.TimeoutException; import org.apache.qpid.proton.driver.Connector; import org.apache.qpid.proton.driver.Driver; -import org.apache.qpid.proton.driver.DriverFactory; import org.apache.qpid.proton.driver.Listener; import org.apache.qpid.proton.engine.Connection; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.engine.Sasl; @@ -49,10 +46,8 @@ import org.apache.qpid.proton.engine.SslDomain; import org.apache.qpid.proton.engine.Ssl; import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.MessageFactory; import org.apache.qpid.proton.messenger.Messenger; import org.apache.qpid.proton.messenger.MessengerException; -import org.apache.qpid.proton.messenger.MessengerFactory; import org.apache.qpid.proton.messenger.Status; import org.apache.qpid.proton.messenger.Tracker; import org.apache.qpid.proton.amqp.messaging.Source; @@ -1449,14 +1444,16 @@ public class MessengerImpl implements Messenger { _receivers++; _blocked.add((Receiver)link); + link.setContext(Boolean.TRUE); } } // a link is being removed, account for it. private void linkRemoved(Link _link) { - if (_link instanceof Receiver) + if (_link instanceof Receiver && (Boolean) _link.getContext()) { + _link.setContext(Boolean.FALSE); Receiver link = (Receiver)_link; assert _receivers > 0; _receivers--; http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/resources/cengine.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/cengine.py b/proton-j/src/main/resources/cengine.py index 320f539..c47ab58 100644 --- a/proton-j/src/main/resources/cengine.py +++ b/proton-j/src/main/resources/cengine.py @@ -25,7 +25,7 @@ from org.apache.qpid.proton.amqp.transaction import Coordinator from org.apache.qpid.proton.amqp.transport import ErrorCondition, \ SenderSettleMode, ReceiverSettleMode from org.apache.qpid.proton.engine import EndpointState, Sender, \ - Receiver, TransportException + Receiver, Transport, TransportException from java.util import EnumSet from jarray import array, zeros @@ -57,10 +57,10 @@ PN_NONDURABLE = 0 PN_CONFIGURATION = 1 PN_DELIVERIES = 2 -PN_LINK_CLOSE = 0 -PN_SESSION_CLOSE = 1 -PN_CONNECTION_CLOSE = 2 -PN_NEVER = 3 +PN_EXPIRE_WITH_LINK = 0 +PN_EXPIRE_WITH_SESSION = 1 +PN_EXPIRE_WITH_CONNECTION = 2 +PN_EXPIRE_NEVER = 3 PN_DIST_MODE_UNSPECIFIED = 0 PN_DIST_MODE_COPY = 1 @@ -72,10 +72,10 @@ PN_REJECTED = (0x0000000000000025) PN_RELEASED = (0x0000000000000026) PN_MODIFIED = (0x0000000000000027) -PN_TRACE_OFF = (0) -PN_TRACE_RAW = (1) -PN_TRACE_FRM = (2) -PN_TRACE_DRV = (4) +PN_TRACE_OFF = Transport.TRACE_OFF +PN_TRACE_RAW = Transport.TRACE_RAW +PN_TRACE_FRM = Transport.TRACE_FRM +PN_TRACE_DRV = Transport.TRACE_DRV def wrap(obj, wrapper): if obj: @@ -98,7 +98,11 @@ class pn_condition: self.description = None self.info.clear() else: - self.name = impl.getCondition().toString() + cond = impl.getCondition() + if cond is None: + self.name = None + else: + self.name = cond.toString() self.description = impl.getDescription() obj2dat(impl.getInfo(), self.info) @@ -222,6 +226,9 @@ def pn_connection_set_container(conn, name): def pn_connection_remote_container(conn): return conn.impl.getRemoteContainer() +def pn_connection_get_hostname(conn): + return conn.impl.getHostname() + def pn_connection_set_hostname(conn, name): conn.impl.setHostname(name) @@ -244,6 +251,9 @@ def pn_connection_close(conn): conn.on_close() conn.impl.close() +def pn_connection_free(conn): + conn.impl.free() + class pn_session_wrapper(endpoint_wrapper): pass @@ -325,7 +335,7 @@ def pn_receiver(ssn, name): return wrap(ssn.impl.receiver(name), pn_link_wrapper) def pn_session_free(ssn): - ssn.impl = None + ssn.impl.free() TERMINUS_TYPES_J2P = { Source: PN_SOURCE, @@ -354,17 +364,17 @@ DURABILITY_J2P = { } EXPIRY_POLICY_P2J = { - PN_LINK_CLOSE: TerminusExpiryPolicy.LINK_DETACH, - PN_SESSION_CLOSE: TerminusExpiryPolicy.SESSION_END, - PN_CONNECTION_CLOSE: TerminusExpiryPolicy.CONNECTION_CLOSE, - PN_NEVER: TerminusExpiryPolicy.NEVER + PN_EXPIRE_WITH_LINK: TerminusExpiryPolicy.LINK_DETACH, + PN_EXPIRE_WITH_SESSION: TerminusExpiryPolicy.SESSION_END, + PN_EXPIRE_WITH_CONNECTION: TerminusExpiryPolicy.CONNECTION_CLOSE, + PN_EXPIRE_NEVER: TerminusExpiryPolicy.NEVER } EXPIRY_POLICY_J2P = { - TerminusExpiryPolicy.LINK_DETACH: PN_LINK_CLOSE, - TerminusExpiryPolicy.SESSION_END: PN_SESSION_CLOSE, - TerminusExpiryPolicy.CONNECTION_CLOSE: PN_CONNECTION_CLOSE, - TerminusExpiryPolicy.NEVER: PN_NEVER + TerminusExpiryPolicy.LINK_DETACH: PN_EXPIRE_WITH_LINK, + TerminusExpiryPolicy.SESSION_END: PN_EXPIRE_WITH_SESSION, + TerminusExpiryPolicy.CONNECTION_CLOSE: PN_EXPIRE_WITH_CONNECTION, + TerminusExpiryPolicy.NEVER: PN_EXPIRE_NEVER } DISTRIBUTION_MODE_P2J = { @@ -385,7 +395,7 @@ class pn_terminus: self.type = type self.address = None self.durability = PN_NONDURABLE - self.expiry_policy = PN_SESSION_CLOSE + self.expiry_policy = PN_EXPIRE_WITH_SESSION self.distribution_mode = PN_DIST_MODE_UNSPECIFIED self.timeout = 0 self.dynamic = False @@ -587,6 +597,9 @@ def pn_link_remote_rcv_settle_mode(link): def pn_link_is_sender(link): return isinstance(link.impl, Sender) +def pn_link_is_receiver(link): + return isinstance(link.impl, Receiver) + def pn_link_head(conn, mask): local, remote = mask2set(mask) return wrap(conn.impl.linkHead(local, remote), pn_link_wrapper) @@ -652,7 +665,7 @@ def pn_link_current(link): return wrap(link.impl.current(), pn_delivery_wrapper) def pn_link_free(link): - link.impl = None + link.impl.free() def pn_work_head(conn): return wrap(conn.impl.getWorkHead(), pn_delivery_wrapper) @@ -802,6 +815,9 @@ def pn_delivery_get_context(dlv): def pn_delivery_set_context(dlv, ctx): dlv.context = ctx +def pn_delivery_partial(dlv): + return dlv.impl.isPartial() + def pn_delivery_pending(dlv): return dlv.impl.pending() @@ -847,7 +863,6 @@ class pn_transport_wrapper: def __init__(self, impl): self.impl = impl - self.error = pn_error(0, None) def pn_transport(): return wrap(Proton.transport(), pn_transport_wrapper) @@ -877,15 +892,15 @@ def pn_transport_bind(trans, conn): trans.impl.bind(conn.impl) return 0 +def pn_transport_unbind(trans): + trans.impl.unbind() + return 0 + def pn_transport_trace(trans, n): - # XXX - pass + trans.impl.trace(n) def pn_transport_pending(trans): - try: - return trans.impl.pending() - except TransportException, e: - return trans.error.set(PN_ERR, str(e)) + return trans.impl.pending() def pn_transport_peek(trans, size): size = min(trans.impl.pending(), size) @@ -893,6 +908,7 @@ def pn_transport_peek(trans, size): if size: bb = trans.impl.head() bb.get(ba) + bb.position(0) return 0, ba.tostring() def pn_transport_pop(trans, size): @@ -906,47 +922,51 @@ def pn_transport_push(trans, input): if cap < 0: return cap elif len(input) > cap: - return PN_OVERFLOW - else: - bb = trans.impl.tail() - bb.put(array(input, 'b')) - try: - trans.impl.process() - return 0 - except TransportException, e: - trans.error = pn_error(PN_ERR, str(e)) - return PN_ERR + input = input[:cap] + + bb = trans.impl.tail() + bb.put(array(input, 'b')) + trans.impl.process() + return len(input) def pn_transport_close_head(trans): - try: - trans.impl.close_head() - return 0 - except TransportException, e: - trans.error = pn_error(PN_ERR, str(e)) - return PN_ERR + trans.impl.close_head() + return 0 def pn_transport_close_tail(trans): - try: - trans.impl.close_tail() - return 0 - except TransportException, e: - trans.error = pn_error(PN_ERR, str(e)) - return PN_ERR + trans.impl.close_tail() + return 0 -def pn_transport_error(trans): - return trans.error +def pn_transport_closed(trans): + return trans.impl.isClosed() from org.apache.qpid.proton.engine import Event -PN_EVENT_CATEGORY_PROTOCOL = Event.Category.PROTOCOL - -PN_CONNECTION_LOCAL_STATE = Event.Type.CONNECTION_LOCAL_STATE -PN_CONNECTION_REMOTE_STATE = Event.Type.CONNECTION_REMOTE_STATE -PN_SESSION_LOCAL_STATE = Event.Type.SESSION_LOCAL_STATE -PN_SESSION_REMOTE_STATE = Event.Type.SESSION_REMOTE_STATE -PN_LINK_LOCAL_STATE = Event.Type.LINK_LOCAL_STATE -PN_LINK_REMOTE_STATE = Event.Type.LINK_REMOTE_STATE +PN_EVENT_CATEGORY_CONNECTION = Event.Category.CONNECTION +PN_EVENT_CATEGORY_SESSION = Event.Category.SESSION +PN_EVENT_CATEGORY_LINK = Event.Category.LINK +PN_EVENT_CATEGORY_DELIVERY = Event.Category.DELIVERY +PN_EVENT_CATEGORY_TRANSPORT = Event.Category.TRANSPORT + +PN_CONNECTION_INIT = Event.Type.CONNECTION_INIT +PN_CONNECTION_OPEN = Event.Type.CONNECTION_OPEN +PN_CONNECTION_REMOTE_OPEN = Event.Type.CONNECTION_REMOTE_OPEN +PN_CONNECTION_CLOSE = Event.Type.CONNECTION_CLOSE +PN_CONNECTION_REMOTE_CLOSE = Event.Type.CONNECTION_REMOTE_CLOSE +PN_CONNECTION_FINAL = Event.Type.CONNECTION_FINAL +PN_SESSION_INIT = Event.Type.SESSION_INIT +PN_SESSION_OPEN = Event.Type.SESSION_OPEN +PN_SESSION_REMOTE_OPEN = Event.Type.SESSION_REMOTE_OPEN +PN_SESSION_CLOSE = Event.Type.SESSION_CLOSE +PN_SESSION_REMOTE_CLOSE = Event.Type.SESSION_REMOTE_CLOSE +PN_SESSION_FINAL = Event.Type.SESSION_FINAL +PN_LINK_INIT = Event.Type.LINK_INIT +PN_LINK_OPEN = Event.Type.LINK_OPEN +PN_LINK_REMOTE_OPEN = Event.Type.LINK_REMOTE_OPEN +PN_LINK_CLOSE = Event.Type.LINK_CLOSE +PN_LINK_REMOTE_CLOSE = Event.Type.LINK_REMOTE_CLOSE PN_LINK_FLOW = Event.Type.LINK_FLOW +PN_LINK_FINAL = Event.Type.LINK_FINAL PN_DELIVERY = Event.Type.DELIVERY PN_TRANSPORT = Event.Type.TRANSPORT http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/main/resources/csasl.py ---------------------------------------------------------------------- diff --git a/proton-j/src/main/resources/csasl.py b/proton-j/src/main/resources/csasl.py index 68c573d..a24246d 100644 --- a/proton-j/src/main/resources/csasl.py +++ b/proton-j/src/main/resources/csasl.py @@ -29,6 +29,7 @@ PN_SASL_AUTH=1 PN_SASL_SYS=2 PN_SASL_PERM=3 PN_SASL_TEMP=4 +PN_SASL_SKIPPED=5 PN_SASL_CONF = 0 PN_SASL_IDLE = 1 @@ -53,7 +54,8 @@ SASL_OUTCOMES_P2J = { PN_SASL_AUTH: Sasl.PN_SASL_AUTH, PN_SASL_SYS: Sasl.PN_SASL_SYS, PN_SASL_PERM: Sasl.PN_SASL_PERM, - PN_SASL_TEMP: Sasl.PN_SASL_TEMP + PN_SASL_TEMP: Sasl.PN_SASL_TEMP, + PN_SASL_SKIPPED: Sasl.PN_SASL_SKIPPED } SASL_OUTCOMES_J2P = { @@ -62,7 +64,8 @@ SASL_OUTCOMES_J2P = { Sasl.PN_SASL_AUTH: PN_SASL_AUTH, Sasl.PN_SASL_SYS: PN_SASL_SYS, Sasl.PN_SASL_PERM: PN_SASL_PERM, - Sasl.PN_SASL_TEMP: PN_SASL_TEMP + Sasl.PN_SASL_TEMP: PN_SASL_TEMP, + Sasl.PN_SASL_SKIPPED: PN_SASL_SKIPPED } def pn_sasl_state(sasl): @@ -77,6 +80,9 @@ def pn_sasl_client(sasl): def pn_sasl_server(sasl): sasl.server() +def pn_sasl_allow_skip(sasl, allow): + sasl.allowSkip(allow) + def pn_sasl_done(sasl, outcome): sasl.done(SASL_OUTCOMES_P2J[outcome]) http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/ProtonFactoryLoaderTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/ProtonFactoryLoaderTest.java b/proton-j/src/test/java/org/apache/qpid/proton/ProtonFactoryLoaderTest.java deleted file mode 100644 index 16e7bbf..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/ProtonFactoryLoaderTest.java +++ /dev/null @@ -1,129 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton; - -import static org.apache.qpid.proton.ProtonFactory.ImplementationType.ANY; -import static org.apache.qpid.proton.ProtonFactory.ImplementationType.PROTON_C; -import static org.apache.qpid.proton.ProtonFactory.ImplementationType.PROTON_J; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertNotNull; - -import org.apache.qpid.proton.ProtonFactory.ImplementationType; -import org.apache.qpid.proton.factoryloadertesting.DummyProtonFactory; -import org.junit.Test; - -public class ProtonFactoryLoaderTest -{ - private String _previousImplementationType; - - @Test - public void testLoadFactoryForAnyImplementationType() - { - ImplementationType implementationType = ANY; - - ProtonFactoryLoader<DummyProtonFactory> factoryLoader = - new ProtonFactoryLoader<DummyProtonFactory>(DummyProtonFactory.class, implementationType); - - assertNotNull(factoryLoader); - } - - @Test - public void testLoadFactoryForProtonJ() - { - testImplementationType(PROTON_J); - } - - @Test - public void testLoadFactoryForProtonC() - { - testImplementationType(PROTON_C); - } - - private void testImplementationType(ImplementationType implementationType) - { - ProtonFactoryLoader<DummyProtonFactory> factoryLoader = - new ProtonFactoryLoader<DummyProtonFactory>(DummyProtonFactory.class, implementationType); - - assertEquals(implementationType, factoryLoader.loadFactory().getImplementationType()); - } - - @Test - public void testLoadFactoryUsingProtonCImplementationTypeFromSystemProperty() - { - testLoadFactoryUsingImplementationTypeFromSystemProperty(PROTON_C.name()); - } - - @Test - public void testLoadFactoryUsingProtonJImplementationTypeFromSystemProperty() - { - testLoadFactoryUsingImplementationTypeFromSystemProperty(PROTON_J.name()); - } - - @Test - public void testLoadFactoryUsingDefaultImplementationType() - { - testLoadFactoryUsingImplementationTypeFromSystemProperty(null); - } - - private void testLoadFactoryUsingImplementationTypeFromSystemProperty(String implementationTypeName) - { - try - { - setImplementationTypeSystemProperty(implementationTypeName); - ProtonFactoryLoader<DummyProtonFactory> factoryLoader = new ProtonFactoryLoader<DummyProtonFactory>(DummyProtonFactory.class); - DummyProtonFactory factory = factoryLoader.loadFactory(); - - assertNotNull(factory); - - if(implementationTypeName != null) - { - assertEquals( - ImplementationType.valueOf(implementationTypeName), - factory.getImplementationType()); - } - } - finally - { - resetImplementationTypeSystemProperty(); - } - } - - private void setImplementationTypeSystemProperty(String implementationTypeName) - { - _previousImplementationType = System.getProperty(ProtonFactoryLoader.IMPLEMENTATION_TYPE_PROPERTY); - setOrClearSystemProperty(implementationTypeName); - } - - private void resetImplementationTypeSystemProperty() - { - setOrClearSystemProperty(_previousImplementationType); - } - - private void setOrClearSystemProperty(String propertyValue) - { - if(propertyValue == null) - { - System.clearProperty(ProtonFactoryLoader.IMPLEMENTATION_TYPE_PROPERTY); - } - else - { - System.setProperty(ProtonFactoryLoader.IMPLEMENTATION_TYPE_PROPERTY, propertyValue); - } - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java index 7ff5062..347184b 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/FrameParserTest.java @@ -41,6 +41,7 @@ import org.apache.qpid.proton.amqp.transport.Open; import org.apache.qpid.proton.codec.AMQPDefinedTypes; import org.apache.qpid.proton.codec.DecoderImpl; import org.apache.qpid.proton.codec.EncoderImpl; +import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.TransportException; import org.apache.qpid.proton.engine.TransportResult; import org.apache.qpid.proton.engine.TransportResult.Status; @@ -63,9 +64,6 @@ public class FrameParserTest private final AmqpFramer _amqpFramer = new AmqpFramer(); - @Rule - public ExpectedException _expectedException = ExpectedException.none(); - @Before public void setUp() { @@ -80,16 +78,8 @@ public class FrameParserTest String headerMismatchMessage = "AMQP header mismatch"; ByteBuffer buffer = _frameParser.tail(); buffer.put("hello".getBytes()); - try { - _frameParser.process(); - fail("expected exception"); - } catch (TransportException e) { - assertThat(e.getMessage(), containsString(headerMismatchMessage)); - } - - _expectedException.expect(TransportException.class); - _expectedException.expectMessage(headerMismatchMessage); - _frameParser.tail(); + _frameParser.process(); + assertEquals(_frameParser.capacity(), Transport.END_OF_STREAM); } @Test http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java index f0603f6..7dce8f8 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportImplTest.java @@ -207,4 +207,19 @@ public class TransportImplTest assertTrue("Expecting second buffer to have bytes", buf.remaining() > 0); assertTrue("Expecting second buffer to not be full", buf.remaining() < Transport.MIN_MAX_FRAME_SIZE); } + + @Test + public void testAttemptToInitiateSaslAfterProcessingBeginsCausesIllegalStateException() + { + _transport.process(); + + try + { + _transport.sasl(); + } + catch(IllegalStateException ise) + { + //expected, sasl must be initiated before processing begins + } + } } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java index 2061eeb..19c2f7b 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/TransportOutputAdaptorTest.java @@ -25,6 +25,8 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import org.apache.qpid.proton.engine.TransportException; + import java.nio.ByteBuffer; import org.junit.Test; @@ -143,7 +145,7 @@ public class TransportOutputAdaptorTest _cannedOutput = cannedOutput; } - public void closed() + public void closed(TransportException error) { // do nothing } http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java index 59c5859..45e2273 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/engine/impl/ssl/SimpleSslTransportWrapperTest.java @@ -32,6 +32,7 @@ import java.nio.ByteBuffer; import javax.net.ssl.SSLException; +import org.apache.qpid.proton.engine.Transport; import org.apache.qpid.proton.engine.TransportException; import org.junit.Before; import org.junit.Rule; @@ -132,16 +133,8 @@ public class SimpleSslTransportWrapperTest _dummySslEngine.rejectNextEncodedPacket(sslException); _sslWrapper.tail().put("<-A->".getBytes()); - try { - _sslWrapper.process(); - fail("no exception"); - } catch (TransportException e) { - assertSame(sslException, e.getCause()); - assertEquals("", _underlyingInput.getAcceptedInput()); - } - - _expectedException.expect(TransportException.class); - _sslWrapper.tail(); + _sslWrapper.process(); + assertEquals(_sslWrapper.capacity(), Transport.END_OF_STREAM); } @Test http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonCFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonCFactory.java b/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonCFactory.java deleted file mode 100644 index bdd76b9..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonCFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.factoryloadertesting; - - -public class DummyProtonCFactory implements DummyProtonFactory -{ - @Override - public ImplementationType getImplementationType() - { - return ImplementationType.PROTON_C; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonFactory.java b/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonFactory.java deleted file mode 100644 index e80e403..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonFactory.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.factoryloadertesting; - -import org.apache.qpid.proton.ProtonFactory; - -public interface DummyProtonFactory extends ProtonFactory -{ -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonJFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonJFactory.java b/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonJFactory.java deleted file mode 100644 index aba12af..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/factoryloadertesting/DummyProtonJFactory.java +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.factoryloadertesting; - - -public class DummyProtonJFactory implements DummyProtonFactory -{ - @Override - public ImplementationType getImplementationType() - { - return ImplementationType.PROTON_J; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java index 0edf65c..9c5dbb3 100644 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java +++ b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonEngineExampleTest.java @@ -26,7 +26,6 @@ import static org.apache.qpid.proton.engine.EndpointState.ACTIVE; import static org.apache.qpid.proton.engine.EndpointState.CLOSED; import static org.apache.qpid.proton.engine.EndpointState.UNINITIALIZED; import static org.apache.qpid.proton.systemtests.TestLoggingHelper.bold; -import static org.apache.qpid.proton.systemtests.engine.ProtonFactoryTestFixture.isProtonJ; import static org.junit.Assert.assertNull; import static org.junit.Assert.assertSame; @@ -34,7 +33,7 @@ import java.nio.ByteBuffer; import java.util.Arrays; import java.util.logging.Logger; -import org.apache.qpid.proton.ProtonFactoryLoader; +import org.apache.qpid.proton.Proton; import org.apache.qpid.proton.amqp.messaging.Accepted; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.Section; @@ -45,10 +44,8 @@ import org.apache.qpid.proton.amqp.transport.SenderSettleMode; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Endpoint; import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.EngineFactory; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.MessageFactory; import org.junit.Test; /** @@ -80,26 +77,23 @@ public class ProtonEngineExampleTest private final String _targetAddress = _server.containerId + "-link1-target"; - private final EngineFactory _engineFactory = new ProtonFactoryLoader<EngineFactory>(EngineFactory.class).loadFactory(); - private final MessageFactory _messageFactory = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class).loadFactory(); - @Test public void test() throws Exception { LOGGER.fine(bold("======== About to create transports")); - _client.transport = _engineFactory.createTransport(); + _client.transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(_client.transport, TestLoggingHelper.CLIENT_PREFIX); - _server.transport = _engineFactory.createTransport(); + _server.transport = Proton.transport(); ProtocolTracerEnabler.setProtocolTracer(_server.transport, " " + TestLoggingHelper.SERVER_PREFIX); doOutputInputCycle(); - _client.connection = _engineFactory.createConnection(); + _client.connection = Proton.connection(); _client.transport.bind(_client.connection); - _server.connection = _engineFactory.createConnection(); + _server.connection = Proton.connection(); _server.transport.bind(_server.connection); @@ -182,7 +176,7 @@ public class ProtonEngineExampleTest LOGGER.fine(bold("======== About to create a message and send it to the server")); - _client.message = _messageFactory.createMessage(); + _client.message = Proton.message(); Section messageBody = new AmqpValue("Hello"); _client.message.setBody(messageBody); _client.messageData = new byte[BUFFER_SIZE]; @@ -195,12 +189,7 @@ public class ProtonEngineExampleTest assertEquals("For simplicity, assume the sender can accept all the data", lengthOfEncodedMessage, numberOfBytesAcceptedBySender); - if (isProtonJ(_engineFactory)) - { - // TODO PROTON-261: Proton-c ProtonJNI.pn_delivery_local_state is returning 0, which doesn't map to an - // value within the C enum. - assertNull(_client.delivery.getLocalState()); - } + assertNull(_client.delivery.getLocalState()); boolean senderAdvanced = _client.sender.advance(); assertTrue("sender has not advanced", senderAdvanced); @@ -213,11 +202,8 @@ public class ProtonEngineExampleTest _server.delivery = _server.connection.getWorkHead(); assertEquals("The received delivery should be on our receiver", _server.receiver, _server.delivery.getLink()); - if (isProtonJ(_engineFactory)) - { - assertNull(_server.delivery.getLocalState()); - assertNull(_server.delivery.getRemoteState()); - } + assertNull(_server.delivery.getLocalState()); + assertNull(_server.delivery.getRemoteState()); assertFalse(_server.delivery.isPartial()); assertTrue(_server.delivery.isReadable()); @@ -226,7 +212,7 @@ public class ProtonEngineExampleTest int numberOfBytesProducedByReceiver = _server.receiver.recv(_server.messageData, 0, BUFFER_SIZE); assertEquals(numberOfBytesAcceptedBySender, numberOfBytesProducedByReceiver); - _server.message = _messageFactory.createMessage(); + _server.message = Proton.message(); _server.message.decode(_server.messageData, 0, numberOfBytesProducedByReceiver); boolean messageProcessed = applicationProcessMessage(_server.message); http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/1c2f4894/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonFactoryTest.java ---------------------------------------------------------------------- diff --git a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonFactoryTest.java b/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonFactoryTest.java deleted file mode 100644 index 9fd770f..0000000 --- a/proton-j/src/test/java/org/apache/qpid/proton/systemtests/ProtonFactoryTest.java +++ /dev/null @@ -1,60 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.systemtests; - -import static org.junit.Assert.assertNotNull; - -import org.apache.qpid.proton.ProtonFactoryLoader; -import org.apache.qpid.proton.engine.EngineFactory; -import org.apache.qpid.proton.message.MessageFactory; -import org.apache.qpid.proton.messenger.MessengerFactory; -import org.junit.Test; - -public class ProtonFactoryTest -{ - @SuppressWarnings({ "rawtypes", "unchecked" }) - @Test - public void testLoadFactoryWithExplicitClass() - { - ProtonFactoryLoader factoryLoader = new ProtonFactoryLoader(); - MessageFactory messageFactory = (MessageFactory) factoryLoader.loadFactory(MessageFactory.class); - assertNotNull(messageFactory); - } - - @Test - public void testMessageFactory() - { - ProtonFactoryLoader<MessageFactory> factoryLoader = new ProtonFactoryLoader<MessageFactory>(MessageFactory.class); - assertNotNull(factoryLoader.loadFactory()); - } - - @Test - public void testEngineFactory() - { - ProtonFactoryLoader<EngineFactory> factoryLoader = new ProtonFactoryLoader<EngineFactory>(EngineFactory.class); - assertNotNull(factoryLoader.loadFactory()); - } - - @Test - public void testMessengerFactory() - { - ProtonFactoryLoader<MessengerFactory> factoryLoader = new ProtonFactoryLoader<MessengerFactory>(MessengerFactory.class); - assertNotNull(factoryLoader.loadFactory()); - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
