http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonConsumer.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonConsumer.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonConsumer.java deleted file mode 100644 index 4bf52a7..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonConsumer.java +++ /dev/null @@ -1,314 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton; - -import java.util.Map; - -import org.apache.qpid.proton.amqp.DescribedType; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Accepted; -import org.apache.qpid.proton.amqp.messaging.Modified; -import org.apache.qpid.proton.amqp.messaging.Rejected; -import org.apache.qpid.proton.amqp.messaging.Released; -import org.apache.qpid.proton.amqp.transport.DeliveryState; -import org.apache.qpid.proton.amqp.transport.SenderSettleMode; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.jms.EncodedMessage; -import org.hornetq.api.core.SimpleString; -import org.hornetq.core.client.impl.ClientConsumerImpl; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException; -import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.QueueQueryResult; -import org.hornetq.core.server.ServerMessage; - -/** - * A this is a wrapper around a HornetQ ServerConsumer for handling outgoing messages and incoming acks via a Proton Sender - * - * @author <a href="mailto:[email protected]">Andy Taylor</a> - */ -public class ProtonConsumer implements ProtonDeliveryHandler -{ - private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector"); - private static final Symbol COPY = Symbol.valueOf("copy"); - private final ProtonSession protonSession; - private final HornetQServer server; - private final Sender sender; - private final ProtonRemotingConnection connection; - private final ProtonProtocolManager protonProtocolManager; - private long consumerID; - private boolean closed = false; - private long forcedDeliveryCount = 0; - private boolean forcingDelivery = false; - private boolean receivedForcedDelivery = true; - - public ProtonConsumer(ProtonRemotingConnection connection, Sender sender, ProtonSession protonSession, HornetQServer server, - ProtonProtocolManager protonProtocolManager) - { - this.connection = connection; - this.sender = sender; - this.protonSession = protonSession; - this.server = server; - this.protonProtocolManager = protonProtocolManager; - } - - /* - * start the session - * */ - public void start() throws HornetQAMQPException - { - protonSession.getServerSession().start(); - - //todo add flow control - try - { - protonSession.getServerSession().receiveConsumerCredits(consumerID, -1); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage()); - } - } - - /* - * create the actual underlying HornetQ Server Consumer - * */ - public void init() throws HornetQAMQPException - { - org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource(); - - SimpleString queue; - - consumerID = server.getStorageManager().generateUniqueID(); - - SimpleString selector = null; - Map filter = source.getFilter(); - if (filter != null) - { - DescribedType value = (DescribedType) filter.get(SELECTOR); - if (value != null) - { - selector = new SimpleString(value.getDescribed().toString()); - } - } - - if (source.getDynamic()) - { - //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and - // will be deleted on closing of the session - queue = new SimpleString(java.util.UUID.randomUUID().toString()); - try - { - protonSession.getServerSession().createQueue(queue, queue, null, true, false); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); - } - source.setAddress(queue.toString()); - } - else - { - //if not dynamic then we use the targets address as the address to forward the messages to, however there has to - //be a queue bound to it so we nee to check this. - String address = source.getAddress(); - if (address == null) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet(); - } - - queue = new SimpleString(source.getAddress()); - QueueQueryResult queryResult; - try - { - queryResult = protonSession.getServerSession().executeQueueQuery(new SimpleString(address)); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorFindingTemporaryQueue(e.getMessage()); - } - if (!queryResult.isExists()) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); - } - } - boolean browseOnly = source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); - try - { - protonSession.getServerSession().createConsumer(consumerID, queue, selector, browseOnly); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingHornetQConsumer(e.getMessage()); - } - } - - /* - * close the session - * */ - public synchronized void close() throws HornetQAMQPException - { - closed = true; - protonSession.removeConsumer(consumerID); - } - - public long getConsumerID() - { - return consumerID; - } - - /* - * handle an out going message from HornetQ, send via the Proton Sender - * */ - public synchronized int handleDelivery(ServerMessage message, int deliveryCount) - { - if (closed) - { - return 0; - } - if (message.containsProperty(ClientConsumerImpl.FORCED_DELIVERY_MESSAGE)) - { - if (forcingDelivery) - { - sender.drained(); - } - else - { - receivedForcedDelivery = true; - forcingDelivery = false; - } - return 0; - } - //if we get here then a forced delivery has pushed some messages thru and we continue - if (forcingDelivery) - { - forcingDelivery = false; - } - //presettle means we can ack the message on the proton side before we send it, i.e. for browsers - boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; - //we only need a tag if we are going to ack later - byte[] tag = preSettle ? new byte[0] : protonSession.getTag(); - //encode the message - EncodedMessage encodedMessage = ProtonUtils.OUTBOUND.transform(message, deliveryCount); - //now handle the delivery - protonProtocolManager.handleDelivery(sender, tag, encodedMessage, message, connection, preSettle); - - return encodedMessage.getLength(); - } - - @Override - /* - * handle an incoming Ack from Proton, basically pass to HornetQ to handle - * */ - public void onMessage(Delivery delivery) throws HornetQAMQPException - { - ServerMessage message = (ServerMessage) delivery.getContext(); - - boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; - - - DeliveryState remoteState = delivery.getRemoteState(); - - if (remoteState != null) - { - if (remoteState instanceof Accepted) - { - //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order - // from proton, a perf hit but a must - try - { - protonSession.getServerSession().individualAcknowledge(consumerID, message.getMessageID()); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.getMessageID(), e.getMessage()); - } - } - else if (remoteState instanceof Released) - { - try - { - protonSession.getServerSession().individualCancel(consumerID, message.getMessageID(), false); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.getMessageID(), e.getMessage()); - } - } - else if (remoteState instanceof Rejected || remoteState instanceof Modified) - { - try - { - protonSession.getServerSession().individualCancel(consumerID, message.getMessageID(), true); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.getMessageID(), e.getMessage()); - } - } - - synchronized (connection.getDeliveryLock()) - { - delivery.settle(); - } - //todo add tag caching - if (!preSettle) - { - protonSession.replaceTag(delivery.getTag()); - } - sender.offer(1); - } - else - { - //todo not sure if we need to do anything here - } - } - - /* - * check the state of the consumer, i.e. are there any more messages. only really needed for browsers? - * */ - public synchronized void checkState() - { - if (!forcingDelivery && receivedForcedDelivery) - { - try - { - forcingDelivery = true; - receivedForcedDelivery = false; - protonSession.getServerSession().forceConsumerDelivery(consumerID, forcedDeliveryCount++); - } - catch (Exception e) - { - e.printStackTrace(); - } - } - } - - public Sender getSender() - { - return sender; - } - - private String formatTag(byte[] tag) - { - StringBuffer sb = new StringBuffer(); - for (byte b : tag) - { - sb.append(b).append(":"); - } - return sb.toString(); - } - - int x = 5; -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonDeliveryHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonDeliveryHandler.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonDeliveryHandler.java deleted file mode 100644 index 9b7a605..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonDeliveryHandler.java +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton; - -import org.apache.qpid.proton.engine.Delivery; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * <p/> - * An interface to handle deliveries, either messages, acks or transaction calls - */ -public interface ProtonDeliveryHandler -{ - void onMessage(Delivery delivery) throws HornetQAMQPException; - - void checkState(); - - void close() throws HornetQAMQPException; -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProducer.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProducer.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProducer.java deleted file mode 100644 index 71f3a83..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProducer.java +++ /dev/null @@ -1,142 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Rejected; -import org.apache.qpid.proton.amqp.messaging.Target; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Receiver; -import org.hornetq.api.core.HornetQBuffer; -import org.hornetq.api.core.SimpleString; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException; -import org.hornetq.core.server.QueueQueryResult; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * <p/> - * handles incoming messages via a Proton Receiver and forwards them to HornetQ - */ -public class ProtonProducer implements ProtonDeliveryHandler -{ - private final ProtonRemotingConnection connection; - - private final ProtonSession protonSession; - - private final ProtonProtocolManager protonProtocolManager; - - private final Receiver receiver; - - private final String address; - - private HornetQBuffer buffer; - - public ProtonProducer(ProtonRemotingConnection connection, ProtonSession protonSession, ProtonProtocolManager protonProtocolManager, Receiver receiver) - { - this.connection = connection; - this.protonSession = protonSession; - this.protonProtocolManager = protonProtocolManager; - this.receiver = receiver; - this.address = ((Target) receiver.getRemoteTarget()).getAddress(); - buffer = connection.createBuffer(1024); - } - - /* - * called when Proton receives a message to be delivered via a Delivery. - * - * This may be called more than once per deliver so we have to cache the buffer until we have received it all. - * - * */ - public void onMessage(Delivery delivery) throws HornetQAMQPException - { - Receiver receiver; - try - { - receiver = ((Receiver) delivery.getLink()); - - if (!delivery.isReadable()) - { - return; - } - - protonProtocolManager.handleMessage(receiver, buffer, delivery, connection, protonSession, address); - - } - catch (Exception e) - { - e.printStackTrace(); - Rejected rejected = new Rejected(); - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(Symbol.valueOf("failed")); - condition.setDescription(e.getMessage()); - rejected.setError(condition); - delivery.disposition(rejected); - } - } - - @Override - public void checkState() - { - //no op - } - - @Override - public void close() throws HornetQAMQPException - { - protonSession.removeProducer(receiver); - } - - public void init() throws HornetQAMQPException - { - org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); - if (target.getDynamic()) - { - //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and - // will be deleted on closing of the session - SimpleString queue = new SimpleString(java.util.UUID.randomUUID().toString()); - try - { - protonSession.getServerSession().createQueue(queue, queue, null, true, false); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); - } - target.setAddress(queue.toString()); - } - else - { - //if not dynamic then we use the targets address as the address to forward the messages to, however there has to - //be a queue bound to it so we nee to check this. - String address = target.getAddress(); - if (address == null) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet(); - } - try - { - QueueQueryResult queryResult = protonSession.getServerSession().executeQueueQuery(new SimpleString(address)); - if (!queryResult.isExists()) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); - } - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorFindingTemporaryQueue(e.getMessage()); - } - } - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java index ae06aa1..8086925 100644 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonProtocolManager.java @@ -13,46 +13,25 @@ package org.hornetq.core.protocol.proton; -import java.nio.ByteBuffer; -import java.nio.charset.StandardCharsets; -import java.util.EnumSet; +import java.util.concurrent.Executor; import io.netty.channel.ChannelPipeline; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.transaction.Coordinator; -import org.apache.qpid.proton.amqp.transaction.Declare; -import org.apache.qpid.proton.amqp.transaction.Declared; -import org.apache.qpid.proton.amqp.transaction.Discharge; -import org.apache.qpid.proton.amqp.transport.AmqpError; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Link; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.impl.LinkImpl; -import org.apache.qpid.proton.engine.impl.TransportImpl; -import org.apache.qpid.proton.jms.EncodedMessage; -import org.apache.qpid.proton.message.impl.MessageImpl; import org.hornetq.api.core.HornetQBuffer; -import org.hornetq.api.core.SimpleString; -import org.hornetq.core.journal.IOAsyncTask; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPIllegalStateException; +import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.core.protocol.proton.converter.ProtonMessageConverter; +import org.hornetq.core.protocol.proton.plug.HornetQProtonConnectionCallback; import org.hornetq.core.remoting.impl.netty.NettyServerConnection; import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.ServerMessage; -import org.hornetq.core.server.impl.ServerMessageImpl; import org.hornetq.core.server.management.Notification; import org.hornetq.core.server.management.NotificationListener; -import org.hornetq.core.transaction.Transaction; import org.hornetq.spi.core.protocol.ConnectionEntry; +import org.hornetq.spi.core.protocol.MessageConverter; import org.hornetq.spi.core.protocol.ProtocolManager; import org.hornetq.spi.core.protocol.RemotingConnection; import org.hornetq.spi.core.remoting.Acceptor; import org.hornetq.spi.core.remoting.Connection; -import org.hornetq.utils.UUIDGenerator; +import org.proton.plug.AMQPServerConnectionContext; +import org.proton.plug.context.server.ProtonServerConnectionContextFactory; /** * A proton protocol manager, basically reads the Proton Input and maps proton resources to HornetQ resources @@ -61,272 +40,83 @@ import org.hornetq.utils.UUIDGenerator; */ public class ProtonProtocolManager implements ProtocolManager, NotificationListener { - public static final EnumSet<EndpointState> UNINITIALIZED = EnumSet.of(EndpointState.UNINITIALIZED); - - public static final EnumSet<EndpointState> INITIALIZED = EnumSet.complementOf(UNINITIALIZED); - - public static final EnumSet<EndpointState> ACTIVE = EnumSet.of(EndpointState.ACTIVE); - - public static final EnumSet<EndpointState> CLOSED = EnumSet.of(EndpointState.CLOSED); - - public static final EnumSet<EndpointState> ANY_ENDPOINT_STATE = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED); - private final HornetQServer server; + private MessageConverter protonConverter; + public ProtonProtocolManager(HornetQServer server) { this.server = server; + this.protonConverter = new ProtonMessageConverter(server.getStorageManager()); } - @Override - public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection connection) + public HornetQServer getServer() { - ProtonRemotingConnection conn = new ProtonRemotingConnection(acceptorUsed, connection, this); - //todo do we have a ttl? - return new ConnectionEntry(conn, null, System.currentTimeMillis(), 1 * 60 * 1000); + return server; } - @Override - public void removeHandler(String name) - { - } @Override - public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer) + public MessageConverter getConverter() { - ProtonRemotingConnection protonRemotingConnection = (ProtonRemotingConnection) connection; - protonRemotingConnection.setDataReceived(); - byte[] frame = new byte[buffer.readableBytes()]; - buffer.readBytes(frame); - - protonRemotingConnection.handleFrame(frame); + return protonConverter; } @Override - public void addChannelHandlers(ChannelPipeline pipeliner) + public void onNotification(Notification notification) { - //we don't need any we do our own decoding - } - @Override - public boolean isProtocol(byte[] array) - { - String startFrame = new String(array, StandardCharsets.US_ASCII); - return startFrame.startsWith("AMQP"); } @Override - public void handshake(NettyServerConnection connection, HornetQBuffer buffer) + public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) { - //todo move handshake to here - } + HornetQProtonConnectionCallback connectionCallback = new HornetQProtonConnectionCallback(this, remotingConnection); - @Override - public void onNotification(Notification notification) - { - //noop - } + AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(connectionCallback); - public ServerMessageImpl createServerMessage() - { - return new ServerMessageImpl(server.getStorageManager().generateUniqueID(), 512); - } + Executor executor = server.getExecutorFactory().getExecutor(); - public void handleMessage(final Receiver receiver, HornetQBuffer buffer, final Delivery delivery, - final ProtonRemotingConnection connection, ProtonSession protonSession, - String address) throws Exception - { - synchronized (connection.getDeliveryLock()) - { - int count; - byte[] data = new byte[1024]; - //todo an optimisation here would be to only use the buffer if we need more that one recv - while ((count = receiver.recv(data, 0, data.length)) > 0) - { - buffer.writeBytes(data, 0, count); - } + HornetQProtonRemotingConnection delegate = new HornetQProtonRemotingConnection(this, amqpConnection, remotingConnection, executor); + + connectionCallback.setProtonConnectionDelegate(delegate); - // we keep reading until we get end of messages, i.e. -1 - if (count == 0) - { - return; - } - receiver.advance(); - byte[] bytes = new byte[buffer.readableBytes()]; - buffer.readBytes(bytes); - buffer.clear(); - EncodedMessage encodedMessage = new EncodedMessage(delivery.getMessageFormat(), bytes, 0, bytes.length); - ServerMessage message = ProtonUtils.INBOUND.transform(connection, encodedMessage); - //use the address on the receiver if not null, if null let's hope it was set correctly on the message - if (address != null) - { - message.setAddress(new SimpleString(address)); - } - //todo decide on whether to deliver direct - protonSession.getServerSession().send(message, true); - server.getStorageManager().afterCompleteOperations(new IOAsyncTask() - { - @Override - public void done() - { - synchronized (connection.getDeliveryLock()) - { - receiver.flow(1); - delivery.settle(); - } - } + ConnectionEntry entry = new ConnectionEntry(delegate, executor, + System.currentTimeMillis(), HornetQClient.DEFAULT_CONNECTION_TTL); - @Override - public void onError(int errorCode, String errorMessage) - { - receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); - } - }); - } + return entry; } - public void handleDelivery(final Sender sender, byte[] tag, EncodedMessage encodedMessage, ServerMessage message, ProtonRemotingConnection connection, final boolean preSettle) + @Override + public void removeHandler(String name) { - synchronized (connection.getDeliveryLock()) - { - final Delivery delivery; - delivery = sender.delivery(tag, 0, tag.length); - delivery.setContext(message); - sender.send(encodedMessage.getArray(), 0, encodedMessage.getLength()); - server.getStorageManager().afterCompleteOperations(new IOAsyncTask() - { - @Override - public void done() - { - if (preSettle) - { - delivery.settle(); - ((LinkImpl) sender).addCredit(1); - } - else - { - sender.advance(); - } - } - @Override - public void onError(int errorCode, String errorMessage) - { - sender.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); - } - }); - } - connection.write(); } - void handleNewLink(Link link, ProtonSession protonSession) throws HornetQAMQPException + @Override + public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer) { - link.setSource(link.getRemoteSource()); - link.setTarget(link.getRemoteTarget()); - if (link instanceof Receiver) - { - Receiver receiver = (Receiver) link; - if (link.getRemoteTarget() instanceof Coordinator) - { - protonSession.initialise(true); - Coordinator coordinator = (Coordinator) link.getRemoteTarget(); - protonSession.addTransactionHandler(coordinator, receiver); - } - else - { - protonSession.initialise(false); - protonSession.addProducer(receiver); - //todo do this using the server session flow control - receiver.flow(100); - } - } - else - { - protonSession.initialise(false); - Sender sender = (Sender) link; - protonSession.addConsumer(sender); - sender.offer(1); - } - } + HornetQProtonRemotingConnection protonConnection = (HornetQProtonRemotingConnection)connection; - public ProtonSession createSession(ProtonRemotingConnection protonConnection, TransportImpl protonTransport) throws HornetQAMQPException - { - String name = UUIDGenerator.getInstance().generateStringUUID(); - return new ProtonSession(name, protonConnection, this, server.getStorageManager() - .newContext(server.getExecutorFactory().getExecutor()), server, protonTransport); + protonConnection.bufferReceived(protonConnection.getID(), buffer); } - void handleActiveLink(Link link) throws HornetQAMQPException + @Override + public void addChannelHandlers(ChannelPipeline pipeline) { - link.setSource(link.getRemoteSource()); - link.setTarget(link.getRemoteTarget()); - ProtonDeliveryHandler handler = (ProtonDeliveryHandler) link.getContext(); - handler.checkState(); + } - public void handleTransaction(Receiver receiver, HornetQBuffer buffer, Delivery delivery, ProtonSession protonSession) throws HornetQAMQPIllegalStateException + @Override + public boolean isProtocol(byte[] array) { - int count; - byte[] data = new byte[1024]; - //todo an optimisation here would be to only use the buffer if we need more that one recv - while ((count = receiver.recv(data, 0, data.length)) > 0) - { - buffer.writeBytes(data, 0, count); - } - - // we keep reading until we get end of messages, i.e. -1 - if (count == 0) - { - return; - } - receiver.advance(); - byte[] bytes = new byte[buffer.readableBytes()]; - buffer.readBytes(bytes); - buffer.clear(); - MessageImpl msg = new MessageImpl(); - msg.decode(bytes, 0, bytes.length); - Object action = ((AmqpValue) msg.getBody()).getValue(); - if (action instanceof Declare) - { - Transaction tx = protonSession.getServerSession().getCurrentTransaction(); - Declared declared = new Declared(); - declared.setTxnId(new Binary(longToBytes(tx.getID()))); - delivery.disposition(declared); - delivery.settle(); - } - else if (action instanceof Discharge) - { - Discharge discharge = (Discharge) action; - if (discharge.getFail()) - { - try - { - protonSession.getServerSession().rollback(false); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage()); - } - } - else - { - try - { - protonSession.getServerSession().commit(); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage()); - } - } - delivery.settle(); - } + return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P'; } - public byte[] longToBytes(long x) + @Override + public void handshake(NettyServerConnection connection, HornetQBuffer buffer) { - ByteBuffer buffer = ByteBuffer.allocate(8); - buffer.putLong(x); - return buffer.array(); } + + } http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java deleted file mode 100644 index fab1886..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonRemotingConnection.java +++ /dev/null @@ -1,670 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton; - -import java.util.ArrayList; -import java.util.Collections; -import java.util.HashMap; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CopyOnWriteArrayList; - -import org.apache.qpid.proton.amqp.transport.AmqpError; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.EndpointState; -import org.apache.qpid.proton.engine.Sasl; -import org.apache.qpid.proton.engine.Session; -import org.apache.qpid.proton.engine.impl.ConnectionImpl; -import org.apache.qpid.proton.engine.impl.DeliveryImpl; -import org.apache.qpid.proton.engine.impl.LinkImpl; -import org.apache.qpid.proton.engine.impl.TransportImpl; -import org.hornetq.api.core.HornetQBuffer; -import org.hornetq.api.core.HornetQException; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException; -import org.hornetq.core.remoting.CloseListener; -import org.hornetq.core.remoting.FailureListener; -import org.hornetq.core.server.HornetQServerLogger; -import org.hornetq.core.server.impl.ServerMessageImpl; -import org.hornetq.spi.core.protocol.RemotingConnection; -import org.hornetq.spi.core.remoting.Acceptor; -import org.hornetq.spi.core.remoting.Connection; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - */ -public class ProtonRemotingConnection implements RemotingConnection -{ - private TransportImpl protonTransport; - - private ConnectionImpl protonConnection; - - private final Map<Object, ProtonSession> sessions = new HashMap<Object, ProtonSession>(); - - /* - * Proton is not thread safe therefore we need to make sure we aren't updating the deliveries on the connection from - * the input of proton transport and asynchronously back from HornetQ at the same time. - * (this probably needs to be fixed on Proton) - * */ - private final Object deliveryLock = new Object(); - - private boolean destroyed = false; - - private String clientId; - - private final Acceptor acceptorUsed; - - private final long creationTime; - - private final Connection connection; - - private final ProtonProtocolManager protonProtocolManager; - - private final List<FailureListener> failureListeners = new CopyOnWriteArrayList<FailureListener>(); - - private final List<CloseListener> closeListeners = new CopyOnWriteArrayList<CloseListener>(); - - private boolean initialised = false; - - private static final byte[] VERSION_HEADER = new byte[]{ - 'A', 'M', 'Q', 'P', 0, 1, 0, 0 - }; - private Sasl sasl; - - private String username; - - private String passcode; - - private boolean dataReceived; - - public ProtonRemotingConnection(Acceptor acceptorUsed, Connection connection, ProtonProtocolManager protonProtocolManager) - { - this.protonProtocolManager = protonProtocolManager; - - this.connection = connection; - - this.creationTime = System.currentTimeMillis(); - - this.acceptorUsed = acceptorUsed; - - this.protonTransport = new TransportImpl(); - - this.protonConnection = new ConnectionImpl(); - - protonTransport.bind(protonConnection); - } - - @Override - public Object getID() - { - return connection.getID(); - } - - @Override - public long getCreationTime() - { - return creationTime; - } - - @Override - public String getRemoteAddress() - { - return connection.getRemoteAddress(); - } - - @Override - public void addFailureListener(final FailureListener listener) - { - if (listener == null) - { - throw new IllegalStateException("FailureListener cannot be null"); - } - - failureListeners.add(listener); - } - - @Override - public boolean removeFailureListener(final FailureListener listener) - { - if (listener == null) - { - throw new IllegalStateException("FailureListener cannot be null"); - } - - return failureListeners.remove(listener); - } - - @Override - public void addCloseListener(final CloseListener listener) - { - if (listener == null) - { - throw new IllegalStateException("CloseListener cannot be null"); - } - - closeListeners.add(listener); - } - - @Override - public boolean removeCloseListener(final CloseListener listener) - { - if (listener == null) - { - throw new IllegalStateException("CloseListener cannot be null"); - } - - return closeListeners.remove(listener); - } - - @Override - public List<CloseListener> removeCloseListeners() - { - List<CloseListener> ret = new ArrayList<CloseListener>(closeListeners); - - closeListeners.clear(); - - return ret; - } - - @Override - public List<FailureListener> removeFailureListeners() - { - List<FailureListener> ret = new ArrayList<FailureListener>(failureListeners); - - failureListeners.clear(); - - return ret; - } - - @Override - public void setCloseListeners(List<CloseListener> listeners) - { - closeListeners.clear(); - - closeListeners.addAll(listeners); - } - - @Override - public void setFailureListeners(final List<FailureListener> listeners) - { - failureListeners.clear(); - - failureListeners.addAll(listeners); - } - - public List<FailureListener> getFailureListeners() - { - // we do not return the listeners otherwise the remoting service - // would NOT destroy the connection. - return Collections.emptyList(); - } - - @Override - public HornetQBuffer createBuffer(int size) - { - return connection.createBuffer(size); - } - - @Override - public void fail(HornetQException me) - { - HornetQServerLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); - // Then call the listeners - callFailureListeners(me); - - callClosingListeners(); - - destroyed = true; - - connection.close(); - } - - @Override - public void fail(HornetQException me, String scaleDownTargetNodeID) - { - fail(me); - } - - @Override - public void destroy() - { - destroyed = true; - - connection.close(); - - synchronized (deliveryLock) - { - callClosingListeners(); - } - } - - @Override - public Connection getTransportConnection() - { - return connection; - } - - @Override - public boolean isClient() - { - return false; - } - - @Override - public boolean isDestroyed() - { - return destroyed; - } - - @Override - public void disconnect(final boolean criticalError) - { - disconnect(null, criticalError); - } - - @Override - public void disconnect(final String scaleDownNodeID, final boolean criticalError) - { - destroy(); - } - - @Override - public boolean checkDataReceived() - { - boolean res = dataReceived; - - dataReceived = false; - - return res; - } - - @Override - public void flush() - { - //no op - } - - @Override - public void bufferReceived(Object connectionID, HornetQBuffer buffer) - { - if (initialised) - { - protonProtocolManager.handleBuffer(this, buffer); - } - else - { - byte[] prot = new byte[4]; - buffer.readBytes(prot); - String headerProt = new String(prot); - checkProtocol(headerProt); - int protocolId = buffer.readByte(); - int major = buffer.readByte(); - int minor = buffer.readByte(); - int revision = buffer.readByte(); - if (!(checkVersion(major, minor, revision) && checkProtocol(headerProt))) - { - protonTransport.close(); - protonConnection.close(); - write(); - destroy(); - return; - } - if (protocolId == 3) - { - sasl = protonTransport.sasl(); - sasl.setMechanisms(new String[]{"ANONYMOUS", "PLAIN"}); - sasl.server(); - } - - ///its only 8 bytes, there's always going to always be enough in the buffer, isn't there? - protonTransport.input(VERSION_HEADER, 0, VERSION_HEADER.length); - - write(); - - initialised = true; - - if (buffer.readableBytes() > 0) - { - protonProtocolManager.handleBuffer(this, buffer.copy(buffer.readerIndex(), buffer.readableBytes())); - } - - if (sasl != null) - { - if (sasl.getRemoteMechanisms().length > 0) - { - if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) - { - byte[] data = new byte[sasl.pending()]; - sasl.recv(data, 0, data.length); - sasl.done(Sasl.SaslOutcome.PN_SASL_OK); - sasl = null; - } - else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) - { - sasl.done(Sasl.SaslOutcome.PN_SASL_OK); - sasl = null; - } - } - - write(); - } - } - } - - private boolean checkProtocol(String headerProt) - { - boolean ok = "AMQP".equals(headerProt); - if (!ok) - { - protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, "Unknown Protocol " + headerProt)); - } - return ok; - } - - private boolean checkVersion(int major, int minor, int revision) - { - if (major < 1) - { - protonConnection.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, - "Version not supported " + major + "." + minor + "." + revision)); - return false; - } - return true; - } - - void write() - { - synchronized (deliveryLock) - { - int size = 1024 * 64; - byte[] data = new byte[size]; - boolean done = false; - while (!done) - { - int count = protonTransport.output(data, 0, size); - if (count > 0) - { - final HornetQBuffer buffer; - buffer = connection.createBuffer(count); - buffer.writeBytes(data, 0, count); - connection.write(buffer); - } - else - { - done = true; - } - } - } - } - - public String getLogin() - { - return username; - } - - public String getPasscode() - { - return passcode; - } - - public ServerMessageImpl createServerMessage() - { - return protonProtocolManager.createServerMessage(); - } - - protected synchronized void setDataReceived() - { - dataReceived = true; - } - - public void handleFrame(byte[] frame) - { - int read = 0; - while (read < frame.length) - { - synchronized (deliveryLock) - { - try - { - int count = protonTransport.input(frame, read, frame.length - read); - read += count; - } - catch (Exception e) - { - protonTransport.setCondition(new ErrorCondition(AmqpError.DECODE_ERROR, HornetQAMQPProtocolMessageBundle.BUNDLE.decodeError())); - write(); - protonConnection.close(); - return; - } - } - - if (sasl != null) - { - if (sasl.getRemoteMechanisms().length > 0) - { - if ("PLAIN".equals(sasl.getRemoteMechanisms()[0])) - { - byte[] data = new byte[sasl.pending()]; - sasl.recv(data, 0, data.length); - setUserPass(data); - sasl.done(Sasl.SaslOutcome.PN_SASL_OK); - sasl = null; - } - else if ("ANONYMOUS".equals(sasl.getRemoteMechanisms()[0])) - { - sasl.done(Sasl.SaslOutcome.PN_SASL_OK); - sasl = null; - } - } - } - - //handle opening of connection - if (protonConnection.getLocalState() == EndpointState.UNINITIALIZED && protonConnection.getRemoteState() != EndpointState.UNINITIALIZED) - { - clientId = protonConnection.getRemoteContainer(); - protonConnection.open(); - write(); - } - - //handle any new sessions - Session session = protonConnection.sessionHead(ProtonProtocolManager.UNINITIALIZED, ProtonProtocolManager.INITIALIZED); - while (session != null) - { - try - { - ProtonSession protonSession = getSession(session); - session.setContext(protonSession); - session.open(); - - } - catch (HornetQAMQPException e) - { - protonConnection.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - session.close(); - } - write(); - session = protonConnection.sessionHead(ProtonProtocolManager.UNINITIALIZED, ProtonProtocolManager.INITIALIZED); - } - - //handle new link (producer or consumer - LinkImpl link = (LinkImpl) protonConnection.linkHead(ProtonProtocolManager.UNINITIALIZED, ProtonProtocolManager.INITIALIZED); - while (link != null) - { - try - { - protonProtocolManager.handleNewLink(link, getSession(link.getSession())); - } - catch (HornetQAMQPException e) - { - link.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - link.close(); - } - link = (LinkImpl) protonConnection.linkHead(ProtonProtocolManager.UNINITIALIZED, ProtonProtocolManager.INITIALIZED); - } - - //handle any deliveries - DeliveryImpl delivery; - - Iterator<DeliveryImpl> iterator = protonConnection.getWorkSequence(); - - while (iterator.hasNext()) - { - delivery = iterator.next(); - ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext(); - try - { - handler.onMessage(delivery); - } - catch (HornetQAMQPException e) - { - delivery.getLink().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - } - } - - link = (LinkImpl) protonConnection.linkHead(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.ANY_ENDPOINT_STATE); - while (link != null) - { - try - { - protonProtocolManager.handleActiveLink(link); - } - catch (HornetQAMQPException e) - { - link.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - } - link = (LinkImpl) link.next(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.ANY_ENDPOINT_STATE); - } - - link = (LinkImpl) protonConnection.linkHead(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.CLOSED); - while (link != null) - { - try - { - ((ProtonDeliveryHandler) link.getContext()).close(); - } - catch (HornetQAMQPException e) - { - link.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - } - link.close(); - - link = (LinkImpl) link.next(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.CLOSED); - } - - session = protonConnection.sessionHead(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.CLOSED); - while (session != null) - { - ProtonSession protonSession = (ProtonSession) session.getContext(); - protonSession.close(); - sessions.remove(session); - session.close(); - session = session.next(ProtonProtocolManager.ACTIVE, ProtonProtocolManager.CLOSED); - } - - if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) - { - for (ProtonSession protonSession : sessions.values()) - { - protonSession.close(); - } - sessions.clear(); - protonConnection.close(); - write(); - destroy(); - } - - write(); - } - } - - private void setUserPass(byte[] data) - { - String bytes = new String(data); - String[] credentials = bytes.split(Character.toString((char) 0)); - int offSet = 0; - if (credentials.length > 0) - { - if (credentials[0].length() == 0) - { - offSet = 1; - } - - if (credentials.length >= offSet) - { - username = credentials[offSet]; - } - if (credentials.length >= (offSet + 1)) - { - passcode = credentials[offSet + 1]; - } - } - } - - private ProtonSession getSession(Session realSession) throws HornetQAMQPException - { - ProtonSession protonSession = sessions.get(realSession); - if (protonSession == null) - { - protonSession = protonProtocolManager.createSession(this, protonTransport); - sessions.put(realSession, protonSession); - } - return protonSession; - - } - - private void callFailureListeners(final HornetQException me) - { - final List<FailureListener> listenersClone = new ArrayList<FailureListener>(failureListeners); - - for (final FailureListener listener : listenersClone) - { - try - { - listener.connectionFailed(me, false); - } - catch (final Throwable t) - { - // Failure of one listener to execute shouldn't prevent others - // from - // executing - HornetQServerLogger.LOGGER.errorCallingFailureListener(t); - } - } - } - - private void callClosingListeners() - { - final List<CloseListener> listenersClone = new ArrayList<CloseListener>(closeListeners); - - for (final CloseListener listener : listenersClone) - { - try - { - listener.connectionClosed(); - } - catch (final Throwable t) - { - // Failure of one listener to execute shouldn't prevent others - // from - // executing - HornetQServerLogger.LOGGER.errorCallingFailureListener(t); - } - } - } - - public Object getDeliveryLock() - { - return deliveryLock; - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonSession.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonSession.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonSession.java deleted file mode 100644 index 6c46f2f..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonSession.java +++ /dev/null @@ -1,327 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton; - -import java.util.ArrayList; -import java.util.HashMap; -import java.util.List; -import java.util.Map; - -import org.apache.qpid.proton.amqp.transaction.Coordinator; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.engine.Sender; -import org.apache.qpid.proton.engine.impl.TransportImpl; -import org.hornetq.api.core.SimpleString; -import org.hornetq.api.core.client.HornetQClient; -import org.hornetq.core.persistence.OperationContext; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPInternalErrorException; -import org.hornetq.core.server.HornetQServer; -import org.hornetq.core.server.HornetQServerLogger; -import org.hornetq.core.server.ServerMessage; -import org.hornetq.core.server.ServerSession; -import org.hornetq.spi.core.protocol.SessionCallback; -import org.hornetq.spi.core.remoting.ReadyListener; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * 4/10/13 - */ -public class ProtonSession implements SessionCallback -{ - private final String name; - - private final ProtonRemotingConnection connection; - - private final HornetQServer server; - - private final TransportImpl protonTransport; - - private final ProtonProtocolManager protonProtocolManager; - - private ServerSession serverSession; - - private OperationContext context; - - //todo make this configurable - private int tagCacheSize = 1000; - - private long currentTag = 0; - - private final List<byte[]> tagCache = new ArrayList<byte[]>(); - - private Map<Object, ProtonProducer> producers = new HashMap<Object, ProtonProducer>(); - - private Map<Long, ProtonConsumer> consumers = new HashMap<Long, ProtonConsumer>(); - - private boolean closed = false; - - public ProtonSession(String name, ProtonRemotingConnection connection, ProtonProtocolManager protonProtocolManager, OperationContext operationContext, HornetQServer server, TransportImpl protonTransport) - { - this.name = name; - this.connection = connection; - context = operationContext; - this.server = server; - this.protonTransport = protonTransport; - this.protonProtocolManager = protonProtocolManager; - } - - public ServerSession getServerSession() - { - return serverSession; - } - - /* - * we need to initialise the actual server session when we receive the first linkas this tells us whether or not the - * session is transactional - * */ - public void initialise(boolean transacted) throws HornetQAMQPInternalErrorException - { - if (serverSession == null) - { - try - { - serverSession = server.createSession(name, - connection.getLogin(), - connection.getPasscode(), - HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, - connection, - !transacted, - !transacted, - false, - false, - null, - this); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingHornetQSession(e.getMessage()); - } - } - } - - @Override - public void sendProducerCreditsMessage(int credits, SimpleString address) - { - } - - @Override - public void sendProducerCreditsFailMessage(int credits, SimpleString address) - { - } - - @Override - public int sendMessage(ServerMessage message, long consumerID, int deliveryCount) - { - ProtonConsumer protonConsumer = consumers.get(consumerID); - if (protonConsumer != null) - { - return protonConsumer.handleDelivery(message, deliveryCount); - } - return 0; - } - - @Override - public int sendLargeMessage(ServerMessage message, long consumerID, long bodySize, int deliveryCount) - { - return 0; - } - - @Override - public int sendLargeMessageContinuation(long consumerID, byte[] body, boolean continues, boolean requiresResponse) - { - return 0; - } - - @Override - public void closed() - { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void addReadyListener(ReadyListener listener) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void removeReadyListener(ReadyListener listener) - { - //To change body of implemented methods use File | Settings | File Templates. - } - - @Override - public void disconnect(long consumerId, String queueName) - { - ProtonConsumer protonConsumer = consumers.remove(consumerId); - if (protonConsumer != null) - { - try - { - protonConsumer.close(); - } - catch (HornetQAMQPException e) - { - protonConsumer.getSender().setTarget(null); - protonConsumer.getSender().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - } - connection.write(); - } - } - - public OperationContext getContext() - { - return context; - } - - public void addProducer(Receiver receiver) throws HornetQAMQPException - { - try - { - ProtonProducer producer = new ProtonProducer(connection, this, protonProtocolManager, receiver); - producer.init(); - producers.put(receiver, producer); - receiver.setContext(producer); - receiver.open(); - } - catch (HornetQAMQPException e) - { - producers.remove(receiver); - receiver.setTarget(null); - receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - receiver.close(); - } - } - - - public void addTransactionHandler(Coordinator coordinator, Receiver receiver) - { - TransactionHandler transactionHandler = new TransactionHandler(connection, coordinator, protonProtocolManager, this); - receiver.setContext(transactionHandler); - receiver.open(); - receiver.flow(100); - } - - public void addConsumer(Sender sender) throws HornetQAMQPException - { - ProtonConsumer protonConsumer = new ProtonConsumer(connection, sender, this, server, protonProtocolManager); - - try - { - protonConsumer.init(); - consumers.put(protonConsumer.getConsumerID(), protonConsumer); - sender.setContext(protonConsumer); - sender.open(); - protonConsumer.start(); - } - catch (HornetQAMQPException e) - { - consumers.remove(protonConsumer.getConsumerID()); - sender.setSource(null); - sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); - sender.close(); - } - } - - public byte[] getTag() - { - synchronized (tagCache) - { - byte[] bytes; - if (tagCache.size() > 0) - { - bytes = tagCache.remove(0); - } - else - { - bytes = Long.toHexString(currentTag++).getBytes(); - } - return bytes; - } - } - - public void replaceTag(byte[] tag) - { - synchronized (tagCache) - { - if (tagCache.size() < tagCacheSize) - { - tagCache.add(tag); - } - } - } - - public void close() - { - if (closed) - { - return; - } - - for (ProtonProducer protonProducer : producers.values()) - { - try - { - protonProducer.close(); - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.errorClosingSession(e); - } - } - producers.clear(); - for (ProtonConsumer protonConsumer : consumers.values()) - { - try - { - protonConsumer.close(); - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.errorClosingConsumer(e); - } - } - consumers.clear(); - try - { - getServerSession().rollback(true); - getServerSession().close(false); - } - catch (Exception e) - { - HornetQServerLogger.LOGGER.errorClosingSession(e); - } - closed = true; - } - - public void removeConsumer(long consumerID) throws HornetQAMQPException - { - consumers.remove(consumerID); - try - { - getServerSession().closeConsumer(consumerID); - } - catch (Exception e) - { - throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorClosingConsumer(consumerID, e.getMessage()); - } - } - - public void removeProducer(Receiver receiver) - { - producers.remove(receiver); - } -}
