http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java new file mode 100644 index 0000000..95a7f61 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContext.java @@ -0,0 +1,97 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.client; + +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Session; +import org.proton.plug.AMQPClientConnectionContext; +import org.proton.plug.AMQPClientSessionContext; +import org.proton.plug.ClientSASL; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.context.AbstractConnectionContext; +import org.proton.plug.context.AbstractProtonSessionContext; +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.context.ProtonInitializable; +import org.proton.plug.util.FutureRunnable; + +/** + * @author Clebert Suconic + */ + +public class ProtonClientConnectionContext extends AbstractConnectionContext implements AMQPClientConnectionContext +{ + public ProtonClientConnectionContext(AMQPConnectionCallback connectionCallback) + { + super(connectionCallback); + } + + // Maybe a client interface? + public void clientOpen(ClientSASL sasl) throws Exception + { + FutureRunnable future = new FutureRunnable(1); + synchronized (handler.getLock()) + { + this.afterInit(future); + if (sasl != null) + { + handler.createClientSasl(sasl); + } + handler.getConnection().open(); + } + + flush(); + + waitWithTimeout(future); + } + + public AMQPClientSessionContext createClientSession() throws HornetQAMQPException + { + + FutureRunnable futureRunnable = new FutureRunnable(1); + ProtonClientSessionContext sessionImpl; + synchronized (handler.getLock()) + { + Session session = handler.getConnection().session(); + sessionImpl = (ProtonClientSessionContext) getSessionExtension(session); + sessionImpl.afterInit(futureRunnable); + session.open(); + } + + flush(); + waitWithTimeout(futureRunnable); + + return sessionImpl; + } + + @Override + protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws HornetQAMQPException + { + AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this); + AbstractProtonSessionContext protonSession = new ProtonClientSessionContext(sessionSPI, this, realSession); + + return protonSession; + + } + + @Override + protected void remoteLinkOpened(Link link) throws Exception + { + Object context = link.getContext(); + if (context != null && context instanceof ProtonInitializable) + { + ((ProtonInitializable) context).initialise(); + } + } +}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java new file mode 100644 index 0000000..11f8709 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientConnectionContextFactory.java @@ -0,0 +1,39 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.client; + +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.AMQPConnectionContextFactory; +import org.proton.plug.AMQPConnectionCallback; + +/** + * @author Clebert Suconic + */ + +public class ProtonClientConnectionContextFactory extends AMQPConnectionContextFactory +{ + private static final AMQPConnectionContextFactory theInstance = new ProtonClientConnectionContextFactory(); + + public static AMQPConnectionContextFactory getFactory() + { + return theInstance; + } + + public AMQPConnectionContext createConnection(AMQPConnectionCallback connectionCallback) + { + return new ProtonClientConnectionContext(connectionCallback); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java new file mode 100644 index 0000000..f0dc46a --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientContext.java @@ -0,0 +1,82 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.client; + +import java.util.concurrent.TimeUnit; + +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.proton.plug.AMQPClientSenderContext; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.context.AbstractConnectionContext; +import org.proton.plug.context.AbstractProtonContextSender; +import org.proton.plug.context.AbstractProtonSessionContext; +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.util.FutureRunnable; + +/** + * @author Clebert Suconic + */ + +public class ProtonClientContext extends AbstractProtonContextSender implements AMQPClientSenderContext +{ + + FutureRunnable catchUpRunnable = new FutureRunnable(); + + public ProtonClientContext(AbstractConnectionContext connection, Sender sender, AbstractProtonSessionContext protonSession, AMQPSessionCallback server) + { + super(connection, sender, protonSession, server); + } + + + @Override + public void onMessage(Delivery delivery) throws HornetQAMQPException + { + if (delivery.getRemoteState() instanceof Accepted) + { + if (delivery.getContext() instanceof FutureRunnable) + { + ((FutureRunnable) delivery.getContext()).countDown(); + } + } + } + + public void send(ProtonJMessage message) + { + if (sender.getSenderSettleMode() != SenderSettleMode.SETTLED) + { + catchUpRunnable.countUp(); + } + performSend(message, catchUpRunnable); + } + + + public boolean sync(long timeout, TimeUnit unit) + { + try + { + return catchUpRunnable.await(timeout, unit); + } + catch (InterruptedException e) + { + Thread.currentThread().interrupt(); + return false; + } + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java new file mode 100644 index 0000000..ebf7f7d --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientReceiverContext.java @@ -0,0 +1,89 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.client; + +import java.util.concurrent.LinkedBlockingDeque; +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.proton.plug.AMQPClientReceiverContext; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.context.AbstractConnectionContext; +import org.proton.plug.context.AbstractProtonReceiverContext; +import org.proton.plug.context.AbstractProtonSessionContext; +import org.proton.plug.exceptions.HornetQAMQPException; + +import static org.proton.plug.util.DeliveryUtil.readDelivery; +import static org.proton.plug.util.DeliveryUtil.decodeMessageImpl; + +/** + * @author Clebert Suconic + */ +public class ProtonClientReceiverContext extends AbstractProtonReceiverContext implements AMQPClientReceiverContext +{ + public ProtonClientReceiverContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, AbstractProtonSessionContext protonSession, Receiver receiver) + { + super(sessionSPI, connection, protonSession, receiver); + } + + public void onFlow(int credits) + { + } + + LinkedBlockingDeque<MessageImpl> queues = new LinkedBlockingDeque<>(); + + /* + * called when Proton receives a message to be delivered via a Delivery. + * + * This may be called more than once per deliver so we have to cache the buffer until we have received it all. + * + * */ + public void onMessage(Delivery delivery) throws HornetQAMQPException + { + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + try + { + synchronized (connection.getLock()) + { + readDelivery(receiver, buffer); + MessageImpl clientMessage = decodeMessageImpl(buffer); + + // This second method could be better +// clientMessage.decode(buffer.nioBuffer()); + + receiver.advance(); + delivery.disposition(Accepted.getInstance()); + queues.add(clientMessage); + + } + } + finally + { + buffer.release(); + } + } + + + @Override + public ProtonJMessage receiveMessage(int time, TimeUnit unit) throws Exception + { + return queues.poll(time, unit); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java new file mode 100644 index 0000000..e888ea9 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/client/ProtonClientSessionContext.java @@ -0,0 +1,91 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.client; + +import org.apache.qpid.proton.amqp.messaging.Source; +import org.apache.qpid.proton.amqp.messaging.Target; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.proton.plug.AMQPClientReceiverContext; +import org.proton.plug.AMQPClientSenderContext; +import org.proton.plug.AMQPClientSessionContext; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.context.AbstractConnectionContext; +import org.proton.plug.context.AbstractProtonSessionContext; +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.util.FutureRunnable; + +/** + * @author Clebert Suconic + */ + +public class ProtonClientSessionContext extends AbstractProtonSessionContext implements AMQPClientSessionContext +{ + public ProtonClientSessionContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, Session session) + { + super(sessionSPI, connection, session); + } + + public AMQPClientSenderContext createSender(String address, boolean preSettled) throws HornetQAMQPException + { + FutureRunnable futureRunnable = new FutureRunnable(1); + + ProtonClientContext amqpSender; + synchronized (connection.getLock()) + { + Sender sender = session.sender(address); + sender.setSenderSettleMode(SenderSettleMode.SETTLED); + Target target = new Target(); + target.setAddress(address); + sender.setTarget(target); + amqpSender = new ProtonClientContext(connection, sender, this, sessionSPI); + amqpSender.afterInit(futureRunnable); + sender.setContext(amqpSender); + sender.open(); + } + + connection.flush(); + + waitWithTimeout(futureRunnable); + return amqpSender; + } + + public AMQPClientReceiverContext createReceiver(String address) throws HornetQAMQPException + { + FutureRunnable futureRunnable = new FutureRunnable(1); + + ProtonClientReceiverContext amqpReceiver; + + synchronized (connection.getLock()) + { + Receiver receiver = session.receiver(address); + Source source = new Source(); + source.setAddress(address); + receiver.setSource(source); + amqpReceiver = new ProtonClientReceiverContext(sessionSPI, connection, this, receiver); + receiver.setContext(amqpReceiver); + amqpReceiver.afterInit(futureRunnable); + receiver.open(); + } + + connection.flush(); + + waitWithTimeout(futureRunnable); + + return amqpReceiver; + + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java new file mode 100644 index 0000000..1d83e54 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContext.java @@ -0,0 +1,76 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.server; + +import org.apache.qpid.proton.amqp.transaction.Coordinator; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPServerConnectionContext; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.context.AbstractConnectionContext; +import org.proton.plug.context.AbstractProtonSessionContext; +import org.proton.plug.exceptions.HornetQAMQPException; + +/** + * @author Clebert Suconic + */ + +public class ProtonServerConnectionContext extends AbstractConnectionContext implements AMQPServerConnectionContext +{ + public ProtonServerConnectionContext(AMQPConnectionCallback connectionSP) + { + super(connectionSP); + } + + protected AbstractProtonSessionContext newSessionExtension(Session realSession) throws HornetQAMQPException + { + AMQPSessionCallback sessionSPI = connectionCallback.createSessionCallback(this); + AbstractProtonSessionContext protonSession = new ProtonServerSessionContext(sessionSPI, this, realSession); + + return protonSession; + } + + protected void remoteLinkOpened(Link link) throws Exception + { + + ProtonServerSessionContext protonSession = (ProtonServerSessionContext) getSessionExtension(link.getSession()); + + link.setSource(link.getRemoteSource()); + link.setTarget(link.getRemoteTarget()); + if (link instanceof Receiver) + { + Receiver receiver = (Receiver) link; + if (link.getRemoteTarget() instanceof Coordinator) + { + Coordinator coordinator = (Coordinator) link.getRemoteTarget(); + protonSession.addTransactionHandler(coordinator, receiver); + } + else + { + protonSession.addReceiver(receiver); + receiver.flow(100); + } + } + else + { + Sender sender = (Sender) link; + protonSession.addSender(sender); + sender.offer(1); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java new file mode 100644 index 0000000..22e3675 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerConnectionContextFactory.java @@ -0,0 +1,38 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.server; + +import org.proton.plug.AMQPConnectionContextFactory; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPServerConnectionContext; + +/** + * @author Clebert Suconic + */ + +public class ProtonServerConnectionContextFactory extends AMQPConnectionContextFactory +{ + private static final ProtonServerConnectionContextFactory theInstance = new ProtonServerConnectionContextFactory(); + + public static ProtonServerConnectionContextFactory getFactory() + { + return theInstance; + } + + public AMQPServerConnectionContext createConnection(AMQPConnectionCallback connectionCallback) + { + ProtonServerConnectionContext connection = new ProtonServerConnectionContext(connectionCallback); + return connection; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java new file mode 100644 index 0000000..21ca2bc --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerReceiverContext.java @@ -0,0 +1,159 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.server; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Receiver; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.context.AbstractConnectionContext; +import org.proton.plug.context.AbstractProtonReceiverContext; +import org.proton.plug.context.AbstractProtonSessionContext; +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.exceptions.HornetQAMQPInternalErrorException; +import org.proton.plug.logger.HornetQAMQPProtocolMessageBundle; + +import static org.proton.plug.util.DeliveryUtil.readDelivery; + +/** + * @author Clebert Suconic + */ + +public class ProtonServerReceiverContext extends AbstractProtonReceiverContext +{ + + private final int numberOfCredits = 100; + + public ProtonServerReceiverContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, AbstractProtonSessionContext protonSession, Receiver receiver) + { + super(sessionSPI, connection, protonSession, receiver); + } + + public void onFlow(int credits) + { + } + + + @Override + public void initialise() throws Exception + { + super.initialise(); + org.apache.qpid.proton.amqp.messaging.Target target = (org.apache.qpid.proton.amqp.messaging.Target) receiver.getRemoteTarget(); + + if (target != null) + { + if (target.getDynamic()) + { + //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and + // will be deleted on closing of the session + String queue = sessionSPI.tempQueueName(); + + + try + { + sessionSPI.createTemporaryQueue(queue); + } + catch (Exception e) + { + throw new HornetQAMQPInternalErrorException(e.getMessage(), e); + } + target.setAddress(queue.toString()); + } + else + { + //if not dynamic then we use the targets address as the address to forward the messages to, however there has to + //be a queue bound to it so we nee to check this. + String address = target.getAddress(); + if (address == null) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.targetAddressNotSet(); + } + try + { + if (!sessionSPI.queueQuery(address)) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); + } + } + catch (Exception e) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorFindingTemporaryQueue(e.getMessage()); + } + } + } + + flow(numberOfCredits); + } + + /* + * called when Proton receives a message to be delivered via a Delivery. + * + * This may be called more than once per deliver so we have to cache the buffer until we have received it all. + * + * */ + public void onMessage(Delivery delivery) throws HornetQAMQPException + { + Receiver receiver; + try + { + receiver = ((Receiver) delivery.getLink()); + + if (!delivery.isReadable()) + { + System.err.println("!!!!! Readable!!!!!!!"); + return; + } + + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.heapBuffer(10 * 1024); + try + { + synchronized (connection.getLock()) + { + readDelivery(receiver, buffer); + + receiver.advance(); + + sessionSPI.serverSend(receiver, delivery, address, delivery.getMessageFormat(), buffer); + delivery.disposition(Accepted.getInstance()); + delivery.settle(); + + if (receiver.getRemoteCredit() < numberOfCredits / 2) + { + flow(numberOfCredits); + } + } + } + finally + { + buffer.release(); + } + } + catch (Exception e) + { + e.printStackTrace(); + Rejected rejected = new Rejected(); + ErrorCondition condition = new ErrorCondition(); + condition.setCondition(Symbol.valueOf("failed")); + condition.setDescription(e.getMessage()); + rejected.setError(condition); + delivery.disposition(rejected); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java new file mode 100644 index 0000000..9b66de0 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSenderContext.java @@ -0,0 +1,283 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.server; + +import java.util.Map; + +import org.apache.qpid.proton.amqp.DescribedType; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.messaging.Accepted; +import org.apache.qpid.proton.amqp.messaging.Modified; +import org.apache.qpid.proton.amqp.messaging.Rejected; +import org.apache.qpid.proton.amqp.messaging.Released; +import org.apache.qpid.proton.amqp.transport.DeliveryState; +import org.apache.qpid.proton.amqp.transport.SenderSettleMode; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.context.AbstractConnectionContext; +import org.proton.plug.context.AbstractProtonContextSender; +import org.proton.plug.context.AbstractProtonSessionContext; +import org.proton.plug.exceptions.HornetQAMQPException; +import org.proton.plug.exceptions.HornetQAMQPInternalErrorException; +import org.proton.plug.logger.HornetQAMQPProtocolMessageBundle; +import org.proton.plug.context.ProtonPlugSender; +import org.apache.qpid.proton.amqp.messaging.Source; + +/** + * @author Clebert Suconic + */ + +public class ProtonServerSenderContext extends AbstractProtonContextSender implements ProtonPlugSender +{ + + private static final Symbol SELECTOR = Symbol.getSymbol("jms-selector"); + private static final Symbol COPY = Symbol.valueOf("copy"); + + private Object brokerConsumer; + + public ProtonServerSenderContext(AbstractConnectionContext connection, Sender sender, AbstractProtonSessionContext protonSession, AMQPSessionCallback server) + { + super(connection, sender, protonSession, server); + } + + public Object getBrokerConsumer() + { + return brokerConsumer; + } + + public void onFlow(int currentCredits) + { + super.onFlow(currentCredits); + sessionSPI.onFlowConsumer(brokerConsumer, currentCredits); + } + + /* +* start the session +* */ + public void start() throws HornetQAMQPException + { + super.start(); + // protonSession.getServerSession().start(); + + //todo add flow control + try + { + // to do whatever you need to make the broker start sending messages to the consumer + sessionSPI.startSender(brokerConsumer); + //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1); + } + catch (Exception e) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage()); + } + } + + /** + * create the actual underlying HornetQ Server Consumer + */ + @Override + public void initialise() throws Exception + { + super.initialise(); + + Source source = (Source) sender.getRemoteSource(); + + String queue; + + String selector = null; + Map filter = source == null ? null : source.getFilter(); + if (filter != null) + { + DescribedType value = (DescribedType) filter.get(SELECTOR); + if (value != null) + { + selector = value.getDescribed().toString(); + } + } + + if (source != null) + { + if (source.getDynamic()) + { + //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and + // will be deleted on closing of the session + queue = java.util.UUID.randomUUID().toString(); + try + { + sessionSPI.createTemporaryQueue(queue); + //protonSession.getServerSession().createQueue(queue, queue, null, true, false); + } + catch (Exception e) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); + } + source.setAddress(queue); + } + else + { + //if not dynamic then we use the targets address as the address to forward the messages to, however there has to + //be a queue bound to it so we nee to check this. + queue = source.getAddress(); + if (queue == null) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.sourceAddressNotSet(); + } + + try + { + if (!sessionSPI.queueQuery(queue)) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); + } + } + catch (Exception e) + { + throw new HornetQAMQPInternalErrorException(e.getMessage(), e); + } + } + + boolean browseOnly = source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); + try + { + brokerConsumer = sessionSPI.createSender(this, queue, selector, browseOnly); + } + catch (Exception e) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCreatingHornetQConsumer(e.getMessage()); + } + } + } + + /* + * close the session + * */ + public void close() throws HornetQAMQPException + { + super.close(); + try + { + sessionSPI.closeSender(brokerConsumer); + } + catch (Exception e) + { + e.printStackTrace(); + throw new HornetQAMQPInternalErrorException(e.getMessage()); + } + } + + + public void onMessage(Delivery delivery) throws HornetQAMQPException + { + Object message = delivery.getContext(); + + boolean preSettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED; + + + DeliveryState remoteState = delivery.getRemoteState(); + + if (remoteState != null) + { + if (remoteState instanceof Accepted) + { + //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order + // from dealer, a perf hit but a must + try + { + sessionSPI.ack(brokerConsumer, message); + } + catch (Exception e) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); + } + } + else if (remoteState instanceof Released) + { + try + { + sessionSPI.cancel(brokerConsumer, message, false); + } + catch (Exception e) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); + } + } + else if (remoteState instanceof Rejected || remoteState instanceof Modified) + { + try + { + sessionSPI.cancel(brokerConsumer, message, true); + } + catch (Exception e) + { + throw HornetQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); + } + } + //todo add tag caching + if (!preSettle) + { + protonSession.replaceTag(delivery.getTag()); + } + + synchronized (connection.getLock()) + { + delivery.settle(); + sender.offer(1); + } + + } + else + { + //todo not sure if we need to do anything here + } + } + + @Override + public synchronized void checkState() + { + super.checkState(); + sessionSPI.resumeDelivery(brokerConsumer); + } + + + /** + * handle an out going message from HornetQ, send via the Proton Sender + */ + public int deliverMessage(Object message, int deliveryCount) throws Exception + { + if (closed) + { + System.err.println("Message can't be delivered as it's closed"); + return 0; + } + + //encode the message + ProtonJMessage serverMessage; + try + { + // This can be done a lot better here + serverMessage = sessionSPI.encodeMessage(message, deliveryCount); + } + catch (Throwable e) + { + e.printStackTrace(); + throw new HornetQAMQPInternalErrorException(e.getMessage(), e); + } + + return performSend(serverMessage, message); + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java new file mode 100644 index 0000000..1860c90 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/context/server/ProtonServerSessionContext.java @@ -0,0 +1,124 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.context.server; + +import java.util.HashMap; +import java.util.Map; + +import org.apache.qpid.proton.amqp.transaction.Coordinator; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.engine.Sender; +import org.apache.qpid.proton.engine.Session; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.context.AbstractConnectionContext; +import org.proton.plug.context.AbstractProtonContextSender; +import org.proton.plug.context.AbstractProtonReceiverContext; +import org.proton.plug.context.AbstractProtonSessionContext; +import org.proton.plug.context.ProtonTransactionHandler; +import org.proton.plug.exceptions.HornetQAMQPException; + +/** + * @author Clebert Suconic + */ + +public class ProtonServerSessionContext extends AbstractProtonSessionContext +{ + + public ProtonServerSessionContext(AMQPSessionCallback sessionSPI, AbstractConnectionContext connection, Session session) + { + super(sessionSPI, connection, session); + } + + protected Map<Object, AbstractProtonContextSender> serverSenders = new HashMap<Object, AbstractProtonContextSender>(); + + + /** + * The consumer object from the broker or the key used to store the sender + * + * @param message + * @param consumer + * @param deliveryCount + * @return the number of bytes sent + */ + public int serverDelivery(Object message, Object consumer, int deliveryCount) throws Exception + { + ProtonServerSenderContext protonSender = (ProtonServerSenderContext) serverSenders.get(consumer); + if (protonSender != null) + { + return protonSender.deliverMessage(message, deliveryCount); + } + return 0; + } + + public void addTransactionHandler(Coordinator coordinator, Receiver receiver) + { + ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI); + receiver.setContext(transactionHandler); + receiver.open(); + receiver.flow(100); + } + + public void addSender(Sender sender) throws Exception + { + ProtonServerSenderContext protonSender = new ProtonServerSenderContext(connection, sender, this, sessionSPI); + + try + { + protonSender.initialise(); + senders.put(sender, protonSender); + serverSenders.put(protonSender.getBrokerConsumer(), protonSender); + sender.setContext(protonSender); + sender.open(); + protonSender.start(); + } + catch (HornetQAMQPException e) + { + senders.remove(sender); + sender.setSource(null); + sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); + sender.close(); + } + } + + public void removeSender(Sender sender) throws HornetQAMQPException + { + ProtonServerSenderContext senderRemoved = (ProtonServerSenderContext) senders.remove(sender); + if (senderRemoved != null) + { + serverSenders.remove(senderRemoved.getBrokerConsumer()); + } + } + + + public void addReceiver(Receiver receiver) throws Exception + { + try + { + AbstractProtonReceiverContext protonReceiver = new ProtonServerReceiverContext(sessionSPI, connection, this, receiver); + protonReceiver.initialise(); + receivers.put(receiver, protonReceiver); + receiver.setContext(protonReceiver); + receiver.open(); + } + catch (HornetQAMQPException e) + { + receivers.remove(receiver); + receiver.setTarget(null); + receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); + receiver.close(); + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPException.java new file mode 100644 index 0000000..0a3c68a --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPException.java @@ -0,0 +1,45 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.exceptions; + +import org.apache.qpid.proton.amqp.Symbol; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * 6/6/13 + */ +public class HornetQAMQPException extends Exception +{ + + private static final String ERROR_PREFIX = "amqp:"; + + public Symbol getAmqpError() + { + return amqpError; + } + + private final Symbol amqpError; + + public HornetQAMQPException(Symbol amqpError, String message, Throwable e) + { + super(message, e); + this.amqpError = amqpError; + } + + public HornetQAMQPException(Symbol amqpError, String message) + { + super(message); + this.amqpError = amqpError; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPIllegalStateException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPIllegalStateException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPIllegalStateException.java new file mode 100644 index 0000000..f3e3b38 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPIllegalStateException.java @@ -0,0 +1,28 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.exceptions; + +import org.apache.qpid.proton.amqp.transport.AmqpError; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * 6/6/13 + */ +public class HornetQAMQPIllegalStateException extends HornetQAMQPException +{ + public HornetQAMQPIllegalStateException(String message) + { + super(AmqpError.ILLEGAL_STATE, message); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInternalErrorException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInternalErrorException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInternalErrorException.java new file mode 100644 index 0000000..7db34a7 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInternalErrorException.java @@ -0,0 +1,33 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.exceptions; + +import org.apache.qpid.proton.amqp.transport.AmqpError; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * 6/6/13 + */ +public class HornetQAMQPInternalErrorException extends HornetQAMQPException +{ + public HornetQAMQPInternalErrorException(String message, Throwable e) + { + super(AmqpError.INTERNAL_ERROR, message, e); + } + + public HornetQAMQPInternalErrorException(String message) + { + super(AmqpError.INTERNAL_ERROR, message); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInvalidFieldException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInvalidFieldException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInvalidFieldException.java new file mode 100644 index 0000000..2f73de8 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPInvalidFieldException.java @@ -0,0 +1,28 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.exceptions; + +import org.apache.qpid.proton.amqp.transport.AmqpError; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * 6/6/13 + */ +public class HornetQAMQPInvalidFieldException extends HornetQAMQPException +{ + public HornetQAMQPInvalidFieldException(String message) + { + super(AmqpError.INVALID_FIELD, message); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPNotImplementedException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPNotImplementedException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPNotImplementedException.java new file mode 100644 index 0000000..35a2143 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPNotImplementedException.java @@ -0,0 +1,28 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.exceptions; + +import org.apache.qpid.proton.amqp.transport.AmqpError; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * 6/19/13 + */ +public class HornetQAMQPNotImplementedException extends HornetQAMQPException +{ + public HornetQAMQPNotImplementedException(String message) + { + super(AmqpError.NOT_IMPLEMENTED, message); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPTimeoutException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPTimeoutException.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPTimeoutException.java new file mode 100644 index 0000000..50d72d8 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/exceptions/HornetQAMQPTimeoutException.java @@ -0,0 +1,29 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.exceptions; + +import org.apache.qpid.proton.amqp.transport.AmqpError; + +/** + * @author Clebert Suconic + */ + +public class HornetQAMQPTimeoutException extends HornetQAMQPException +{ + public HornetQAMQPTimeoutException(String message) + { + super(AmqpError.ILLEGAL_STATE, message); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java new file mode 100644 index 0000000..482821a --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/EventHandler.java @@ -0,0 +1,80 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.handler; + +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.engine.Transport; + +/** + * EventHandler + * <p/> + * + * @author rhs + */ + +public interface EventHandler +{ + + void onSASLInit(ProtonHandler handler, Connection connection); + + void onInit(Connection connection) throws Exception; + + void onLocalOpen(Connection connection) throws Exception; + + void onRemoteOpen(Connection connection) throws Exception; + + void onLocalClose(Connection connection) throws Exception; + + void onRemoteClose(Connection connection) throws Exception; + + void onFinal(Connection connection) throws Exception; + + void onInit(Session session) throws Exception; + + void onLocalOpen(Session session) throws Exception; + + void onRemoteOpen(Session session) throws Exception; + + void onLocalClose(Session session) throws Exception; + + void onRemoteClose(Session session) throws Exception; + + void onFinal(Session session) throws Exception; + + void onInit(Link link) throws Exception; + + void onLocalOpen(Link link) throws Exception; + + void onRemoteOpen(Link link) throws Exception; + + void onLocalClose(Link link) throws Exception; + + void onRemoteClose(Link link) throws Exception; + + void onFlow(Link link) throws Exception; + + void onFinal(Link link) throws Exception; + + void onRemoteDetach(Link link) throws Exception; + + void onDetach(Link link) throws Exception; + + void onDelivery(Delivery delivery) throws Exception; + + void onTransport(Transport transport) throws Exception; + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/Events.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/Events.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/Events.java new file mode 100644 index 0000000..4d58aa1 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/Events.java @@ -0,0 +1,109 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.handler; + +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Transport; + +/** + * TODO : this needs a better home + * + * @author rhs + */ + +public final class Events +{ + + public static void dispatchTransport(Transport transport, EventHandler handler) throws Exception + { + handler.onTransport(transport); + } + + public static void dispatch(Event event, EventHandler handler) throws Exception + { + switch (event.getType()) + { + case CONNECTION_INIT: + handler.onInit(event.getConnection()); + break; + case CONNECTION_LOCAL_OPEN: + handler.onLocalOpen(event.getConnection()); + break; + case CONNECTION_REMOTE_OPEN: + handler.onRemoteOpen(event.getConnection()); + break; + case CONNECTION_LOCAL_CLOSE: + handler.onLocalClose(event.getConnection()); + break; + case CONNECTION_REMOTE_CLOSE: + handler.onRemoteClose(event.getConnection()); + break; + case CONNECTION_FINAL: + handler.onFinal(event.getConnection()); + break; + case SESSION_INIT: + handler.onInit(event.getSession()); + break; + case SESSION_LOCAL_OPEN: + handler.onLocalOpen(event.getSession()); + break; + case SESSION_REMOTE_OPEN: + handler.onRemoteOpen(event.getSession()); + break; + case SESSION_LOCAL_CLOSE: + handler.onLocalClose(event.getSession()); + break; + case SESSION_REMOTE_CLOSE: + handler.onRemoteClose(event.getSession()); + break; + case SESSION_FINAL: + handler.onFinal(event.getSession()); + break; + case LINK_INIT: + handler.onInit(event.getLink()); + break; + case LINK_LOCAL_OPEN: + handler.onLocalOpen(event.getLink()); + break; + case LINK_REMOTE_OPEN: + handler.onRemoteOpen(event.getLink()); + break; + case LINK_LOCAL_CLOSE: + handler.onLocalClose(event.getLink()); + break; + case LINK_REMOTE_CLOSE: + handler.onRemoteClose(event.getLink()); + break; + case LINK_FLOW: + handler.onFlow(event.getLink()); + break; + case LINK_FINAL: + handler.onFinal(event.getLink()); + break; + case LINK_LOCAL_DETACH: + handler.onDetach(event.getLink()); + break; + case LINK_REMOTE_DETACH: + handler.onRemoteDetach(event.getLink()); + break; + case TRANSPORT: + handler.onTransport(event.getTransport()); + break; + case DELIVERY: + handler.onDelivery(event.getDelivery()); + break; + } + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java new file mode 100644 index 0000000..99f4658 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/ProtonHandler.java @@ -0,0 +1,134 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.handler; + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Transport; +import org.proton.plug.ClientSASL; +import org.proton.plug.ServerSASL; +import org.proton.plug.SASLResult; +import org.proton.plug.handler.impl.ProtonHandlerImpl; + +/** + * This is a definition of the public interface for {@link org.proton.plug.handler.impl.ProtonHandlerImpl} + * + * @author Clebert Suconic + */ + +public interface ProtonHandler +{ + + public static final class Factory + { + public static ProtonHandler create() + { + return new ProtonHandlerImpl(); + } + } + + + /** + * It returns true if the transport connection has any capacity available + * + * @return + */ + int capacity(); + + Transport getTransport(); + + Connection getConnection(); + + /** + * Add an event handler to the chain + * + * @param handler + * @return + */ + ProtonHandler addEventHandler(EventHandler handler); + + void createClientSasl(ClientSASL clientSASL); + + /** + * To be used on server connections. To define SASL integration. + * + * @param handlers + */ + void createServerSASL(ServerSASL[] handlers); + + /** + * To return the SASL Mechanism that was successful with the connection. + * This should contain any state such as user and password + * + * @return + */ + SASLResult getSASLResult(); + + /** + * The input on the Handler. + * Notice that buffer will be positioned up to where we needed + * + * @param buffer + */ + void inputBuffer(ByteBuf buffer); + + /** + * To be used at your discretion to verify if the client was active since you last checked + * it can be used to implement server TTL cleanup and verifications + * + * @return + */ + boolean checkDataReceived(); + + /** + * Return the creation time of the handler + * + * @return + */ + long getCreationTime(); + + /** + * To be called after you used the outputBuffer + * + * @param bytes number of bytes you used already on the output + */ + void outputDone(int bytes); + + /** + * it will return pending bytes you have on the Transport + * after you are done with it you must call {@link #outputDone(int)} + * + * @return + */ + ByteBuf outputBuffer(); + + /** + * It will process the transport and cause events to be called + */ + void flush(); + + /** + * It will close the connection and flush events + */ + void close(); + + + /** + * Get the object used to lock transport, connection and events operations + * + * @return + */ + Object getLock(); + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java new file mode 100644 index 0000000..2e1be00 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/DefaultEventHandler.java @@ -0,0 +1,167 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.handler.impl; + +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Session; +import org.apache.qpid.proton.engine.Transport; +import org.proton.plug.handler.EventHandler; + +/** + * This is useful for cases where you only want to implement a few methods + * + * @author Clebert Suconic + */ +public abstract class DefaultEventHandler implements EventHandler +{ + @Override + public void onInit(Connection connection) throws Exception + { + + } + + @Override + public void onLocalOpen(Connection connection) throws Exception + { + + } + + @Override + public void onRemoteOpen(Connection connection) throws Exception + { + + } + + @Override + public void onLocalClose(Connection connection) throws Exception + { + + } + + @Override + public void onRemoteClose(Connection connection) throws Exception + { + } + + @Override + public void onFinal(Connection connection) throws Exception + { + + } + + @Override + public void onInit(Session session) throws Exception + { + + } + + @Override + public void onLocalOpen(Session session) throws Exception + { + + } + + @Override + public void onRemoteOpen(Session session) throws Exception + { + + } + + @Override + public void onLocalClose(Session session) throws Exception + { + + } + + @Override + public void onRemoteClose(Session session) throws Exception + { + + } + + @Override + public void onFinal(Session session) throws Exception + { + + } + + @Override + public void onInit(Link link) throws Exception + { + + } + + @Override + public void onLocalOpen(Link link) throws Exception + { + + } + + @Override + public void onRemoteOpen(Link link) throws Exception + { + + } + + @Override + public void onLocalClose(Link link) throws Exception + { + + } + + @Override + public void onRemoteClose(Link link) throws Exception + { + + } + + @Override + public void onFlow(Link link) throws Exception + { + + } + + @Override + public void onFinal(Link link) throws Exception + { + + } + + + @Override + public void onRemoteDetach(Link link) throws Exception + { + + } + + @Override + public void onDetach(Link link) throws Exception + { + + } + + @Override + public void onDelivery(Delivery delivery) throws Exception + { + + } + + @Override + public void onTransport(Transport transport) throws Exception + { + + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java new file mode 100644 index 0000000..3eb7093 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/handler/impl/ProtonHandlerImpl.java @@ -0,0 +1,423 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.handler.impl; + +import java.nio.ByteBuffer; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Map; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.qpid.proton.Proton; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Collector; +import org.apache.qpid.proton.engine.Connection; +import org.apache.qpid.proton.engine.Event; +import org.apache.qpid.proton.engine.Sasl; +import org.apache.qpid.proton.engine.Transport; +import org.proton.plug.ClientSASL; +import org.proton.plug.ServerSASL; +import org.proton.plug.handler.EventHandler; +import org.proton.plug.handler.Events; +import org.proton.plug.handler.ProtonHandler; +import org.proton.plug.context.ProtonInitializable; +import org.proton.plug.SASLResult; +import org.proton.plug.util.ByteUtil; +import org.proton.plug.util.DebugInfo; + +/** + * Clebert Suconic + */ +public class ProtonHandlerImpl extends ProtonInitializable implements ProtonHandler +{ + private final Transport transport = Proton.transport(); + + private final Connection connection = Proton.connection(); + + private final Collector collector = Proton.collector(); + + private ArrayList<EventHandler> handlers = new ArrayList<>(); + + private Sasl serverSasl; + + private Sasl clientSasl; + + private final Object lock = new Object(); + + private final long creationTime; + + private Map<String, ServerSASL> saslHandlers; + + private SASLResult saslResult; + + /** + * If dispatching a dispatch call is ignored to avoid infinite stack loop + */ + private boolean dispatching = false; + + protected volatile boolean dataReceived; + + protected boolean receivedFirstPacket = false; + + private int offset = 0; + + public ProtonHandlerImpl() + { + this.creationTime = System.currentTimeMillis(); + transport.bind(connection); + connection.collect(collector); + } + + @Override + public int capacity() + { + synchronized (lock) + { + return transport.capacity(); + } + } + + public Object getLock() + { + return lock; + } + + @Override + public Transport getTransport() + { + return transport; + } + + @Override + public Connection getConnection() + { + return connection; + } + + @Override + public ProtonHandler addEventHandler(EventHandler handler) + { + handlers.add(handler); + return this; + } + + @Override + public void createServerSASL(ServerSASL[] handlers) + { + this.serverSasl = transport.sasl(); + saslHandlers = new HashMap<>(); + String[] names = new String[handlers.length]; + int count = 0; + for (ServerSASL handler : handlers) + { + saslHandlers.put(handler.getName(), handler); + names[count++] = handler.getName(); + } + this.serverSasl.server(); + serverSasl.setMechanisms(names); + + } + + @Override + public SASLResult getSASLResult() + { + return saslResult; + } + + @Override + public void inputBuffer(ByteBuf buffer) + { + dataReceived = true; + synchronized (lock) + { + while (buffer.readableBytes() > 0) + { + int capacity = transport.capacity(); + + if (!receivedFirstPacket) + { + try + { + if (buffer.getByte(4) == 0x03) + { + dispatchSASL(); + } + } + catch (Throwable ignored) + { + ignored.printStackTrace(); + } + + receivedFirstPacket = true; + } + + + if (capacity > 0) + { + ByteBuffer tail = transport.tail(); + int min = Math.min(capacity, buffer.readableBytes()); + tail.limit(min); + buffer.readBytes(tail); + + + flush(); + } + else + { + if (capacity == 0) + { + System.out.println("abandoning: " + buffer.readableBytes()); + } + else + { + System.out.println("transport closed, discarding: " + buffer.readableBytes() + " capacity = " + transport.capacity()); + } + break; + } + } + } + } + + + @Override + public boolean checkDataReceived() + { + boolean res = dataReceived; + + dataReceived = false; + + return res; + } + + @Override + public long getCreationTime() + { + return creationTime; + } + + @Override + public void outputDone(int bytes) + { + synchronized (lock) + { + transport.pop(bytes); + offset -= bytes; + + if (offset < 0) + { + throw new IllegalStateException("You called outputDone for more bytes than you actually received. numberOfBytes=" + bytes + + ", outcome result=" + offset); + } + } + + flush(); + } + + @Override + public ByteBuf outputBuffer() + { + + synchronized (lock) + { + int pending = transport.pending(); + + if (pending < 0) + { + return null;//throw new IllegalStateException("xxx need to close the connection"); + } + + int size = pending - offset; + + if (size < 0) + { + throw new IllegalStateException("negative size: " + pending); + } + + if (size == 0) + { + return null; + } + + // For returning PooledBytes + ByteBuf buffer = PooledByteBufAllocator.DEFAULT.buffer(size); + ByteBuffer head = transport.head(); + head.position(offset); + buffer.writeBytes(head); + offset += size; // incrementing offset for future calls + return buffer; + } + } + + public void createClientSasl(ClientSASL clientSASL) + { + if (clientSASL != null) + { + clientSasl = transport.sasl(); + clientSasl.setMechanisms(clientSASL.getName()); + byte[] initialSasl = clientSASL.getBytes(); + clientSasl.send(initialSasl, 0, initialSasl.length); + } + } + + + @Override + public void flush() + { + synchronized (lock) + { + transport.process(); + + checkServerSASL(); + + if (dispatching) + { + return; + } + + dispatching = true; + + } + + try + { + dispatch(); + } + finally + { + dispatching = false; + } + } + + @Override + public void close() + { + synchronized (lock) + { + connection.close(); + } + flush(); + } + + protected void checkServerSASL() + { + if (serverSasl != null && serverSasl.getRemoteMechanisms().length > 0) + { + // TODO: should we look at the first only? + ServerSASL mechanism = saslHandlers.get(serverSasl.getRemoteMechanisms()[0]); + if (mechanism != null) + { + + byte[] dataSASL = new byte[serverSasl.pending()]; + serverSasl.recv(dataSASL, 0, dataSASL.length); + + if (DebugInfo.debug) + { + System.out.println("Working on sasl::" + ByteUtil.bytesToHex(dataSASL, 2)); + } + + saslResult = mechanism.processSASL(dataSASL); + + if (saslResult != null && saslResult.isSuccess()) + { + serverSasl.done(Sasl.SaslOutcome.PN_SASL_OK); + serverSasl = null; + saslHandlers.clear(); + saslHandlers = null; + } + else + { + serverSasl.done(Sasl.SaslOutcome.PN_SASL_AUTH); + } + serverSasl = null; + } + else + { + // no auth available, system error + serverSasl.done(Sasl.SaslOutcome.PN_SASL_SYS); + } + } + } + + private Event popEvent() + { + synchronized (lock) + { + Event ev = collector.peek(); + if (ev != null) + { + // pop will invalidate the event + // for that reason we make a new one + // Events are reused inside the collector, so we need to make a new one here + ev = ev.copy(); + collector.pop(); + } + return ev; + } + } + + private void dispatchSASL() + { + for (EventHandler h: handlers) + { + h.onSASLInit(this, getConnection()); + } + } + + + private void dispatch() + { + Event ev; + // We don't hold a lock on the entire event processing + // because we could have a distributed deadlock + // while processing events (for instance onTransport) + // while a client is also trying to write here + while ((ev = popEvent()) != null) + { + for (EventHandler h : handlers) + { + if (DebugInfo.debug) + { + System.out.println("Handling " + ev + " towards " + h); + } + try + { + Events.dispatch(ev, h); + } + catch (Exception e) + { + // TODO: logs + e.printStackTrace(); + connection.setCondition(new ErrorCondition()); + } + } + } + + for (EventHandler h : handlers) + { + try + { + h.onTransport(transport); + } + catch (Exception e) + { + // TODO: logs + e.printStackTrace(); + connection.setCondition(new ErrorCondition()); + } + } + + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/logger/HornetQAMQPProtocolMessageBundle.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/logger/HornetQAMQPProtocolMessageBundle.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/logger/HornetQAMQPProtocolMessageBundle.java new file mode 100644 index 0000000..c4c22c8 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/logger/HornetQAMQPProtocolMessageBundle.java @@ -0,0 +1,83 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.proton.plug.logger; + +import org.proton.plug.exceptions.HornetQAMQPIllegalStateException; +import org.proton.plug.exceptions.HornetQAMQPInternalErrorException; +import org.proton.plug.exceptions.HornetQAMQPInvalidFieldException; +import org.jboss.logging.annotations.Message; +import org.jboss.logging.annotations.MessageBundle; +import org.jboss.logging.Messages; + +/** + * Logger Code 11 + * <p/> + * Each message id must be 6 digits long starting with 10, the 3rd digit should be 9. So the range + * is from 219000 to 119999. + * <p/> + * Once released, methods should not be deleted as they may be referenced by knowledge base + * articles. Unused methods should be marked as deprecated. + * + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +@MessageBundle(projectCode = "HQ") +public interface HornetQAMQPProtocolMessageBundle +{ + HornetQAMQPProtocolMessageBundle BUNDLE = Messages.getBundle(HornetQAMQPProtocolMessageBundle.class); + + + @Message(id = 219000, value = "target address not set", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPInvalidFieldException targetAddressNotSet(); + + @Message(id = 219001, value = "error creating temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPInternalErrorException errorCreatingTemporaryQueue(String message); + + @Message(id = 219002, value = "target address does not exist", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPIllegalStateException addressDoesntExist(); + + @Message(id = 219003, value = "error finding temporary queue, {0}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPInternalErrorException errorFindingTemporaryQueue(String message); + + @Message(id = 219004, value = "error creating HornetQ Session, {0}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPInternalErrorException errorCreatingHornetQSession(String message); + + @Message(id = 219005, value = "error creating HornetQ Consumer, {0}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPInternalErrorException errorCreatingHornetQConsumer(String message); + + @Message(id = 219006, value = "error starting HornetQ Consumer, {0}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPIllegalStateException errorStartingConsumer(String message); + + @Message(id = 219007, value = "error acknowledging message {0}, {1}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPIllegalStateException errorAcknowledgingMessage(String messageID, String message); + + @Message(id = 219008, value = "error cancelling message {0}, {1}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPIllegalStateException errorCancellingMessage(String messageID, String message); + + @Message(id = 219009, value = "error closing consumer {0}, {1}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPIllegalStateException errorClosingConsumer(long consumerID, String message); + + @Message(id = 219010, value = "source address does not exist", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPInvalidFieldException sourceAddressDoesntExist(); + + @Message(id = 219011, value = "source address not set", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPInvalidFieldException sourceAddressNotSet(); + + @Message(id = 219012, value = "error rolling back coordinator: {0}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPIllegalStateException errorRollingbackCoordinator(String message); + + @Message(id = 219013, value = "error committing coordinator: {0}", format = Message.Format.MESSAGE_FORMAT) + HornetQAMQPIllegalStateException errorCommittingCoordinator(String message); + + @Message(id = 219015, value = "error decoding AMQP frame", format = Message.Format.MESSAGE_FORMAT) + String decodeError(); +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java new file mode 100644 index 0000000..be05f37 --- /dev/null +++ b/hornetq-protocols/hornetq-proton-plug/src/main/java/org/proton/plug/sasl/AnonymousServerSASL.java @@ -0,0 +1,41 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.proton.plug.sasl; + +import org.proton.plug.SASLResult; +import org.proton.plug.ServerSASL; + +/** + * @author Clebert Suconic + */ + +public class AnonymousServerSASL implements ServerSASL +{ + public AnonymousServerSASL() + { + } + + @Override + public String getName() + { + return "ANONYMOUS"; + } + + @Override + public SASLResult processSASL(byte[] bytes) + { + return new PlainSASLResult(true, null, null); + } +} +
