http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportResultFactory.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportResultFactory.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportResultFactory.java deleted file mode 100644 index c4b79b4..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/TransportResultFactory.java +++ /dev/null @@ -1,125 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine; - -import static org.apache.qpid.proton.engine.TransportResult.Status.ERROR; -import static org.apache.qpid.proton.engine.TransportResult.Status.OK; - -import java.util.IllegalFormatException; -import java.util.logging.Level; -import java.util.logging.Logger; - - -/** - * Creates TransportResults. - * Only intended for use by internal Proton classes. - * This class resides in the api module so it can be used by both proton-j-impl and proton-jni. - */ -public class TransportResultFactory -{ - private static final Logger LOGGER = Logger.getLogger(TransportResultFactory.class.getName()); - - private static final TransportResult _okResult = new TransportResultImpl(OK, null, null); - - public static TransportResult ok() - { - return _okResult; - } - - public static TransportResult error(String format, Object... args) - { - String errorDescription; - try - { - errorDescription = String.format(format, args); - } - catch(IllegalFormatException e) - { - LOGGER.log(Level.SEVERE, "Formating error in string " + format, e); - errorDescription = format; - } - return new TransportResultImpl(ERROR, errorDescription, null); - } - - public static TransportResult error(final String errorDescription) - { - return new TransportResultImpl(ERROR, errorDescription, null); - } - - public static TransportResult error(final Exception e) - { - return new TransportResultImpl(ERROR, e == null ? null : e.toString(), e); - } - - private static final class TransportResultImpl implements TransportResult - { - private final String _errorDescription; - private final Status _status; - private final Exception _exception; - - private TransportResultImpl(Status status, String errorDescription, Exception exception) - { - _status = status; - _errorDescription = errorDescription; - _exception = exception; - } - - @Override - public boolean isOk() - { - return _status == OK; - } - - @Override - public Status getStatus() - { - return _status; - } - - @Override - public String getErrorDescription() - { - return _errorDescription; - } - - @Override - public Exception getException() - { - return _exception; - } - - @Override - public void checkIsOk() - { - if (!isOk()) - { - Exception e = getException(); - if (e != null) - { - throw new TransportException(e); - } - else - { - throw new TransportException(getErrorDescription()); - } - } - } - } -}
http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpErrorException.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpErrorException.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpErrorException.java deleted file mode 100644 index b1d52ed..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpErrorException.java +++ /dev/null @@ -1,25 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine.impl; - -public class AmqpErrorException extends Exception -{ -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpHeader.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpHeader.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpHeader.java deleted file mode 100644 index 41f7e30..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/AmqpHeader.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.qpid.proton.engine.impl; - -public interface AmqpHeader -{ - public static final byte[] HEADER = new byte[] - { 'A', 'M', 'Q', 'P', 0, 1, 0, 0 }; - - public static final byte[] SASL_HEADER = new byte[] - { 'A', 'M', 'Q', 'P', 3, 1, 0, 0 }; -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ByteBufferUtils.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ByteBufferUtils.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ByteBufferUtils.java deleted file mode 100644 index 8616bee..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ByteBufferUtils.java +++ /dev/null @@ -1,110 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine.impl; - -import java.nio.ByteBuffer; - -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.engine.TransportException; - -public class ByteBufferUtils -{ - /** - * @return number of bytes poured - */ - public static int pour(ByteBuffer source, ByteBuffer destination) - { - int numberOfBytesToPour = Math.min(source.remaining(), destination.remaining()); - ByteBuffer sourceSubBuffer = source.duplicate(); - sourceSubBuffer.limit(sourceSubBuffer.position() + numberOfBytesToPour); - destination.put(sourceSubBuffer); - source.position(source.position() + numberOfBytesToPour); - return numberOfBytesToPour; - } - - /** - * Assumes {@code destination} is ready to be written. - * - * @return number of bytes poured which may be fewer than {@code sizeRequested} if - * {@code destination} has insufficient remaining - */ - public static int pourArrayToBuffer(byte[] source, int offset, int sizeRequested, ByteBuffer destination) - { - int numberToWrite = Math.min(destination.remaining(), sizeRequested); - destination.put(source, offset, numberToWrite); - return numberToWrite; - } - - /** - * Pours the contents of {@code source} into {@code destinationTransportInput}, calling - * the TransportInput many times if necessary. If the TransportInput returns a {@link org.apache.qpid.proton.engine.TransportResult} - * other than ok, data may remain in source. - */ - public static int pourAll(ByteBuffer source, TransportInput destinationTransportInput) throws TransportException - { - int capacity = destinationTransportInput.capacity(); - if (capacity == Transport.END_OF_STREAM) - { - if (source.hasRemaining()) { - throw new IllegalStateException("Destination has reached end of stream: " + - destinationTransportInput); - } else { - return Transport.END_OF_STREAM; - } - } - - int total = source.remaining(); - - while(source.hasRemaining() && destinationTransportInput.capacity() > 0) - { - pour(source, destinationTransportInput.tail()); - destinationTransportInput.process(); - } - - return total - source.remaining(); - } - - /** - * Assumes {@code source} is ready to be read. - * - * @return number of bytes poured which may be fewer than {@code sizeRequested} if - * {@code source} has insufficient remaining - */ - public static int pourBufferToArray(ByteBuffer source, byte[] destination, int offset, int sizeRequested) - { - int numberToRead = Math.min(source.remaining(), sizeRequested); - source.get(destination, offset, numberToRead); - return numberToRead; - } - - public static ByteBuffer newWriteableBuffer(int capacity) - { - ByteBuffer newBuffer = ByteBuffer.allocate(capacity); - return newBuffer; - } - - public static ByteBuffer newReadableBuffer(int capacity) - { - ByteBuffer newBuffer = ByteBuffer.allocate(capacity); - newBuffer.flip(); - return newBuffer; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java deleted file mode 100644 index b4d3925..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/CollectorImpl.java +++ /dev/null @@ -1,101 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine.impl; - -import org.apache.qpid.proton.engine.Collector; -import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.EventType; - - -/** - * CollectorImpl - * - */ - -public class CollectorImpl implements Collector -{ - - private EventImpl head; - private EventImpl tail; - private EventImpl free; - - public CollectorImpl() - {} - - @Override - public Event peek() - { - return head; - } - - @Override - public void pop() - { - if (head != null) { - EventImpl next = head.next; - head.next = free; - free = head; - head.clear(); - head = next; - } - } - - public EventImpl put(EventType type, Object context) - { - if (type == null) { - throw new IllegalArgumentException("Type cannot be null"); - } - if (!type.isValid()) { - throw new IllegalArgumentException("Cannot put events of type " + type); - } - if (tail != null && tail.getEventType() == type && - tail.getContext() == context) { - return null; - } - - EventImpl event; - if (free == null) { - event = new EventImpl(); - } else { - event = free; - free = free.next; - event.next = null; - } - - event.init(type, context); - - if (head == null) { - head = event; - tail = event; - } else { - tail.next = event; - tail = event; - } - - return event; - } - - @Override - public boolean more() { - return head != null && head.next != null; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java deleted file mode 100644 index 2878a39..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/ConnectionImpl.java +++ /dev/null @@ -1,670 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine.impl; - -import java.util.ArrayList; -import java.util.EnumSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.transport.Open; -import org.apache.qpid.proton.engine.Collector; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.ProtonJConnection; -import org.apache.qpid.proton.engine.Session; -import org.apache.qpid.proton.reactor.Reactor; - -public class ConnectionImpl extends EndpointImpl implements ProtonJConnection -{ - public static final int MAX_CHANNELS = 65535; - - private List<SessionImpl> _sessions = new ArrayList<SessionImpl>(); - private EndpointImpl _transportTail; - private EndpointImpl _transportHead; - private int _maxChannels = MAX_CHANNELS; - - private LinkNode<SessionImpl> _sessionHead; - private LinkNode<SessionImpl> _sessionTail; - - - private LinkNode<LinkImpl> _linkHead; - private LinkNode<LinkImpl> _linkTail; - - - private DeliveryImpl _workHead; - private DeliveryImpl _workTail; - - private TransportImpl _transport; - private DeliveryImpl _transportWorkHead; - private DeliveryImpl _transportWorkTail; - private int _transportWorkSize = 0; - private String _localContainerId = ""; - private String _localHostname; - private String _remoteContainer; - private String _remoteHostname; - private Symbol[] _offeredCapabilities; - private Symbol[] _desiredCapabilities; - private Symbol[] _remoteOfferedCapabilities; - private Symbol[] _remoteDesiredCapabilities; - private Map<Symbol, Object> _properties; - private Map<Symbol, Object> _remoteProperties; - - private Object _context; - private CollectorImpl _collector; - private Reactor _reactor; - - private static final Symbol[] EMPTY_SYMBOL_ARRAY = new Symbol[0]; - - /** - * @deprecated This constructor's visibility will be reduced to the default scope in a future release. - * Client code outside this module should use {@link org.apache.qpid.proton.engine.Connection.Factory#create()} instead. - */ - @Deprecated public ConnectionImpl() - { - } - - @Override - public SessionImpl session() - { - SessionImpl session = new SessionImpl(this); - _sessions.add(session); - - - return session; - } - - void freeSession(SessionImpl session) - { - _sessions.remove(session); - } - - protected LinkNode<SessionImpl> addSessionEndpoint(SessionImpl endpoint) - { - LinkNode<SessionImpl> node; - if(_sessionHead == null) - { - node = _sessionHead = _sessionTail = LinkNode.newList(endpoint); - } - else - { - node = _sessionTail = _sessionTail.addAtTail(endpoint); - } - return node; - } - - void removeSessionEndpoint(LinkNode<SessionImpl> node) - { - LinkNode<SessionImpl> prev = node.getPrev(); - LinkNode<SessionImpl> next = node.getNext(); - - if(_sessionHead == node) - { - _sessionHead = next; - } - if(_sessionTail == node) - { - _sessionTail = prev; - } - node.remove(); - } - - - protected LinkNode<LinkImpl> addLinkEndpoint(LinkImpl endpoint) - { - LinkNode<LinkImpl> node; - if(_linkHead == null) - { - node = _linkHead = _linkTail = LinkNode.newList(endpoint); - } - else - { - node = _linkTail = _linkTail.addAtTail(endpoint); - } - return node; - } - - - void removeLinkEndpoint(LinkNode<LinkImpl> node) - { - LinkNode<LinkImpl> prev = node.getPrev(); - LinkNode<LinkImpl> next = node.getNext(); - - if(_linkHead == node) - { - _linkHead = next; - } - if(_linkTail == node) - { - _linkTail = prev; - } - node.remove(); - } - - - @Override - public Session sessionHead(final EnumSet<EndpointState> local, final EnumSet<EndpointState> remote) - { - if(_sessionHead == null) - { - return null; - } - else - { - LinkNode.Query<SessionImpl> query = new EndpointImplQuery<SessionImpl>(local, remote); - LinkNode<SessionImpl> node = query.matches(_sessionHead) ? _sessionHead : _sessionHead.next(query); - return node == null ? null : node.getValue(); - } - } - - @Override - public Link linkHead(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) - { - if(_linkHead == null) - { - return null; - } - else - { - LinkNode.Query<LinkImpl> query = new EndpointImplQuery<LinkImpl>(local, remote); - LinkNode<LinkImpl> node = query.matches(_linkHead) ? _linkHead : _linkHead.next(query); - return node == null ? null : node.getValue(); - } - } - - @Override - protected ConnectionImpl getConnectionImpl() - { - return this; - } - - @Override - void postFinal() { - put(Event.Type.CONNECTION_FINAL, this); - } - - @Override - void doFree() { - List<SessionImpl> sessions = new ArrayList<SessionImpl>(_sessions); - for(Session session : sessions) { - session.free(); - } - _sessions = null; - } - - void modifyEndpoints() { - if (_sessions != null) { - for (SessionImpl ssn: _sessions) { - ssn.modifyEndpoints(); - } - } - if (!freed) { - modified(); - } - } - - void handleOpen(Open open) - { - // TODO - store state - setRemoteState(EndpointState.ACTIVE); - setRemoteHostname(open.getHostname()); - setRemoteContainer(open.getContainerId()); - setRemoteDesiredCapabilities(open.getDesiredCapabilities()); - setRemoteOfferedCapabilities(open.getOfferedCapabilities()); - setRemoteProperties(open.getProperties()); - put(Event.Type.CONNECTION_REMOTE_OPEN, this); - } - - - EndpointImpl getTransportHead() - { - return _transportHead; - } - - EndpointImpl getTransportTail() - { - return _transportTail; - } - - void addModified(EndpointImpl endpoint) - { - if(_transportTail == null) - { - endpoint.setTransportNext(null); - endpoint.setTransportPrev(null); - _transportHead = _transportTail = endpoint; - } - else - { - _transportTail.setTransportNext(endpoint); - endpoint.setTransportPrev(_transportTail); - _transportTail = endpoint; - _transportTail.setTransportNext(null); - } - } - - void removeModified(EndpointImpl endpoint) - { - if(_transportHead == endpoint) - { - _transportHead = endpoint.transportNext(); - } - else - { - endpoint.transportPrev().setTransportNext(endpoint.transportNext()); - } - - if(_transportTail == endpoint) - { - _transportTail = endpoint.transportPrev(); - } - else - { - endpoint.transportNext().setTransportPrev(endpoint.transportPrev()); - } - } - - @Override - public int getMaxChannels() - { - return _maxChannels; - } - - public String getLocalContainerId() - { - return _localContainerId; - } - - @Override - public void setLocalContainerId(String localContainerId) - { - _localContainerId = localContainerId; - } - - @Override - public DeliveryImpl getWorkHead() - { - return _workHead; - } - - @Override - public void setContainer(String container) - { - _localContainerId = container; - } - - @Override - public String getContainer() - { - return _localContainerId; - } - - @Override - public void setHostname(String hostname) - { - _localHostname = hostname; - } - - @Override - public String getRemoteContainer() - { - return _remoteContainer; - } - - @Override - public String getRemoteHostname() - { - return _remoteHostname; - } - - @Override - public void setOfferedCapabilities(Symbol[] capabilities) - { - _offeredCapabilities = capabilities; - } - - @Override - public void setDesiredCapabilities(Symbol[] capabilities) - { - _desiredCapabilities = capabilities; - } - - @Override - public Symbol[] getRemoteOfferedCapabilities() - { - return _remoteOfferedCapabilities == null ? EMPTY_SYMBOL_ARRAY : _remoteOfferedCapabilities; - } - - @Override - public Symbol[] getRemoteDesiredCapabilities() - { - return _remoteDesiredCapabilities == null ? EMPTY_SYMBOL_ARRAY : _remoteDesiredCapabilities; - } - - - Symbol[] getOfferedCapabilities() - { - return _offeredCapabilities; - } - - Symbol[] getDesiredCapabilities() - { - return _desiredCapabilities; - } - - void setRemoteOfferedCapabilities(Symbol[] remoteOfferedCapabilities) - { - _remoteOfferedCapabilities = remoteOfferedCapabilities; - } - - void setRemoteDesiredCapabilities(Symbol[] remoteDesiredCapabilities) - { - _remoteDesiredCapabilities = remoteDesiredCapabilities; - } - - - Map<Symbol, Object> getProperties() - { - return _properties; - } - - @Override - public void setProperties(Map<Symbol, Object> properties) - { - _properties = properties; - } - - @Override - public Map<Symbol, Object> getRemoteProperties() - { - return _remoteProperties; - } - - void setRemoteProperties(Map<Symbol, Object> remoteProperties) - { - _remoteProperties = remoteProperties; - } - - @Override - public String getHostname() - { - return _localHostname; - } - - void setRemoteContainer(String remoteContainerId) - { - _remoteContainer = remoteContainerId; - } - - void setRemoteHostname(String remoteHostname) - { - _remoteHostname = remoteHostname; - } - - DeliveryImpl getWorkTail() - { - return _workTail; - } - - void removeWork(DeliveryImpl delivery) - { - if (!delivery._work) return; - - DeliveryImpl next = delivery.getWorkNext(); - DeliveryImpl prev = delivery.getWorkPrev(); - - if (prev != null) { - prev.setWorkNext(next); - } - - if (next != null) { - next.setWorkPrev(prev); - } - - - if(_workHead == delivery) - { - _workHead = next; - - } - - if(_workTail == delivery) - { - _workTail = prev; - } - - delivery._work = false; - } - - void addWork(DeliveryImpl delivery) - { - if (delivery._work) return; - - delivery.setWorkNext(null); - delivery.setWorkPrev(_workTail); - - if (_workTail != null) { - _workTail.setWorkNext(delivery); - } - - _workTail = delivery; - - if (_workHead == null) { - _workHead = delivery; - } - - delivery._work = true; - } - - public Iterator<DeliveryImpl> getWorkSequence() - { - return new WorkSequence(_workHead); - } - - void setTransport(TransportImpl transport) - { - _transport = transport; - } - - @Override - public TransportImpl getTransport() - { - return _transport; - } - - private static class WorkSequence implements Iterator<DeliveryImpl> - { - private DeliveryImpl _next; - - public WorkSequence(DeliveryImpl workHead) - { - _next = workHead; - } - - @Override - public boolean hasNext() - { - return _next != null; - } - - @Override - public void remove() - { - throw new UnsupportedOperationException(); - } - - @Override - public DeliveryImpl next() - { - DeliveryImpl next = _next; - if(next != null) - { - _next = next.getWorkNext(); - } - return next; - } - } - - DeliveryImpl getTransportWorkHead() - { - return _transportWorkHead; - } - - int getTransportWorkSize() { - return _transportWorkSize; - } - - public void removeTransportWork(DeliveryImpl delivery) - { - if (!delivery._transportWork) return; - - DeliveryImpl next = delivery.getTransportWorkNext(); - DeliveryImpl prev = delivery.getTransportWorkPrev(); - - if (prev != null) { - prev.setTransportWorkNext(next); - } - - if (next != null) { - next.setTransportWorkPrev(prev); - } - - - if(_transportWorkHead == delivery) - { - _transportWorkHead = next; - - } - - if(_transportWorkTail == delivery) - { - _transportWorkTail = prev; - } - - delivery._transportWork = false; - _transportWorkSize--; - } - - void addTransportWork(DeliveryImpl delivery) - { - modified(); - if (delivery._transportWork) return; - - delivery.setTransportWorkNext(null); - delivery.setTransportWorkPrev(_transportWorkTail); - - if (_transportWorkTail != null) { - _transportWorkTail.setTransportWorkNext(delivery); - } - - _transportWorkTail = delivery; - - if (_transportWorkHead == null) { - _transportWorkHead = delivery; - } - - delivery._transportWork = true; - _transportWorkSize++; - } - - void workUpdate(DeliveryImpl delivery) - { - if(delivery != null) - { - if(!delivery.isSettled() && - (delivery.isReadable() || - delivery.isWritable() || - delivery.isUpdated())) - { - addWork(delivery); - } - else - { - removeWork(delivery); - } - } - } - - @Override - public Object getContext() - { - return _context; - } - - @Override - public void setContext(Object context) - { - _context = context; - } - - @Override - public void collect(Collector collector) - { - _collector = (CollectorImpl) collector; - - put(Event.Type.CONNECTION_INIT, this); - - LinkNode<SessionImpl> ssn = _sessionHead; - while (ssn != null) { - put(Event.Type.SESSION_INIT, ssn.getValue()); - ssn = ssn.getNext(); - } - - LinkNode<LinkImpl> lnk = _linkHead; - while (lnk != null) { - put(Event.Type.LINK_INIT, lnk.getValue()); - lnk = lnk.getNext(); - } - } - - EventImpl put(Event.Type type, Object context) - { - if (_collector != null) { - return _collector.put(type, context); - } else { - return null; - } - } - - @Override - void localOpen() - { - put(Event.Type.CONNECTION_LOCAL_OPEN, this); - } - - @Override - void localClose() - { - put(Event.Type.CONNECTION_LOCAL_CLOSE, this); - } - - @Override - public Reactor getReactor() { - return _reactor; - } - - public void setReactor(Reactor reactor) { - _reactor = reactor; - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java deleted file mode 100644 index 0bdb163..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/DeliveryImpl.java +++ /dev/null @@ -1,522 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine.impl; - -import java.util.Arrays; - -import org.apache.qpid.proton.amqp.transport.DeliveryState; -import org.apache.qpid.proton.codec.ReadableBuffer; -import org.apache.qpid.proton.codec.WritableBuffer; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Transport; - -public class DeliveryImpl implements Delivery -{ - public static final int DEFAULT_MESSAGE_FORMAT = 0; - - private DeliveryImpl _linkPrevious; - private DeliveryImpl _linkNext; - - private DeliveryImpl _workNext; - private DeliveryImpl _workPrev; - boolean _work; - - private DeliveryImpl _transportWorkNext; - private DeliveryImpl _transportWorkPrev; - boolean _transportWork; - - private Record _attachments; - private Object _context; - - private final byte[] _tag; - private final LinkImpl _link; - private DeliveryState _deliveryState; - private boolean _settled; - private boolean _remoteSettled; - private DeliveryState _remoteDeliveryState; - private DeliveryState _defaultDeliveryState = null; - private int _messageFormat = DEFAULT_MESSAGE_FORMAT; - - /** - * A bit-mask representing the outstanding work on this delivery received from the transport layer - * that has not yet been processed by the application. - */ - private int _flags = (byte) 0; - - private TransportDelivery _transportDelivery; - private byte[] _data; - private int _dataSize; - private boolean _complete; - private boolean _updated; - private boolean _done; - private int _offset; - - DeliveryImpl(final byte[] tag, final LinkImpl link, DeliveryImpl previous) - { - _tag = tag; - _link = link; - _link.incrementUnsettled(); - _linkPrevious = previous; - if(previous != null) - { - previous._linkNext = this; - } - } - - @Override - public byte[] getTag() - { - return _tag; - } - - @Override - public LinkImpl getLink() - { - return _link; - } - - @Override - public DeliveryState getLocalState() - { - return _deliveryState; - } - - @Override - public DeliveryState getRemoteState() - { - return _remoteDeliveryState; - } - - @Override - public boolean remotelySettled() - { - return _remoteSettled; - } - - @Override - public void setMessageFormat(int messageFormat) - { - _messageFormat = messageFormat; - } - - @Override - public int getMessageFormat() - { - return _messageFormat; - } - - @Override - public void disposition(final DeliveryState state) - { - _deliveryState = state; - if(!_remoteSettled) - { - addToTransportWorkList(); - } - } - - @Override - public void settle() - { - if (_settled) { - return; - } - - _settled = true; - _link.decrementUnsettled(); - if(!_remoteSettled) - { - addToTransportWorkList(); - } - else - { - _transportDelivery.settled(); - } - if(_link.current() == this) - { - _link.advance(); - } - - _link.remove(this); - if(_linkPrevious != null) - { - _linkPrevious._linkNext = _linkNext; - } - if(_linkNext != null) - { - _linkNext._linkPrevious = _linkPrevious; - } - updateWork(); - } - - DeliveryImpl getLinkNext() - { - return _linkNext; - } - - @Override - public DeliveryImpl next() - { - return getLinkNext(); - } - - @Override - public void free() - { - settle(); - } - - DeliveryImpl getLinkPrevious() - { - return _linkPrevious; - } - - @Override - public DeliveryImpl getWorkNext() - { - if (_workNext != null) - return _workNext; - // the following hack is brought to you by the C implementation! - if (!_work) // not on the work list - return _link.getConnectionImpl().getWorkHead(); - return null; - } - - DeliveryImpl getWorkPrev() - { - return _workPrev; - } - - - void setWorkNext(DeliveryImpl workNext) - { - _workNext = workNext; - } - - void setWorkPrev(DeliveryImpl workPrev) - { - _workPrev = workPrev; - } - - int recv(final byte[] bytes, int offset, int size) - { - final int consumed; - if (_data != null) - { - //TODO - should only be if no bytes left - consumed = Math.min(size, _dataSize); - - System.arraycopy(_data, _offset, bytes, offset, consumed); - _offset += consumed; - _dataSize -= consumed; - } - else - { - _dataSize = consumed = 0; - } - - return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed; //TODO - Implement - } - - int recv(final WritableBuffer buffer) { - final int consumed; - if (_data != null) - { - consumed = Math.min(buffer.remaining(), _dataSize); - - buffer.put(_data, _offset, consumed); - _offset += consumed; - _dataSize -= consumed; - } - else - { - _dataSize = consumed = 0; - } - - return (_complete && consumed == 0) ? Transport.END_OF_STREAM : consumed; - } - - void updateWork() - { - getLink().getConnectionImpl().workUpdate(this); - } - - DeliveryImpl clearTransportWork() - { - DeliveryImpl next = _transportWorkNext; - getLink().getConnectionImpl().removeTransportWork(this); - return next; - } - - void addToTransportWorkList() - { - getLink().getConnectionImpl().addTransportWork(this); - } - - - DeliveryImpl getTransportWorkNext() - { - return _transportWorkNext; - } - - - DeliveryImpl getTransportWorkPrev() - { - return _transportWorkPrev; - } - - void setTransportWorkNext(DeliveryImpl transportWorkNext) - { - _transportWorkNext = transportWorkNext; - } - - void setTransportWorkPrev(DeliveryImpl transportWorkPrev) - { - _transportWorkPrev = transportWorkPrev; - } - - TransportDelivery getTransportDelivery() - { - return _transportDelivery; - } - - void setTransportDelivery(TransportDelivery transportDelivery) - { - _transportDelivery = transportDelivery; - } - - @Override - public boolean isSettled() - { - return _settled; - } - - int send(byte[] bytes, int offset, int length) - { - if(_data == null) - { - _data = new byte[length]; - } - else if(_data.length - _dataSize < length) - { - byte[] oldData = _data; - _data = new byte[oldData.length + _dataSize]; - System.arraycopy(oldData, _offset, _data, 0, _dataSize); - _offset = 0; - } - System.arraycopy(bytes, offset, _data, _dataSize + _offset, length); - _dataSize += length; - addToTransportWorkList(); - return length; //TODO - Implement. - } - - int send(final ReadableBuffer buffer) - { - int length = buffer.remaining(); - - if(_data == null) - { - _data = new byte[length]; - } - else if(_data.length - _dataSize < length) - { - byte[] oldData = _data; - _data = new byte[oldData.length + _dataSize]; - System.arraycopy(oldData, _offset, _data, 0, _dataSize); - _offset = 0; - } - buffer.get(_data, _offset, length); - _dataSize+=length; - addToTransportWorkList(); - return length; - } - - byte[] getData() - { - return _data; - } - - int getDataOffset() - { - return _offset; - } - - int getDataLength() - { - return _dataSize; //TODO - Implement. - } - - void setData(byte[] data) - { - _data = data; - } - - void setDataLength(int length) - { - _dataSize = length; - } - - public void setDataOffset(int arrayOffset) - { - _offset = arrayOffset; - } - - @Override - public boolean isWritable() - { - return getLink() instanceof SenderImpl - && getLink().current() == this - && ((SenderImpl) getLink()).hasCredit(); - } - - @Override - public boolean isReadable() - { - return getLink() instanceof ReceiverImpl - && getLink().current() == this; - } - - void setComplete() - { - _complete = true; - } - - @Override - public boolean isPartial() - { - return !_complete; - } - - void setRemoteDeliveryState(DeliveryState remoteDeliveryState) - { - _remoteDeliveryState = remoteDeliveryState; - _updated = true; - } - - @Override - public boolean isUpdated() - { - return _updated; - } - - @Override - public void clear() - { - _updated = false; - getLink().getConnectionImpl().workUpdate(this); - } - - - void setDone() - { - _done = true; - } - - boolean isDone() - { - return _done; - } - - void setRemoteSettled(boolean remoteSettled) - { - _remoteSettled = remoteSettled; - _updated = true; - } - - @Override - public boolean isBuffered() - { - if (_remoteSettled) return false; - if (getLink() instanceof SenderImpl) { - if (isDone()) { - return false; - } else { - return _complete || _dataSize > 0; - } - } else { - return false; - } - } - - @Override - public Object getContext() - { - return _context; - } - - @Override - public void setContext(Object context) - { - _context = context; - } - - @Override - public Record attachments() - { - if(_attachments == null) - { - _attachments = new RecordImpl(); - } - - return _attachments; - } - - @Override - public String toString() - { - StringBuilder builder = new StringBuilder(); - builder.append("DeliveryImpl [_tag=").append(Arrays.toString(_tag)) - .append(", _link=").append(_link) - .append(", _deliveryState=").append(_deliveryState) - .append(", _settled=").append(_settled) - .append(", _remoteSettled=").append(_remoteSettled) - .append(", _remoteDeliveryState=").append(_remoteDeliveryState) - .append(", _flags=").append(_flags) - .append(", _defaultDeliveryState=").append(_defaultDeliveryState) - .append(", _transportDelivery=").append(_transportDelivery) - .append(", _dataSize=").append(_dataSize) - .append(", _complete=").append(_complete) - .append(", _updated=").append(_updated) - .append(", _done=").append(_done) - .append(", _offset=").append(_offset).append("]"); - return builder.toString(); - } - - @Override - public int pending() - { - return _dataSize; - } - - @Override - public void setDefaultDeliveryState(DeliveryState state) - { - _defaultDeliveryState = state; - } - - @Override - public DeliveryState getDefaultDeliveryState() - { - return _defaultDeliveryState; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java deleted file mode 100644 index bbcc9d9..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImpl.java +++ /dev/null @@ -1,222 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.engine.impl; - -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.ProtonJEndpoint; -import org.apache.qpid.proton.engine.Record; - -public abstract class EndpointImpl implements ProtonJEndpoint -{ - private EndpointState _localState = EndpointState.UNINITIALIZED; - private EndpointState _remoteState = EndpointState.UNINITIALIZED; - private ErrorCondition _localError = new ErrorCondition(); - private ErrorCondition _remoteError = new ErrorCondition(); - private boolean _modified; - private EndpointImpl _transportNext; - private EndpointImpl _transportPrev; - private Object _context; - private Record _attachments = new RecordImpl(); - - private int refcount = 1; - boolean freed = false; - - void incref() { - refcount++; - } - - void decref() { - refcount--; - if (refcount == 0) { - postFinal(); - } else if (refcount < 0) { - throw new IllegalStateException(); - } - } - - abstract void postFinal(); - - abstract void localOpen(); - - abstract void localClose(); - - @Override - public void open() - { - if (getLocalState() != EndpointState.ACTIVE) - { - _localState = EndpointState.ACTIVE; - localOpen(); - modified(); - } - } - - @Override - public void close() - { - if (getLocalState() != EndpointState.CLOSED) - { - _localState = EndpointState.CLOSED; - localClose(); - modified(); - } - } - - @Override - public EndpointState getLocalState() - { - return _localState; - } - - @Override - public EndpointState getRemoteState() - { - return _remoteState; - } - - @Override - public ErrorCondition getCondition() - { - return _localError; - } - - @Override - public void setCondition(ErrorCondition condition) - { - if(condition != null) - { - _localError.copyFrom(condition); - } - else - { - _localError.clear(); - } - } - - @Override - public ErrorCondition getRemoteCondition() - { - return _remoteError; - } - - void setLocalState(EndpointState localState) - { - _localState = localState; - } - - void setRemoteState(EndpointState remoteState) - { - // TODO - check state change legal - _remoteState = remoteState; - } - - void modified() - { - modified(true); - } - - void modified(boolean emit) - { - if(!_modified) - { - _modified = true; - getConnectionImpl().addModified(this); - } - - if (emit) { - ConnectionImpl conn = getConnectionImpl(); - TransportImpl trans = conn.getTransport(); - if (trans != null) { - conn.put(Event.Type.TRANSPORT, trans); - } - } - } - - protected abstract ConnectionImpl getConnectionImpl(); - - void clearModified() - { - if(_modified) - { - _modified = false; - getConnectionImpl().removeModified(this); - } - } - - boolean isModified() - { - return _modified; - } - - EndpointImpl transportNext() - { - return _transportNext; - } - - EndpointImpl transportPrev() - { - return _transportPrev; - } - - abstract void doFree(); - - @Override - final public void free() - { - if (freed) return; - freed = true; - - doFree(); - decref(); - } - - void setTransportNext(EndpointImpl transportNext) - { - _transportNext = transportNext; - } - - void setTransportPrev(EndpointImpl transportPrevious) - { - _transportPrev = transportPrevious; - } - - @Override - public Object getContext() - { - return _context; - } - - @Override - public void setContext(Object context) - { - _context = context; - } - - @Override - public Record attachments() - { - return _attachments; - } - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImplQuery.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImplQuery.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImplQuery.java deleted file mode 100644 index 33519b1..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EndpointImplQuery.java +++ /dev/null @@ -1,43 +0,0 @@ -package org.apache.qpid.proton.engine.impl; -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * -*/ - - -import java.util.EnumSet; -import org.apache.qpid.proton.engine.EndpointState; - -class EndpointImplQuery<T extends EndpointImpl> implements LinkNode.Query<T> -{ - private final EnumSet<EndpointState> _local; - private final EnumSet<EndpointState> _remote; - - EndpointImplQuery(EnumSet<EndpointState> local, EnumSet<EndpointState> remote) - { - _local = local; - _remote = remote; - } - - public boolean matches(LinkNode<T> node) - { - return (_local == null || _local.contains(node.getValue().getLocalState())) - && (_remote == null || _remote.contains(node.getValue().getRemoteState())); - } -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java deleted file mode 100644 index 3bcecb5..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/EventImpl.java +++ /dev/null @@ -1,318 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine.impl; - -import java.util.Iterator; - -import org.apache.qpid.proton.engine.Connection; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Event; -import org.apache.qpid.proton.engine.EventType; -import org.apache.qpid.proton.engine.Handler; -import org.apache.qpid.proton.engine.HandlerException; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Record; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.Session; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.reactor.Reactor; -import org.apache.qpid.proton.reactor.Selectable; -import org.apache.qpid.proton.reactor.Task; -import org.apache.qpid.proton.reactor.impl.ReactorImpl; - -/** - * EventImpl - * - */ - -class EventImpl implements Event -{ - - EventType type; - Object context; - EventImpl next; - RecordImpl attachments = new RecordImpl(); - - EventImpl() - { - this.type = null; - } - - void init(EventType type, Object context) - { - this.type = type; - this.context = context; - this.attachments.clear(); - } - - void clear() - { - type = null; - context = null; - attachments.clear(); - } - - @Override - public EventType getEventType() - { - return type; - } - - @Override - public Type getType() { - if (type instanceof Type) { - return (Type)type; - } - return Type.NON_CORE_EVENT; - } - - @Override - public Object getContext() - { - return context; - } - - @Override - public Handler getRootHandler() { - return ReactorImpl.ROOT.get(this); - } - - private Handler delegated = null; - - @Override - public void dispatch(Handler handler) throws HandlerException - { - Handler old_delegated = delegated; - try { - delegated = handler; - try { - handler.handle(this); - } catch(HandlerException handlerException) { - throw handlerException; - } catch(RuntimeException runtimeException) { - throw new HandlerException(handler, runtimeException); - } - delegate(); - } finally { - delegated = old_delegated; - } - } - - @Override - public void delegate() throws HandlerException - { - if (delegated == null) { - return; // short circuit - } - Iterator<Handler> children = delegated.children(); - delegated = null; - while(children.hasNext()) { - dispatch(children.next()); - } - } - - @Override - public void redispatch(EventType as_type, Handler handler) throws HandlerException - { - if (!as_type.isValid()) { - throw new IllegalArgumentException("Can only redispatch valid event types"); - } - EventType old = type; - try { - type = as_type; - dispatch(handler); - } - finally { - type = old; - } - } - - @Override - public Connection getConnection() - { - if (context instanceof Connection) { - return (Connection) context; - } else if (context instanceof Transport) { - Transport transport = getTransport(); - if (transport == null) { - return null; - } - return ((TransportImpl) transport).getConnectionImpl(); - } else { - Session ssn = getSession(); - if (ssn == null) { - return null; - } - return ssn.getConnection(); - } - } - - @Override - public Session getSession() - { - if (context instanceof Session) { - return (Session) context; - } else { - Link link = getLink(); - if (link == null) { - return null; - } - return link.getSession(); - } - } - - @Override - public Link getLink() - { - if (context instanceof Link) { - return (Link) context; - } else { - Delivery dlv = getDelivery(); - if (dlv == null) { - return null; - } - return dlv.getLink(); - } - } - - @Override - public Sender getSender() - { - if (context instanceof Sender) { - return (Sender) context; - } else { - Link link = getLink(); - if (link instanceof Sender) { - return (Sender) link; - } - return null; - } - } - - @Override - public Receiver getReceiver() - { - if (context instanceof Receiver) { - return (Receiver) context; - } else { - Link link = getLink(); - if (link instanceof Receiver) { - return (Receiver) link; - } - return null; - } - } - - @Override - public Delivery getDelivery() - { - if (context instanceof Delivery) { - return (Delivery) context; - } else { - return null; - } - } - - @Override - public Transport getTransport() - { - if (context instanceof Transport) { - return (Transport) context; - } else if (context instanceof Connection) { - return ((Connection)context).getTransport(); - } else { - Session session = getSession(); - if (session == null) { - return null; - } - - Connection connection = session.getConnection(); - if (connection == null) { - return null; - } - - return connection.getTransport(); - } - } - - @Override - public Selectable getSelectable() { - if (context instanceof Selectable) { - return (Selectable) context; - } else { - return null; - } - } - - @Override - public Reactor getReactor() { - if (context instanceof Reactor) { - return (Reactor) context; - } else if (context instanceof Task) { - return ((Task)context).getReactor(); - } else if (context instanceof Transport) { - return ((TransportImpl)context).getReactor(); - } else if (context instanceof Delivery) { - return ((Delivery)context).getLink().getSession().getConnection().getReactor(); - } else if (context instanceof Link) { - return ((Link)context).getSession().getConnection().getReactor(); - } else if (context instanceof Session) { - return ((Session)context).getConnection().getReactor(); - } else if (context instanceof Connection) { - return ((Connection)context).getReactor(); - } else if (context instanceof Selectable) { - return ((Selectable)context).getReactor(); - } - return null; - } - - @Override - public Task getTask() { - if (context instanceof Task) { - return (Task) context; - } else { - return null; - } - } - - @Override - public Record attachments() { - return attachments; - } - - @Override - public Event copy() - { - EventImpl newEvent = new EventImpl(); - newEvent.init(type, context); - newEvent.attachments.copy(attachments); - return newEvent; - } - - @Override - public String toString() - { - return "EventImpl{" + "type=" + type + ", context=" + context + '}'; - } - - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java deleted file mode 100644 index dfbb201..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameHandler.java +++ /dev/null @@ -1,43 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ -package org.apache.qpid.proton.engine.impl; - -import org.apache.qpid.proton.engine.TransportException; -import org.apache.qpid.proton.framing.TransportFrame; - -public interface FrameHandler -{ - /** - * @throws IllegalStateException if I am not currently accepting input - * @see #isHandlingFrames() - * @return false on end of stream - */ - boolean handleFrame(TransportFrame frame); - - void closed(TransportException error); - - /** - * Returns whether I am currently able to handle frames. - * MUST be checked before calling {@link #handleFrame(TransportFrame)}. - */ - boolean isHandlingFrames(); - -} http://git-wip-us.apache.org/repos/asf/qpid-proton/blob/ccdcf329/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java ---------------------------------------------------------------------- diff --git a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java b/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java deleted file mode 100644 index 6aede84..0000000 --- a/proton-j/src/main/java/org/apache/qpid/proton/engine/impl/FrameParser.java +++ /dev/null @@ -1,586 +0,0 @@ -/* - * - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - * - */ - -package org.apache.qpid.proton.engine.impl; - -import static org.apache.qpid.proton.engine.impl.AmqpHeader.HEADER; -import static org.apache.qpid.proton.engine.impl.ByteBufferUtils.newWriteableBuffer; - -import java.nio.ByteBuffer; -import java.util.logging.Level; -import java.util.logging.Logger; - -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.transport.EmptyFrame; -import org.apache.qpid.proton.amqp.transport.FrameBody; -import org.apache.qpid.proton.codec.ByteBufferDecoder; -import org.apache.qpid.proton.codec.DecodeException; -import org.apache.qpid.proton.engine.Transport; -import org.apache.qpid.proton.engine.TransportException; -import org.apache.qpid.proton.framing.TransportFrame; - -class FrameParser implements TransportInput -{ - private static final Logger TRACE_LOGGER = Logger.getLogger("proton.trace"); - - private static final ByteBuffer _emptyInputBuffer = newWriteableBuffer(0); - - private enum State - { - HEADER0, - HEADER1, - HEADER2, - HEADER3, - HEADER4, - HEADER5, - HEADER6, - HEADER7, - SIZE_0, - SIZE_1, - SIZE_2, - SIZE_3, - PRE_PARSE, - BUFFERING, - PARSING, - ERROR - } - - private final FrameHandler _frameHandler; - private final ByteBufferDecoder _decoder; - private final int _inputBufferSize; - private final int _localMaxFrameSize; - - private ByteBuffer _inputBuffer = null; - private boolean _tail_closed = false; - - private State _state = State.HEADER0; - - private long _framesInput = 0; - - /** the stated size of the current frame */ - private int _size; - - /** holds the current frame that is being parsed */ - private ByteBuffer _frameBuffer; - - private TransportFrame _heldFrame; - private TransportException _parsingError; - - - /** - * We store the last result when processing input so that - * we know not to process any more input if it was an error. - */ - FrameParser(FrameHandler frameHandler, ByteBufferDecoder decoder, int localMaxFrameSize) - { - _frameHandler = frameHandler; - _decoder = decoder; - _localMaxFrameSize = localMaxFrameSize; - _inputBufferSize = _localMaxFrameSize > 0 ? _localMaxFrameSize : 4*1024; - } - - private void input(ByteBuffer in) throws TransportException - { - flushHeldFrame(); - if (_heldFrame != null) - { - return; - } - - TransportException frameParsingError = null; - int size = _size; - State state = _state; - ByteBuffer oldIn = null; - - boolean transportAccepting = true; - - while(in.hasRemaining() && state != State.ERROR && transportAccepting) - { - switch(state) - { - case HEADER0: - if(in.hasRemaining()) - { - byte c = in.get(); - if(c != HEADER[0]) - { - frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[0], state); - state = State.ERROR; - break; - } - state = State.HEADER1; - } - else - { - break; - } - case HEADER1: - if(in.hasRemaining()) - { - byte c = in.get(); - if(c != HEADER[1]) - { - frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[1], state); - state = State.ERROR; - break; - } - state = State.HEADER2; - } - else - { - break; - } - case HEADER2: - if(in.hasRemaining()) - { - byte c = in.get(); - if(c != HEADER[2]) - { - frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[2], state); - state = State.ERROR; - break; - } - state = State.HEADER3; - } - else - { - break; - } - case HEADER3: - if(in.hasRemaining()) - { - byte c = in.get(); - if(c != HEADER[3]) - { - frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[3], state); - state = State.ERROR; - break; - } - state = State.HEADER4; - } - else - { - break; - } - case HEADER4: - if(in.hasRemaining()) - { - byte c = in.get(); - if(c != HEADER[4]) - { - frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[4], state); - state = State.ERROR; - break; - } - state = State.HEADER5; - } - else - { - break; - } - case HEADER5: - if(in.hasRemaining()) - { - byte c = in.get(); - if(c != HEADER[5]) - { - frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[5], state); - state = State.ERROR; - break; - } - state = State.HEADER6; - } - else - { - break; - } - case HEADER6: - if(in.hasRemaining()) - { - byte c = in.get(); - if(c != HEADER[6]) - { - frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[6], state); - state = State.ERROR; - break; - } - state = State.HEADER7; - } - else - { - break; - } - case HEADER7: - if(in.hasRemaining()) - { - byte c = in.get(); - if(c != HEADER[7]) - { - frameParsingError = new TransportException("AMQP header mismatch value %x, expecting %x. In state: %s", c, HEADER[7], state); - state = State.ERROR; - break; - } - state = State.SIZE_0; - } - else - { - break; - } - case SIZE_0: - if(!in.hasRemaining()) - { - break; - } - if(in.remaining() >= 4) - { - size = in.getInt(); - state = State.PRE_PARSE; - break; - } - else - { - size = (in.get() << 24) & 0xFF000000; - if(!in.hasRemaining()) - { - state = State.SIZE_1; - break; - } - } - case SIZE_1: - size |= (in.get() << 16) & 0xFF0000; - if(!in.hasRemaining()) - { - state = State.SIZE_2; - break; - } - case SIZE_2: - size |= (in.get() << 8) & 0xFF00; - if(!in.hasRemaining()) - { - state = State.SIZE_3; - break; - } - case SIZE_3: - size |= in.get() & 0xFF; - state = State.PRE_PARSE; - - case PRE_PARSE: - if(size < 8) - { - frameParsingError = new TransportException("specified frame size %d smaller than minimum frame header " - + "size %d", - size, 8); - state = State.ERROR; - break; - } - - if (_localMaxFrameSize > 0 && size > _localMaxFrameSize) - { - frameParsingError = new TransportException("specified frame size %d greater than maximum valid frame size %d", - size, _localMaxFrameSize); - state = State.ERROR; - break; - } - - if(in.remaining() < size-4) - { - _frameBuffer = ByteBuffer.allocate(size-4); - _frameBuffer.put(in); - state = State.BUFFERING; - break; - } - case BUFFERING: - if(_frameBuffer != null) - { - if(in.remaining() < _frameBuffer.remaining()) - { - _frameBuffer.put(in); - break; - } - else - { - ByteBuffer dup = in.duplicate(); - dup.limit(dup.position()+_frameBuffer.remaining()); - in.position(in.position()+_frameBuffer.remaining()); - _frameBuffer.put(dup); - oldIn = in; - _frameBuffer.flip(); - in = _frameBuffer; - state = State.PARSING; - } - } - - case PARSING: - - int dataOffset = (in.get() << 2) & 0x3FF; - - if(dataOffset < 8) - { - frameParsingError = new TransportException("specified frame data offset %d smaller than minimum frame header size %d", dataOffset, 8); - state = State.ERROR; - break; - } - else if(dataOffset > size) - { - frameParsingError = new TransportException("specified frame data offset %d larger than the frame size %d", dataOffset, _size); - state = State.ERROR; - break; - } - - // type - - int type = in.get() & 0xFF; - int channel = in.getShort() & 0xFFFF; - - if(type != 0) - { - frameParsingError = new TransportException("unknown frame type: %d", type); - state = State.ERROR; - break; - } - - // note that this skips over the extended header if it's present - if(dataOffset!=8) - { - in.position(in.position()+dataOffset-8); - } - - // oldIn null iff not working on duplicated buffer - final int frameBodySize = size - dataOffset; - if(oldIn == null) - { - oldIn = in; - in = in.duplicate(); - final int endPos = in.position() + frameBodySize; - in.limit(endPos); - oldIn.position(endPos); - - } - - try - { - _framesInput += 1; - - Binary payload = null; - Object val = null; - - if (frameBodySize > 0) - { - _decoder.setByteBuffer(in); - val = _decoder.readObject(); - _decoder.setByteBuffer(null); - - if(in.hasRemaining()) - { - byte[] payloadBytes = new byte[in.remaining()]; - in.get(payloadBytes); - payload = new Binary(payloadBytes); - } - else - { - payload = null; - } - } - else - { - val = new EmptyFrame(); - } - - if(val instanceof FrameBody) - { - FrameBody frameBody = (FrameBody) val; - if(TRACE_LOGGER.isLoggable(Level.FINE)) - { - TRACE_LOGGER.log(Level.FINE, "IN: CH["+channel+"] : " + frameBody + (payload == null ? "" : "[" + payload + "]")); - } - TransportFrame frame = new TransportFrame(channel, frameBody, payload); - - if(_frameHandler.isHandlingFrames()) - { - _tail_closed = _frameHandler.handleFrame(frame); - } - else - { - transportAccepting = false; - _heldFrame = frame; - } - } - else - { - throw new TransportException("Frameparser encountered a " - + (val == null? "null" : val.getClass()) - + " which is not a " + FrameBody.class); - } - - reset(); - in = oldIn; - oldIn = null; - _frameBuffer = null; - state = State.SIZE_0; - } - catch (DecodeException ex) - { - state = State.ERROR; - frameParsingError = new TransportException(ex); - } - break; - case ERROR: - // do nothing - } - - } - - if (_tail_closed) - { - if (in.hasRemaining()) { - state = State.ERROR; - frameParsingError = new TransportException("framing error"); - } else if (state != State.SIZE_0) { - state = State.ERROR; - frameParsingError = new TransportException("connection aborted"); - } else { - _frameHandler.closed(null); - } - } - - _state = state; - _size = size; - - if(_state == State.ERROR) - { - _tail_closed = true; - if(frameParsingError != null) - { - _parsingError = frameParsingError; - _frameHandler.closed(frameParsingError); - } - else - { - throw new TransportException("Unable to parse, probably because of a previous error"); - } - } - } - - @Override - public int capacity() - { - if (_tail_closed) { - return Transport.END_OF_STREAM; - } else { - if (_inputBuffer != null) { - return _inputBuffer.remaining(); - } else { - return _inputBufferSize; - } - } - } - - @Override - public int position() { - if (_tail_closed) { - return Transport.END_OF_STREAM; - } - return (_inputBuffer == null) ? 0 : _inputBuffer.position(); - } - - @Override - public ByteBuffer tail() - { - if (_tail_closed) { - throw new TransportException("tail closed"); - } - - if (_inputBuffer == null) { - _inputBuffer = newWriteableBuffer(_inputBufferSize); - } - - return _inputBuffer; - } - - @Override - public void process() throws TransportException - { - if (_inputBuffer != null) - { - _inputBuffer.flip(); - - try - { - input(_inputBuffer); - } - finally - { - if (_inputBuffer.hasRemaining()) { - _inputBuffer.compact(); - } else if (_inputBuffer.capacity() > TransportImpl.BUFFER_RELEASE_THRESHOLD) { - _inputBuffer = null; - } else { - _inputBuffer.clear(); - } - } - } - else - { - input(_emptyInputBuffer); - } - } - - @Override - public void close_tail() - { - _tail_closed = true; - process(); - } - - /** - * Attempt to flush any cached data to the frame transport. This function - * is useful if the {@link FrameHandler} state has changed. - */ - public void flush() - { - flushHeldFrame(); - - if (_heldFrame == null) - { - process(); - } - } - - private void flushHeldFrame() - { - if(_heldFrame != null && _frameHandler.isHandlingFrames()) - { - _tail_closed = _frameHandler.handleFrame(_heldFrame); - _heldFrame = null; - } - } - - private void reset() - { - _size = 0; - _state = State.SIZE_0; - } - - long getFramesInput() - { - return _framesInput; - } -} --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@qpid.apache.org For additional commands, e-mail: commits-h...@qpid.apache.org