http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/pom.xml b/activemq6-protocols/activemq6-amqp-protocol/pom.xml new file mode 100644 index 0000000..b066415 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/pom.xml @@ -0,0 +1,76 @@ +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> + <parent> + <artifactId>activemq6-protocols</artifactId> + <groupId>org.apache.activemq6</groupId> + <version>6.0.0-SNAPSHOT</version> + </parent> + <modelVersion>4.0.0</modelVersion> + + <artifactId>activemq6-amqp-protocol</artifactId> + + <properties> + <hornetq.basedir>${project.basedir}/../..</hornetq.basedir> + </properties> + + <dependencies> + <!-- JMS Client because of some Convertions that are done --> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-jms-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-core-client</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging-processor</artifactId> + </dependency> + + <!-- + JBoss Logging + --> + <dependency> + <groupId>org.jboss.logging</groupId> + <artifactId>jboss-logging</artifactId> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-server</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.activemq6</groupId> + <artifactId>activemq6-proton-plug</artifactId> + <version>${project.version}</version> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-j</artifactId> + </dependency> + <dependency> + <groupId>org.apache.qpid</groupId> + <artifactId>proton-jms</artifactId> + </dependency> + + + + <dependency> + <groupId>org.jboss.spec.javax.jms</groupId> + <artifactId>jboss-jms-api_2.0_spec</artifactId> + <scope>provided</scope> + </dependency> + <dependency> + <groupId>junit</groupId> + <artifactId>junit</artifactId> + <scope>test</scope> + </dependency> + + + </dependencies> + + +</project>
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/HornetQProtonRemotingConnection.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/HornetQProtonRemotingConnection.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/HornetQProtonRemotingConnection.java new file mode 100644 index 0000000..e526d52 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/HornetQProtonRemotingConnection.java @@ -0,0 +1,146 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.proton; + +import java.util.concurrent.Executor; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.core.client.HornetQClientLogger; +import org.apache.activemq6.spi.core.protocol.AbstractRemotingConnection; +import org.apache.activemq6.spi.core.remoting.Connection; +import org.proton.plug.AMQPConnectionContext; + +/** + * + * This is a Server's Connection representation used by HornetQ. + * @author Clebert Suconic + */ + +public class HornetQProtonRemotingConnection extends AbstractRemotingConnection +{ + private final AMQPConnectionContext amqpConnection; + + private boolean destroyed = false; + + private final ProtonProtocolManager manager; + + + public HornetQProtonRemotingConnection(ProtonProtocolManager manager, AMQPConnectionContext amqpConnection, Connection transportConnection, Executor executor) + { + super(transportConnection, executor); + this.manager = manager; + this.amqpConnection = amqpConnection; + } + + public ProtonProtocolManager getManager() + { + return manager; + } + + /* + * This can be called concurrently by more than one thread so needs to be locked + */ + public void fail(final HornetQException me, String scaleDownTargetNodeID) + { + if (destroyed) + { + return; + } + + destroyed = true; + + HornetQClientLogger.LOGGER.connectionFailureDetected(me.getMessage(), me.getType()); + + // Then call the listeners + callFailureListeners(me, scaleDownTargetNodeID); + + callClosingListeners(); + + internalClose(); + } + + + @Override + public void destroy() + { + synchronized (this) + { + if (destroyed) + { + return; + } + + destroyed = true; + } + + + callClosingListeners(); + + internalClose(); + + } + + @Override + public boolean isClient() + { + return false; + } + + @Override + public boolean isDestroyed() + { + return destroyed; + } + + @Override + public void disconnect(boolean criticalError) + { + getTransportConnection().close(); + } + + /** + * Disconnect the connection, closing all channels + */ + @Override + public void disconnect(String scaleDownNodeID, boolean criticalError) + { + getTransportConnection().close(); + } + + @Override + public boolean checkDataReceived() + { + return amqpConnection.checkDataReceived(); + } + + @Override + public void flush() + { + amqpConnection.flush(); + } + + @Override + public void bufferReceived(Object connectionID, HornetQBuffer buffer) + { + amqpConnection.inputBuffer(buffer.byteBuf()); + super.bufferReceived(connectionID, buffer); + } + + private void internalClose() + { + // We close the underlying transport connection + getTransportConnection().close(); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManager.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManager.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManager.java new file mode 100644 index 0000000..8398866 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManager.java @@ -0,0 +1,122 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.proton; + +import java.util.concurrent.Executor; + +import io.netty.channel.ChannelPipeline; +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.core.protocol.proton.converter.ProtonMessageConverter; +import org.apache.activemq6.core.protocol.proton.plug.HornetQProtonConnectionCallback; +import org.apache.activemq6.core.remoting.impl.netty.NettyServerConnection; +import org.apache.activemq6.core.server.HornetQServer; +import org.apache.activemq6.core.server.management.Notification; +import org.apache.activemq6.core.server.management.NotificationListener; +import org.apache.activemq6.spi.core.protocol.ConnectionEntry; +import org.apache.activemq6.spi.core.protocol.MessageConverter; +import org.apache.activemq6.spi.core.protocol.ProtocolManager; +import org.apache.activemq6.spi.core.protocol.RemotingConnection; +import org.apache.activemq6.spi.core.remoting.Acceptor; +import org.apache.activemq6.spi.core.remoting.Connection; +import org.proton.plug.AMQPServerConnectionContext; +import org.proton.plug.context.server.ProtonServerConnectionContextFactory; + +/** + * A proton protocol manager, basically reads the Proton Input and maps proton resources to HornetQ resources + * + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public class ProtonProtocolManager implements ProtocolManager, NotificationListener +{ + private final HornetQServer server; + + private MessageConverter protonConverter; + + public ProtonProtocolManager(HornetQServer server) + { + this.server = server; + this.protonConverter = new ProtonMessageConverter(server.getStorageManager()); + } + + public HornetQServer getServer() + { + return server; + } + + + @Override + public MessageConverter getConverter() + { + return protonConverter; + } + + @Override + public void onNotification(Notification notification) + { + + } + + @Override + public ConnectionEntry createConnectionEntry(Acceptor acceptorUsed, Connection remotingConnection) + { + HornetQProtonConnectionCallback connectionCallback = new HornetQProtonConnectionCallback(this, remotingConnection); + + AMQPServerConnectionContext amqpConnection = ProtonServerConnectionContextFactory.getFactory().createConnection(connectionCallback); + + Executor executor = server.getExecutorFactory().getExecutor(); + + HornetQProtonRemotingConnection delegate = new HornetQProtonRemotingConnection(this, amqpConnection, remotingConnection, executor); + + connectionCallback.setProtonConnectionDelegate(delegate); + + ConnectionEntry entry = new ConnectionEntry(delegate, executor, + System.currentTimeMillis(), HornetQClient.DEFAULT_CONNECTION_TTL); + + return entry; + } + + @Override + public void removeHandler(String name) + { + + } + + @Override + public void handleBuffer(RemotingConnection connection, HornetQBuffer buffer) + { + HornetQProtonRemotingConnection protonConnection = (HornetQProtonRemotingConnection)connection; + + protonConnection.bufferReceived(protonConnection.getID(), buffer); + } + + @Override + public void addChannelHandlers(ChannelPipeline pipeline) + { + + } + + @Override + public boolean isProtocol(byte[] array) + { + return array.length >= 4 && array[0] == (byte) 'A' && array[1] == (byte) 'M' && array[2] == (byte) 'Q' && array[3] == (byte) 'P'; + } + + @Override + public void handshake(NettyServerConnection connection, HornetQBuffer buffer) + { + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManagerFactory.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManagerFactory.java new file mode 100644 index 0000000..68cc9a5 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/ProtonProtocolManagerFactory.java @@ -0,0 +1,43 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.proton; + +import org.apache.activemq6.api.core.Interceptor; +import org.apache.activemq6.core.server.HornetQServer; +import org.apache.activemq6.spi.core.protocol.ProtocolManager; +import org.apache.activemq6.spi.core.protocol.ProtocolManagerFactory; + +import java.util.List; + +/** + * @author <a href="mailto:[email protected]">Andy Taylor</a> + */ +public class ProtonProtocolManagerFactory implements ProtocolManagerFactory +{ + private static final String AMQP_PROTOCOL_NAME = "AMQP"; + + private static String[] SUPPORTED_PROTOCOLS = {AMQP_PROTOCOL_NAME}; + + @Override + public ProtocolManager createProtocolManager(HornetQServer server, List<Interceptor> incomingInterceptors, List<Interceptor> outgoingInterceptors) + { + return new ProtonProtocolManager(server); + } + + @Override + public String[] getProtocols() + { + return SUPPORTED_PROTOCOLS; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/HornetQJMSVendor.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/HornetQJMSVendor.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/HornetQJMSVendor.java new file mode 100644 index 0000000..725ecfc --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/HornetQJMSVendor.java @@ -0,0 +1,155 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.proton.converter; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.qpid.proton.jms.JMSVendor; +import org.apache.activemq6.core.buffers.impl.ResetLimitWrappedHornetQBuffer; +import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSMapMessage; +import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSMessage; +import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSStreamMessage; +import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSTextMessage; +import org.apache.activemq6.core.server.ServerMessage; +import org.apache.activemq6.core.server.impl.ServerMessageImpl; +import org.apache.activemq6.utils.IDGenerator; + +/** + * @author Clebert Suconic + */ + +public class HornetQJMSVendor extends JMSVendor +{ + + private final IDGenerator serverGenerator; + + HornetQJMSVendor(IDGenerator idGenerator) + { + this.serverGenerator = idGenerator; + } + + @Override + public BytesMessage createBytesMessage() + { + return new ServerJMSBytesMessage(newMessage(org.apache.activemq6.api.core.Message.BYTES_TYPE), 0); + } + + @Override + public StreamMessage createStreamMessage() + { + return new ServerJMSStreamMessage(newMessage(org.apache.activemq6.api.core.Message.STREAM_TYPE), 0); + } + + @Override + public Message createMessage() + { + return new ServerJMSMessage(newMessage(org.apache.activemq6.api.core.Message.DEFAULT_TYPE), 0 ); + } + + @Override + public TextMessage createTextMessage() + { + return new ServerJMSTextMessage(newMessage(org.apache.activemq6.api.core.Message.TEXT_TYPE), 0); + } + + @Override + public ObjectMessage createObjectMessage() + { + return null; + } + + @Override + public MapMessage createMapMessage() + { + return new ServerJMSMapMessage(newMessage(org.apache.activemq6.api.core.Message.MAP_TYPE), 0); + } + + @Override + public void setJMSXUserID(Message message, String s) + { + } + + @Override + public Destination createDestination(String name) + { + return super.createDestination(name); + } + + @Override + public <T extends Destination> T createDestination(String name, Class<T> kind) + { + return super.createDestination(name, kind); + } + + @Override + public void setJMSXGroupID(Message message, String s) + { + + } + + @Override + public void setJMSXGroupSequence(Message message, int i) + { + + } + + @Override + public void setJMSXDeliveryCount(Message message, long l) + { + + } + + + public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) + { + switch (messageType) + { + case org.apache.activemq6.api.core.Message.STREAM_TYPE: + return new ServerJMSStreamMessage(wrapped, deliveryCount); + case org.apache.activemq6.api.core.Message.BYTES_TYPE: + return new ServerJMSBytesMessage(wrapped, deliveryCount); + case org.apache.activemq6.api.core.Message.MAP_TYPE: + return new ServerJMSMapMessage(wrapped, deliveryCount); + case org.apache.activemq6.api.core.Message.TEXT_TYPE: + return new ServerJMSTextMessage(wrapped, deliveryCount); + default: + return new ServerJMSMessage(wrapped, deliveryCount); + } + + } + + + @Override + public String toAddress(Destination destination) + { + return null; + } + + + private ServerMessageImpl newMessage(byte messageType) + { + ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512); + message.setType(messageType); + ((ResetLimitWrappedHornetQBuffer)message.getBodyBuffer()).setMessage(null); + return message; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/ProtonMessageConverter.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/ProtonMessageConverter.java new file mode 100644 index 0000000..8081f6c --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/ProtonMessageConverter.java @@ -0,0 +1,78 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.proton.converter; + +import org.apache.qpid.proton.jms.EncodedMessage; +import org.apache.qpid.proton.jms.InboundTransformer; +import org.apache.qpid.proton.jms.JMSMappingInboundTransformer; +import org.apache.qpid.proton.jms.JMSMappingOutboundTransformer; +import org.apache.activemq6.core.protocol.proton.converter.jms.ServerJMSMessage; +import org.apache.activemq6.core.server.ServerMessage; +import org.apache.activemq6.spi.core.protocol.MessageConverter; +import org.apache.activemq6.utils.IDGenerator; + +/** + * @author Clebert Suconic + */ + +public class ProtonMessageConverter implements MessageConverter +{ + + + HornetQJMSVendor hornetQJMSVendor; + + public ProtonMessageConverter(IDGenerator idGenerator) + { + hornetQJMSVendor = new HornetQJMSVendor(idGenerator); + inboundTransformer = new JMSMappingInboundTransformer(hornetQJMSVendor); + outboundTransformer = new JMSMappingOutboundTransformer(hornetQJMSVendor); + } + + private final InboundTransformer inboundTransformer; + private final JMSMappingOutboundTransformer outboundTransformer; + + @Override + public ServerMessage inbound(Object messageSource) throws Exception + { + ServerJMSMessage jmsMessage = inboundJMSType((EncodedMessage) messageSource); + + return (ServerMessage)jmsMessage.getInnerMessage(); + } + + /** + * Just create the JMS Part of the inbound (for testing) + * @param messageSource + * @return + * @throws Exception + */ + public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception + { + EncodedMessage encodedMessageSource = messageSource; + ServerJMSMessage transformedMessage = (ServerJMSMessage)inboundTransformer.transform(encodedMessageSource); + + transformedMessage.encode(); + + return transformedMessage; + } + + + @Override + public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception + { + ServerJMSMessage jmsMessage = hornetQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); + jmsMessage.decode(); + + return outboundTransformer.convert(jmsMessage); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java new file mode 100644 index 0000000..532f721 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java @@ -0,0 +1,239 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.proton.converter.jms; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; + +import org.apache.activemq6.core.message.impl.MessageImpl; +import org.apache.activemq6.core.message.impl.MessageInternal; + +import static org.apache.activemq6.reader.BytesMessageUtil.bytesMessageReset; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadBoolean; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadByte; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadBytes; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadChar; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadDouble; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadFloat; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadInt; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadLong; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadShort; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUTF; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUnsignedByte; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesReadUnsignedShort; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteBoolean; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteByte; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteBytes; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteChar; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteDouble; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteFloat; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteInt; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteLong; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteObject; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteShort; +import static org.apache.activemq6.reader.BytesMessageUtil.bytesWriteUTF; + + +/** + * @author Clebert Suconic + */ + +public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage +{ + public ServerJMSBytesMessage(MessageInternal message, int deliveryCount) + { + super(message, deliveryCount); + } + + @Override + public long getBodyLength() throws JMSException + { + return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET; + } + + @Override + public boolean readBoolean() throws JMSException + { + return bytesReadBoolean(message); + } + + @Override + public byte readByte() throws JMSException + { + return bytesReadByte(message); + } + + @Override + public int readUnsignedByte() throws JMSException + { + return bytesReadUnsignedByte(message); + } + + @Override + public short readShort() throws JMSException + { + return bytesReadShort(message); + } + + @Override + public int readUnsignedShort() throws JMSException + { + return bytesReadUnsignedShort(message); + } + + @Override + public char readChar() throws JMSException + { + return bytesReadChar(message); + } + + @Override + public int readInt() throws JMSException + { + return bytesReadInt(message); + } + + @Override + public long readLong() throws JMSException + { + return bytesReadLong(message); + } + + @Override + public float readFloat() throws JMSException + { + return bytesReadFloat(message); + } + + @Override + public double readDouble() throws JMSException + { + return bytesReadDouble(message); + } + + @Override + public String readUTF() throws JMSException + { + return bytesReadUTF(message); + } + + @Override + public int readBytes(byte[] value) throws JMSException + { + return bytesReadBytes(message, value); + } + + @Override + public int readBytes(byte[] value, int length) throws JMSException + { + return bytesReadBytes(message, value, length); + } + + @Override + public void writeBoolean(boolean value) throws JMSException + { + bytesWriteBoolean(message, value); + + } + + @Override + public void writeByte(byte value) throws JMSException + { + bytesWriteByte(message, value); + } + + @Override + public void writeShort(short value) throws JMSException + { + bytesWriteShort(message, value); + } + + @Override + public void writeChar(char value) throws JMSException + { + bytesWriteChar(message, value); + } + + @Override + public void writeInt(int value) throws JMSException + { + bytesWriteInt(message, value); + } + + @Override + public void writeLong(long value) throws JMSException + { + bytesWriteLong(message, value); + } + + @Override + public void writeFloat(float value) throws JMSException + { + bytesWriteFloat(message, value); + } + + @Override + public void writeDouble(double value) throws JMSException + { + bytesWriteDouble(message, value); + } + + @Override + public void writeUTF(String value) throws JMSException + { + bytesWriteUTF(message, value); + } + + @Override + public void writeBytes(byte[] value) throws JMSException + { + bytesWriteBytes(message, value); + } + + @Override + public void writeBytes(byte[] value, int offset, int length) throws JMSException + { + bytesWriteBytes(message, value, offset, length); + } + + @Override + public void writeObject(Object value) throws JMSException + { + if (!bytesWriteObject(message, value)) + { + throw new JMSException("Can't make conversion of " + value + " to any known type"); + } + } + + public void encode() throws Exception + { + super.encode(); + // this is to make sure we encode the body-length before it's persisted + getBodyLength(); + } + + + public void decode() throws Exception + { + super.decode(); + + } + + @Override + public void reset() throws JMSException + { + bytesMessageReset(message); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMapMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMapMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMapMessage.java new file mode 100644 index 0000000..ad80904 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMapMessage.java @@ -0,0 +1,326 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.proton.converter.jms; + +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageFormatException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Set; + +import org.apache.activemq6.api.core.HornetQPropertyConversionException; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.message.impl.MessageInternal; +import org.apache.activemq6.utils.TypedProperties; + +import static org.apache.activemq6.reader.MapMessageUtil.readBodyMap; +import static org.apache.activemq6.reader.MapMessageUtil.writeBodyMap; + +/** + * HornetQ implementation of a JMS MapMessage. + * + * @author Norbert Lataille ([email protected]) + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @version $Revision: 3412 $ + */ +public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMessage +{ + // Constants ----------------------------------------------------- + + public static final byte TYPE = Message.MAP_TYPE; + + // Attributes ---------------------------------------------------- + + private final TypedProperties map = new TypedProperties(); + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + /* + * This constructor is used to construct messages prior to sending + */ + public ServerJMSMapMessage(MessageInternal message, int deliveryCount) + { + super(message, deliveryCount); + + } + + // MapMessage implementation ------------------------------------- + + public void setBoolean(final String name, final boolean value) throws JMSException + { + map.putBooleanProperty(new SimpleString(name), value); + } + + public void setByte(final String name, final byte value) throws JMSException + { + map.putByteProperty(new SimpleString(name), value); + } + + public void setShort(final String name, final short value) throws JMSException + { + map.putShortProperty(new SimpleString(name), value); + } + + public void setChar(final String name, final char value) throws JMSException + { + map.putCharProperty(new SimpleString(name), value); + } + + public void setInt(final String name, final int value) throws JMSException + { + map.putIntProperty(new SimpleString(name), value); + } + + public void setLong(final String name, final long value) throws JMSException + { + map.putLongProperty(new SimpleString(name), value); + } + + public void setFloat(final String name, final float value) throws JMSException + { + map.putFloatProperty(new SimpleString(name), value); + } + + public void setDouble(final String name, final double value) throws JMSException + { + map.putDoubleProperty(new SimpleString(name), value); + } + + public void setString(final String name, final String value) throws JMSException + { + map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value)); + } + + public void setBytes(final String name, final byte[] value) throws JMSException + { + map.putBytesProperty(new SimpleString(name), value); + } + + public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException + { + if (offset + length > value.length) + { + throw new JMSException("Invalid offset/length"); + } + byte[] newBytes = new byte[length]; + System.arraycopy(value, offset, newBytes, 0, length); + map.putBytesProperty(new SimpleString(name), newBytes); + } + + public void setObject(final String name, final Object value) throws JMSException + { + try + { + TypedProperties.setObjectProperty(new SimpleString(name), value, map); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public boolean getBoolean(final String name) throws JMSException + { + try + { + return map.getBooleanProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public byte getByte(final String name) throws JMSException + { + try + { + return map.getByteProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public short getShort(final String name) throws JMSException + { + try + { + return map.getShortProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public char getChar(final String name) throws JMSException + { + try + { + return map.getCharProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public int getInt(final String name) throws JMSException + { + try + { + return map.getIntProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public long getLong(final String name) throws JMSException + { + try + { + return map.getLongProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public float getFloat(final String name) throws JMSException + { + try + { + return map.getFloatProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public double getDouble(final String name) throws JMSException + { + try + { + return map.getDoubleProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public String getString(final String name) throws JMSException + { + try + { + SimpleString str = map.getSimpleStringProperty(new SimpleString(name)); + if (str == null) + { + return null; + } + else + { + return str.toString(); + } + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public byte[] getBytes(final String name) throws JMSException + { + try + { + return map.getBytesProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public Object getObject(final String name) throws JMSException + { + Object val = map.getProperty(new SimpleString(name)); + + if (val instanceof SimpleString) + { + val = ((SimpleString) val).toString(); + } + + return val; + } + + public Enumeration getMapNames() throws JMSException + { + Set<SimpleString> simplePropNames = map.getPropertyNames(); + Set<String> propNames = new HashSet<String>(simplePropNames.size()); + + for (SimpleString str : simplePropNames) + { + propNames.add(str.toString()); + } + + return Collections.enumeration(propNames); + } + + public boolean itemExists(final String name) throws JMSException + { + return map.containsProperty(new SimpleString(name)); + } + + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + map.clear(); + } + + + public void encode() throws Exception + { + super.encode(); + writeBodyMap(message, map); + } + + public void decode() throws Exception + { + super.decode(); + readBodyMap(message, map); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMessage.java new file mode 100644 index 0000000..e6f0d8f --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSMessage.java @@ -0,0 +1,435 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.proton.converter.jms; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import java.util.Collections; +import java.util.Enumeration; + +import org.apache.activemq6.api.core.HornetQException; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.message.impl.MessageInternal; +import org.apache.activemq6.jms.client.HornetQDestination; +import org.apache.activemq6.jms.client.HornetQQueue; +import org.apache.activemq6.reader.MessageUtil; + +/** + * @author Clebert Suconic + */ + +public class ServerJMSMessage implements Message +{ + protected final MessageInternal message; + + protected int deliveryCount; + + public MessageInternal getInnerMessage() + { + return message; + } + + + public ServerJMSMessage(MessageInternal message, int deliveryCount) + { + this.message = message; + this.deliveryCount = deliveryCount; + } + + + @Override + public final String getJMSMessageID() throws JMSException + { + return null; + } + + @Override + public final void setJMSMessageID(String id) throws JMSException + { + } + + @Override + public final long getJMSTimestamp() throws JMSException + { + return message.getTimestamp(); + } + + @Override + public final void setJMSTimestamp(long timestamp) throws JMSException + { + message.setTimestamp(timestamp); + } + + + @Override + public final byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return MessageUtil.getJMSCorrelationIDAsBytes(message); + } + + @Override + public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException + { + try + { + MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID); + } + catch (HornetQException e) + { + throw new JMSException(e.getMessage()); + } + } + + @Override + public final void setJMSCorrelationID(String correlationID) throws JMSException + { + MessageUtil.setJMSCorrelationID(message, correlationID); + } + + @Override + public final String getJMSCorrelationID() throws JMSException + { + return MessageUtil.getJMSCorrelationID(message); + } + + @Override + public final Destination getJMSReplyTo() throws JMSException + { + SimpleString reply = MessageUtil.getJMSReplyTo(message); + if (reply != null) + { + return HornetQDestination.fromAddress(reply.toString()); + } + else + { + return null; + } + } + + @Override + public final void setJMSReplyTo(Destination replyTo) throws JMSException + { + MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((HornetQDestination) replyTo).getSimpleAddress()); + + } + + public final Destination getJMSDestination() throws JMSException + { + SimpleString sdest = message.getAddress(); + + if (sdest == null) + { + return null; + } + else + { + if (!sdest.toString().startsWith("jms.")) + { + return new HornetQQueue(sdest.toString(), sdest.toString()); + } + else + { + return HornetQDestination.fromAddress(sdest.toString()); + } + } + } + + @Override + public final void setJMSDestination(Destination destination) throws JMSException + { + if (destination == null) + { + message.setAddress(null); + } + else + { + message.setAddress(((HornetQDestination) destination).getSimpleAddress()); + } + + } + + @Override + public final int getJMSDeliveryMode() throws JMSException + { + return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; + } + + @Override + public final void setJMSDeliveryMode(int deliveryMode) throws JMSException + { + if (deliveryMode == DeliveryMode.PERSISTENT) + { + message.setDurable(true); + } + else if (deliveryMode == DeliveryMode.NON_PERSISTENT) + { + message.setDurable(false); + } + else + { + throw new JMSException("Invalid mode " + deliveryMode); + } + } + + @Override + public final boolean getJMSRedelivered() throws JMSException + { + return false; + } + + @Override + public final void setJMSRedelivered(boolean redelivered) throws JMSException + { + // no op + } + + @Override + public final String getJMSType() throws JMSException + { + return MessageUtil.getJMSType(message); + } + + @Override + public final void setJMSType(String type) throws JMSException + { + MessageUtil.setJMSType(message, type); + } + + @Override + public final long getJMSExpiration() throws JMSException + { + return message.getExpiration(); + } + + @Override + public final void setJMSExpiration(long expiration) throws JMSException + { + message.setExpiration(expiration); + } + + @Override + public final long getJMSDeliveryTime() throws JMSException + { + // no op + return 0; + } + + @Override + public final void setJMSDeliveryTime(long deliveryTime) throws JMSException + { + // no op + } + + @Override + public final int getJMSPriority() throws JMSException + { + return message.getPriority(); + } + + @Override + public final void setJMSPriority(int priority) throws JMSException + { + message.setPriority((byte) priority); + } + + @Override + public final void clearProperties() throws JMSException + { + MessageUtil.clearProperties(message); + + } + + @Override + public final boolean propertyExists(String name) throws JMSException + { + return MessageUtil.propertyExists(message, name); + } + + @Override + public final boolean getBooleanProperty(String name) throws JMSException + { + return message.getBooleanProperty(name); + } + + @Override + public final byte getByteProperty(String name) throws JMSException + { + return message.getByteProperty(name); + } + + @Override + public final short getShortProperty(String name) throws JMSException + { + return message.getShortProperty(name); + } + + @Override + public final int getIntProperty(String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return deliveryCount; + } + + return message.getIntProperty(name); + } + + @Override + public final long getLongProperty(String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return deliveryCount; + } + + return message.getLongProperty(name); + } + + @Override + public final float getFloatProperty(String name) throws JMSException + { + return message.getFloatProperty(name); + } + + @Override + public final double getDoubleProperty(String name) throws JMSException + { + return message.getDoubleProperty(name); + } + + @Override + public final String getStringProperty(String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return String.valueOf(deliveryCount); + } + + + return message.getStringProperty(name); + } + + @Override + public final Object getObjectProperty(String name) throws JMSException + { + Object val = message.getObjectProperty(name); + if (val instanceof SimpleString) + { + val = ((SimpleString)val).toString(); + } + return val; + } + + @Override + public final Enumeration getPropertyNames() throws JMSException + { + return Collections.enumeration(MessageUtil.getPropertyNames(message)); + } + + @Override + public final void setBooleanProperty(String name, boolean value) throws JMSException + { + message.putBooleanProperty(name, value); + } + + @Override + public final void setByteProperty(String name, byte value) throws JMSException + { + message.putByteProperty(name, value); + } + + @Override + public final void setShortProperty(String name, short value) throws JMSException + { + message.putShortProperty(name, value); + } + + @Override + public final void setIntProperty(String name, int value) throws JMSException + { + message.putIntProperty(name, value); + } + + @Override + public final void setLongProperty(String name, long value) throws JMSException + { + message.putLongProperty(name, value); + } + + @Override + public final void setFloatProperty(String name, float value) throws JMSException + { + message.putFloatProperty(name, value); + } + + @Override + public final void setDoubleProperty(String name, double value) throws JMSException + { + message.putDoubleProperty(name, value); + } + + @Override + public final void setStringProperty(String name, String value) throws JMSException + { + message.putStringProperty(name, value); + } + + @Override + public final void setObjectProperty(String name, Object value) throws JMSException + { + message.putObjectProperty(name, value); + } + + @Override + public final void acknowledge() throws JMSException + { + // no op + } + + @Override + public void clearBody() throws JMSException + { + message.getBodyBuffer().clear(); + } + + @Override + public final <T> T getBody(Class<T> c) throws JMSException + { + // no op.. jms2 not used on the conversion + return null; + } + + /** + * Encode the body into the internal message + */ + public void encode() throws Exception + { + message.getBodyBuffer().resetReaderIndex(); + } + + + public void decode() throws Exception + { + message.getBodyBuffer().resetReaderIndex(); + } + + @Override + public final boolean isBodyAssignableTo(Class c) throws JMSException + { + // no op.. jms2 not used on the conversion + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java new file mode 100644 index 0000000..d190d45 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java @@ -0,0 +1,417 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.proton.converter.jms; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.StreamMessage; + +import org.apache.activemq6.api.core.HornetQBuffer; +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.Pair; +import org.apache.activemq6.core.message.impl.MessageInternal; +import org.apache.activemq6.utils.DataConstants; + +import static org.apache.activemq6.reader.MessageUtil.getBodyBuffer; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadBoolean; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadByte; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadBytes; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadChar; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadDouble; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadFloat; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadInteger; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadLong; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadObject; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadShort; +import static org.apache.activemq6.reader.StreamMessageUtil.streamReadString; + +public final class ServerJMSStreamMessage extends ServerJMSMessage implements StreamMessage +{ + public static final byte TYPE = Message.STREAM_TYPE; + + private int bodyLength = 0; + + + public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) + { + super(message, deliveryCount); + + } + + // StreamMessage implementation ---------------------------------- + + public boolean readBoolean() throws JMSException + { + try + { + return streamReadBoolean(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public byte readByte() throws JMSException + { + try + { + return streamReadByte(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public short readShort() throws JMSException + { + + try + { + return streamReadShort(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public char readChar() throws JMSException + { + + try + { + return streamReadChar(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readInt() throws JMSException + { + + try + { + return streamReadInteger(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public long readLong() throws JMSException + { + + try + { + return streamReadLong(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public float readFloat() throws JMSException + { + + try + { + return streamReadFloat(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public double readDouble() throws JMSException + { + + try + { + return streamReadDouble(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public String readString() throws JMSException + { + + try + { + return streamReadString(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + /** + * len here is used to control how many more bytes to read + */ + private int len = 0; + + public int readBytes(final byte[] value) throws JMSException + { + + try + { + Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value); + + len = pairRead.getA(); + return pairRead.getB(); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public Object readObject() throws JMSException + { + + if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition()) + { + throw new MessageEOFException(""); + } + try + { + return streamReadObject(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public void writeBoolean(final boolean value) throws JMSException + { + + getBuffer().writeByte(DataConstants.BOOLEAN); + getBuffer().writeBoolean(value); + } + + public void writeByte(final byte value) throws JMSException + { + + getBuffer().writeByte(DataConstants.BYTE); + getBuffer().writeByte(value); + } + + public void writeShort(final short value) throws JMSException + { + + getBuffer().writeByte(DataConstants.SHORT); + getBuffer().writeShort(value); + } + + public void writeChar(final char value) throws JMSException + { + + getBuffer().writeByte(DataConstants.CHAR); + getBuffer().writeShort((short) value); + } + + public void writeInt(final int value) throws JMSException + { + + getBuffer().writeByte(DataConstants.INT); + getBuffer().writeInt(value); + } + + public void writeLong(final long value) throws JMSException + { + + getBuffer().writeByte(DataConstants.LONG); + getBuffer().writeLong(value); + } + + public void writeFloat(final float value) throws JMSException + { + + getBuffer().writeByte(DataConstants.FLOAT); + getBuffer().writeInt(Float.floatToIntBits(value)); + } + + public void writeDouble(final double value) throws JMSException + { + + getBuffer().writeByte(DataConstants.DOUBLE); + getBuffer().writeLong(Double.doubleToLongBits(value)); + } + + public void writeString(final String value) throws JMSException + { + + getBuffer().writeByte(DataConstants.STRING); + getBuffer().writeNullableString(value); + } + + public void writeBytes(final byte[] value) throws JMSException + { + + getBuffer().writeByte(DataConstants.BYTES); + getBuffer().writeInt(value.length); + getBuffer().writeBytes(value); + } + + public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException + { + + getBuffer().writeByte(DataConstants.BYTES); + getBuffer().writeInt(length); + getBuffer().writeBytes(value, offset, length); + } + + public void writeObject(final Object value) throws JMSException + { + if (value instanceof String) + { + writeString((String) value); + } + else if (value instanceof Boolean) + { + writeBoolean((Boolean) value); + } + else if (value instanceof Byte) + { + writeByte((Byte) value); + } + else if (value instanceof Short) + { + writeShort((Short) value); + } + else if (value instanceof Integer) + { + writeInt((Integer) value); + } + else if (value instanceof Long) + { + writeLong((Long) value); + } + else if (value instanceof Float) + { + writeFloat((Float) value); + } + else if (value instanceof Double) + { + writeDouble((Double) value); + } + else if (value instanceof byte[]) + { + writeBytes((byte[]) value); + } + else if (value instanceof Character) + { + writeChar((Character) value); + } + else if (value == null) + { + writeString(null); + } + else + { + throw new MessageFormatException("Invalid object type: " + value.getClass()); + } + } + + public void reset() throws JMSException + { + getBuffer().resetReaderIndex(); + } + + // HornetQRAMessage overrides ---------------------------------------- + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + getBuffer().clear(); + } + + private HornetQBuffer getBuffer() + { + return message.getBodyBuffer(); + } + + + public void decode() throws Exception + { + super.decode(); + } + + /** + * Encode the body into the internal message + */ + public void encode() throws Exception + { + super.encode(); + bodyLength = message.getEndOfBodyPosition(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSTextMessage.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSTextMessage.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSTextMessage.java new file mode 100644 index 0000000..6d48427 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/ServerJMSTextMessage.java @@ -0,0 +1,112 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.apache.activemq6.core.protocol.proton.converter.jms; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + +import org.apache.activemq6.api.core.Message; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.core.message.impl.MessageInternal; + +import static org.apache.activemq6.reader.TextMessageUtil.readBodyText; +import static org.apache.activemq6.reader.TextMessageUtil.writeBodyText; + + +/** + * HornetQ implementation of a JMS TextMessage. + * <br> + * This class was ported from SpyTextMessage in JBossMQ. + * + * @author Norbert Lataille ([email protected]) + * @author <a href="mailto:[email protected]">Jason Dillon</a> + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @version $Revision: 3412 $ + */ +public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessage +{ + // Constants ----------------------------------------------------- + + public static final byte TYPE = Message.TEXT_TYPE; + + // Attributes ---------------------------------------------------- + + // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write + // methods are more efficient for a SimpleString + private SimpleString text; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + /* + * This constructor is used to construct messages prior to sending + */ + public ServerJMSTextMessage(MessageInternal message, int deliveryCount) + { + super(message, deliveryCount); + + } + // TextMessage implementation ------------------------------------ + + public void setText(final String text) throws JMSException + { + if (text != null) + { + this.text = new SimpleString(text); + } + else + { + this.text = null; + } + + writeBodyText(message, this.text); + } + + public String getText() + { + if (text != null) + { + return text.toString(); + } + else + { + return null; + } + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + text = null; + } + + + public void encode() throws Exception + { + super.encode(); + writeBodyText(message, text); + } + + public void decode() throws Exception + { + super.decode(); + text = readBodyText(message); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/package-info.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/package-info.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/package-info.java new file mode 100644 index 0000000..d3ea9b2 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/converter/jms/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + + +/** + * This package contains incomplete JMS implementations just to be used with converting amqp to hornetq and + * vice versa + * @author Clebert Suconic + */ + +package org.apache.activemq6.core.protocol.proton.converter.jms; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/HornetQProtonConnectionCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/HornetQProtonConnectionCallback.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/HornetQProtonConnectionCallback.java new file mode 100644 index 0000000..55fb67e --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/HornetQProtonConnectionCallback.java @@ -0,0 +1,125 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.proton.plug; + +import java.util.concurrent.TimeUnit; + +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelFuture; +import io.netty.channel.ChannelFutureListener; +import org.apache.activemq6.core.buffers.impl.ChannelBufferWrapper; +import org.apache.activemq6.core.protocol.proton.HornetQProtonRemotingConnection; +import org.apache.activemq6.core.protocol.proton.ProtonProtocolManager; +import org.apache.activemq6.core.protocol.proton.sasl.HornetQPlainSASL; +import org.apache.activemq6.spi.core.remoting.Connection; +import org.apache.activemq6.utils.ReusableLatch; +import org.proton.plug.AMQPConnectionCallback; +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.ServerSASL; +import org.proton.plug.sasl.AnonymousServerSASL; + +/** + * @author Clebert Suconic + */ + +public class HornetQProtonConnectionCallback implements AMQPConnectionCallback +{ + private final ProtonProtocolManager manager; + + private final Connection connection; + + protected HornetQProtonRemotingConnection protonConnectionDelegate; + + protected AMQPConnectionContext amqpConnection; + + private final ReusableLatch latch = new ReusableLatch(0); + + public HornetQProtonConnectionCallback(ProtonProtocolManager manager, Connection connection) + { + this.manager = manager; + this.connection = connection; + } + + @Override + public ServerSASL[] getSASLMechnisms() + { + return new ServerSASL[]{new AnonymousServerSASL(), new HornetQPlainSASL(manager.getServer().getSecurityStore(), manager.getServer().getSecurityManager())}; + } + + @Override + public void close() + { + + } + + @Override + public void setConnection(AMQPConnectionContext connection) + { + this.amqpConnection = connection; + } + + @Override + public AMQPConnectionContext getConnection() + { + return amqpConnection; + } + + public HornetQProtonRemotingConnection getProtonConnectionDelegate() + { + return protonConnectionDelegate; + } + + public void setProtonConnectionDelegate(HornetQProtonRemotingConnection protonConnectionDelegate) + { + this.protonConnectionDelegate = protonConnectionDelegate; + } + + public void onTransport(ByteBuf byteBuf, AMQPConnectionContext amqpConnection) + { + final int size = byteBuf.writerIndex(); + + latch.countUp(); + connection.write(new ChannelBufferWrapper(byteBuf, true), false, false, new ChannelFutureListener() + { + @Override + public void operationComplete(ChannelFuture future) throws Exception + { + latch.countDown(); + } + }); + + if (amqpConnection.isSyncOnFlush()) + { + try + { + latch.await(5, TimeUnit.SECONDS); + } + catch (Exception e) + { + e.printStackTrace(); + } + } + + amqpConnection.outputDone(size); + } + + + @Override + public AMQPSessionCallback createSessionCallback(AMQPConnectionContext connection) + { + return new ProtonSessionIntegrationCallback(this, manager, connection); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java new file mode 100644 index 0000000..85e3670 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -0,0 +1,341 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.apache.activemq6.core.protocol.proton.plug; + + +import io.netty.buffer.ByteBuf; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.transport.AmqpError; +import org.apache.qpid.proton.amqp.transport.ErrorCondition; +import org.apache.qpid.proton.engine.Delivery; +import org.apache.qpid.proton.engine.Link; +import org.apache.qpid.proton.engine.Receiver; +import org.apache.qpid.proton.jms.EncodedMessage; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.apache.activemq6.api.core.SimpleString; +import org.apache.activemq6.api.core.client.HornetQClient; +import org.apache.activemq6.core.journal.IOAsyncTask; +import org.apache.activemq6.core.protocol.proton.ProtonProtocolManager; +import org.apache.activemq6.core.server.QueueQueryResult; +import org.apache.activemq6.core.server.ServerConsumer; +import org.apache.activemq6.core.server.ServerMessage; +import org.apache.activemq6.core.server.ServerSession; +import org.apache.activemq6.spi.core.protocol.SessionCallback; +import org.apache.activemq6.spi.core.remoting.ReadyListener; +import org.apache.activemq6.utils.ByteUtil; +import org.apache.activemq6.utils.IDGenerator; +import org.apache.activemq6.utils.SimpleIDGenerator; +import org.apache.activemq6.utils.UUIDGenerator; +import org.proton.plug.AMQPConnectionContext; +import org.proton.plug.AMQPSessionCallback; +import org.proton.plug.AMQPSessionContext; +import org.proton.plug.SASLResult; +import org.proton.plug.context.ProtonPlugSender; +import org.proton.plug.sasl.PlainSASLResult; + +/** + * @author Clebert Suconic + */ + +public class ProtonSessionIntegrationCallback implements AMQPSessionCallback, SessionCallback +{ + protected final IDGenerator consumerIDGenerator = new SimpleIDGenerator(0); + + private final HornetQProtonConnectionCallback protonSPI; + + private final ProtonProtocolManager manager; + + private final AMQPConnectionContext connection; + + private ServerSession serverSession; + + private AMQPSessionContext protonSession; + + public ProtonSessionIntegrationCallback(HornetQProtonConnectionCallback protonSPI, ProtonProtocolManager manager, AMQPConnectionContext connection) + { + this.protonSPI = protonSPI; + this.manager = manager; + this.connection = connection; + } + + @Override + public void onFlowConsumer(Object consumer, int credits) + { + // We have our own flow control on AMQP, so we set hornetq's flow control to 0 + ((ServerConsumer) consumer).receiveCredits(-1); + } + + @Override + public void init(AMQPSessionContext protonSession, SASLResult saslResult) throws Exception + { + + this.protonSession = protonSession; + + String name = UUIDGenerator.getInstance().generateStringUUID(); + + String user = null; + String passcode = null; + if (saslResult != null) + { + user = saslResult.getUser(); + if (saslResult instanceof PlainSASLResult) + { + passcode = ((PlainSASLResult)saslResult).getPassword(); + } + } + + serverSession = manager.getServer().createSession(name, + user, + passcode, + HornetQClient.DEFAULT_MIN_LARGE_MESSAGE_SIZE, + protonSPI.getProtonConnectionDelegate(), // RemotingConnection remotingConnection, + false, // boolean autoCommitSends + false, // boolean autoCommitAcks, + false, // boolean preAcknowledge, + true, //boolean xa, + (String) null, + this, + null); + } + + @Override + public void start() + { + + } + + @Override + public Object createSender(ProtonPlugSender protonSender, String queue, String filer, boolean browserOnly) throws Exception + { + long consumerID = consumerIDGenerator.generateID(); + + ServerConsumer consumer = serverSession.createConsumer(consumerID, SimpleString.toSimpleString(queue), SimpleString.toSimpleString(filer), browserOnly); + + // AMQP handles its own flow control for when it's started + consumer.setStarted(true); + + consumer.setProtocolContext(protonSender); + + return consumer; + } + + @Override + public void startSender(Object brokerConsumer) throws Exception + { + ServerConsumer serverConsumer = (ServerConsumer) brokerConsumer; + // flow control is done at proton + serverConsumer.receiveCredits(-1); + } + + @Override + public void createTemporaryQueue(String queueName) throws Exception + { + serverSession.createQueue(SimpleString.toSimpleString(queueName), SimpleString.toSimpleString(queueName), null, true, false); + } + + @Override + public boolean queueQuery(String queueName) throws Exception + { + QueueQueryResult queueQuery = serverSession.executeQueueQuery(SimpleString.toSimpleString(queueName)); + return queueQuery.isExists(); + } + + @Override + public void closeSender(Object brokerConsumer) throws Exception + { + ((ServerConsumer) brokerConsumer).close(false); + } + + @Override + public ProtonJMessage encodeMessage(Object message, int deliveryCount) throws Exception + { + return (ProtonJMessage) manager.getConverter().outbound((ServerMessage) message, deliveryCount); + } + + @Override + public Binary getCurrentTXID() + { + return new Binary(ByteUtil.longToBytes(serverSession.getCurrentTransaction().getID())); + } + + @Override + public String tempQueueName() + { + return UUIDGenerator.getInstance().generateStringUUID(); + } + + @Override + public void commitCurrentTX() throws Exception + { + serverSession.commit(); + } + + @Override + public void rollbackCurrentTX() throws Exception + { + serverSession.rollback(false); + } + + @Override + public void close() throws Exception + { + serverSession.close(false); + } + + @Override + public void ack(Object brokerConsumer, Object message) throws Exception + { + ((ServerConsumer)brokerConsumer).individualAcknowledge(null, ((ServerMessage)message).getMessageID()); + } + + @Override + public void cancel(Object brokerConsumer, Object message, boolean updateCounts) throws Exception + { + ((ServerConsumer)brokerConsumer).individualCancel(((ServerMessage)message).getMessageID(), updateCounts); + } + + @Override + public void resumeDelivery(Object consumer) + { + ((ServerConsumer) consumer).receiveCredits(-1); + } + + @Override + public void serverSend(final Receiver receiver, final Delivery delivery, String address, int messageFormat, ByteBuf messageEncoded) throws Exception + { + EncodedMessage encodedMessage = new EncodedMessage(messageFormat, messageEncoded.array(), messageEncoded.arrayOffset(), messageEncoded.writerIndex()); + + ServerMessage message = manager.getConverter().inbound(encodedMessage); + //use the address on the receiver if not null, if null let's hope it was set correctly on the message + if (address != null) + { + message.setAddress(new SimpleString(address)); + } + + serverSession.send(message, false); + + manager.getServer().getStorageManager().afterCompleteOperations(new IOAsyncTask() + { + @Override + public void done() + { + synchronized (connection.getLock()) + { + delivery.settle(); + connection.flush(); + } + } + + @Override + public void onError(int errorCode, String errorMessage) + { + synchronized (connection.getLock()) + { + receiver.setCondition(new ErrorCondition(AmqpError.ILLEGAL_STATE, errorCode + ":" + errorMessage)); + connection.flush(); + } + } + }); + } + + + @Override + public void sendProducerCreditsMessage(int credits, SimpleString address) + { + } + + @Override + public void sendProducerCreditsFailMessage(int credits, SimpleString address) + { + } + + @Override + public int sendMessage(ServerMessage message, ServerConsumer consumer, int deliveryCount) + { + + ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext(); + + try + { + return plugSender.deliverMessage(message, deliveryCount); + } + catch (Exception e) + { + synchronized (connection.getLock()) + { + plugSender.getSender().setCondition(new ErrorCondition(AmqpError.INTERNAL_ERROR, e.getMessage())); + connection.flush(); + } + throw new IllegalStateException("Can't deliver message " + e, e); + } + + } + + @Override + public int sendLargeMessage(ServerMessage message, ServerConsumer consumer, long bodySize, int deliveryCount) + { + return 0; + } + + @Override + public int sendLargeMessageContinuation(ServerConsumer consumer, byte[] body, boolean continues, boolean requiresResponse) + { + return 0; + } + + @Override + public void closed() + { + } + + @Override + public void addReadyListener(ReadyListener listener) + { + + } + + @Override + public void removeReadyListener(ReadyListener listener) + { + + } + + @Override + public void disconnect(ServerConsumer consumer, String queueName) + { + synchronized (connection.getLock()) + { + ((Link) consumer.getProtocolContext()).close(); + connection.flush(); + } + } + + + @Override + public boolean hasCredits(ServerConsumer consumer) + { + ProtonPlugSender plugSender = (ProtonPlugSender) consumer.getProtocolContext(); + + if (plugSender != null && plugSender.getSender().getCredit() > 0) + { + return true; + } + else + { + return false; + } + } + + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/23e8edd9/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/package-info.java ---------------------------------------------------------------------- diff --git a/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/package-info.java b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/package-info.java new file mode 100644 index 0000000..9bdf305 --- /dev/null +++ b/activemq6-protocols/activemq6-amqp-protocol/src/main/java/org/apache/activemq6/core/protocol/proton/plug/package-info.java @@ -0,0 +1,18 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +/** + * This package contains classes for integration with the ProtonPlug + * @author Clebert Suconic + */ +package org.apache.activemq6.core.protocol.proton.plug; \ No newline at end of file
