http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPInternalErrorException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPInternalErrorException.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPInternalErrorException.java deleted file mode 100644 index 61ae1d4..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPInternalErrorException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton.exceptions; - -import org.apache.qpid.proton.amqp.transport.AmqpError; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * 6/6/13 - */ -public class HornetQAMQPInternalErrorException extends HornetQAMQPException -{ - public HornetQAMQPInternalErrorException(String message) - { - super(AmqpError.INTERNAL_ERROR, message); - } -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPInvalidFieldException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPInvalidFieldException.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPInvalidFieldException.java deleted file mode 100644 index 006687a..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPInvalidFieldException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton.exceptions; - -import org.apache.qpid.proton.amqp.transport.AmqpError; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * 6/6/13 - */ -public class HornetQAMQPInvalidFieldException extends HornetQAMQPException -{ - public HornetQAMQPInvalidFieldException(String message) - { - super(AmqpError.INVALID_FIELD, message); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPNotImplementedException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPNotImplementedException.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPNotImplementedException.java deleted file mode 100644 index 782afb1..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPNotImplementedException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton.exceptions; - -import org.apache.qpid.proton.amqp.transport.AmqpError; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * 6/19/13 - */ -public class HornetQAMQPNotImplementedException extends HornetQAMQPException -{ - public HornetQAMQPNotImplementedException(String message) - { - super(AmqpError.NOT_IMPLEMENTED, message); - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/HornetQProtonConnectionCallback.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/HornetQProtonConnectionCallback.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/HornetQProtonConnectionCallback.java new file mode 100644 index 0000000..43e1d2b --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/HornetQProtonConnectionCallback.java @@ -0,0 +1,125 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.proton.plug; + +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.hornetq.core.buffers.impl.ChannelBufferWrapper; +import org.hornetq.core.protocol.proton.HornetQProtonRemotingConnection; +import org.hornetq.core.protocol.proton.ProtonProtocolManager; +import org.hornetq.core.protocol.proton.sasl.HornetQPlainSASL; +import org.hornetq.spi.core.remoting.Connection; +import org.hornetq.utils.ReusableLatch; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.ServerSASL; +import org.proton.plug.sasl.AnonymousServerSASL; + +/** + * @author Clebert Suconic + */ + +public class HornetQProtonConnectionCallback implements AMQPConnectionCallback +{ + private final ProtonProtocolManager manager; + + private final Connection connection; + + protected HornetQProtonRemotingConnection protonConnectionDelegate; + + protected AMQPConnectionContext amqpConnection; + + private final ReusableLatch latch = new ReusableLatch(0); + + public HornetQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection) + { + this.manager = manager; + this.connection = connection; + } + + @Override + public ServerSASL[] getSASLMechnisms() + { + return new ServerSASL[]{new AnonymousServerSASL(), new HornetQPlainSASL(manager.getServer().getSecurityStore(), manager.getServer().getSecurityManager())}; + } + + @Override + public void close() + { + + } + + @Override + public void setConnection(AMQPConnectionContext connection) + { + this.amqpConnection = connection; + } + + @Override + public AMQPConnectionContext getConnection() + { + return amqpConnection; + } + + public HornetQProtonRemotingConnection getProtonConnectionDelegate() + { + return protonConnectionDelegate; + } + + public void setProtonConnectionDelegate(HornetQProtonRemotingConnection protonConnectionDelegate) + { + this.protonConnectionDelegate = protonConnectionDelegate; + } + + public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) + { + final int size = byteBuf.writerIndex(); + + latch.countUp(); + connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() + { + @Override + public void operationComplete(ChannelFuture future) throws Exception + { + latch.countDown(); + } + }); + + if (amqpConnection.isSyncOnFlush()) + { + try + { + latch.await(5, TimeUnit.SECONDS); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + amqpConnection.outputDone(size); + } + + + @Override + public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) + { + return new ProtonSessionIntegrationCallback(this, manager, connection); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java new file mode 100644 index 0000000..cbb2c77 --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -0,0 +1,341 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.proton.plug; + + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.jms.EncodedMessage; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.hornetq.api.core.SimpleString; +import org.hornetq.api.core.client.HornetQClient; +import org.hornetq.core.journal.IOAsyncTask; +import org.hornetq.core.protocol.proton.ProtonProtocolManager; +import org.hornetq.core.server.QueueQueryResult; +import org.hornetq.core.server.ServerConsumer; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.server.ServerSession; +import org.hornetq.spi.core.protocol.SessionCallback; +import org.hornetq.spi.core.remoting.ReadyListener; +import org.hornetq.utils.ByteUtil; +import org.hornetq.utils.IDGenerator; +import org.hornetq.utils.SimpleIDGenerator; +import org.hornetq.utils.UUIDGenerator; +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.AMQPSessionContext; +import org.proton.plug.SASLResult; +import org.proton.plug.context.ProtonPlugSender; +import org.proton.plug.sasl.PlainSASLResult; + +/** + * @author Clebert Suconic + */ + +public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback +{ + protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0); + + private final HornetQProtonConnectionCallback protonSPI; + + private final ProtonProtocolManager manager; + + private final AMQPConnectionContext connection; + + private ServerSession serverSession; + + private AMQPSessionContext protonSession; + + public ProtonSessionIntegrationCallback(HornetQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection) + { + this.protonSPI = protonSPI; + this.manager = manager; + this.connection = connection; + } + + @Override + public void onFlowConsumer(Object consumer, int credits) + { + // We have our own flow control on AMQP, so we set hornetq's flow control to 0 + ((ServerConsumer) consumer).receiveCredits(-1); + } + + @Override + public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception + { + + this.protonSession = protonSession; + + String name = UUIDGenerator.getInstance().generateStringUUID(); + + String user = null; + String passcode = null; + if (saslResult != null) + { + user = saslResult.getUser(); + if (saslResult instanceof PlainSASLResult) + { + passcode = ((PlainSASLResult)saslResult).getPassword(); + } + } + + serverSession = manager.getServer().createSession(name, + user, + passcode, + HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, + protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection, + false, // boolean autoCommitSends + false, // boolean autoCommitAcks, + false, // boolean preAcknowledge, + true, //boolean xa, + (String) null, + this, + null); + } + + @Override + public void start() + { + + } + + @Override + public Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception + { + long consumerID = consumerIDGenerator.generateID(); + + ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filer), browserOnly); + + // AMQP handles its own flow control for when it's started + consumer.setStarted(true); + + consumer.setProtocolContext(protonSender); + + return consumer; + } + + @Override + public void startSender(Object brokerConsumer) throws Exception + { + ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer; + // flow control is done at proton + serverConsumer.receiveCredits(-1); + } + + @Override + public void createTemporaryQueue(String queueName) throws Exception + { + serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false); + } + + @Override + public boolean queueQuery(String queueName) throws Exception + { + QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); + return queueQuery.isExists(); + } + + @Override + public void closeSender(Object brokerConsumer) throws Exception + { + ((ServerConsumer) brokerConsumer).close(false); + } + + @Override + public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception + { + return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount); + } + + @Override + public Binary getCurrentTXID() + { + return new Binary(ByteUtil.longToBytes(serverSession.getCurrentTransaction().getID())); + } + + @Override + public String tempQueueName() + { + return UUIDGenerator.getInstance().generateStringUUID(); + } + + @Override + public void commitCurrentTX() throws Exception + { + serverSession.commit(); + } + + @Override + public void rollbackCurrentTX() throws Exception + { + serverSession.rollback(false); + } + + @Override + public void close() throws Exception + { + serverSession.close(false); + } + + @Override + public void ack(Object brokerConsumer, Object message) throws Exception + { + ((ServerConsumer)brokerConsumer).individualAcknowledge(null, ((ServerMessage)message).getMessageID()); + } + + @Override + public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception + { + ((ServerConsumer)brokerConsumer).individualCancel(((ServerMessage)message).getMessageID(), updateCounts); + } + + @Override + public void resumeDelivery(Object consumer) + { + ((ServerConsumer) consumer).receiveCredits(-1); + } + + @Override + public void serverSend(final Receiver receiver, final Delivery delivery, String address, int messageFormat, ByteBuf messageEncoded) throws Exception + { + EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex()); + + ServerMessage message = manager.getConverter().inbound(encodedMessage); + //use the address on the receiver if not null, if null let's hope it was set correctly on the message + if (address != null) + { + message.setAddress(new SimpleString(address)); + } + + serverSession.send(message, false); + + manager.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask() + { + @Override + public void done() + { + synchronized (connection.getLock()) + { + delivery.settle(); + connection.flush(); + } + } + + @Override + public void onError(int errorCode, String errorMessage) + { + synchronized (connection.getLock()) + { + receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); + connection.flush(); + } + } + }); + } + + + @Override + public void sendProducerCreditsMessage(int credits, SimpleString address) + { + } + + @Override + public void sendProducerCreditsFailMessage(int credits, SimpleString address) + { + } + + @Override + public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) + { + + ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext(); + + try + { + return plugSender.deliverMessage(message, deliveryCount); + } + catch (Exception e) + { + synchronized (connection.getLock()) + { + plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage())); + connection.flush(); + } + throw new IllegalStateException("Can't deliver message " + e, e); + } + + } + + @Override + public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) + { + return 0; + } + + @Override + public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse) + { + return 0; + } + + @Override + public void closed() + { + } + + @Override + public void addReadyListener(ReadyListener listener) + { + + } + + @Override + public void removeReadyListener(ReadyListener listener) + { + + } + + @Override + public void disconnect(ServerConsumer consumer, String queueName) + { + synchronized (connection.getLock()) + { + ((Link) consumer.getProtocolContext()).close(); + connection.flush(); + } + } + + + @Override + public boolean hasCredits(ServerConsumer consumer) + { + ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext(); + + if (plugSender != null && plugSender.getSender().getCredit() > 0) + { + return true; + } + else + { + return false; + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/package-info.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/package-info.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/package-info.java new file mode 100644 index 0000000..5612b2a --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/plug/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/** + * This package contains classes for integration with the ProtonPlug + * @author Clebert Suconic + */ +package org.hornetq.core.protocol.proton.plug; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/sasl/HornetQPlainSASL.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/sasl/HornetQPlainSASL.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/sasl/HornetQPlainSASL.java new file mode 100644 index 0000000..43e7f29 --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/sasl/HornetQPlainSASL.java @@ -0,0 +1,50 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.proton.sasl; + +import org.hornetq.core.security.SecurityStore; +import org.hornetq.spi.core.security.HornetQSecurityManager; +import org.proton.plug.sasl.ServerSASLPlain; + +/** + * @author Clebert Suconic + */ + +public class HornetQPlainSASL extends ServerSASLPlain +{ + + private final HornetQSecurityManager securityManager; + + private final SecurityStore securityStore; + + + public HornetQPlainSASL(SecurityStore securityStore, HornetQSecurityManager securityManager) + { + this.securityManager = securityManager; + this.securityStore = securityStore; + } + + @Override + protected boolean authenticate(String user, String password) + { + if (securityStore.isSecurityEnabled()) + { + return securityManager.validateUser(user, password); + } + else + { + return true; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/test/java/org/hornetq/core/protocol/proton/TestConversions.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/test/java/org/hornetq/core/protocol/proton/TestConversions.java b/hornetq-protocols/hornetq-amqp-protocol/src/test/java/org/hornetq/core/protocol/proton/TestConversions.java new file mode 100644 index 0000000..029a763 --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/test/java/org/hornetq/core/protocol/proton/TestConversions.java @@ -0,0 +1,841 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.proton; + +import java.nio.ByteBuffer; +import java.util.HashMap; +import java.util.LinkedList; +import java.util.List; +import java.util.Map; + +import io.netty.buffer.ByteBuf; +import io.netty.buffer.PooledByteBufAllocator; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.jms.EncodedMessage; +import org.apache.qpid.proton.message.Message; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.apache.qpid.proton.message.impl.MessageImpl; +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.journal.EncodingSupport; +import org.hornetq.core.protocol.proton.converter.ProtonMessageConverter; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSBytesMessage; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSMapMessage; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSMessage; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSStreamMessage; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSTextMessage; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.utils.SimpleIDGenerator; +import org.junit.Assert; +import org.junit.Test; +import org.proton.plug.util.NettyWritable; + +/** + * @author Clebert Suconic + */ + +public class TestConversions extends Assert +{ + + @Test + public void testSimpleConversionBytes() throws Exception + { + Map<String, Object> mapprop = createPropertiesMap(); + ApplicationProperties properties = new ApplicationProperties(mapprop); + MessageImpl message = (MessageImpl) Message.Factory.create(); + message.setApplicationProperties(properties); + + byte[] bodyBytes = new byte[4]; + + for (int i = 0; i < bodyBytes.length; i++) + { + bodyBytes[i] = (byte) 0xff; + } + + message.setBody(new Data(new Binary(bodyBytes))); + + + EncodedMessage encodedMessage = encodeMessage(message); + + + ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); + ServerJMSBytesMessage serverMessage = (ServerJMSBytesMessage)converter.inboundJMSType(encodedMessage); + + verifyProperties(serverMessage); + + + assertEquals(bodyBytes.length, serverMessage.getBodyLength()); + + byte[] newBodyBytes = new byte[4]; + + serverMessage.readBytes(newBodyBytes); + + + Assert.assertArrayEquals(bodyBytes, newBodyBytes); + + + Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0); + + System.out.println("output = " + obj); + + } + + private void verifyProperties(javax.jms.Message message) throws Exception + { + assertEquals(true, message.getBooleanProperty("true")); + assertEquals(false, message.getBooleanProperty("false")); + assertEquals("bar", message.getStringProperty("foo")); + } + + private Map<String, Object> createPropertiesMap() + { + Map<String, Object> mapprop = new HashMap<>(); + + mapprop.put("true", Boolean.TRUE); + mapprop.put("false", Boolean.FALSE); + mapprop.put("foo", "bar"); + return mapprop; + } + + @Test + public void testSimpleConversionMap() throws Exception + { + Map<String, Object> mapprop = createPropertiesMap(); + ApplicationProperties properties = new ApplicationProperties(mapprop); + MessageImpl message = (MessageImpl) Message.Factory.create(); + message.setApplicationProperties(properties); + + Map<String, Object> mapValues = new HashMap<>(); + mapValues.put("somestr", "value"); + mapValues.put("someint", Integer.valueOf(1)); + + message.setBody(new AmqpValue(mapValues)); + + EncodedMessage encodedMessage = encodeMessage(message); + + ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); + ServerJMSMapMessage serverMessage = (ServerJMSMapMessage)converter.inboundJMSType(encodedMessage); + + verifyProperties(serverMessage); + + Assert.assertEquals(1, serverMessage.getInt("someint")); + Assert.assertEquals("value", serverMessage.getString("somestr")); + + Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0); + + reEncodeMsg(obj); + + + MessageImpl outMessage = (MessageImpl) obj; + AmqpValue value = (AmqpValue)outMessage.getBody(); + Map mapoutput = (Map)value.getValue(); + + assertEquals(Integer.valueOf(1), (Integer) mapoutput.get("someint")); + + + System.out.println("output = " + obj); + + } + + + @Test + public void testSimpleConversionStream() throws Exception + { + Map<String, Object> mapprop = createPropertiesMap(); + ApplicationProperties properties = new ApplicationProperties(mapprop); + MessageImpl message = (MessageImpl) Message.Factory.create(); + message.setApplicationProperties(properties); + + List<Object> objects = new LinkedList<>(); + objects.add(new Integer(10)); + objects.add("10"); + + message.setBody(new AmqpSequence(objects)); + + EncodedMessage encodedMessage = encodeMessage(message); + + ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); + ServerJMSStreamMessage serverMessage = (ServerJMSStreamMessage)converter.inboundJMSType(encodedMessage); + + simulatePersistence(serverMessage); + + verifyProperties(serverMessage); + + serverMessage.reset(); + + assertEquals(10, serverMessage.readInt()); + assertEquals("10", serverMessage.readString()); + + Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0); + + reEncodeMsg(obj); + + MessageImpl outMessage = (MessageImpl)obj; + List list = ((AmqpSequence)outMessage.getBody()).getValue(); + Assert.assertEquals(Integer.valueOf(10), list.get(0)); + Assert.assertEquals("10", list.get(1)); + + } + + @Test + public void testSimpleConversionText() throws Exception + { + Map<String, Object> mapprop = createPropertiesMap(); + ApplicationProperties properties = new ApplicationProperties(mapprop); + MessageImpl message = (MessageImpl) Message.Factory.create(); + message.setApplicationProperties(properties); + + String text = "someText"; + message.setBody(new AmqpValue(text)); + + EncodedMessage encodedMessage = encodeMessage(message); + + ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); + ServerJMSTextMessage serverMessage = (ServerJMSTextMessage)converter.inboundJMSType(encodedMessage); + + simulatePersistence(serverMessage); + + + verifyProperties(serverMessage); + + Assert.assertEquals(text, serverMessage.getText()); + + + Object obj = converter.outbound((ServerMessage)serverMessage.getInnerMessage(), 0); + + + reEncodeMsg(obj); + + MessageImpl outMessage = (MessageImpl) obj; + AmqpValue value = (AmqpValue)outMessage.getBody(); + String textValue = (String)value.getValue(); + + Assert.assertEquals(text, textValue); + + System.out.println("output = " + obj); + + } + + private void simulatePersistence(ServerJMSMessage serverMessage) + { + serverMessage.getInnerMessage().setAddress(new SimpleString("jms.queue.SomeAddress")); + // This is just to simulate what would happen during the persistence of the message + // We need to still be able to recover the message when we read it back + ((EncodingSupport)serverMessage.getInnerMessage()).encode(new EmptyBuffer()); + } + + private ProtonJMessage reEncodeMsg(Object obj) + { + ProtonJMessage objOut = (ProtonJMessage)obj; + + ByteBuf nettyBuffer = PooledByteBufAllocator.DEFAULT.heapBuffer(1024); + + objOut.encode(new NettyWritable(nettyBuffer)); + return objOut; + } + + + private EncodedMessage encodeMessage(MessageImpl message) + { + ByteBuf buf = PooledByteBufAllocator.DEFAULT.heapBuffer(1024 * 1024); + message.encode(new NettyWritable(buf)); + byte[] bytesConvert = new byte[buf.writerIndex()]; + buf.readBytes(bytesConvert); + return new EncodedMessage(0, bytesConvert, 0, bytesConvert.length); + } + + + class EmptyBuffer implements HornetQBuffer + { + @Override + public ByteBuf byteBuf() + { + return null; + } + + @Override + public int capacity() + { + return 0; + } + + @Override + public int readerIndex() + { + return 0; + } + + @Override + public void readerIndex(int readerIndex) + { + + } + + @Override + public int writerIndex() + { + return 0; + } + + @Override + public void writerIndex(int writerIndex) + { + + } + + @Override + public void setIndex(int readerIndex, int writerIndex) + { + + } + + @Override + public int readableBytes() + { + return 0; + } + + @Override + public int writableBytes() + { + return 0; + } + + @Override + public boolean readable() + { + return false; + } + + @Override + public boolean writable() + { + return false; + } + + @Override + public void clear() + { + + } + + @Override + public void markReaderIndex() + { + + } + + @Override + public void resetReaderIndex() + { + + } + + @Override + public void markWriterIndex() + { + + } + + @Override + public void resetWriterIndex() + { + + } + + @Override + public void discardReadBytes() + { + + } + + @Override + public byte getByte(int index) + { + return 0; + } + + @Override + public short getUnsignedByte(int index) + { + return 0; + } + + @Override + public short getShort(int index) + { + return 0; + } + + @Override + public int getUnsignedShort(int index) + { + return 0; + } + + @Override + public int getInt(int index) + { + return 0; + } + + @Override + public long getUnsignedInt(int index) + { + return 0; + } + + @Override + public long getLong(int index) + { + return 0; + } + + @Override + public void getBytes(int index, HornetQBuffer dst) + { + + } + + @Override + public void getBytes(int index, HornetQBuffer dst, int length) + { + + } + + @Override + public void getBytes(int index, HornetQBuffer dst, int dstIndex, int length) + { + + } + + @Override + public void getBytes(int index, byte[] dst) + { + + } + + @Override + public void getBytes(int index, byte[] dst, int dstIndex, int length) + { + + } + + @Override + public void getBytes(int index, ByteBuffer dst) + { + + } + + @Override + public char getChar(int index) + { + return 0; + } + + @Override + public float getFloat(int index) + { + return 0; + } + + @Override + public double getDouble(int index) + { + return 0; + } + + @Override + public void setByte(int index, byte value) + { + + } + + @Override + public void setShort(int index, short value) + { + + } + + @Override + public void setInt(int index, int value) + { + + } + + @Override + public void setLong(int index, long value) + { + + } + + @Override + public void setBytes(int index, HornetQBuffer src) + { + + } + + @Override + public void setBytes(int index, HornetQBuffer src, int length) + { + + } + + @Override + public void setBytes(int index, HornetQBuffer src, int srcIndex, int length) + { + + } + + @Override + public void setBytes(int index, byte[] src) + { + + } + + @Override + public void setBytes(int index, byte[] src, int srcIndex, int length) + { + + } + + @Override + public void setBytes(int index, ByteBuffer src) + { + + } + + @Override + public void setChar(int index, char value) + { + + } + + @Override + public void setFloat(int index, float value) + { + + } + + @Override + public void setDouble(int index, double value) + { + + } + + @Override + public byte readByte() + { + return 0; + } + + @Override + public short readUnsignedByte() + { + return 0; + } + + @Override + public short readShort() + { + return 0; + } + + @Override + public int readUnsignedShort() + { + return 0; + } + + @Override + public int readInt() + { + return 0; + } + + @Override + public long readUnsignedInt() + { + return 0; + } + + @Override + public long readLong() + { + return 0; + } + + @Override + public char readChar() + { + return 0; + } + + @Override + public float readFloat() + { + return 0; + } + + @Override + public double readDouble() + { + return 0; + } + + @Override + public boolean readBoolean() + { + return false; + } + + @Override + public SimpleString readNullableSimpleString() + { + return null; + } + + @Override + public String readNullableString() + { + return null; + } + + @Override + public SimpleString readSimpleString() + { + return null; + } + + @Override + public String readString() + { + return null; + } + + @Override + public String readUTF() + { + return null; + } + + @Override + public HornetQBuffer readBytes(int length) + { + return null; + } + + @Override + public HornetQBuffer readSlice(int length) + { + return null; + } + + @Override + public void readBytes(HornetQBuffer dst) + { + + } + + @Override + public void readBytes(HornetQBuffer dst, int length) + { + + } + + @Override + public void readBytes(HornetQBuffer dst, int dstIndex, int length) + { + + } + + @Override + public void readBytes(byte[] dst) + { + + } + + @Override + public void readBytes(byte[] dst, int dstIndex, int length) + { + + } + + @Override + public void readBytes(ByteBuffer dst) + { + + } + + @Override + public void skipBytes(int length) + { + + } + + @Override + public void writeByte(byte value) + { + + } + + @Override + public void writeShort(short value) + { + + } + + @Override + public void writeInt(int value) + { + + } + + @Override + public void writeLong(long value) + { + + } + + @Override + public void writeChar(char chr) + { + + } + + @Override + public void writeFloat(float value) + { + + } + + @Override + public void writeDouble(double value) + { + + } + + @Override + public void writeBoolean(boolean val) + { + + } + + @Override + public void writeNullableSimpleString(SimpleString val) + { + + } + + @Override + public void writeNullableString(String val) + { + + } + + @Override + public void writeSimpleString(SimpleString val) + { + + } + + @Override + public void writeString(String val) + { + + } + + @Override + public void writeUTF(String utf) + { + + } + + @Override + public void writeBytes(HornetQBuffer src, int length) + { + + } + + @Override + public void writeBytes(HornetQBuffer src, int srcIndex, int length) + { + + } + + @Override + public void writeBytes(byte[] src) + { + + } + + @Override + public void writeBytes(byte[] src, int srcIndex, int length) + { + + } + + @Override + public void writeBytes(ByteBuffer src) + { + + } + + @Override + public HornetQBuffer copy() + { + return null; + } + + @Override + public HornetQBuffer copy(int index, int length) + { + return null; + } + + @Override + public HornetQBuffer slice() + { + return null; + } + + @Override + public HornetQBuffer slice(int index, int length) + { + return null; + } + + @Override + public HornetQBuffer duplicate() + { + return null; + } + + @Override + public ByteBuffer toByteBuffer() + { + return null; + } + + @Override + public ByteBuffer toByteBuffer(int index, int length) + { + return null; + } + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/pom.xml b/hornetq-protocols/hornetq-openwire-protocol/pom.xml new file mode 100644 index 0000000..447ceb0 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/pom.xml @@ -0,0 +1,49 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>hornetq-protocols</artifactId> + <groupId>org.hornetq</groupId> + <version>2.5.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>hornetq-openwire-protocol</artifactId> + + <properties> + <hornetq.basedir>${project.parent.parent.basedir}</hornetq.basedir> + </properties> + + <dependencies> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging-processor</artifactId> + </dependency> + + <!-- + JBoss Logging + --> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging</artifactId> + </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + </dependency> + <dependency> + <groupId>org.slf4j</groupId> + <artifactId>slf4j-api</artifactId> + </dependency> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.hornetq</groupId> + <artifactId>hornetq-jms-client</artifactId> + <version>${project.version}</version> + </dependency> + </dependencies> + +</project> http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/AMQConnectorImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/AMQConnectorImpl.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/AMQConnectorImpl.java new file mode 100644 index 0000000..3732a54 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/AMQConnectorImpl.java @@ -0,0 +1,121 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +import org.apache.activemq.command.BrokerInfo; +import org.apache.activemq.command.ConnectionControl; +import org.hornetq.core.protocol.openwire.amq.AMQConnector; +import org.hornetq.core.protocol.openwire.amq.AMQConnectorStatistics; +import org.hornetq.spi.core.remoting.Acceptor; + +public class AMQConnectorImpl implements AMQConnector +{ + private Acceptor acceptor; + + public AMQConnectorImpl(Acceptor acceptorUsed) + { + this.acceptor = acceptorUsed; + } + + @Override + public BrokerInfo getBrokerInfo() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public AMQConnectorStatistics getStatistics() + { + // TODO Auto-generated method stub + return null; + } + + @Override + public boolean isUpdateClusterClients() + { + // TODO Auto-generated method stub + return false; + } + + @Override + public boolean isRebalanceClusterClients() + { + // TODO Auto-generated method stub + return false; + } + + @Override + public void updateClientClusterInfo() + { + // TODO Auto-generated method stub + + } + + @Override + public boolean isUpdateClusterClientsOnRemove() + { + // TODO Auto-generated method stub + return false; + } + + @Override + public int connectionCount() + { + // TODO Auto-generated method stub + return 0; + } + + @Override + public boolean isAllowLinkStealing() + { + // TODO Auto-generated method stub + return false; + } + + @Override + public ConnectionControl getConnectionControl() + { + return new ConnectionControl(); + } + + @Override + public void onStarted(OpenWireConnection connection) + { + // TODO Auto-generated method stub + + } + + @Override + public void onStopped(OpenWireConnection connection) + { + // TODO Auto-generated method stub + + } + + public int getMaximumConsumersAllowedPerConnection() + { + return 1000000;//this belongs to configuration, now hardcoded + } + + public int getMaximumProducersAllowedPerConnection() + { + return 1000000;//this belongs to configuration, now hardcoded + } + + public boolean isAuditNetworkProducers() + { + return false; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/AMQTransactionImpl.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/AMQTransactionImpl.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/AMQTransactionImpl.java new file mode 100644 index 0000000..d0d5a30 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/AMQTransactionImpl.java @@ -0,0 +1,74 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +import org.hornetq.core.persistence.StorageManager; +import org.hornetq.core.server.Queue; +import org.hornetq.core.server.impl.RefsOperation; +import org.hornetq.core.transaction.Transaction; +import org.hornetq.core.transaction.impl.TransactionImpl; + +import javax.transaction.xa.Xid; + +public class AMQTransactionImpl extends TransactionImpl +{ + private boolean rollbackForClose = false; + + public AMQTransactionImpl(StorageManager storageManager, int timeoutSeconds) + { + super(storageManager, timeoutSeconds); + } + + public AMQTransactionImpl(StorageManager storageManager) + { + super(storageManager); + } + + public AMQTransactionImpl(Xid xid, StorageManager storageManager, int timeoutSeconds) + { + super(xid, storageManager, timeoutSeconds); + } + + public AMQTransactionImpl(long id, Xid xid, StorageManager storageManager) + { + super(id, xid, storageManager); + } + + @Override + public RefsOperation createRefsOperation(Queue queue) + { + return new AMQrefsOperation(queue, storageManager); + } + + public class AMQrefsOperation extends RefsOperation + { + public AMQrefsOperation(Queue queue, StorageManager storageManager) + { + super(queue, storageManager); + } + + @Override + public void afterRollback(Transaction tx) + { + if (rollbackForClose) + { + super.afterRollback(tx); + } + } + } + + public void setRollbackForClose() + { + this.rollbackForClose = true; + } +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/BrokerState.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/BrokerState.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/BrokerState.java new file mode 100644 index 0000000..fb67b1d --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/BrokerState.java @@ -0,0 +1,23 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +/** + * The class holds related states of an activemq broker. + * @author howard + * + */ +public class BrokerState +{ + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/DataInputWrapper.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/DataInputWrapper.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/DataInputWrapper.java new file mode 100644 index 0000000..911f034 --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/DataInputWrapper.java @@ -0,0 +1,257 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +import java.io.DataInput; +import java.io.IOException; +import java.nio.ByteBuffer; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.utils.UTF8Util; +import org.hornetq.utils.UTF8Util.StringUtilBuffer; + +public class DataInputWrapper implements DataInput +{ + private static final int DEFAULT_CAPACITY = 1024 * 1024; + private static final NotEnoughBytesException exception = new NotEnoughBytesException(); + private ByteBuffer internalBuffer; + + public DataInputWrapper() + { + this(DEFAULT_CAPACITY); + } + + public DataInputWrapper(int capacity) + { + this.internalBuffer = ByteBuffer.allocateDirect(capacity); + this.internalBuffer.mark(); + this.internalBuffer.limit(0); + } + + public void receiveData(byte[] data) + { + int newSize = data.length; + int freeSpace = internalBuffer.capacity() - internalBuffer.limit(); + if (freeSpace < newSize) + { + internalBuffer.reset(); + internalBuffer.compact(); + if (internalBuffer.remaining() < newSize) + { + //need to enlarge + } + //make sure mark is at zero and position is at effective limit + int pos = internalBuffer.position(); + internalBuffer.position(0); + internalBuffer.mark(); + internalBuffer.position(pos); + } + else + { + internalBuffer.position(internalBuffer.limit()); + internalBuffer.limit(internalBuffer.capacity()); + } + internalBuffer.put(data); + internalBuffer.limit(internalBuffer.position()); + internalBuffer.reset(); + } + + public void receiveData(HornetQBuffer buffer) + { + int newSize = buffer.readableBytes(); + byte[] newData = new byte[newSize]; + buffer.readBytes(newData); + this.receiveData(newData); + } + + //invoke after each successful unmarshall + public void mark() + { + this.internalBuffer.mark(); + } + + @Override + public void readFully(byte[] b) throws IOException + { + readFully(b, 0, b.length); + } + + private void checkSize(int n) throws NotEnoughBytesException + { + if (internalBuffer.remaining() < n) + { + throw exception; + } + } + + @Override + public void readFully(byte[] b, int off, int len) throws IOException + { + checkSize(len); + internalBuffer.get(b, off, len); + } + + @Override + public int skipBytes(int n) throws IOException + { + checkSize(n); + int pos = internalBuffer.position(); + internalBuffer.position(pos + n); + return n; + } + + @Override + public boolean readBoolean() throws IOException + { + checkSize(1); + byte b = internalBuffer.get(); + return b != 0; + } + + @Override + public byte readByte() throws IOException + { + checkSize(1); + return this.internalBuffer.get(); + } + + @Override + public int readUnsignedByte() throws IOException + { + checkSize(1); + return 0xFF & this.internalBuffer.get(); + } + + @Override + public short readShort() throws IOException + { + checkSize(2); + return this.internalBuffer.getShort(); + } + + @Override + public int readUnsignedShort() throws IOException + { + checkSize(2); + return 0xFFFF & this.internalBuffer.getShort(); + } + + @Override + public char readChar() throws IOException + { + checkSize(2); + return this.internalBuffer.getChar(); + } + + @Override + public int readInt() throws IOException + { + checkSize(4); + return this.internalBuffer.getInt(); + } + + @Override + public long readLong() throws IOException + { + checkSize(8); + return this.internalBuffer.getLong(); + } + + @Override + public float readFloat() throws IOException + { + checkSize(4); + return this.internalBuffer.getFloat(); + } + + @Override + public double readDouble() throws IOException + { + checkSize(8); + return this.internalBuffer.getDouble(); + } + + @Override + public String readLine() throws IOException + { + StringBuilder sb = new StringBuilder(""); + char c = this.readChar(); + while (c != '\n') + { + sb.append(c); + c = this.readChar(); + } + return sb.toString(); + } + + @Override + public String readUTF() throws IOException + { + StringUtilBuffer buffer = UTF8Util.getThreadLocalBuffer(); + + final int size = this.readUnsignedShort(); + + if (size > buffer.byteBuffer.length) + { + buffer.resizeByteBuffer(size); + } + + if (size > buffer.charBuffer.length) + { + buffer.resizeCharBuffer(size); + } + + int count = 0; + int byte1, byte2, byte3; + int charCount = 0; + + this.readFully(buffer.byteBuffer, 0, size); + + while (count < size) + { + byte1 = buffer.byteBuffer[count++]; + + if (byte1 > 0 && byte1 <= 0x7F) + { + buffer.charBuffer[charCount++] = (char)byte1; + } + else + { + int c = byte1 & 0xff; + switch (c >> 4) + { + case 0xc: + case 0xd: + byte2 = buffer.byteBuffer[count++]; + buffer.charBuffer[charCount++] = (char)((c & 0x1F) << 6 | byte2 & 0x3F); + break; + case 0xe: + byte2 = buffer.byteBuffer[count++]; + byte3 = buffer.byteBuffer[count++]; + buffer.charBuffer[charCount++] = (char)((c & 0x0F) << 12 | (byte2 & 0x3F) << 6 | (byte3 & 0x3F) << 0); + break; + default: + throw new InternalError("unhandled utf8 byte " + c); + } + } + } + + return new String(buffer.charBuffer, 0, charCount); + } + + public boolean readable() + { + return this.internalBuffer.hasRemaining(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/NotEnoughBytesException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/NotEnoughBytesException.java b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/NotEnoughBytesException.java new file mode 100644 index 0000000..940649f --- /dev/null +++ b/hornetq-protocols/hornetq-openwire-protocol/src/main/java/org/hornetq/core/protocol/openwire/NotEnoughBytesException.java @@ -0,0 +1,21 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.openwire; + +import java.io.IOException; + +public class NotEnoughBytesException extends IOException +{ + private static final long serialVersionUID = 3752739907942923658L; + +}
