Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/AMQPConnection_1_0.java Fri Apr 15 10:10:16 2016 @@ -25,11 +25,18 @@ import java.nio.ByteBuffer; import java.security.AccessController; import java.security.Principal; import java.security.PrivilegedAction; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collection; import java.util.Collections; +import java.util.HashMap; import java.util.Iterator; import java.util.LinkedHashMap; import java.util.List; import java.util.Map; +import java.util.NoSuchElementException; +import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -39,20 +46,21 @@ import javax.security.sasl.SaslServer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.amqp_1_0.codec.FrameWriter; -import org.apache.qpid.amqp_1_0.codec.ProtocolHandler; -import org.apache.qpid.amqp_1_0.framing.AMQFrame; -import org.apache.qpid.amqp_1_0.framing.FrameHandler; -import org.apache.qpid.amqp_1_0.framing.OversizeFrameException; -import org.apache.qpid.amqp_1_0.framing.SASLFrameHandler; -import org.apache.qpid.amqp_1_0.framing.TransportFrame; -import org.apache.qpid.amqp_1_0.transport.ConnectionEndpoint; -import org.apache.qpid.amqp_1_0.transport.Container; -import org.apache.qpid.amqp_1_0.transport.FrameOutputHandler; -import org.apache.qpid.amqp_1_0.transport.SaslServerProvider; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.FrameBody; -import org.apache.qpid.amqp_1_0.type.Symbol; +import org.apache.qpid.server.model.AuthenticationProvider; +import org.apache.qpid.server.model.VirtualHost; +import org.apache.qpid.server.protocol.v1_0.codec.DescribedTypeConstructorRegistry; +import org.apache.qpid.server.protocol.v1_0.codec.FrameWriter; +import org.apache.qpid.server.protocol.v1_0.codec.ProtocolHandler; +import org.apache.qpid.server.protocol.v1_0.codec.ValueWriter; +import org.apache.qpid.server.protocol.v1_0.framing.AMQFrame; +import org.apache.qpid.server.protocol.v1_0.framing.FrameHandler; +import org.apache.qpid.server.protocol.v1_0.framing.OversizeFrameException; +import org.apache.qpid.server.protocol.v1_0.framing.SASLFrame; +import org.apache.qpid.server.protocol.v1_0.framing.TransportFrame; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.FrameBody; +import org.apache.qpid.server.protocol.v1_0.type.SaslFrameBody; +import org.apache.qpid.server.protocol.v1_0.type.Symbol; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.common.ServerPropertyNames; import org.apache.qpid.configuration.CommonProperties; @@ -64,6 +72,27 @@ import org.apache.qpid.server.model.Tran import org.apache.qpid.server.model.port.AmqpPort; import org.apache.qpid.server.protocol.AMQSessionModel; import org.apache.qpid.server.protocol.ConnectionClosingTicker; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedShort; +import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslChallenge; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslCode; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslInit; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslMechanisms; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslOutcome; +import org.apache.qpid.server.protocol.v1_0.type.security.SaslResponse; +import org.apache.qpid.server.protocol.v1_0.type.transport.AmqpError; +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; +import org.apache.qpid.server.protocol.v1_0.type.transport.Close; +import org.apache.qpid.server.protocol.v1_0.type.transport.ConnectionError; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition; +import org.apache.qpid.server.protocol.v1_0.type.transport.End; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; +import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; import org.apache.qpid.server.security.SubjectCreator; import org.apache.qpid.server.security.auth.AuthenticatedPrincipal; import org.apache.qpid.server.security.auth.UsernamePrincipal; @@ -74,191 +103,1048 @@ import org.apache.qpid.server.transport. import org.apache.qpid.server.transport.ServerNetworkConnection; import org.apache.qpid.server.util.Action; import org.apache.qpid.server.util.ConnectionScopedRuntimeException; +import org.apache.qpid.server.virtualhost.VirtualHostUnavailableException; import org.apache.qpid.transport.ByteBufferSender; import org.apache.qpid.transport.network.AggregateTicker; public class AMQPConnection_1_0 extends AbstractAMQPConnection<AMQPConnection_1_0> - implements FrameOutputHandler + implements FrameOutputHandler, DescribedTypeConstructorRegistry.Source, + ValueWriter.Registry.Source, + ErrorHandler, + SASLEndpoint, + ConnectionHandler { - public static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class); + private static Logger LOGGER = LoggerFactory.getLogger(AMQPConnection_1_0.class); + private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM"); + private static final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW"); + + + + private static final long CLOSE_RESPONSE_TIMEOUT = 10000l; + + private final AtomicBoolean _stateChanged = new AtomicBoolean(); + private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>(); + + + private static final byte[] SASL_HEADER = new byte[] + { + (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 3, + (byte) 1, + (byte) 0, + (byte) 0 + }; + + private static final byte[] AMQP_HEADER = new byte[] + { + (byte) 'A', + (byte) 'M', + (byte) 'Q', + (byte) 'P', + (byte) 0, + (byte) 1, + (byte) 0, + (byte) 0 + }; + + private FrameWriter _frameWriter; + private ProtocolHandler _frameHandler; + private volatile boolean _transportBlockedForWriting; + + private enum FrameReceivingState + { + AMQP_OR_SASL_HEADER, + SASL_INIT_ONLY, + SASL_RESPONSE_ONLY, + AMQP_HEADER, + OPEN_ONLY, + ANY_FRAME, + CLOSED + } + + private volatile FrameReceivingState _frameReceivingState = FrameReceivingState.AMQP_OR_SASL_HEADER; + + private static final short CONNECTION_CONTROL_CHANNEL = (short) 0; + private static final QpidByteBuffer EMPTY_BYTE_BUFFER = QpidByteBuffer.wrap(new byte[0]); + + private static final int DEFAULT_CHANNEL_MAX = Math.min(Integer.getInteger("amqp.channel_max", 255), 0xFFFF); + private static final int DEFAULT_MAX_FRAME = Integer.getInteger("amqp.max_frame_size", 1 << 15); + + private AmqpPort<?> _port; + private SubjectCreator _subjectCreator; + private Transport _transport; + private long _connectionId; + + private Container _container; + private Principal _user; + + + private int _channelMax = DEFAULT_CHANNEL_MAX; + private int _maxFrameSize = 4096; + private String _remoteContainerId; + + private SocketAddress _remoteAddress; + + // positioned by the *outgoing* channel + private SessionEndpoint[] _sendingSessions; + + // positioned by the *incoming* channel + private SessionEndpoint[] _receivingSessions; + private boolean _closedForInput; + private boolean _closedForOutput; + + private long _idleTimeout; + + private ConnectionState _connectionState = ConnectionState.UNOPENED; + + private AMQPDescribedTypeRegistry _describedTypeRegistry = AMQPDescribedTypeRegistry.newInstance() + .registerTransportLayer() + .registerMessagingLayer() + .registerTransactionLayer() + .registerSecurityLayer(); + + + private Map _properties; + private SaslServerProvider _saslServerProvider; + private boolean _saslComplete; + + private SaslServer _saslServer; + private String _localHostname; + private long _desiredIdleTimeout; + private UnsignedInteger _handleMax = UnsignedInteger.MAX_VALUE; + private Error _remoteError; + + private static final long MINIMUM_SUPPORTED_IDLE_TIMEOUT = 1000L; + + private Map _remoteProperties; + + private final AtomicBoolean _orderlyClose = new AtomicBoolean(false); + + private final Collection<Session_1_0> + _sessions = Collections.synchronizedCollection(new ArrayList<Session_1_0>()); + + private final Object _reference = new Object(); + + private final Queue<Action<? super ConnectionHandler>> _asyncTaskList = + new ConcurrentLinkedQueue<>(); + + private boolean _closedOnOpen; + + + + AMQPConnection_1_0(final Broker<?> broker, + final ServerNetworkConnection network, + AmqpPort<?> port, Transport transport, long id, + final AggregateTicker aggregateTicker, + final boolean useSASL) + { + super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker); + _container = new Container(broker.getId().toString()); + + _subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()); + + _saslServerProvider = useSASL ? asSaslServerProvider(_subjectCreator, network) : null; + _port = port; + _transport = transport; + _connectionId = id; + + Map<Symbol,Object> serverProperties = new LinkedHashMap<>(); + serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName()); + serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion()); + serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion()); + serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), broker.getName()); + + setProperties(serverProperties); + + setRemoteAddress(network.getRemoteAddress()); + + setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay()); + + _frameWriter = new FrameWriter(getDescribedTypeRegistry()); + + + } + + + private void setUserPrincipal(final Principal user) + { + setSubject(_subjectCreator.createSubjectWithGroups(user)); + } + + private long getDesiredIdleTimeout() + { + return _desiredIdleTimeout; + } + + public void receiveAttach(final short channel, final Attach attach) + { + assertState(FrameReceivingState.ANY_FRAME); + SessionEndpoint endPoint = getSession(channel); + if (endPoint != null) + { + endPoint.receiveAttach(attach); + } + } + + public void receive(final short channel, final Object frame) + { + List<Runnable> postLockActions; + FRAME_LOGGER.debug("RECV[{}|{}] : {}", _remoteAddress, channel, frame); + if (frame instanceof FrameBody) + { + ((FrameBody) frame).invoke(channel, this); + } + else if (frame instanceof SaslFrameBody) + { + ((SaslFrameBody) frame).invoke(channel, this); + } + } + + private void closeSaslWithFailure() + { + _saslComplete = true; + _frameReceivingState = FrameReceivingState.CLOSED; + setClosedForInput(true); + close(); + } + + public void receiveSaslChallenge(final SaslChallenge saslChallenge) + { + // TODO - log unexpected frame + closeSaslWithFailure(); + } + + public void receiveClose(final short channel, final Close close) + { + assertState(FrameReceivingState.ANY_FRAME); + _frameReceivingState = FrameReceivingState.CLOSED; + setClosedForInput(true); + closeReceived(); + switch (_connectionState) + { + case UNOPENED: + case AWAITING_OPEN: + Error error = new Error(); + error.setCondition(ConnectionError.CONNECTION_FORCED); + error.setDescription("Connection close sent before connection was opened"); + closeConnection(error); + break; + case OPEN: + _connectionState = ConnectionState.CLOSE_RECEIVED; + // TODO - we should log the error we received from the client if present + sendClose(new Close()); + _connectionState = ConnectionState.CLOSED; + _orderlyClose.set(true); + break; + case CLOSE_SENT: + _connectionState = ConnectionState.CLOSED; + _orderlyClose.set(true); + + default: + } + _remoteError = close.getError(); + + } + + private void closeReceived() + { + Collection<Session_1_0> sessions = new ArrayList<>(_sessions); + + for(Session_1_0 session : sessions) + { + session.remoteEnd(new End()); + } + } + + private void setClosedForInput(final boolean closed) + { + _closedForInput = closed; + } + + public void receiveSaslMechanisms(final SaslMechanisms saslMechanisms) + { + // TODO - log unexpected frame + closeSaslWithFailure(); + } + + public void receiveSaslResponse(final SaslResponse saslResponse) + { + final Binary responseBinary = saslResponse.getResponse(); + byte[] response = responseBinary == null ? new byte[0] : responseBinary.getArray(); + + assertState(FrameReceivingState.SASL_RESPONSE_ONLY); + + try + { + + // Process response from the client + byte[] challenge = _saslServer.evaluateResponse(response != null ? response : new byte[0]); + + if (_saslServer.isComplete()) + { + SaslOutcome outcome = new SaslOutcome(); + + outcome.setCode(SaslCode.OK); + send(new SASLFrame(outcome), null); + _saslComplete = true; + _user = _saslServerProvider.getAuthenticatedPrincipal(_saslServer); + _frameReceivingState = FrameReceivingState.AMQP_HEADER; + } + else + { + SaslChallenge challengeBody = new SaslChallenge(); + challengeBody.setChallenge(new Binary(challenge)); + send(new SASLFrame(challengeBody), null); + + } + } + catch (SaslException e) + { + SaslOutcome outcome = new SaslOutcome(); + + outcome.setCode(SaslCode.AUTH); + send(new SASLFrame(outcome), null); + _saslComplete = true; + closeSaslWithFailure(); + + } + } + + public AMQPDescribedTypeRegistry getDescribedTypeRegistry() + { + return _describedTypeRegistry; + } + + private void closeSessionAsync(final Session_1_0 session, final AMQConstant cause, final String message) + { + addAsyncTask(new Action<ConnectionHandler>() + { + @Override + public void performAction(final ConnectionHandler object) + { + session.close(cause, message); + } + }); + } + + + private boolean closedForOutput() + { + return _closedForOutput; + } + + public boolean isClosed() + { + return _connectionState == ConnectionState.CLOSED + || _connectionState == ConnectionState.CLOSE_RECEIVED; + } + + public boolean closedForInput() + { + return _closedForInput; + } + + void sessionEnded(final Session_1_0 session) + { + if(!_closedOnOpen) + { + _sessions.remove(session); + sessionRemoved(session); + } + } + + public int send(final short channel, final FrameBody body, final QpidByteBuffer payload) + { + return sendFrame(channel, body, payload); + } + + private void inputClosed() + { + List<Runnable> postLockActions; + + if (!_closedForInput) + { + _closedForInput = true; + FRAME_LOGGER.debug("RECV[{}] : {}", _remoteAddress, "Underlying connection closed"); + switch (_connectionState) + { + case UNOPENED: + case AWAITING_OPEN: + case CLOSE_SENT: + _connectionState = ConnectionState.CLOSED; + closeSender(); + break; + case OPEN: + _connectionState = ConnectionState.CLOSE_RECEIVED; + case CLOSED: + // already sent our close - too late to do anything more + break; + default: + } + closeReceived(); + } + + + } + + private void closeSender() + { + setClosedForOutput(true); + close(); + } + + String getRemoteContainerId() + { + return _remoteContainerId; + } - public static final long CLOSE_RESPONSE_TIMEOUT = 10000l; - private final Broker<?> _broker; + private void setDesiredIdleTimeout(final long desiredIdleTimeout) + { + _desiredIdleTimeout = desiredIdleTimeout; + } + + public boolean isOpen() + { + return _connectionState == ConnectionState.OPEN; + } + + void sendEnd(final short channel, final End end, final boolean remove) + { + sendFrame(channel, end); + if (remove) + { + _sendingSessions[channel] = null; + } + + } + + public void receiveSaslOutcome(final SaslOutcome saslOutcome) + { + // TODO - log unexpected frame + closeSaslWithFailure(); + } + + public void receiveEnd(final short channel, final End end) + { + + assertState(FrameReceivingState.ANY_FRAME); + SessionEndpoint endpoint = _receivingSessions[channel]; + if (endpoint != null) + { + _receivingSessions[channel] = null; + + endpoint.receiveEnd(end); + } + else + { + // TODO error + } + } + + public void receiveDisposition(final short channel, + final Disposition disposition) + { + assertState(FrameReceivingState.ANY_FRAME); + SessionEndpoint endPoint = getSession(channel); + if (endPoint != null) + { + endPoint.receiveDisposition(disposition); + } + + } + + public void receiveBegin(final short channel, final Begin begin) + { + + assertState(FrameReceivingState.ANY_FRAME); + short myChannelId; + if (begin.getRemoteChannel() != null) + { + myChannelId = begin.getRemoteChannel().shortValue(); + SessionEndpoint sessionEndpoint; + try + { + sessionEndpoint = _sendingSessions[myChannelId]; + } + catch (IndexOutOfBoundsException e) + { + final Error error = new Error(); + error.setCondition(ConnectionError.FRAMING_ERROR); + error.setDescription("BEGIN received on channel " + channel + " with given remote-channel " + + begin.getRemoteChannel() + " which is outside the valid range of 0 to " + + _channelMax + "."); + closeConnection(error); + return; + } + if (sessionEndpoint != null) + { + if (_receivingSessions[channel] == null) + { + _receivingSessions[channel] = sessionEndpoint; + sessionEndpoint.setReceivingChannel(channel); + sessionEndpoint.setNextIncomingId(begin.getNextOutgoingId()); + sessionEndpoint.setOutgoingSessionCredit(begin.getIncomingWindow()); + + if (sessionEndpoint.getState() == SessionState.END_SENT) + { + _sendingSessions[myChannelId] = null; + } + } + else + { + final Error error = new Error(); + error.setCondition(ConnectionError.FRAMING_ERROR); + error.setDescription("BEGIN received on channel " + channel + " which is already in use."); + closeConnection(error); + } + } + else + { + final Error error = new Error(); + error.setCondition(ConnectionError.FRAMING_ERROR); + error.setDescription("BEGIN received on channel " + channel + " with given remote-channel " + + begin.getRemoteChannel() + " which is not known as a begun session."); + closeConnection(error); + } + + + } + else // Peer requesting session creation + { + + myChannelId = getFirstFreeChannel(); + if (myChannelId == -1) + { + // close any half open channel + myChannelId = getFirstFreeChannel(); + + } + + if (_receivingSessions[channel] == null) + { + SessionEndpoint sessionEndpoint = new SessionEndpoint(this, begin); + + _receivingSessions[channel] = sessionEndpoint; + _sendingSessions[myChannelId] = sessionEndpoint; + + Begin beginToSend = new Begin(); + + sessionEndpoint.setReceivingChannel(channel); + sessionEndpoint.setSendingChannel(myChannelId); + beginToSend.setRemoteChannel(UnsignedShort.valueOf(channel)); + beginToSend.setNextOutgoingId(sessionEndpoint.getNextOutgoingId()); + beginToSend.setOutgoingWindow(sessionEndpoint.getOutgoingWindowSize()); + beginToSend.setIncomingWindow(sessionEndpoint.getIncomingWindowSize()); + sendFrame(myChannelId, beginToSend); + + remoteSessionCreation(sessionEndpoint); + } + else + { + final Error error = new Error(); + error.setCondition(ConnectionError.FRAMING_ERROR); + error.setDescription("BEGIN received on channel " + channel + " which is already in use."); + closeConnection(error); + } + + } + + } + + private void remoteSessionCreation(final SessionEndpoint sessionEndpoint) + { + if(!_closedOnOpen) + { + final Session_1_0 session = new Session_1_0(this, sessionEndpoint); + _sessions.add(session); + sessionAdded(session); + sessionEndpoint.setSessionEventListener(new SessionEventListener() + { + @Override + public void remoteLinkCreation(final LinkEndpoint endpoint11) + { + AccessController.doPrivileged(new PrivilegedAction<Object>() + { + @Override + public Object run() + { + session.remoteLinkCreation(endpoint11); + return null; + } + }, session.getAccessControllerContext()); + } + + @Override + public void remoteEnd(final End end) + { + AccessController.doPrivileged(new PrivilegedAction<Object>() + { + @Override + public Object run() + { + session.remoteEnd(end); + return null; + } + }, session.getAccessControllerContext()); + } + }); + } + } + + private short getFirstFreeChannel() + { + for (int i = 0; i <= _channelMax; i++) + { + if (_sendingSessions[i] == null) + { + return (short) i; + } + } + return -1; + } + + public void handleError(final Error error) + { + if (!closedForOutput()) + { + Close close = new Close(); + close.setError(error); + sendFrame((short) 0, close); + + setClosedForOutput(true); + } + + } + + public void receiveTransfer(final short channel, final Transfer transfer) + { + assertState(FrameReceivingState.ANY_FRAME); + SessionEndpoint endPoint = getSession(channel); + if (endPoint != null) + { + endPoint.receiveTransfer(transfer); + } + + } + + public SessionEndpoint createSession(final String name) + { + // todo assert connection state + short channel = getFirstFreeChannel(); + if (channel != -1) + { + SessionEndpoint endpoint = new SessionEndpoint(this); + _sendingSessions[channel] = endpoint; + endpoint.setSendingChannel(channel); + Begin begin = new Begin(); + begin.setNextOutgoingId(endpoint.getNextOutgoingId()); + begin.setOutgoingWindow(endpoint.getOutgoingWindowSize()); + begin.setIncomingWindow(endpoint.getIncomingWindowSize()); + + begin.setHandleMax(_handleMax); + sendFrame(channel, begin); + return endpoint; + + } + else + { + // TODO - report error + return null; + } + + } + + public void receiveFlow(final short channel, final Flow flow) + { + assertState(FrameReceivingState.ANY_FRAME); + SessionEndpoint endPoint = getSession(channel); + if (endPoint != null) + { + endPoint.receiveFlow(flow); + } + + } + + public void receiveOpen(final short channel, final Open open) + { + assertState(FrameReceivingState.OPEN_ONLY); + _frameReceivingState = FrameReceivingState.ANY_FRAME; + _channelMax = open.getChannelMax() == null ? _channelMax + : open.getChannelMax().intValue() < _channelMax + ? open.getChannelMax().intValue() + : _channelMax; + if (_receivingSessions == null) + { + _receivingSessions = new SessionEndpoint[_channelMax + 1]; + _sendingSessions = new SessionEndpoint[_channelMax + 1]; + } + _maxFrameSize = open.getMaxFrameSize() == null ? DEFAULT_MAX_FRAME : open.getMaxFrameSize().intValue(); + _remoteContainerId = open.getContainerId(); + _localHostname = open.getHostname(); + if (open.getIdleTimeOut() != null) + { + _idleTimeout = open.getIdleTimeOut().longValue(); + } + _remoteProperties = open.getProperties(); + if (_remoteProperties != null) + { + if (_remoteProperties.containsKey(Symbol.valueOf("product"))) + { + setClientProduct(_remoteProperties.get(Symbol.valueOf("product")).toString()); + } + if (_remoteProperties.containsKey(Symbol.valueOf("version"))) + { + setClientVersion(_remoteProperties.get(Symbol.valueOf("version")).toString()); + } + setClientId(_remoteContainerId); + } + if (_idleTimeout != 0L && _idleTimeout < MINIMUM_SUPPORTED_IDLE_TIMEOUT) + { + closeConnection(new Error(ConnectionError.CONNECTION_FORCED, + "Requested idle timeout of " + + _idleTimeout + + " is too low. The minimum supported timeout is" + + MINIMUM_SUPPORTED_IDLE_TIMEOUT)); + close(); + _closedOnOpen = true; + } + else + { + long desiredIdleTimeout = getDesiredIdleTimeout(); + initialiseHeartbeating(_idleTimeout / 2L, desiredIdleTimeout); + final VirtualHost vhost = ((AmqpPort) _port).getVirtualHost(_localHostname); + if (vhost == null) + { + closeWithError(AmqpError.NOT_FOUND, "Unknown hostname in connection open: '" + _localHostname + "'"); + } + else + { + if (vhost.getState() != org.apache.qpid.server.model.State.ACTIVE) + { + final Error err = new Error(); + err.setCondition(AmqpError.NOT_FOUND); + closeConnection(err); + + _closedOnOpen = true; + + populateConnectionRedirect(vhost, err); + + closeConnection(err); + + close(); + + _closedOnOpen = true; + + } + else + { + final Principal user = _user; + if (user != null) + { + setUserPrincipal(user); + } + if (AuthenticatedPrincipal.getOptionalAuthenticatedPrincipalFromSubject(getSubject()) == null) + { + closeWithError(AmqpError.NOT_ALLOWED, "Connection has not been authenticated"); + } + else + { + try + { + setVirtualHost(vhost); + } + catch (VirtualHostUnavailableException e) + { + closeWithError(AmqpError.NOT_ALLOWED, e.getMessage()); + } + } + } + } + } + switch (_connectionState) + { + case UNOPENED: + sendOpen(_channelMax, _maxFrameSize); + case AWAITING_OPEN: + _connectionState = ConnectionState.OPEN; + default: + // TODO bad stuff (connection already open) + + } + + } + + private void populateConnectionRedirect(final VirtualHost vhost, final Error err) + { + final String redirectHost = vhost.getRedirectHost(((AmqpPort) _port)); + + if(redirectHost == null) + { + err.setDescription("Virtual host '" + _localHostname + "' is not active"); + } + else + { + String networkHost; + int port; + if(redirectHost.matches("\\[[0-9a-f:]+\\](:[0-9]+)?")) + { + // IPv6 case + networkHost = redirectHost.substring(1, redirectHost.indexOf("]")); + if(redirectHost.contains("]:")) + { + port = Integer.parseInt(redirectHost.substring(redirectHost.indexOf("]")+2)); + } + else + { + port = -1; + } + } + else + { + if(redirectHost.contains(":")) + { + networkHost = redirectHost.substring(0, redirectHost.lastIndexOf(":")); + try + { + String portString = redirectHost.substring(redirectHost.lastIndexOf(":")+1); + port = Integer.parseInt(portString); + } + catch (NumberFormatException e) + { + port = -1; + } + } + else + { + networkHost = redirectHost; + port = -1; + } + } + final Map<Symbol, Object> infoMap = new HashMap<>(); + infoMap.put(Symbol.valueOf("network-host"), networkHost); + if(port > 0) + { + infoMap.put(Symbol.valueOf("port"), UnsignedInteger.valueOf(port)); + } + err.setInfo(infoMap); + } + } - private ConnectionEndpoint _endpoint; - private final AtomicBoolean _stateChanged = new AtomicBoolean(); - private final AtomicReference<Action<ProtocolEngine>> _workListener = new AtomicReference<>(); + public void receiveDetach(final short channel, final Detach detach) + { + assertState(FrameReceivingState.ANY_FRAME); + SessionEndpoint endPoint = getSession(channel); + if (endPoint != null) + { + endPoint.receiveDetach(detach); + } + } + private void transportStateChanged() + { + for (Session_1_0 session : _sessions) + { + session.transportStateChanged(); + } + } - private static final ByteBuffer SASL_LAYER_HEADER = - ByteBuffer.wrap(new byte[] - { - (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte) 3, - (byte) 1, - (byte) 0, - (byte) 0 - }); - - private static final ByteBuffer AMQP_LAYER_HEADER = - ByteBuffer.wrap(new byte[] - { - (byte)'A', - (byte)'M', - (byte)'Q', - (byte)'P', - (byte) 0, - (byte) 1, - (byte) 0, - (byte) 0 - }); + public void close(final Error error) + { + closeConnection(error); + } + private void setRemoteAddress(final SocketAddress remoteAddress) + { + _remoteAddress = remoteAddress; + } - private FrameWriter _frameWriter; - private ProtocolHandler _frameHandler; - private Object _sendLock = new Object(); - private byte _major; - private byte _minor; - private byte _revision; - private final Connection_1_0 _connection; - private volatile boolean _transportBlockedForWriting; + public Principal getUser() + { + return _user; + } + public void setProperties(final Map<Symbol, Object> properties) + { + _properties = properties; + } - static enum State { - A, - M, - Q, - P, - PROTOCOL, - MAJOR, - MINOR, - REVISION, - FRAME - } - - private State _state = State.A; - - public AMQPConnection_1_0(final Broker<?> broker, final ServerNetworkConnection network, - AmqpPort<?> port, Transport transport, long id, - final AggregateTicker aggregateTicker, - final boolean useSASL) + private void setClosedForOutput(final boolean closed) { - super(broker, network, port, transport, Protocol.AMQP_1_0, id, aggregateTicker); - _broker = broker; - _connection = createConnection(broker, network, port, transport, id, useSASL); - _endpoint = _connection.getConnectionEndpoint(); - _endpoint.setConnectionEventListener(_connection); - _endpoint.setDesiredIdleTimeout(1000L * broker.getConnection_heartBeatDelay()); - _endpoint.setFrameOutputHandler(this); - final List<String> mechanisms = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()).getMechanisms(); - ByteBuffer headerResponse = useSASL ? initiateSasl() : initiateNonSasl(mechanisms); + _closedForOutput = closed; + } - _frameWriter = new FrameWriter(_endpoint.getDescribedTypeRegistry()); + public void receiveSaslInit(final SaslInit saslInit) + { + assertState(FrameReceivingState.SASL_INIT_ONLY); + String mechanism = saslInit.getMechanism() == null ? null : saslInit.getMechanism().toString(); + final Binary initialResponse = saslInit.getInitialResponse(); + byte[] response = initialResponse == null ? new byte[0] : initialResponse.getArray(); - getSender().send(QpidByteBuffer.wrap(headerResponse.duplicate())); - getSender().flush(); - if(useSASL) + try { - _endpoint.initiateSASL(mechanisms.toArray(new String[mechanisms.size()])); - } + _saslServer = _saslServerProvider.getSaslServer(mechanism, "localhost"); + // Process response from the client + byte[] challenge = _saslServer.evaluateResponse(response != null ? response : new byte[0]); - } + if (_saslServer.isComplete()) + { + SaslOutcome outcome = new SaslOutcome(); - private Connection_1_0 createConnection(final Broker<?> broker, - final ServerNetworkConnection network, - final AmqpPort<?> port, - final Transport transport, - final long id, - final boolean useSASL) - { - Container container = new Container(broker.getId().toString()); + outcome.setCode(SaslCode.OK); + send(new SASLFrame(outcome), null); + _saslComplete = true; + _user = _saslServerProvider.getAuthenticatedPrincipal(_saslServer); - SubjectCreator subjectCreator = port.getAuthenticationProvider().getSubjectCreator(transport.isSecure()); - final ConnectionEndpoint endpoint = - new ConnectionEndpoint(container, useSASL ? asSaslServerProvider(subjectCreator, network) : null); - endpoint.setLogger(new ConnectionEndpoint.FrameReceiptLogger() - { - @Override - public boolean isEnabled() - { - return FRAME_LOGGER.isDebugEnabled(); - } + _frameReceivingState = FrameReceivingState.AMQP_HEADER; - @Override - public void received(final SocketAddress remoteAddress, final short channel, final Object frame) + } + else { - FRAME_LOGGER.debug("RECV[" + remoteAddress + "|" + channel + "] : " + frame); + SaslChallenge challengeBody = new SaslChallenge(); + challengeBody.setChallenge(new Binary(challenge)); + send(new SASLFrame(challengeBody), null); + + _frameReceivingState = FrameReceivingState.SASL_RESPONSE_ONLY; } - }); - Map<Symbol,Object> serverProperties = new LinkedHashMap<>(); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.PRODUCT), CommonProperties.getProductName()); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.VERSION), CommonProperties.getReleaseVersion()); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_BUILD), CommonProperties.getBuildVersion()); - serverProperties.put(Symbol.valueOf(ServerPropertyNames.QPID_INSTANCE_NAME), broker.getName()); + } + catch (SaslException e) + { + SaslOutcome outcome = new SaslOutcome(); - endpoint.setProperties(serverProperties); + outcome.setCode(SaslCode.AUTH); + send(new SASLFrame(outcome), null); + _saslComplete = true; - endpoint.setRemoteAddress(network.getRemoteAddress()); - return new Connection_1_0(endpoint, id, port, transport, subjectCreator, this); + closeSaslWithFailure(); + + } } + public int getMaxFrameSize() + { + return _maxFrameSize; + } - public ByteBufferSender getSender() + Object getReference() { - return getNetwork().getSender(); + return _reference; } - public ByteBuffer initiateNonSasl(final List<String> mechanisms) + private void endpointClosed() { - final ByteBuffer headerResponse; - if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) - && getNetwork().getPeerPrincipal() != null) + try { - _connection.setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal())); + performDeleteTasks(); + closeReceived(); } - else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME)) + finally { - _connection.setUserPrincipal(new AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL)); + VirtualHost<?> virtualHost = getVirtualHost(); + if (virtualHost != null) + { + virtualHost.deregisterConnection(this); + } } - else + } + + private void closeConnection() + { + switch (_connectionState) { - getNetwork().close(); + case AWAITING_OPEN: + case OPEN: + Close closeToSend = new Close(); + sendClose(closeToSend); + _connectionState = ConnectionState.CLOSE_SENT; + break; + case CLOSE_SENT: + default: } - - _frameHandler = new FrameHandler(_endpoint); - headerResponse = AMQP_LAYER_HEADER; - return headerResponse; } - public ByteBuffer initiateSasl() + private void closeConnection(final Error error) { - final ByteBuffer headerResponse; - _endpoint.setSaslFrameOutput(this); + Close close = new Close(); + close.setError(error); + switch (_connectionState) + { + case UNOPENED: + sendOpen(0, 0); + sendClose(close); + _connectionState = ConnectionState.CLOSED; + break; + case AWAITING_OPEN: + case OPEN: + sendClose(close); + _connectionState = ConnectionState.CLOSE_SENT; + case CLOSE_SENT: + case CLOSED: + // already sent our close - too late to do anything more + break; + default: + // TODO Unknown state + } + } - _endpoint.setOnSaslComplete(new Runnable() + int sendFrame(final short channel, final FrameBody body, final QpidByteBuffer payload) + { + if (!_closedForOutput) { - public void run() + ValueWriter<FrameBody> writer = _describedTypeRegistry.getValueWriter(body); + int size = writer.writeToBuffer(EMPTY_BYTE_BUFFER); + QpidByteBuffer payloadDup = payload == null ? null : payload.duplicate(); + int payloadSent = _maxFrameSize - (size + 9); + try { - if (_endpoint.isAuthenticated()) + if (payloadSent < (payload == null ? 0 : payload.remaining())) { - getSender().send(QpidByteBuffer.wrap(AMQP_LAYER_HEADER.duplicate())); - getSender().flush(); + + if (body instanceof Transfer) + { + ((Transfer) body).setMore(Boolean.TRUE); + } + + writer = _describedTypeRegistry.getValueWriter(body); + size = writer.writeToBuffer(EMPTY_BYTE_BUFFER); + payloadSent = _maxFrameSize - (size + 9); + + payloadDup.limit(payloadDup.position() + payloadSent); } else { - getNetwork().close(); + payloadSent = payload == null ? 0 : payload.remaining(); } + send(AMQFrame.createAMQFrame(channel, body, payloadDup)); } - }); - _frameHandler = new SASLFrameHandler(_endpoint); - headerResponse = SASL_LAYER_HEADER; - return headerResponse; + finally + { + if (payloadDup != null) + { + payloadDup.dispose(); + } + } + return payloadSent; + } + else + { + return -1; + } + } + + void sendFrame(final short channel, final FrameBody body) + { + sendFrame(channel, body, null); + } + + public ByteBufferSender getSender() + { + return getNetwork().getSender(); } @Override @@ -311,15 +1197,14 @@ public class AMQPConnection_1_0 extends return getNetwork().getRemoteAddress().toString(); } - private final Logger RAW_LOGGER = LoggerFactory.getLogger("RAW"); - public synchronized void received(final QpidByteBuffer msg) + public void received(final QpidByteBuffer msg) { try { updateLastReadTime(); - if(RAW_LOGGER.isDebugEnabled()) + if (RAW_LOGGER.isDebugEnabled()) { QpidByteBuffer dup = msg.duplicate(); byte[] data = new byte[dup.remaining()]; @@ -328,137 +1213,122 @@ public class AMQPConnection_1_0 extends Binary bin = new Binary(data); RAW_LOGGER.debug("RECV[" + getNetwork().getRemoteAddress() + "] : " + bin.toString()); } - ProtocolHandler frameHandler; + int remaining; do { - frameHandler = _frameHandler; remaining = msg.remaining(); - switch (_state) + switch (_frameReceivingState) { - case A: - if (msg.hasRemaining()) - { - msg.get(); - } - else - { - break; - } - case M: - if (msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.M; - break; - } - - case Q: - if (msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.Q; - break; - } - case P: - if (msg.hasRemaining()) - { - msg.get(); - } - else + case AMQP_OR_SASL_HEADER: + case AMQP_HEADER: + if (remaining < 8) { - _state = State.P; - break; + return; } - case PROTOCOL: - if (msg.hasRemaining()) - { - msg.get(); - } - else - { - _state = State.PROTOCOL; - break; - } - case MAJOR: - if (msg.hasRemaining()) - { - _major = msg.get(); - } - else - { - _state = State.MAJOR; - break; - } - case MINOR: - if (msg.hasRemaining()) - { - _minor = msg.get(); - } - else - { - _state = State.MINOR; - break; - } - case REVISION: - if (msg.hasRemaining()) - { - _revision = msg.get(); + processProtocolHeader(msg); + break; + case OPEN_ONLY: + case ANY_FRAME: + case SASL_INIT_ONLY: + case SASL_RESPONSE_ONLY: + _frameHandler.parse(msg); + break; + case CLOSED: + // ignore; + break; + } - _state = State.FRAME; - } - else - { - _state = State.REVISION; - break; - } - case FRAME: - if (msg.hasRemaining()) - { - AccessController.doPrivileged(new PrivilegedAction<Void>() - { - @Override - public Void run() - { - _frameHandler = _frameHandler.parse(msg); - return null; - } - }, getAccessControllerContext()); - } - } } - while(_frameHandler != frameHandler || msg.remaining() != remaining); + while (msg.remaining() != remaining); } - catch(ConnectionScopedRuntimeException e) + catch (ConnectionScopedRuntimeException e) { throw e; } - catch(RuntimeException e) + catch (RuntimeException e) { LOGGER.error("Unexpected exception while processing incoming data", e); throw new ConnectionScopedRuntimeException("Unexpected exception while processing incoming data", e); } - finally + + } + + private void processProtocolHeader(final QpidByteBuffer msg) + { + if(msg.remaining() >= 8) { - msg.position(msg.limit()); + byte[] header = new byte[8]; + msg.get(header); + + final AuthenticationProvider authenticationProvider = getPort().getAuthenticationProvider(); + final SubjectCreator subjectCreator = authenticationProvider.getSubjectCreator(getTransport().isSecure()); + + if(Arrays.equals(header, SASL_HEADER)) + { + if(_saslComplete) + { + throw new ConnectionScopedRuntimeException("SASL Layer header received after SASL already established"); + } + + getSender().send(QpidByteBuffer.wrap(SASL_HEADER)); + + SaslMechanisms mechanisms = new SaslMechanisms(); + ArrayList<Symbol> mechanismsList = new ArrayList<Symbol>(); + for (String name : subjectCreator.getMechanisms()) + { + mechanismsList.add(Symbol.valueOf(name)); + } + mechanisms.setSaslServerMechanisms(mechanismsList.toArray(new Symbol[mechanismsList.size()])); + send(new SASLFrame(mechanisms), null); + + _frameReceivingState = FrameReceivingState.SASL_INIT_ONLY; + _frameHandler = new FrameHandler(this, true); + } + else if(Arrays.equals(header, AMQP_HEADER)) + { + if(!_saslComplete) + { + final List<String> mechanisms = subjectCreator.getMechanisms(); + + if(mechanisms.contains(ExternalAuthenticationManagerImpl.MECHANISM_NAME) && getNetwork().getPeerPrincipal() != null) + { + setUserPrincipal(new AuthenticatedPrincipal(getNetwork().getPeerPrincipal())); + } + else if(mechanisms.contains(AnonymousAuthenticationManager.MECHANISM_NAME)) + { + setUserPrincipal(new AuthenticatedPrincipal(AnonymousAuthenticationManager.ANONYMOUS_PRINCIPAL)); + } + else + { + // TODO - log auth failure / close + getNetwork().close(); + } + + } + getSender().send(QpidByteBuffer.wrap(AMQP_HEADER)); + _frameReceivingState = FrameReceivingState.OPEN_ONLY; + _frameHandler = new FrameHandler(this, false); + + } + else + { + throw new ConnectionScopedRuntimeException("Unknown protocol header"); + } + } - } + + } public void closed() { try { - _endpoint.inputClosed(); + inputClosed(); } catch(RuntimeException e) { @@ -468,7 +1338,7 @@ public class AMQPConnection_1_0 extends { try { - _connection.closed(); + endpointClosed(); } finally { @@ -487,57 +1357,49 @@ public class AMQPConnection_1_0 extends send(amqFrame, null); } - private static final Logger FRAME_LOGGER = LoggerFactory.getLogger("FRM"); public void send(final AMQFrame amqFrame, ByteBuffer buf) { - synchronized (_sendLock) - { - updateLastWriteTime(); - if (FRAME_LOGGER.isDebugEnabled()) - { - FRAME_LOGGER.debug("SEND[" - + getNetwork().getRemoteAddress() - + "|" - + amqFrame.getChannel() - + "] : " - + amqFrame.getFrameBody()); - } + updateLastWriteTime(); + FRAME_LOGGER.debug("SEND[{}|{}] : {}", + getNetwork().getRemoteAddress(), + amqFrame.getChannel(), + amqFrame.getFrameBody()); - _frameWriter.setValue(amqFrame); + _frameWriter.setValue(amqFrame); - QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_frameWriter.getSize()); + QpidByteBuffer buffer = QpidByteBuffer.allocateDirect(_frameWriter.getSize()); - try + try + { + int size = _frameWriter.writeToBuffer(buffer); + if (size > getMaxFrameSize()) { - int size = _frameWriter.writeToBuffer(buffer); - if (size > _endpoint.getMaxFrameSize()) - { - throw new OversizeFrameException(amqFrame, size); - } - - buffer.flip(); + throw new OversizeFrameException(amqFrame, size); + } - if (RAW_LOGGER.isDebugEnabled()) - { - QpidByteBuffer dup = buffer.duplicate(); - byte[] data = new byte[dup.remaining()]; - dup.get(data); - dup.dispose(); - Binary bin = new Binary(data); - RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString()); - } + buffer.flip(); - getSender().send(buffer); - getSender().flush(); - } - finally + if (RAW_LOGGER.isDebugEnabled()) { - buffer.dispose(); + QpidByteBuffer dup = buffer.duplicate(); + byte[] data = new byte[dup.remaining()]; + dup.get(data); + dup.dispose(); + Binary bin = new Binary(data); + RAW_LOGGER.debug("SEND[" + getNetwork().getRemoteAddress() + "] : " + bin.toString()); } + + getSender().send(buffer); + + } + finally + { + buffer.dispose(); } + } public void send(short channel, FrameBody body) @@ -565,7 +1427,7 @@ public class AMQPConnection_1_0 extends if(_transportBlockedForWriting != blocked) { _transportBlockedForWriting = blocked; - _connection.transportStateChanged(); + transportStateChanged(); } } @@ -575,7 +1437,7 @@ public class AMQPConnection_1_0 extends { if (isIOThread()) { - return _connection.processPendingIterator(); + return new ProcessPendingIterator(); } else { @@ -620,44 +1482,202 @@ public class AMQPConnection_1_0 extends public void sendConnectionCloseAsync(final AMQConstant cause, final String message) { - _connection.sendConnectionCloseAsync(cause, message); + Action<ConnectionHandler> action = new Action<ConnectionHandler>() + { + @Override + public void performAction(final ConnectionHandler object) + { + closeConnection(); + + } + }; + addAsyncTask(action); } public void closeSessionAsync(final AMQSessionModel<?> session, final AMQConstant cause, final String message) { - _connection.closeSessionAsync((Session_1_0) session, cause, message); + closeSessionAsync((Session_1_0) session, cause, message); } public void block() { - _connection.block(); + // TODO } public String getRemoteContainerName() { - return _connection.getRemoteContainerName(); + return _remoteContainerId; } public List<Session_1_0> getSessionModels() { - return _connection.getSessionModels(); + return new ArrayList<>(_sessions); } public void unblock() { - _connection.unblock(); + // TODO } public long getSessionCountLimit() { - return _connection.getSessionCountLimit(); + return 0; // TODO } @Override - protected boolean isOrderlyClose() + public boolean isOrderlyClose() { - return _connection.getConnectionEndpoint().isOrderlyClose(); + return _orderlyClose.get(); + } + + private void addAsyncTask(final Action<ConnectionHandler> action) + { + _asyncTaskList.add(action); + notifyWork(); + } + + + private void sendOpen(final int channelMax, final int maxFrameSize) + { + Open open = new Open(); + + if (_receivingSessions == null) + { + _receivingSessions = new SessionEndpoint[channelMax + 1]; + _sendingSessions = new SessionEndpoint[channelMax + 1]; + } + if (channelMax < _channelMax) + { + _channelMax = channelMax; + } + open.setChannelMax(UnsignedShort.valueOf((short) channelMax)); + open.setContainerId(_container.getId()); + open.setMaxFrameSize(UnsignedInteger.valueOf(maxFrameSize)); + // TODO - should we try to set the hostname based on the connection information? + // open.setHostname(); + open.setIdleTimeOut(UnsignedInteger.valueOf(_desiredIdleTimeout)); + if (_properties != null) + { + open.setProperties(_properties); + } + + sendFrame(CONNECTION_CONTROL_CHANNEL, open); + } + + private void closeWithError(final AmqpError amqpError, final String errorDescription) + { + final Error err = new Error(); + err.setCondition(amqpError); + err.setDescription(errorDescription); + closeConnection(err); + close(); + _closedOnOpen = true; + } + + private SessionEndpoint getSession(final short channel) + { + SessionEndpoint session = _receivingSessions[channel]; + if (session == null) + { + Error error = new Error(); + error.setCondition(ConnectionError.FRAMING_ERROR); + error.setDescription("Frame received on channel " + channel + " which is not known as a begun session."); + handleError(error); + } + + return session; + } + + private void sendClose(Close closeToSend) + { + sendFrame(CONNECTION_CONTROL_CHANNEL, closeToSend); + closeSender(); + } + + @Override + public String toString() + { + VirtualHost<?> virtualHost = getVirtualHost(); + return "Connection_1_0[" + + _connectionId + + " " + + getAddress() + + (virtualHost == null ? "" : (" vh : " + virtualHost.getName())) + + ']'; + } + + + private void assertState(final FrameReceivingState state) + { + if(_frameReceivingState != state) + { + throw new ConnectionScopedRuntimeException("Unexpected state, client has sent frame in an illegal order. Required state: " + state + ", actual state: " + _frameReceivingState); + } + } + + + private class ProcessPendingIterator implements Iterator<Runnable> + { + private final List<? extends AMQSessionModel<?>> _sessionsWithPending; + private Iterator<? extends AMQSessionModel<?>> _sessionIterator; + private ProcessPendingIterator() + { + _sessionsWithPending = new ArrayList<>(getSessionModels()); + _sessionIterator = _sessionsWithPending.iterator(); + } + + @Override + public boolean hasNext() + { + return !(_sessionsWithPending.isEmpty() && _asyncTaskList.isEmpty()); + } + + @Override + public Runnable next() + { + if(!_sessionsWithPending.isEmpty()) + { + if(!_sessionIterator.hasNext()) + { + _sessionIterator = _sessionsWithPending.iterator(); + } + final AMQSessionModel<?> session = _sessionIterator.next(); + return new Runnable() + { + @Override + public void run() + { + if(!session.processPending()) + { + _sessionIterator.remove(); + } + } + }; + } + else if(!_asyncTaskList.isEmpty()) + { + final Action<? super ConnectionHandler> asyncAction = _asyncTaskList.poll(); + return new Runnable() + { + @Override + public void run() + { + asyncAction.performAction(AMQPConnection_1_0.this); + } + }; + } + else + { + throw new NoSuchElementException(); + } + } + + @Override + public void remove() + { + throw new UnsupportedOperationException(); + } } @Override
Added: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java?rev=1739270&view=auto ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java (added) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java Fri Apr 15 10:10:16 2016 @@ -0,0 +1,61 @@ +/* + * + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + * + */ +package org.apache.qpid.server.protocol.v1_0; + +import org.apache.qpid.server.protocol.v1_0.type.transport.Attach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Begin; +import org.apache.qpid.server.protocol.v1_0.type.transport.Close; +import org.apache.qpid.server.protocol.v1_0.type.transport.Detach; +import org.apache.qpid.server.protocol.v1_0.type.transport.Disposition; +import org.apache.qpid.server.protocol.v1_0.type.transport.End; +import org.apache.qpid.server.protocol.v1_0.type.transport.Error; +import org.apache.qpid.server.protocol.v1_0.type.transport.Flow; +import org.apache.qpid.server.protocol.v1_0.type.transport.Open; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; + +public interface ConnectionHandler extends SASLEndpoint +{ + void receiveOpen(short channel, Open close); + + void receiveClose(short channel, Close close); + + void receiveBegin(short channel, Begin begin); + + void receiveEnd(short channel, End end); + + void receiveAttach(short channel, Attach attach); + + void receiveDetach(short channel, Detach detach); + + void receiveTransfer(short channel, Transfer transfer); + + void receiveDisposition(short channel, Disposition disposition); + + void receiveFlow(short channel, Flow flow); + + int getMaxFrameSize(); + + void handleError(Error parsingError); + + boolean closedForInput(); + + void receive(short channel, Object val); +} Propchange: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionHandler.java ------------------------------------------------------------------------------ svn:eol-style = native Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionState.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionState.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ConnectionState.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConnectionState.java Fri Apr 15 10:10:16 2016 @@ -19,7 +19,7 @@ * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; public enum ConnectionState { Modified: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java?rev=1739270&r1=1739269&r2=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ConsumerTarget_1_0.java Fri Apr 15 10:10:16 2016 @@ -26,23 +26,22 @@ import java.util.Collection; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import org.apache.qpid.amqp_1_0.codec.ValueHandler; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoder; -import org.apache.qpid.amqp_1_0.messaging.SectionEncoderImpl; -import org.apache.qpid.amqp_1_0.transport.SendingLinkEndpoint; -import org.apache.qpid.amqp_1_0.type.AmqpErrorException; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; -import org.apache.qpid.amqp_1_0.type.Outcome; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.codec.AMQPDescribedTypeRegistry; -import org.apache.qpid.amqp_1_0.type.messaging.Accepted; -import org.apache.qpid.amqp_1_0.type.messaging.Header; -import org.apache.qpid.amqp_1_0.type.messaging.Modified; -import org.apache.qpid.amqp_1_0.type.messaging.Released; -import org.apache.qpid.amqp_1_0.type.transaction.TransactionalState; -import org.apache.qpid.amqp_1_0.type.transport.SenderSettleMode; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; +import org.apache.qpid.server.protocol.v1_0.codec.ValueHandler; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoder; +import org.apache.qpid.server.protocol.v1_0.messaging.SectionEncoderImpl; +import org.apache.qpid.server.protocol.v1_0.type.AmqpErrorException; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; +import org.apache.qpid.server.protocol.v1_0.type.Outcome; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.codec.AMQPDescribedTypeRegistry; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Accepted; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Header; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Modified; +import org.apache.qpid.server.protocol.v1_0.type.messaging.Released; +import org.apache.qpid.server.protocol.v1_0.type.transaction.TransactionalState; +import org.apache.qpid.server.protocol.v1_0.type.transport.SenderSettleMode; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; import org.apache.qpid.bytebuffer.QpidByteBuffer; import org.apache.qpid.server.transport.ProtocolEngine; import org.apache.qpid.server.consumer.AbstractConsumerTarget; @@ -130,21 +129,21 @@ class ConsumerTarget_1_0 extends Abstrac QpidByteBuffer payload = null; //TODO Collection<QpidByteBuffer> fragments = message.getFragments(); - if(fragments.size() == 1) + if (fragments.size() == 1) { payload = fragments.iterator().next(); } else { int size = 0; - for(QpidByteBuffer fragment : fragments) + for (QpidByteBuffer fragment : fragments) { size += fragment.remaining(); } payload = QpidByteBuffer.allocateDirect(size); - for(QpidByteBuffer fragment : fragments) + for (QpidByteBuffer fragment : fragments) { payload.put(fragment); fragment.dispose(); @@ -153,7 +152,7 @@ class ConsumerTarget_1_0 extends Abstrac payload.flip(); } - if(entry.getDeliveryCount() != 0) + if (entry.getDeliveryCount() != 0) { ValueHandler valueHandler = new ValueHandler(_typeRegistry); @@ -161,7 +160,7 @@ class ConsumerTarget_1_0 extends Abstrac try { Object value = valueHandler.parse(payload); - if(value instanceof Header) + if (value instanceof Header) { oldHeader = (Header) value; } @@ -177,7 +176,7 @@ class ConsumerTarget_1_0 extends Abstrac } Header header = new Header(); - if(oldHeader != null) + if (oldHeader != null) { header.setDurable(oldHeader.getDurable()); header.setPriority(oldHeader.getPriority()); @@ -190,7 +189,7 @@ class ConsumerTarget_1_0 extends Abstrac QpidByteBuffer oldPayload = payload; payload = QpidByteBuffer.allocateDirect(oldPayload.remaining() + encodedHeader.getLength()); - payload.put(encodedHeader.getArray(),encodedHeader.getArrayOffset(),encodedHeader.getLength()); + payload.put(encodedHeader.getArray(), encodedHeader.getArrayOffset(), encodedHeader.getLength()); payload.put(oldPayload); oldPayload.dispose(); payload.flip(); @@ -203,58 +202,57 @@ class ConsumerTarget_1_0 extends Abstrac transfer.setDeliveryTag(tag); - synchronized(_link.getLock()) + if (_link.isAttached()) { - if(_link.isAttached()) + if (SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) { - if(SenderSettleMode.SETTLED.equals(getEndpoint().getSendingSettlementMode())) - { - transfer.setSettled(true); - } - else - { - UnsettledAction action = _acquires - ? new DispositionAction(tag, entry) - : new DoNothingAction(tag, entry); + transfer.setSettled(true); + } + else + { + UnsettledAction action = _acquires + ? new DispositionAction(tag, entry) + : new DoNothingAction(tag, entry); - _link.addUnsettled(tag, action, entry); - } + _link.addUnsettled(tag, action, entry); + } - if(_transactionId != null) - { - TransactionalState state = new TransactionalState(); - state.setTxnId(_transactionId); - transfer.setState(state); - } - // TODO - need to deal with failure here - if(_acquires && _transactionId != null) + if (_transactionId != null) + { + TransactionalState state = new TransactionalState(); + state.setTxnId(_transactionId); + transfer.setState(state); + } + // TODO - need to deal with failure here + if (_acquires && _transactionId != null) + { + ServerTransaction txn = _link.getTransaction(_transactionId); + if (txn != null) { - ServerTransaction txn = _link.getTransaction(_transactionId); - if(txn != null) + txn.addPostTransactionAction(new ServerTransaction.Action() { - txn.addPostTransactionAction(new ServerTransaction.Action(){ - public void postCommit() - { - } - - public void onRollback() - { - entry.release(getConsumer()); - _link.getEndpoint().updateDisposition(tag, (DeliveryState)null, true); - } - }); - } + public void postCommit() + { + } + public void onRollback() + { + entry.release(getConsumer()); + _link.getEndpoint().updateDisposition(tag, (DeliveryState) null, true); + } + }); } - getSession().getAMQPConnection().registerMessageDelivered(message.getSize()); - getEndpoint().transfer(transfer, false); - } - else - { - entry.release(getConsumer()); + } + getSession().getAMQPConnection().registerMessageDelivered(message.getSize()); + getEndpoint().transfer(transfer, false); } + else + { + entry.release(getConsumer()); + } + } finally { @@ -281,63 +279,48 @@ class ConsumerTarget_1_0 extends Abstrac public boolean allocateCredit(final ServerMessage msg) { - synchronized (_link.getLock()) + ProtocolEngine protocolEngine = getSession().getConnection(); + final boolean hasCredit = + _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting(); + if (!hasCredit && getState() == State.ACTIVE) { + suspend(); + } - ProtocolEngine protocolEngine = getSession().getConnection().getAmqpConnection(); - final boolean hasCredit = _link.isAttached() && getEndpoint().hasCreditToSend() && !protocolEngine.isTransportBlockedForWriting(); - if (!hasCredit && getState() == State.ACTIVE) - { - suspend(); - } - - if (hasCredit) - { - SendingLinkEndpoint linkEndpoint = _link.getEndpoint(); - linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE)); - } - - return hasCredit; + if (hasCredit) + { + SendingLinkEndpoint linkEndpoint = _link.getEndpoint(); + linkEndpoint.setLinkCredit(linkEndpoint.getLinkCredit().subtract(UnsignedInteger.ONE)); } + + return hasCredit; } public void suspend() { - synchronized(_link.getLock()) - { - updateState(State.ACTIVE, State.SUSPENDED); - } + updateState(State.ACTIVE, State.SUSPENDED); } public void restoreCredit(final ServerMessage message) { - synchronized (_link.getLock()) - { - final SendingLinkEndpoint endpoint = _link.getEndpoint(); - endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE)); - } + final SendingLinkEndpoint endpoint = _link.getEndpoint(); + endpoint.setLinkCredit(endpoint.getLinkCredit().add(UnsignedInteger.ONE)); } public void queueEmpty() { - synchronized(_link.getLock()) - { - _queueEmpty = true; - } + _queueEmpty = true; } public void flowStateChanged() { - synchronized(_link.getLock()) + ProtocolEngine protocolEngine = getSession().getConnection(); + if (isFlowSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting()) { - ProtocolEngine protocolEngine = getSession().getConnection().getAmqpConnection(); - if(isFlowSuspended() && getEndpoint() != null && !protocolEngine.isTransportBlockedForWriting()) - { - updateState(State.SUSPENDED, State.ACTIVE); - _transactionId = _link.getTransactionId(); - } + updateState(State.SUSPENDED, State.ACTIVE); + _transactionId = _link.getTransactionId(); } } @@ -552,16 +535,13 @@ class ConsumerTarget_1_0 extends Abstrac @Override protected void processStateChanged() { - synchronized (_link.getLock()) + if(_queueEmpty) { - if(_queueEmpty) - { - _queueEmpty = false; + _queueEmpty = false; - if(_link.drained()) - { - updateState(State.ACTIVE, State.SUSPENDED); - } + if(_link.drained()) + { + updateState(State.ACTIVE, State.SUSPENDED); } } } @@ -569,10 +549,7 @@ class ConsumerTarget_1_0 extends Abstrac @Override protected boolean hasStateChanged() { - synchronized (_link.getLock()) - { - return _queueEmpty; - } + return _queueEmpty; } @Override Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Container.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Container.java Fri Apr 15 10:10:16 2016 @@ -19,7 +19,7 @@ * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; import java.lang.management.ManagementFactory; import java.net.InetAddress; Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/Delivery.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/Delivery.java Fri Apr 15 10:10:16 2016 @@ -17,14 +17,11 @@ * under the License. */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.UnsignedInteger; -import org.apache.qpid.amqp_1_0.type.transport.Transfer; - -import java.util.ArrayList; -import java.util.List; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.UnsignedInteger; +import org.apache.qpid.server.protocol.v1_0.type.transport.Transfer; public class Delivery { Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/DeliveryStateHandler.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/DeliveryStateHandler.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/DeliveryStateHandler.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/DeliveryStateHandler.java Fri Apr 15 10:10:16 2016 @@ -17,10 +17,10 @@ * under the License. */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; -import org.apache.qpid.amqp_1_0.type.Binary; -import org.apache.qpid.amqp_1_0.type.DeliveryState; +import org.apache.qpid.server.protocol.v1_0.type.Binary; +import org.apache.qpid.server.protocol.v1_0.type.DeliveryState; public interface DeliveryStateHandler { Copied: qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java (from r1739261, qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ErrorHandler.java) URL: http://svn.apache.org/viewvc/qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java?p2=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java&p1=qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ErrorHandler.java&r1=1739261&r2=1739270&rev=1739270&view=diff ============================================================================== --- qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/amqp_1_0/transport/ErrorHandler.java (original) +++ qpid/java/trunk/broker-plugins/amqp-1-0-protocol/src/main/java/org/apache/qpid/server/protocol/v1_0/ErrorHandler.java Fri Apr 15 10:10:16 2016 @@ -18,10 +18,10 @@ * under the License. * */ -package org.apache.qpid.amqp_1_0.transport; +package org.apache.qpid.server.protocol.v1_0; public interface ErrorHandler { - void handleError(org.apache.qpid.amqp_1_0.type.transport.Error error); + void handleError(org.apache.qpid.server.protocol.v1_0.type.transport.Error error); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
