http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonUtils.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonUtils.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonUtils.java deleted file mode 100644 index c6b60f3..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/ProtonUtils.java +++ /dev/null @@ -1,633 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton; - -import java.nio.ByteBuffer; -import java.util.Calendar; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.UnsignedByte; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; -import org.apache.qpid.proton.amqp.messaging.Footer; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Properties; -import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.codec.CompositeWritableBuffer; -import org.apache.qpid.proton.codec.DroppingWritableBuffer; -import org.apache.qpid.proton.codec.WritableBuffer; -import org.apache.qpid.proton.jms.EncodedMessage; -import org.apache.qpid.proton.message.Message; -import org.apache.qpid.proton.message.MessageFormat; -import org.apache.qpid.proton.message.impl.MessageImpl; -import org.hornetq.api.core.SimpleString; -import org.hornetq.core.server.ServerMessage; -import org.hornetq.core.server.impl.ServerMessageImpl; -import org.hornetq.utils.TypedProperties; - -import static org.hornetq.api.core.Message.TEXT_TYPE; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * 4/11/13 - */ -public class ProtonUtils -{ - private static final String PREFIX = "HORNETQ_PROTON_"; - private static final String MESSAGE_ANNOTATIONS = PREFIX + "MESSAGE_ANNOTATIONS_"; - private static final String DELIVERY_ANNOTATIONS = PREFIX + "DELIVERY_ANNOTATIONS_"; - private static final String FOOTER_VALUES = PREFIX + "FOOTER_VALUES"; - private static final String MESSAGE_FORMAT = PREFIX + "MESSAGE_FORMAT"; - private static final String PROTON_MESSAGE_FORMAT = PREFIX + "FORMAT"; - private static final String PROTON_MESSAGE_SIZE = "PROTON_MESSAGE_SIZE"; - private static final String MESSAGE_TYPE = PREFIX + "MESSAGE_TYPE"; - private static final String FIRST_ACQUIRER = PREFIX + "FIRST_ACQUIRER"; - private static final String USER_ID = PREFIX + "USER_ID"; - private static final String SUBJECT = PREFIX + "SUBJECT"; - private static final String REPLY_TO = PREFIX + "REPLY_TO"; - private static final String CORRELATION_ID = PREFIX + "CORRELATION_ID"; - private static final String CONTENT_TYPE = PREFIX + "CONTENT_TYPE"; - private static final String CONTENT_ENCODING = PREFIX + "CONTENT_TYPE"; - private static final String ABSOLUTE_EXPIRY_TIME = PREFIX + "ABSOLUTE_EXPIRY_TIME"; - private static final String CREATION_TIME = PREFIX + "CREATION_TIME"; - private static final String GROUP_ID = PREFIX + "GROUP_ID"; - private static final String GROUP_SEQUENCE = PREFIX + "GROUP_SEQUENCE"; - private static final String REPLY_TO_GROUP_ID = PREFIX + "REPLY_TO_GROUP_ID"; - - private static final SimpleString USER_ID_SS = new SimpleString(USER_ID); - private static final SimpleString SUBJECT_SS = new SimpleString(SUBJECT); - private static final SimpleString REPLY_TO_SS = new SimpleString(REPLY_TO); - private static final SimpleString CORRELATION_ID_SS = new SimpleString(CORRELATION_ID); - private static final SimpleString CONTENT_TYPE_SS = new SimpleString(CONTENT_TYPE); - private static final SimpleString CONTENT_ENCODING_SS = new SimpleString(CONTENT_ENCODING); - private static final SimpleString ABSOLUTE_EXPIRY_TIME_SS = new SimpleString(ABSOLUTE_EXPIRY_TIME); - private static final SimpleString CREATION_TIME_SS = new SimpleString(CREATION_TIME); - private static final SimpleString GROUP_ID_SS = new SimpleString(GROUP_ID); - private static final SimpleString GROUP_SEQUENCE_SS = new SimpleString(GROUP_SEQUENCE); - private static final SimpleString REPLY_TO_GROUP_ID_SS = new SimpleString(REPLY_TO_GROUP_ID); - private static final SimpleString PROTON_MESSAGE_SIZE_SS = new SimpleString(PROTON_MESSAGE_SIZE); - - private static Set<String> SPECIAL_PROPS = new HashSet<String>(); - - static - { - SPECIAL_PROPS.add(MESSAGE_FORMAT); - SPECIAL_PROPS.add(MESSAGE_TYPE); - SPECIAL_PROPS.add(FIRST_ACQUIRER); - SPECIAL_PROPS.add(PROTON_MESSAGE_FORMAT); - SPECIAL_PROPS.add(PROTON_MESSAGE_SIZE); - SPECIAL_PROPS.add(MESSAGE_TYPE); - SPECIAL_PROPS.add(USER_ID); - SPECIAL_PROPS.add(SUBJECT); - SPECIAL_PROPS.add(REPLY_TO); - SPECIAL_PROPS.add(CORRELATION_ID); - SPECIAL_PROPS.add(CONTENT_TYPE); - SPECIAL_PROPS.add(CONTENT_ENCODING); - SPECIAL_PROPS.add(ABSOLUTE_EXPIRY_TIME); - SPECIAL_PROPS.add(CREATION_TIME); - SPECIAL_PROPS.add(GROUP_ID); - SPECIAL_PROPS.add(GROUP_SEQUENCE); - SPECIAL_PROPS.add(REPLY_TO_GROUP_ID); - } - - public static class INBOUND - { - public static ServerMessageImpl transform(ProtonRemotingConnection connection, EncodedMessage encodedMessage) throws Exception - { - org.apache.qpid.proton.message.Message protonMessage = encodedMessage.decode(); - - Header header = protonMessage.getHeader(); - if (header == null) - { - header = new Header(); - } - - ServerMessageImpl message = connection.createServerMessage(); - TypedProperties properties = message.getProperties(); - - properties.putLongProperty(new SimpleString(MESSAGE_FORMAT), encodedMessage.getMessageFormat()); - properties.putLongProperty(new SimpleString(PROTON_MESSAGE_FORMAT), getMessageFormat(protonMessage.getMessageFormat())); - properties.putIntProperty(new SimpleString(PROTON_MESSAGE_SIZE), encodedMessage.getLength()); - - populateSpecialProps(header, protonMessage, message, properties); - populateHeaderProperties(header, properties, message); - populateDeliveryAnnotations(protonMessage.getDeliveryAnnotations(), properties); - populateMessageAnnotations(protonMessage.getMessageAnnotations(), properties); - populateApplicationProperties(protonMessage.getApplicationProperties(), properties); - populateProperties(protonMessage.getProperties(), properties, message); - populateFooterProperties(protonMessage.getFooter(), properties); - message.setTimestamp(System.currentTimeMillis()); - - Section section = protonMessage.getBody(); - if (section instanceof AmqpValue) - { - AmqpValue amqpValue = (AmqpValue) section; - Object value = amqpValue.getValue(); - if (value instanceof String) - { - message.getBodyBuffer().writeNullableString((String) value); - } - else if (value instanceof Binary) - { - Binary binary = (Binary) value; - message.getBodyBuffer().writeBytes(binary.getArray()); - } - } - else if (section instanceof Data) - { - message.getBodyBuffer().writeBytes(((Data) section).getValue().getArray()); - } - - return message; - } - - private static void populateSpecialProps(Header header, Message protonMessage, ServerMessageImpl message, TypedProperties properties) - { - if (header.getFirstAcquirer() != null) - { - properties.putBooleanProperty(new SimpleString(FIRST_ACQUIRER), header.getFirstAcquirer()); - } - properties.putIntProperty(new SimpleString(MESSAGE_TYPE), getMessageType(protonMessage)); - } - - private static void populateHeaderProperties(Header header, TypedProperties properties, ServerMessageImpl message) - { - if (header.getDurable() != null) - { - message.setDurable(header.getDurable()); - } - - if (header.getPriority() != null) - { - message.setPriority((byte) header.getPriority().intValue()); - } - - if (header.getTtl() != null) - { - message.setExpiration(header.getTtl().longValue()); - } - } - - private static void populateDeliveryAnnotations(DeliveryAnnotations deliveryAnnotations, TypedProperties properties) - { - if (deliveryAnnotations != null) - { - Map values = deliveryAnnotations.getValue(); - Set keySet = values.keySet(); - for (Object key : keySet) - { - Symbol symbol = (Symbol) key; - Object value = values.get(key); - properties.putSimpleStringProperty(new SimpleString(DELIVERY_ANNOTATIONS + symbol.toString()), new SimpleString(value.toString())); - } - } - } - - private static void populateFooterProperties(Footer footer, TypedProperties properties) - { - if (footer != null) - { - Map values = footer.getValue(); - Set keySet = values.keySet(); - for (Object key : keySet) - { - Symbol symbol = (Symbol) key; - Object value = values.get(key); - properties.putSimpleStringProperty(new SimpleString(FOOTER_VALUES + symbol.toString()), new SimpleString(value.toString())); - } - } - } - - private static void populateProperties(Properties amqpProperties, TypedProperties properties, ServerMessageImpl message) - { - if (amqpProperties == null) - { - return; - } - if (amqpProperties.getTo() != null) - { - message.setAddress(new SimpleString(amqpProperties.getTo())); - } - if (amqpProperties.getUserId() != null) - { - properties.putBytesProperty(USER_ID_SS, amqpProperties.getUserId().getArray()); - } - if (amqpProperties.getSubject() != null) - { - properties.putSimpleStringProperty(SUBJECT_SS, new SimpleString(amqpProperties.getSubject())); - } - if (amqpProperties.getReplyTo() != null) - { - properties.putSimpleStringProperty(REPLY_TO_SS, new SimpleString(amqpProperties.getReplyTo())); - } - if (amqpProperties.getCorrelationId() != null) - { - properties.putSimpleStringProperty(CORRELATION_ID_SS, new SimpleString(amqpProperties.getCorrelationId().toString())); - } - if (amqpProperties.getContentType() != null) - { - properties.putSimpleStringProperty(CONTENT_TYPE_SS, new SimpleString(amqpProperties.getContentType().toString())); - } - if (amqpProperties.getContentEncoding() != null) - { - properties.putSimpleStringProperty(CONTENT_ENCODING_SS, new SimpleString(amqpProperties.getContentEncoding().toString())); - } - if (amqpProperties.getAbsoluteExpiryTime() != null) - { - properties.putLongProperty(ABSOLUTE_EXPIRY_TIME_SS, amqpProperties.getAbsoluteExpiryTime().getTime()); - } - if (amqpProperties.getCreationTime() != null) - { - properties.putLongProperty(CREATION_TIME_SS, amqpProperties.getCreationTime().getTime()); - } - if (amqpProperties.getGroupId() != null) - { - properties.putSimpleStringProperty(GROUP_ID_SS, new SimpleString(amqpProperties.getGroupId())); - } - if (amqpProperties.getGroupSequence() != null) - { - properties.putIntProperty(GROUP_SEQUENCE_SS, amqpProperties.getGroupSequence().intValue()); - } - if (amqpProperties.getReplyToGroupId() != null) - { - message.getProperties().putSimpleStringProperty(REPLY_TO_GROUP_ID_SS, new SimpleString(amqpProperties.getReplyToGroupId())); - } - } - - private static void populateApplicationProperties(ApplicationProperties applicationProperties, TypedProperties properties) - { - if (applicationProperties != null) - { - Map props = applicationProperties.getValue(); - for (Object key : props.keySet()) - { - Object val = props.get(key); - setProperty(key, val, properties); - } - } - } - - private static void setProperty(Object key, Object val, TypedProperties properties) - { - if (val instanceof String) - { - properties.putSimpleStringProperty(new SimpleString((String) key), new SimpleString((String) val)); - } - else if (val instanceof Boolean) - { - properties.putBooleanProperty(new SimpleString((String) key), (Boolean) val); - } - else if (val instanceof Double) - { - properties.putDoubleProperty(new SimpleString((String) key), (Double) val); - } - else if (val instanceof Float) - { - properties.putFloatProperty(new SimpleString((String) key), (Float) val); - } - else if (val instanceof Integer) - { - properties.putIntProperty(new SimpleString((String) key), (Integer) val); - } - else if (val instanceof Byte) - { - properties.putByteProperty(new SimpleString((String) key), (Byte) val); - } - } - - public static void populateMessageAnnotations(MessageAnnotations messageAnnotations, TypedProperties properties) - { - if (messageAnnotations != null) - { - Map values = messageAnnotations.getValue(); - Set keySet = values.keySet(); - for (Object key : keySet) - { - Symbol symbol = (Symbol) key; - Object value = values.get(key); - properties.putSimpleStringProperty(new SimpleString(MESSAGE_ANNOTATIONS + symbol.toString()), new SimpleString(value.toString())); - } - } - } - - } - - public static class OUTBOUND - { - public static EncodedMessage transform(ServerMessage message, int deliveryCount) - { - long messageFormat = message.getLongProperty(MESSAGE_FORMAT); - Integer size = message.getIntProperty(PROTON_MESSAGE_SIZE_SS); - - Header header = populateHeader(message, deliveryCount); - DeliveryAnnotations deliveryAnnotations = populateDeliveryAnnotations(message); - MessageAnnotations messageAnnotations = populateMessageAnnotations(message); - Properties props = populateProperties(message); - ApplicationProperties applicationProperties = populateApplicationProperties(message); - Section section = populateBody(message); - Footer footer = populateFooter(message); - Set<SimpleString> propertyNames = message.getPropertyNames(); - for (SimpleString propertyName : propertyNames) - { - TypedProperties typedProperties = message.getTypedProperties(); - String realName = propertyName.toString(); - if (realName.startsWith(MESSAGE_ANNOTATIONS)) - { - - SimpleString value = (SimpleString) typedProperties.getProperty(propertyName); - Symbol symbol = Symbol.getSymbol(realName.replace(MESSAGE_ANNOTATIONS, "")); - messageAnnotations.getValue().put(symbol, value.toString()); - } - } - MessageImpl protonMessage = new MessageImpl(header, deliveryAnnotations, messageAnnotations, props, applicationProperties, section, footer); - protonMessage.setMessageFormat(getMessageFormat(message.getLongProperty(new SimpleString(PROTON_MESSAGE_FORMAT)))); - ByteBuffer buffer = ByteBuffer.wrap(new byte[size]); - final DroppingWritableBuffer overflow = new DroppingWritableBuffer(); - int c = protonMessage.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow)); - if (overflow.position() > 0) - { - buffer = ByteBuffer.wrap(new byte[1024 * 4 + overflow.position()]); - c = protonMessage.encode(new WritableBuffer.ByteBufferWrapper(buffer)); - } - - return new EncodedMessage(messageFormat, buffer.array(), 0, c); - } - - private static Header populateHeader(ServerMessage message, int deliveryCount) - { - Header header = new Header(); - header.setDurable(message.isDurable()); - header.setPriority(new UnsignedByte(message.getPriority())); - header.setDeliveryCount(new UnsignedInteger(deliveryCount)); - header.setTtl(new UnsignedInteger((int) message.getExpiration())); - return header; - } - - private static DeliveryAnnotations populateDeliveryAnnotations(ServerMessage message) - { - HashMap actualValues = new HashMap(); - DeliveryAnnotations deliveryAnnotations = new DeliveryAnnotations(actualValues); - for (SimpleString name : message.getPropertyNames()) - { - String sName = name.toString(); - if (sName.startsWith(DELIVERY_ANNOTATIONS)) - { - Object val = message.getTypedProperties().getProperty(name); - if (val instanceof SimpleString) - { - actualValues.put(sName.subSequence(sName.indexOf(DELIVERY_ANNOTATIONS), sName.length()), val.toString()); - } - else - { - actualValues.put(sName.subSequence(sName.indexOf(DELIVERY_ANNOTATIONS), sName.length()), val); - } - } - } - //this is a proton jms thing, if not null it creates wrong type of message - return actualValues.size() > 0 ? deliveryAnnotations : null; - } - - private static MessageAnnotations populateMessageAnnotations(ServerMessage message) - { - HashMap actualValues = new HashMap(); - MessageAnnotations messageAnnotations = new MessageAnnotations(actualValues); - for (SimpleString name : message.getPropertyNames()) - { - String sName = name.toString(); - if (sName.startsWith(MESSAGE_ANNOTATIONS)) - { - Object val = message.getTypedProperties().getProperty(name); - if (val instanceof SimpleString) - { - actualValues.put(sName.subSequence(sName.indexOf(MESSAGE_ANNOTATIONS), sName.length()), val.toString()); - } - else - { - actualValues.put(sName.subSequence(sName.indexOf(MESSAGE_ANNOTATIONS), sName.length()), val); - } - } - } - return messageAnnotations; - } - - private static Properties populateProperties(ServerMessage message) - { - Calendar calendar = Calendar.getInstance(); - Properties properties = new Properties(); - TypedProperties typedProperties = message.getTypedProperties(); - properties.setMessageId(message.getMessageID()); - if (message.getAddress() != null) - { - properties.setTo(message.getAddress().toString()); - } - if (typedProperties.containsProperty(USER_ID_SS)) - { - properties.setUserId(new Binary(typedProperties.getBytesProperty(USER_ID_SS))); - } - if (typedProperties.containsProperty(SUBJECT_SS)) - { - properties.setSubject(typedProperties.getSimpleStringProperty(SUBJECT_SS).toString()); - } - if (typedProperties.containsProperty(REPLY_TO_SS)) - { - properties.setReplyTo(typedProperties.getSimpleStringProperty(REPLY_TO_SS).toString()); - } - if (typedProperties.containsProperty(CORRELATION_ID_SS)) - { - properties.setCorrelationId(typedProperties.getSimpleStringProperty(CORRELATION_ID_SS).toString()); - } - if (typedProperties.containsProperty(CONTENT_TYPE_SS)) - { - properties.setContentType(Symbol.getSymbol(typedProperties.getSimpleStringProperty(CONTENT_TYPE_SS).toString())); - } - if (typedProperties.containsProperty(CONTENT_ENCODING_SS)) - { - properties.setContentEncoding(Symbol.getSymbol(typedProperties.getSimpleStringProperty(CONTENT_ENCODING_SS).toString())); - } - if (typedProperties.containsProperty(ABSOLUTE_EXPIRY_TIME_SS)) - { - calendar.setTimeInMillis(typedProperties.getLongProperty(ABSOLUTE_EXPIRY_TIME_SS)); - properties.setAbsoluteExpiryTime(calendar.getTime()); - } - if (typedProperties.containsProperty(CREATION_TIME_SS)) - { - calendar.setTimeInMillis(typedProperties.getLongProperty(CREATION_TIME_SS)); - properties.setCreationTime(calendar.getTime()); - } - if (typedProperties.containsProperty(GROUP_ID_SS)) - { - properties.setGroupId(typedProperties.getSimpleStringProperty(GROUP_ID_SS).toString()); - } - if (typedProperties.containsProperty(GROUP_SEQUENCE_SS)) - { - properties.setGroupSequence(new UnsignedInteger(typedProperties.getIntProperty(GROUP_SEQUENCE_SS))); - } - if (typedProperties.containsProperty(REPLY_TO_GROUP_ID_SS)) - { - properties.setReplyToGroupId(typedProperties.getSimpleStringProperty(REPLY_TO_GROUP_ID_SS).toString()); - } - return properties; - } - - private static ApplicationProperties populateApplicationProperties(ServerMessage message) - { - HashMap<String, Object> values = new HashMap<String, Object>(); - for (SimpleString name : message.getPropertyNames()) - { - setProperty(name, message.getTypedProperties().getProperty(name), values); - } - return new ApplicationProperties(values); - } - - private static void setProperty(SimpleString name, Object property, HashMap<String, Object> values) - { - String s = name.toString(); - if (SPECIAL_PROPS.contains(s) || - s.startsWith(MESSAGE_ANNOTATIONS) || - s.startsWith(DELIVERY_ANNOTATIONS) || - s.startsWith(FOOTER_VALUES)) - { - return; - } - if (property instanceof SimpleString) - { - values.put(s, property.toString()); - } - else - { - values.put(s, property); - } - } - - private static Footer populateFooter(ServerMessage message) - { - HashMap actualValues = new HashMap(); - Footer footer = new Footer(actualValues); - for (SimpleString name : message.getPropertyNames()) - { - String sName = name.toString(); - if (sName.startsWith(FOOTER_VALUES)) - { - Object val = message.getTypedProperties().getProperty(name); - if (val instanceof SimpleString) - { - actualValues.put(sName.subSequence(sName.indexOf(FOOTER_VALUES), sName.length()), val.toString()); - } - else - { - actualValues.put(sName.subSequence(sName.indexOf(FOOTER_VALUES), sName.length()), val); - } - } - } - return footer; - } - - private static Section populateBody(ServerMessage message) - { - // TODO: Depend on array() is most likely not a very good idea - Integer type = message.getIntProperty(MESSAGE_TYPE); - switch (type) - { - case 0: - case 1: - return new Data(new Binary(message.getBodyBuffer().copy().byteBuf().array())); - case 2: - return new AmqpValue(new Binary(message.getBodyBuffer().copy().byteBuf().array())); - case 3: - return new AmqpValue(message.getBodyBuffer().copy().readNullableString()); - default: - return new Data(new Binary(message.getBodyBuffer().copy().byteBuf().array())); - } - } - } - - private static long getMessageFormat(MessageFormat messageFormat) - { - switch (messageFormat) - { - case AMQP: - return 0; - case DATA: - return 1; - case JSON: - return 2; - case TEXT: - return 3; - default: - return 0; - - } - } - - private static MessageFormat getMessageFormat(long messageFormat) - { - switch ((int) messageFormat) - { - case 0: - return MessageFormat.AMQP; - case 1: - return MessageFormat.DATA; - case 2: - return MessageFormat.JSON; - case 3: - return MessageFormat.TEXT; - default: - return MessageFormat.AMQP; - - } - } - - private static int getMessageType(Message protonMessage) - { - Section section = protonMessage.getBody(); - if (section instanceof AmqpValue) - { - AmqpValue amqpValue = (AmqpValue) section; - Object value = amqpValue.getValue(); - if (value instanceof String) - { - return TEXT_TYPE; - } - else if (value instanceof byte[]) - { - return org.hornetq.api.core.Message.BYTES_TYPE; - } - else if (value instanceof Map) - { - return org.hornetq.api.core.Message.MAP_TYPE; - } - else if (value instanceof Object) - { - return org.hornetq.api.core.Message.OBJECT_TYPE; - } - else - { - return org.hornetq.api.core.Message.DEFAULT_TYPE; - } - } - else - { - return org.hornetq.api.core.Message.DEFAULT_TYPE; - } - } -}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/TransactionHandler.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/TransactionHandler.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/TransactionHandler.java deleted file mode 100644 index 32d5a7a..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/TransactionHandler.java +++ /dev/null @@ -1,83 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ -package org.hornetq.core.protocol.proton; - -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.messaging.Rejected; -import org.apache.qpid.proton.amqp.transaction.Coordinator; -import org.apache.qpid.proton.amqp.transport.ErrorCondition; -import org.apache.qpid.proton.engine.Delivery; -import org.apache.qpid.proton.engine.Receiver; -import org.hornetq.api.core.HornetQBuffer; -import org.hornetq.core.protocol.proton.exceptions.HornetQAMQPException; - -/** - * handles an amqp Coordinator to deal with transaction boundaries etc - */ -public class TransactionHandler implements ProtonDeliveryHandler -{ - private final ProtonRemotingConnection connection; - private final Coordinator coordinator; - private final ProtonProtocolManager protonProtocolManager; - private final ProtonSession protonSession; - private final HornetQBuffer buffer; - - public TransactionHandler(ProtonRemotingConnection connection, Coordinator coordinator, ProtonProtocolManager protonProtocolManager, ProtonSession protonSession) - { - this.connection = connection; - this.coordinator = coordinator; - this.protonProtocolManager = protonProtocolManager; - this.protonSession = protonSession; - buffer = connection.createBuffer(1024); - } - - @Override - public void onMessage(Delivery delivery) throws HornetQAMQPException - { - Receiver receiver = null; - try - { - receiver = ((Receiver) delivery.getLink()); - - if (!delivery.isReadable()) - { - return; - } - - protonProtocolManager.handleTransaction(receiver, buffer, delivery, protonSession); - - } - catch (Exception e) - { - e.printStackTrace(); - Rejected rejected = new Rejected(); - ErrorCondition condition = new ErrorCondition(); - condition.setCondition(Symbol.valueOf("failed")); - condition.setDescription(e.getMessage()); - rejected.setError(condition); - delivery.disposition(rejected); - } - } - - @Override - public void checkState() - { - //noop - } - - @Override - public void close() throws HornetQAMQPException - { - //noop - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/HornetQJMSVendor.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/HornetQJMSVendor.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/HornetQJMSVendor.java new file mode 100644 index 0000000..3126b28 --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/HornetQJMSVendor.java @@ -0,0 +1,155 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.proton.converter; + +import javax.jms.BytesMessage; +import javax.jms.Destination; +import javax.jms.MapMessage; +import javax.jms.Message; +import javax.jms.ObjectMessage; +import javax.jms.StreamMessage; +import javax.jms.TextMessage; + +import org.apache.qpid.proton.jms.JMSVendor; +import org.hornetq.core.buffers.impl.ResetLimitWrappedHornetQBuffer; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSBytesMessage; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSMapMessage; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSMessage; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSStreamMessage; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSTextMessage; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.core.server.impl.ServerMessageImpl; +import org.hornetq.utils.IDGenerator; + +/** + * @author Clebert Suconic + */ + +public class HornetQJMSVendor extends JMSVendor +{ + + private final IDGenerator serverGenerator; + + HornetQJMSVendor(IDGenerator idGenerator) + { + this.serverGenerator = idGenerator; + } + + @Override + public BytesMessage createBytesMessage() + { + return new ServerJMSBytesMessage(newMessage(org.hornetq.api.core.Message.BYTES_TYPE), 0); + } + + @Override + public StreamMessage createStreamMessage() + { + return new ServerJMSStreamMessage(newMessage(org.hornetq.api.core.Message.STREAM_TYPE), 0); + } + + @Override + public Message createMessage() + { + return new ServerJMSMessage(newMessage(org.hornetq.api.core.Message.DEFAULT_TYPE), 0 ); + } + + @Override + public TextMessage createTextMessage() + { + return new ServerJMSTextMessage(newMessage(org.hornetq.api.core.Message.TEXT_TYPE), 0); + } + + @Override + public ObjectMessage createObjectMessage() + { + return null; + } + + @Override + public MapMessage createMapMessage() + { + return new ServerJMSMapMessage(newMessage(org.hornetq.api.core.Message.MAP_TYPE), 0); + } + + @Override + public void setJMSXUserID(Message message, String s) + { + } + + @Override + public Destination createDestination(String name) + { + return super.createDestination(name); + } + + @Override + public <T extends Destination> T createDestination(String name, Class<T> kind) + { + return super.createDestination(name, kind); + } + + @Override + public void setJMSXGroupID(Message message, String s) + { + + } + + @Override + public void setJMSXGroupSequence(Message message, int i) + { + + } + + @Override + public void setJMSXDeliveryCount(Message message, long l) + { + + } + + + public ServerJMSMessage wrapMessage(int messageType, ServerMessage wrapped, int deliveryCount) + { + switch (messageType) + { + case org.hornetq.api.core.Message.STREAM_TYPE: + return new ServerJMSStreamMessage(wrapped, deliveryCount); + case org.hornetq.api.core.Message.BYTES_TYPE: + return new ServerJMSBytesMessage(wrapped, deliveryCount); + case org.hornetq.api.core.Message.MAP_TYPE: + return new ServerJMSMapMessage(wrapped, deliveryCount); + case org.hornetq.api.core.Message.TEXT_TYPE: + return new ServerJMSTextMessage(wrapped, deliveryCount); + default: + return new ServerJMSMessage(wrapped, deliveryCount); + } + + } + + + @Override + public String toAddress(Destination destination) + { + return null; + } + + + private ServerMessageImpl newMessage(byte messageType) + { + ServerMessageImpl message = new ServerMessageImpl(serverGenerator.generateID(), 512); + message.setType(messageType); + ((ResetLimitWrappedHornetQBuffer)message.getBodyBuffer()).setMessage(null); + return message; + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/ProtonMessageConverter.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/ProtonMessageConverter.java new file mode 100644 index 0000000..495c76a --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/ProtonMessageConverter.java @@ -0,0 +1,78 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.proton.converter; + +import org.apache.qpid.proton.jms.EncodedMessage; +import org.apache.qpid.proton.jms.InboundTransformer; +import org.apache.qpid.proton.jms.JMSMappingInboundTransformer; +import org.apache.qpid.proton.jms.JMSMappingOutboundTransformer; +import org.hornetq.core.protocol.proton.converter.jms.ServerJMSMessage; +import org.hornetq.core.server.ServerMessage; +import org.hornetq.spi.core.protocol.MessageConverter; +import org.hornetq.utils.IDGenerator; + +/** + * @author Clebert Suconic + */ + +public class ProtonMessageConverter implements MessageConverter +{ + + + HornetQJMSVendor hornetQJMSVendor; + + public ProtonMessageConverter(IDGenerator idGenerator) + { + hornetQJMSVendor = new HornetQJMSVendor(idGenerator); + inboundTransformer = new JMSMappingInboundTransformer(hornetQJMSVendor); + outboundTransformer = new JMSMappingOutboundTransformer(hornetQJMSVendor); + } + + private final InboundTransformer inboundTransformer; + private final JMSMappingOutboundTransformer outboundTransformer; + + @Override + public ServerMessage inbound(Object messageSource) throws Exception + { + ServerJMSMessage jmsMessage = inboundJMSType((EncodedMessage) messageSource); + + return (ServerMessage)jmsMessage.getInnerMessage(); + } + + /** + * Just create the JMS Part of the inbound (for testing) + * @param messageSource + * @return + * @throws Exception + */ + public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception + { + EncodedMessage encodedMessageSource = messageSource; + ServerJMSMessage transformedMessage = (ServerJMSMessage)inboundTransformer.transform(encodedMessageSource); + + transformedMessage.encode(); + + return transformedMessage; + } + + + @Override + public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception + { + ServerJMSMessage jmsMessage = hornetQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); + jmsMessage.decode(); + + return outboundTransformer.convert(jmsMessage); + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java new file mode 100644 index 0000000..840fb35 --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSBytesMessage.java @@ -0,0 +1,239 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.proton.converter.jms; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; + +import org.hornetq.core.message.impl.MessageImpl; +import org.hornetq.core.message.impl.MessageInternal; + +import static org.hornetq.reader.BytesMessageUtil.bytesMessageReset; +import static org.hornetq.reader.BytesMessageUtil.bytesReadBoolean; +import static org.hornetq.reader.BytesMessageUtil.bytesReadByte; +import static org.hornetq.reader.BytesMessageUtil.bytesReadBytes; +import static org.hornetq.reader.BytesMessageUtil.bytesReadChar; +import static org.hornetq.reader.BytesMessageUtil.bytesReadDouble; +import static org.hornetq.reader.BytesMessageUtil.bytesReadFloat; +import static org.hornetq.reader.BytesMessageUtil.bytesReadInt; +import static org.hornetq.reader.BytesMessageUtil.bytesReadLong; +import static org.hornetq.reader.BytesMessageUtil.bytesReadShort; +import static org.hornetq.reader.BytesMessageUtil.bytesReadUTF; +import static org.hornetq.reader.BytesMessageUtil.bytesReadUnsignedByte; +import static org.hornetq.reader.BytesMessageUtil.bytesReadUnsignedShort; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteBoolean; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteByte; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteBytes; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteChar; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteDouble; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteFloat; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteInt; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteLong; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteObject; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteShort; +import static org.hornetq.reader.BytesMessageUtil.bytesWriteUTF; + + +/** + * @author Clebert Suconic + */ + +public class ServerJMSBytesMessage extends ServerJMSMessage implements BytesMessage +{ + public ServerJMSBytesMessage(MessageInternal message, int deliveryCount) + { + super(message, deliveryCount); + } + + @Override + public long getBodyLength() throws JMSException + { + return message.getEndOfBodyPosition() - MessageImpl.BODY_OFFSET; + } + + @Override + public boolean readBoolean() throws JMSException + { + return bytesReadBoolean(message); + } + + @Override + public byte readByte() throws JMSException + { + return bytesReadByte(message); + } + + @Override + public int readUnsignedByte() throws JMSException + { + return bytesReadUnsignedByte(message); + } + + @Override + public short readShort() throws JMSException + { + return bytesReadShort(message); + } + + @Override + public int readUnsignedShort() throws JMSException + { + return bytesReadUnsignedShort(message); + } + + @Override + public char readChar() throws JMSException + { + return bytesReadChar(message); + } + + @Override + public int readInt() throws JMSException + { + return bytesReadInt(message); + } + + @Override + public long readLong() throws JMSException + { + return bytesReadLong(message); + } + + @Override + public float readFloat() throws JMSException + { + return bytesReadFloat(message); + } + + @Override + public double readDouble() throws JMSException + { + return bytesReadDouble(message); + } + + @Override + public String readUTF() throws JMSException + { + return bytesReadUTF(message); + } + + @Override + public int readBytes(byte[] value) throws JMSException + { + return bytesReadBytes(message, value); + } + + @Override + public int readBytes(byte[] value, int length) throws JMSException + { + return bytesReadBytes(message, value, length); + } + + @Override + public void writeBoolean(boolean value) throws JMSException + { + bytesWriteBoolean(message, value); + + } + + @Override + public void writeByte(byte value) throws JMSException + { + bytesWriteByte(message, value); + } + + @Override + public void writeShort(short value) throws JMSException + { + bytesWriteShort(message, value); + } + + @Override + public void writeChar(char value) throws JMSException + { + bytesWriteChar(message, value); + } + + @Override + public void writeInt(int value) throws JMSException + { + bytesWriteInt(message, value); + } + + @Override + public void writeLong(long value) throws JMSException + { + bytesWriteLong(message, value); + } + + @Override + public void writeFloat(float value) throws JMSException + { + bytesWriteFloat(message, value); + } + + @Override + public void writeDouble(double value) throws JMSException + { + bytesWriteDouble(message, value); + } + + @Override + public void writeUTF(String value) throws JMSException + { + bytesWriteUTF(message, value); + } + + @Override + public void writeBytes(byte[] value) throws JMSException + { + bytesWriteBytes(message, value); + } + + @Override + public void writeBytes(byte[] value, int offset, int length) throws JMSException + { + bytesWriteBytes(message, value, offset, length); + } + + @Override + public void writeObject(Object value) throws JMSException + { + if (!bytesWriteObject(message, value)) + { + throw new JMSException("Can't make conversion of " + value + " to any known type"); + } + } + + public void encode() throws Exception + { + super.encode(); + // this is to make sure we encode the body-length before it's persisted + getBodyLength(); + } + + + public void decode() throws Exception + { + super.decode(); + + } + + @Override + public void reset() throws JMSException + { + bytesMessageReset(message); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java new file mode 100644 index 0000000..4373098 --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMapMessage.java @@ -0,0 +1,326 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.proton.converter.jms; + +import javax.jms.JMSException; +import javax.jms.MapMessage; +import javax.jms.MessageFormatException; +import java.util.Collections; +import java.util.Enumeration; +import java.util.HashSet; +import java.util.Set; + +import org.hornetq.api.core.HornetQPropertyConversionException; +import org.hornetq.api.core.Message; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.message.impl.MessageInternal; +import org.hornetq.utils.TypedProperties; + +import static org.hornetq.reader.MapMessageUtil.readBodyMap; +import static org.hornetq.reader.MapMessageUtil.writeBodyMap; + +/** + * HornetQ implementation of a JMS MapMessage. + * + * @author Norbert Lataille ([email protected]) + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @version $Revision: 3412 $ + */ +public final class ServerJMSMapMessage extends ServerJMSMessage implements MapMessage +{ + // Constants ----------------------------------------------------- + + public static final byte TYPE = Message.MAP_TYPE; + + // Attributes ---------------------------------------------------- + + private final TypedProperties map = new TypedProperties(); + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + /* + * This constructor is used to construct messages prior to sending + */ + public ServerJMSMapMessage(MessageInternal message, int deliveryCount) + { + super(message, deliveryCount); + + } + + // MapMessage implementation ------------------------------------- + + public void setBoolean(final String name, final boolean value) throws JMSException + { + map.putBooleanProperty(new SimpleString(name), value); + } + + public void setByte(final String name, final byte value) throws JMSException + { + map.putByteProperty(new SimpleString(name), value); + } + + public void setShort(final String name, final short value) throws JMSException + { + map.putShortProperty(new SimpleString(name), value); + } + + public void setChar(final String name, final char value) throws JMSException + { + map.putCharProperty(new SimpleString(name), value); + } + + public void setInt(final String name, final int value) throws JMSException + { + map.putIntProperty(new SimpleString(name), value); + } + + public void setLong(final String name, final long value) throws JMSException + { + map.putLongProperty(new SimpleString(name), value); + } + + public void setFloat(final String name, final float value) throws JMSException + { + map.putFloatProperty(new SimpleString(name), value); + } + + public void setDouble(final String name, final double value) throws JMSException + { + map.putDoubleProperty(new SimpleString(name), value); + } + + public void setString(final String name, final String value) throws JMSException + { + map.putSimpleStringProperty(new SimpleString(name), value == null ? null : new SimpleString(value)); + } + + public void setBytes(final String name, final byte[] value) throws JMSException + { + map.putBytesProperty(new SimpleString(name), value); + } + + public void setBytes(final String name, final byte[] value, final int offset, final int length) throws JMSException + { + if (offset + length > value.length) + { + throw new JMSException("Invalid offset/length"); + } + byte[] newBytes = new byte[length]; + System.arraycopy(value, offset, newBytes, 0, length); + map.putBytesProperty(new SimpleString(name), newBytes); + } + + public void setObject(final String name, final Object value) throws JMSException + { + try + { + TypedProperties.setObjectProperty(new SimpleString(name), value, map); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public boolean getBoolean(final String name) throws JMSException + { + try + { + return map.getBooleanProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public byte getByte(final String name) throws JMSException + { + try + { + return map.getByteProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public short getShort(final String name) throws JMSException + { + try + { + return map.getShortProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public char getChar(final String name) throws JMSException + { + try + { + return map.getCharProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public int getInt(final String name) throws JMSException + { + try + { + return map.getIntProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public long getLong(final String name) throws JMSException + { + try + { + return map.getLongProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public float getFloat(final String name) throws JMSException + { + try + { + return map.getFloatProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public double getDouble(final String name) throws JMSException + { + try + { + return map.getDoubleProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public String getString(final String name) throws JMSException + { + try + { + SimpleString str = map.getSimpleStringProperty(new SimpleString(name)); + if (str == null) + { + return null; + } + else + { + return str.toString(); + } + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public byte[] getBytes(final String name) throws JMSException + { + try + { + return map.getBytesProperty(new SimpleString(name)); + } + catch (HornetQPropertyConversionException e) + { + throw new MessageFormatException(e.getMessage()); + } + } + + public Object getObject(final String name) throws JMSException + { + Object val = map.getProperty(new SimpleString(name)); + + if (val instanceof SimpleString) + { + val = ((SimpleString) val).toString(); + } + + return val; + } + + public Enumeration getMapNames() throws JMSException + { + Set<SimpleString> simplePropNames = map.getPropertyNames(); + Set<String> propNames = new HashSet<String>(simplePropNames.size()); + + for (SimpleString str : simplePropNames) + { + propNames.add(str.toString()); + } + + return Collections.enumeration(propNames); + } + + public boolean itemExists(final String name) throws JMSException + { + return map.containsProperty(new SimpleString(name)); + } + + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + map.clear(); + } + + + public void encode() throws Exception + { + super.encode(); + writeBodyMap(message, map); + } + + public void decode() throws Exception + { + super.decode(); + readBodyMap(message, map); + } + + // Package protected --------------------------------------------- + + // Protected ----------------------------------------------------- + + // Private ------------------------------------------------------- + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMessage.java new file mode 100644 index 0000000..2c5e6e2 --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSMessage.java @@ -0,0 +1,435 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + +package org.hornetq.core.protocol.proton.converter.jms; + +import javax.jms.DeliveryMode; +import javax.jms.Destination; +import javax.jms.JMSException; +import javax.jms.Message; +import java.util.Collections; +import java.util.Enumeration; + +import org.hornetq.api.core.HornetQException; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.message.impl.MessageInternal; +import org.hornetq.jms.client.HornetQDestination; +import org.hornetq.jms.client.HornetQQueue; +import org.hornetq.reader.MessageUtil; + +/** + * @author Clebert Suconic + */ + +public class ServerJMSMessage implements Message +{ + protected final MessageInternal message; + + protected int deliveryCount; + + public MessageInternal getInnerMessage() + { + return message; + } + + + public ServerJMSMessage(MessageInternal message, int deliveryCount) + { + this.message = message; + this.deliveryCount = deliveryCount; + } + + + @Override + public final String getJMSMessageID() throws JMSException + { + return null; + } + + @Override + public final void setJMSMessageID(String id) throws JMSException + { + } + + @Override + public final long getJMSTimestamp() throws JMSException + { + return message.getTimestamp(); + } + + @Override + public final void setJMSTimestamp(long timestamp) throws JMSException + { + message.setTimestamp(timestamp); + } + + + @Override + public final byte[] getJMSCorrelationIDAsBytes() throws JMSException + { + return MessageUtil.getJMSCorrelationIDAsBytes(message); + } + + @Override + public final void setJMSCorrelationIDAsBytes(byte[] correlationID) throws JMSException + { + try + { + MessageUtil.setJMSCorrelationIDAsBytes(message, correlationID); + } + catch (HornetQException e) + { + throw new JMSException(e.getMessage()); + } + } + + @Override + public final void setJMSCorrelationID(String correlationID) throws JMSException + { + MessageUtil.setJMSCorrelationID(message, correlationID); + } + + @Override + public final String getJMSCorrelationID() throws JMSException + { + return MessageUtil.getJMSCorrelationID(message); + } + + @Override + public final Destination getJMSReplyTo() throws JMSException + { + SimpleString reply = MessageUtil.getJMSReplyTo(message); + if (reply != null) + { + return HornetQDestination.fromAddress(reply.toString()); + } + else + { + return null; + } + } + + @Override + public final void setJMSReplyTo(Destination replyTo) throws JMSException + { + MessageUtil.setJMSReplyTo(message, replyTo == null ? null : ((HornetQDestination) replyTo).getSimpleAddress()); + + } + + public final Destination getJMSDestination() throws JMSException + { + SimpleString sdest = message.getAddress(); + + if (sdest == null) + { + return null; + } + else + { + if (!sdest.toString().startsWith("jms.")) + { + return new HornetQQueue(sdest.toString(), sdest.toString()); + } + else + { + return HornetQDestination.fromAddress(sdest.toString()); + } + } + } + + @Override + public final void setJMSDestination(Destination destination) throws JMSException + { + if (destination == null) + { + message.setAddress(null); + } + else + { + message.setAddress(((HornetQDestination) destination).getSimpleAddress()); + } + + } + + @Override + public final int getJMSDeliveryMode() throws JMSException + { + return message.isDurable() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT; + } + + @Override + public final void setJMSDeliveryMode(int deliveryMode) throws JMSException + { + if (deliveryMode == DeliveryMode.PERSISTENT) + { + message.setDurable(true); + } + else if (deliveryMode == DeliveryMode.NON_PERSISTENT) + { + message.setDurable(false); + } + else + { + throw new JMSException("Invalid mode " + deliveryMode); + } + } + + @Override + public final boolean getJMSRedelivered() throws JMSException + { + return false; + } + + @Override + public final void setJMSRedelivered(boolean redelivered) throws JMSException + { + // no op + } + + @Override + public final String getJMSType() throws JMSException + { + return MessageUtil.getJMSType(message); + } + + @Override + public final void setJMSType(String type) throws JMSException + { + MessageUtil.setJMSType(message, type); + } + + @Override + public final long getJMSExpiration() throws JMSException + { + return message.getExpiration(); + } + + @Override + public final void setJMSExpiration(long expiration) throws JMSException + { + message.setExpiration(expiration); + } + + @Override + public final long getJMSDeliveryTime() throws JMSException + { + // no op + return 0; + } + + @Override + public final void setJMSDeliveryTime(long deliveryTime) throws JMSException + { + // no op + } + + @Override + public final int getJMSPriority() throws JMSException + { + return message.getPriority(); + } + + @Override + public final void setJMSPriority(int priority) throws JMSException + { + message.setPriority((byte) priority); + } + + @Override + public final void clearProperties() throws JMSException + { + MessageUtil.clearProperties(message); + + } + + @Override + public final boolean propertyExists(String name) throws JMSException + { + return MessageUtil.propertyExists(message, name); + } + + @Override + public final boolean getBooleanProperty(String name) throws JMSException + { + return message.getBooleanProperty(name); + } + + @Override + public final byte getByteProperty(String name) throws JMSException + { + return message.getByteProperty(name); + } + + @Override + public final short getShortProperty(String name) throws JMSException + { + return message.getShortProperty(name); + } + + @Override + public final int getIntProperty(String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return deliveryCount; + } + + return message.getIntProperty(name); + } + + @Override + public final long getLongProperty(String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return deliveryCount; + } + + return message.getLongProperty(name); + } + + @Override + public final float getFloatProperty(String name) throws JMSException + { + return message.getFloatProperty(name); + } + + @Override + public final double getDoubleProperty(String name) throws JMSException + { + return message.getDoubleProperty(name); + } + + @Override + public final String getStringProperty(String name) throws JMSException + { + if (MessageUtil.JMSXDELIVERYCOUNT.equals(name)) + { + return String.valueOf(deliveryCount); + } + + + return message.getStringProperty(name); + } + + @Override + public final Object getObjectProperty(String name) throws JMSException + { + Object val = message.getObjectProperty(name); + if (val instanceof SimpleString) + { + val = ((SimpleString)val).toString(); + } + return val; + } + + @Override + public final Enumeration getPropertyNames() throws JMSException + { + return Collections.enumeration(MessageUtil.getPropertyNames(message)); + } + + @Override + public final void setBooleanProperty(String name, boolean value) throws JMSException + { + message.putBooleanProperty(name, value); + } + + @Override + public final void setByteProperty(String name, byte value) throws JMSException + { + message.putByteProperty(name, value); + } + + @Override + public final void setShortProperty(String name, short value) throws JMSException + { + message.putShortProperty(name, value); + } + + @Override + public final void setIntProperty(String name, int value) throws JMSException + { + message.putIntProperty(name, value); + } + + @Override + public final void setLongProperty(String name, long value) throws JMSException + { + message.putLongProperty(name, value); + } + + @Override + public final void setFloatProperty(String name, float value) throws JMSException + { + message.putFloatProperty(name, value); + } + + @Override + public final void setDoubleProperty(String name, double value) throws JMSException + { + message.putDoubleProperty(name, value); + } + + @Override + public final void setStringProperty(String name, String value) throws JMSException + { + message.putStringProperty(name, value); + } + + @Override + public final void setObjectProperty(String name, Object value) throws JMSException + { + message.putObjectProperty(name, value); + } + + @Override + public final void acknowledge() throws JMSException + { + // no op + } + + @Override + public void clearBody() throws JMSException + { + message.getBodyBuffer().clear(); + } + + @Override + public final <T> T getBody(Class<T> c) throws JMSException + { + // no op.. jms2 not used on the conversion + return null; + } + + /** + * Encode the body into the internal message + */ + public void encode() throws Exception + { + message.getBodyBuffer().resetReaderIndex(); + } + + + public void decode() throws Exception + { + message.getBodyBuffer().resetReaderIndex(); + } + + @Override + public final boolean isBodyAssignableTo(Class c) throws JMSException + { + // no op.. jms2 not used on the conversion + return false; + } +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java new file mode 100644 index 0000000..14a5954 --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSStreamMessage.java @@ -0,0 +1,417 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.proton.converter.jms; + +import javax.jms.JMSException; +import javax.jms.MessageEOFException; +import javax.jms.MessageFormatException; +import javax.jms.StreamMessage; + +import org.hornetq.api.core.HornetQBuffer; +import org.hornetq.api.core.Message; +import org.hornetq.api.core.Pair; +import org.hornetq.core.message.impl.MessageInternal; +import org.hornetq.utils.DataConstants; + +import static org.hornetq.reader.MessageUtil.getBodyBuffer; +import static org.hornetq.reader.StreamMessageUtil.streamReadBoolean; +import static org.hornetq.reader.StreamMessageUtil.streamReadByte; +import static org.hornetq.reader.StreamMessageUtil.streamReadBytes; +import static org.hornetq.reader.StreamMessageUtil.streamReadChar; +import static org.hornetq.reader.StreamMessageUtil.streamReadDouble; +import static org.hornetq.reader.StreamMessageUtil.streamReadFloat; +import static org.hornetq.reader.StreamMessageUtil.streamReadInteger; +import static org.hornetq.reader.StreamMessageUtil.streamReadLong; +import static org.hornetq.reader.StreamMessageUtil.streamReadObject; +import static org.hornetq.reader.StreamMessageUtil.streamReadShort; +import static org.hornetq.reader.StreamMessageUtil.streamReadString; + +public final class ServerJMSStreamMessage extends ServerJMSMessage implements StreamMessage +{ + public static final byte TYPE = Message.STREAM_TYPE; + + private int bodyLength = 0; + + + public ServerJMSStreamMessage(MessageInternal message, int deliveryCount) + { + super(message, deliveryCount); + + } + + // StreamMessage implementation ---------------------------------- + + public boolean readBoolean() throws JMSException + { + try + { + return streamReadBoolean(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public byte readByte() throws JMSException + { + try + { + return streamReadByte(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public short readShort() throws JMSException + { + + try + { + return streamReadShort(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public char readChar() throws JMSException + { + + try + { + return streamReadChar(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public int readInt() throws JMSException + { + + try + { + return streamReadInteger(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public long readLong() throws JMSException + { + + try + { + return streamReadLong(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public float readFloat() throws JMSException + { + + try + { + return streamReadFloat(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public double readDouble() throws JMSException + { + + try + { + return streamReadDouble(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public String readString() throws JMSException + { + + try + { + return streamReadString(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + /** + * len here is used to control how many more bytes to read + */ + private int len = 0; + + public int readBytes(final byte[] value) throws JMSException + { + + try + { + Pair<Integer, Integer> pairRead = streamReadBytes(message, len, value); + + len = pairRead.getA(); + return pairRead.getB(); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public Object readObject() throws JMSException + { + + if (getBodyBuffer(message).readerIndex() >= message.getEndOfBodyPosition()) + { + throw new MessageEOFException(""); + } + try + { + return streamReadObject(message); + } + catch (IllegalStateException e) + { + throw new MessageFormatException(e.getMessage()); + } + catch (IndexOutOfBoundsException e) + { + throw new MessageEOFException(""); + } + } + + public void writeBoolean(final boolean value) throws JMSException + { + + getBuffer().writeByte(DataConstants.BOOLEAN); + getBuffer().writeBoolean(value); + } + + public void writeByte(final byte value) throws JMSException + { + + getBuffer().writeByte(DataConstants.BYTE); + getBuffer().writeByte(value); + } + + public void writeShort(final short value) throws JMSException + { + + getBuffer().writeByte(DataConstants.SHORT); + getBuffer().writeShort(value); + } + + public void writeChar(final char value) throws JMSException + { + + getBuffer().writeByte(DataConstants.CHAR); + getBuffer().writeShort((short) value); + } + + public void writeInt(final int value) throws JMSException + { + + getBuffer().writeByte(DataConstants.INT); + getBuffer().writeInt(value); + } + + public void writeLong(final long value) throws JMSException + { + + getBuffer().writeByte(DataConstants.LONG); + getBuffer().writeLong(value); + } + + public void writeFloat(final float value) throws JMSException + { + + getBuffer().writeByte(DataConstants.FLOAT); + getBuffer().writeInt(Float.floatToIntBits(value)); + } + + public void writeDouble(final double value) throws JMSException + { + + getBuffer().writeByte(DataConstants.DOUBLE); + getBuffer().writeLong(Double.doubleToLongBits(value)); + } + + public void writeString(final String value) throws JMSException + { + + getBuffer().writeByte(DataConstants.STRING); + getBuffer().writeNullableString(value); + } + + public void writeBytes(final byte[] value) throws JMSException + { + + getBuffer().writeByte(DataConstants.BYTES); + getBuffer().writeInt(value.length); + getBuffer().writeBytes(value); + } + + public void writeBytes(final byte[] value, final int offset, final int length) throws JMSException + { + + getBuffer().writeByte(DataConstants.BYTES); + getBuffer().writeInt(length); + getBuffer().writeBytes(value, offset, length); + } + + public void writeObject(final Object value) throws JMSException + { + if (value instanceof String) + { + writeString((String) value); + } + else if (value instanceof Boolean) + { + writeBoolean((Boolean) value); + } + else if (value instanceof Byte) + { + writeByte((Byte) value); + } + else if (value instanceof Short) + { + writeShort((Short) value); + } + else if (value instanceof Integer) + { + writeInt((Integer) value); + } + else if (value instanceof Long) + { + writeLong((Long) value); + } + else if (value instanceof Float) + { + writeFloat((Float) value); + } + else if (value instanceof Double) + { + writeDouble((Double) value); + } + else if (value instanceof byte[]) + { + writeBytes((byte[]) value); + } + else if (value instanceof Character) + { + writeChar((Character) value); + } + else if (value == null) + { + writeString(null); + } + else + { + throw new MessageFormatException("Invalid object type: " + value.getClass()); + } + } + + public void reset() throws JMSException + { + getBuffer().resetReaderIndex(); + } + + // HornetQRAMessage overrides ---------------------------------------- + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + getBuffer().clear(); + } + + private HornetQBuffer getBuffer() + { + return message.getBodyBuffer(); + } + + + public void decode() throws Exception + { + super.decode(); + } + + /** + * Encode the body into the internal message + */ + public void encode() throws Exception + { + super.encode(); + bodyLength = message.getEndOfBodyPosition(); + } + +} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java new file mode 100644 index 0000000..5455e6e --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/ServerJMSTextMessage.java @@ -0,0 +1,112 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ +package org.hornetq.core.protocol.proton.converter.jms; + +import javax.jms.JMSException; +import javax.jms.TextMessage; + +import org.hornetq.api.core.Message; +import org.hornetq.api.core.SimpleString; +import org.hornetq.core.message.impl.MessageInternal; + +import static org.hornetq.reader.TextMessageUtil.readBodyText; +import static org.hornetq.reader.TextMessageUtil.writeBodyText; + + +/** + * HornetQ implementation of a JMS TextMessage. + * <br> + * This class was ported from SpyTextMessage in JBossMQ. + * + * @author Norbert Lataille ([email protected]) + * @author <a href="mailto:[email protected]">Jason Dillon</a> + * @author <a href="mailto:[email protected]">Adrian Brock</a> + * @author <a href="mailto:[email protected]">Tim Fox</a> + * @author <a href="mailto:[email protected]">Ovidiu Feodorov</a> + * @author <a href="mailto:[email protected]">Andy Taylor</a> + * @version $Revision: 3412 $ + */ +public class ServerJMSTextMessage extends ServerJMSMessage implements TextMessage +{ + // Constants ----------------------------------------------------- + + public static final byte TYPE = Message.TEXT_TYPE; + + // Attributes ---------------------------------------------------- + + // We cache it locally - it's more performant to cache as a SimpleString, the AbstractChannelBuffer write + // methods are more efficient for a SimpleString + private SimpleString text; + + // Static -------------------------------------------------------- + + // Constructors -------------------------------------------------- + + /* + * This constructor is used to construct messages prior to sending + */ + public ServerJMSTextMessage(MessageInternal message, int deliveryCount) + { + super(message, deliveryCount); + + } + // TextMessage implementation ------------------------------------ + + public void setText(final String text) throws JMSException + { + if (text != null) + { + this.text = new SimpleString(text); + } + else + { + this.text = null; + } + + writeBodyText(message, this.text); + } + + public String getText() + { + if (text != null) + { + return text.toString(); + } + else + { + return null; + } + } + + @Override + public void clearBody() throws JMSException + { + super.clearBody(); + + text = null; + } + + + public void encode() throws Exception + { + super.encode(); + writeBodyText(message, text); + } + + public void decode() throws Exception + { + super.decode(); + text = readBodyText(message); + } + +} \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/package-info.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/package-info.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/package-info.java new file mode 100644 index 0000000..acc5a0d --- /dev/null +++ b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/converter/jms/package-info.java @@ -0,0 +1,21 @@ +/* + * Copyright 2005-2014 Red Hat, Inc. + * Red Hat licenses this file to you under the Apache License, version + * 2.0 (the "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or + * implied. See the License for the specific language governing + * permissions and limitations under the License. + */ + + +/** + * This package contains incomplete JMS implementations just to be used with converting amqp to hornetq and + * vice versa + * @author Clebert Suconic + */ + +package org.hornetq.core.protocol.proton.converter.jms; \ No newline at end of file http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPException.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPException.java deleted file mode 100644 index 03f797d..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPException.java +++ /dev/null @@ -1,40 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton.exceptions; - -import org.apache.qpid.proton.amqp.Symbol; -import org.hornetq.api.core.HornetQException; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * 6/6/13 - */ -public class HornetQAMQPException extends HornetQException -{ - - private static final String ERROR_PREFIX = "amqp:"; - - public Symbol getAmqpError() - { - return amqpError; - } - - private final Symbol amqpError; - - public HornetQAMQPException(Symbol amqpError, String message) - { - super(message); - this.amqpError = amqpError; - } -} http://git-wip-us.apache.org/repos/asf/activemq-6/blob/177e6820/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPIllegalStateException.java ---------------------------------------------------------------------- diff --git a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPIllegalStateException.java b/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPIllegalStateException.java deleted file mode 100644 index 0792f88..0000000 --- a/hornetq-protocols/hornetq-amqp-protocol/src/main/java/org/hornetq/core/protocol/proton/exceptions/HornetQAMQPIllegalStateException.java +++ /dev/null @@ -1,28 +0,0 @@ -/* - * Copyright 2005-2014 Red Hat, Inc. - * Red Hat licenses this file to you under the Apache License, version - * 2.0 (the "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * http://www.apache.org/licenses/LICENSE-2.0 - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or - * implied. See the License for the specific language governing - * permissions and limitations under the License. - */ - -package org.hornetq.core.protocol.proton.exceptions; - -import org.apache.qpid.proton.amqp.transport.AmqpError; - -/** - * @author <a href="mailto:[email protected]">Andy Taylor</a> - * 6/6/13 - */ -public class HornetQAMQPIllegalStateException extends HornetQAMQPException -{ - public HornetQAMQPIllegalStateException(String message) - { - super(AmqpError.ILLEGAL_STATE, message); - } -}
