http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java index 55195eb..ff0c035 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/InboundTransformer.java @@ -16,6 +16,13 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; +import javax.jms.DeliveryMode; +import javax.jms.JMSException; +import javax.jms.Message; +import java.nio.charset.StandardCharsets; +import java.util.Map; +import java.util.Set; + import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.Decimal128; import org.apache.qpid.proton.amqp.Decimal32; @@ -31,14 +38,6 @@ import org.apache.qpid.proton.amqp.messaging.Header; import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; import org.apache.qpid.proton.amqp.messaging.Properties; -import javax.jms.DeliveryMode; -import javax.jms.JMSException; -import javax.jms.Message; - -import java.nio.charset.StandardCharsets; -import java.util.Map; -import java.util.Set; - import static org.apache.activemq.artemis.api.core.Message.HDR_SCHEDULED_DELIVERY_TIME; public abstract class InboundTransformer { @@ -116,15 +115,13 @@ public abstract class InboundTransformer { if (header.getDurable() != null) { jms.setJMSDeliveryMode(header.getDurable().booleanValue() ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT); - } - else { + } else { jms.setJMSDeliveryMode(defaultDeliveryMode); } if (header.getPriority() != null) { jms.setJMSPriority(header.getPriority().intValue()); - } - else { + } else { jms.setJMSPriority(defaultPriority); } @@ -143,12 +140,10 @@ public abstract class InboundTransformer { if ("x-opt-jms-type".equals(key) && entry.getValue() != null) { // Legacy annotation, JMSType value will be replaced by Subject further down if also present. jms.setJMSType(entry.getValue().toString()); - } - else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { + } else if ("x-opt-delivery-time".equals(key) && entry.getValue() != null) { long deliveryTime = ((Number) entry.getValue()).longValue(); jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), deliveryTime); - } - else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { + } else if ("x-opt-delivery-delay".equals(key) && entry.getValue() != null) { long delay = ((Number) entry.getValue()).longValue(); if (delay > 0) { jms.setLongProperty(HDR_SCHEDULED_DELIVERY_TIME.toString(), System.currentTimeMillis() + delay); @@ -182,14 +177,11 @@ public abstract class InboundTransformer { String key = entry.getKey().toString(); if ("JMSXGroupID".equals(key)) { vendor.setJMSXGroupID(jms, entry.getValue().toString()); - } - else if ("JMSXGroupSequence".equals(key)) { + } else if ("JMSXGroupSequence".equals(key)) { vendor.setJMSXGroupSequence(jms, ((Number) entry.getValue()).intValue()); - } - else if ("JMSXUserID".equals(key)) { + } else if ("JMSXUserID".equals(key)) { vendor.setJMSXUserID(jms, entry.getValue().toString()); - } - else { + } else { setProperty(jms, key, entry.getValue()); } } @@ -249,8 +241,7 @@ public abstract class InboundTransformer { if (ttl == 0) { jms.setJMSExpiration(0); - } - else { + } else { jms.setJMSExpiration(System.currentTimeMillis() + ttl); } } @@ -268,50 +259,38 @@ public abstract class InboundTransformer { if (value instanceof UnsignedLong) { long v = ((UnsignedLong) value).longValue(); msg.setLongProperty(key, v); - } - else if (value instanceof UnsignedInteger) { + } else if (value instanceof UnsignedInteger) { long v = ((UnsignedInteger) value).longValue(); if (Integer.MIN_VALUE <= v && v <= Integer.MAX_VALUE) { msg.setIntProperty(key, (int) v); - } - else { + } else { msg.setLongProperty(key, v); } - } - else if (value instanceof UnsignedShort) { + } else if (value instanceof UnsignedShort) { int v = ((UnsignedShort) value).intValue(); if (Short.MIN_VALUE <= v && v <= Short.MAX_VALUE) { msg.setShortProperty(key, (short) v); - } - else { + } else { msg.setIntProperty(key, v); } - } - else if (value instanceof UnsignedByte) { + } else if (value instanceof UnsignedByte) { short v = ((UnsignedByte) value).shortValue(); if (Byte.MIN_VALUE <= v && v <= Byte.MAX_VALUE) { msg.setByteProperty(key, (byte) v); - } - else { + } else { msg.setShortProperty(key, v); } - } - else if (value instanceof Symbol) { + } else if (value instanceof Symbol) { msg.setStringProperty(key, value.toString()); - } - else if (value instanceof Decimal128) { + } else if (value instanceof Decimal128) { msg.setDoubleProperty(key, ((Decimal128) value).doubleValue()); - } - else if (value instanceof Decimal64) { + } else if (value instanceof Decimal64) { msg.setDoubleProperty(key, ((Decimal64) value).doubleValue()); - } - else if (value instanceof Decimal32) { + } else if (value instanceof Decimal32) { msg.setFloatProperty(key, ((Decimal32) value).floatValue()); - } - else if (value instanceof Binary) { + } else if (value instanceof Binary) { msg.setStringProperty(key, value.toString()); - } - else { + } else { msg.setObjectProperty(key, value); } }
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java index 2bcbfe2..9dd29ab 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingInboundTransformer.java @@ -16,12 +16,6 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.messaging.AmqpSequence; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.amqp.messaging.Section; - import javax.jms.BytesMessage; import javax.jms.MapMessage; import javax.jms.Message; @@ -33,6 +27,12 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.Section; + public class JMSMappingInboundTransformer extends InboundTransformer { public JMSMappingInboundTransformer(JMSVendor vendor) { @@ -58,14 +58,12 @@ public class JMSMappingInboundTransformer extends InboundTransformer { final Section body = amqp.getBody(); if (body == null) { rc = vendor.createMessage(); - } - else if (body instanceof Data) { + } else if (body instanceof Data) { Binary d = ((Data) body).getValue(); BytesMessage m = vendor.createBytesMessage(); m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); rc = m; - } - else if (body instanceof AmqpSequence) { + } else if (body instanceof AmqpSequence) { AmqpSequence sequence = (AmqpSequence) body; StreamMessage m = vendor.createStreamMessage(); for (Object item : sequence.getValue()) { @@ -73,8 +71,7 @@ public class JMSMappingInboundTransformer extends InboundTransformer { } rc = m; m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_SEQUENCE); - } - else if (body instanceof AmqpValue) { + } else if (body instanceof AmqpValue) { Object value = ((AmqpValue) body).getValue(); if (value == null) { rc = vendor.createObjectMessage(); @@ -83,36 +80,31 @@ public class JMSMappingInboundTransformer extends InboundTransformer { TextMessage m = vendor.createTextMessage(); m.setText((String) value); rc = m; - } - else if (value instanceof Binary) { + } else if (value instanceof Binary) { Binary d = (Binary) value; BytesMessage m = vendor.createBytesMessage(); m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength()); rc = m; - } - else if (value instanceof List) { + } else if (value instanceof List) { StreamMessage m = vendor.createStreamMessage(); for (Object item : (List<Object>) value) { m.writeObject(item); } rc = m; m.setStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY, AMQPMessageTypes.AMQP_LIST); - } - else if (value instanceof Map) { + } else if (value instanceof Map) { MapMessage m = vendor.createMapMessage(); final Set<Map.Entry<String, Object>> set = ((Map<String, Object>) value).entrySet(); for (Map.Entry<String, Object> entry : set) { m.setObject(entry.getKey(), entry.getValue()); } rc = m; - } - else { + } else { ObjectMessage m = vendor.createObjectMessage(); m.setObject((Serializable) value); rc = m; } - } - else { + } else { throw new RuntimeException("Unexpected body type: " + body.getClass()); } rc.setJMSDeliveryMode(defaultDeliveryMode); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java index 40cbf79..9f28a6b 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/converter/message/JMSMappingOutboundTransformer.java @@ -16,26 +16,6 @@ */ package org.apache.activemq.artemis.protocol.amqp.converter.message; -import org.apache.activemq.artemis.core.message.impl.MessageInternal; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; -import org.apache.qpid.proton.amqp.Binary; -import org.apache.qpid.proton.amqp.Symbol; -import org.apache.qpid.proton.amqp.UnsignedByte; -import org.apache.qpid.proton.amqp.UnsignedInteger; -import org.apache.qpid.proton.amqp.messaging.AmqpSequence; -import org.apache.qpid.proton.amqp.messaging.AmqpValue; -import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; -import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; -import org.apache.qpid.proton.amqp.messaging.Footer; -import org.apache.qpid.proton.amqp.messaging.Header; -import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; -import org.apache.qpid.proton.amqp.messaging.Properties; -import org.apache.qpid.proton.amqp.messaging.Section; -import org.apache.qpid.proton.message.ProtonJMessage; -import org.jboss.logging.Logger; - import javax.jms.BytesMessage; import javax.jms.DeliveryMode; import javax.jms.Destination; @@ -57,7 +37,28 @@ import java.util.Date; import java.util.Enumeration; import java.util.HashMap; +import org.apache.activemq.artemis.core.message.impl.MessageInternal; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; +import org.apache.qpid.proton.amqp.Binary; +import org.apache.qpid.proton.amqp.Symbol; +import org.apache.qpid.proton.amqp.UnsignedByte; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.AmqpSequence; +import org.apache.qpid.proton.amqp.messaging.AmqpValue; +import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; +import org.apache.qpid.proton.amqp.messaging.Data; +import org.apache.qpid.proton.amqp.messaging.DeliveryAnnotations; +import org.apache.qpid.proton.amqp.messaging.Footer; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.amqp.messaging.MessageAnnotations; +import org.apache.qpid.proton.amqp.messaging.Properties; +import org.apache.qpid.proton.amqp.messaging.Section; +import org.apache.qpid.proton.message.ProtonJMessage; +import org.jboss.logging.Logger; + public class JMSMappingOutboundTransformer extends OutboundTransformer { + private static final Logger logger = Logger.getLogger(JMSMappingOutboundTransformer.class); public static final Symbol JMS_DEST_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-dest"); public static final Symbol JMS_REPLY_TO_TYPE_MSG_ANNOTATION = Symbol.valueOf("x-opt-jms-reply-to"); @@ -116,15 +117,13 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { while (true) { list.add(m.readObject()); } - } - catch (MessageEOFException e) { + } catch (MessageEOFException e) { } String amqpType = msg.getStringProperty(AMQPMessageTypes.AMQP_TYPE_KEY); if (amqpType.equals(AMQPMessageTypes.AMQP_LIST)) { body = new AmqpValue(list); - } - else { + } else { body = new AmqpSequence(list); } } @@ -142,11 +141,9 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { if (s != null) { body = new AmqpValue(s.toString()); } - } - catch (Throwable ignored) { + } catch (Throwable ignored) { logger.debug("Exception ignored during conversion, should be ok!", ignored.getMessage(), ignored); - } - finally { + } finally { internalMessage.getBodyBuffer().readerIndex(readerIndex); } } @@ -163,8 +160,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { try { props.setMessageId(AMQPMessageIdHelper.INSTANCE.toIdObject(msgId)); - } - catch (ActiveMQAMQPIllegalStateException e) { + } catch (ActiveMQAMQPIllegalStateException e) { props.setMessageId(msgId); } } @@ -187,8 +183,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { try { props.setCorrelationId(AMQPMessageIdHelper.INSTANCE.toIdObject(correlationId)); - } - catch (ActiveMQAMQPIllegalStateException e) { + } catch (ActiveMQAMQPIllegalStateException e) { props.setCorrelationId(correlationId); } } @@ -210,72 +205,59 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { String key = keys.nextElement(); if (key.equals(messageFormatKey) || key.equals(nativeKey) || key.equals(ServerJMSMessage.NATIVE_MESSAGE_ID)) { // skip.. - } - else if (key.equals(firstAcquirerKey)) { + } else if (key.equals(firstAcquirerKey)) { header.setFirstAcquirer(msg.getBooleanProperty(key)); - } - else if (key.startsWith("JMSXDeliveryCount")) { + } else if (key.startsWith("JMSXDeliveryCount")) { // The AMQP delivery-count field only includes prior failed delivery attempts, // whereas JMSXDeliveryCount includes the first/current delivery attempt. int amqpDeliveryCount = msg.getIntProperty(key) - 1; if (amqpDeliveryCount > 0) { header.setDeliveryCount(new UnsignedInteger(amqpDeliveryCount)); } - } - else if (key.startsWith("JMSXUserID")) { + } else if (key.startsWith("JMSXUserID")) { String value = msg.getStringProperty(key); props.setUserId(new Binary(value.getBytes(StandardCharsets.UTF_8))); - } - else if (key.startsWith("JMSXGroupID") || key.startsWith("_AMQ_GROUP_ID")) { + } else if (key.startsWith("JMSXGroupID") || key.startsWith("_AMQ_GROUP_ID")) { String value = msg.getStringProperty(key); props.setGroupId(value); if (apMap == null) { apMap = new HashMap(); } apMap.put(key, value); - } - else if (key.startsWith("JMSXGroupSeq")) { + } else if (key.startsWith("JMSXGroupSeq")) { UnsignedInteger value = new UnsignedInteger(msg.getIntProperty(key)); props.setGroupSequence(value); if (apMap == null) { apMap = new HashMap(); } apMap.put(key, value); - } - else if (key.startsWith(prefixDeliveryAnnotationsKey)) { + } else if (key.startsWith(prefixDeliveryAnnotationsKey)) { if (daMap == null) { daMap = new HashMap<>(); } String name = key.substring(prefixDeliveryAnnotationsKey.length()); daMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); - } - else if (key.startsWith(prefixMessageAnnotationsKey)) { + } else if (key.startsWith(prefixMessageAnnotationsKey)) { if (maMap == null) { maMap = new HashMap<>(); } String name = key.substring(prefixMessageAnnotationsKey.length()); maMap.put(Symbol.valueOf(name), msg.getObjectProperty(key)); - } - else if (key.equals(contentTypeKey)) { + } else if (key.equals(contentTypeKey)) { props.setContentType(Symbol.getSymbol(msg.getStringProperty(key))); - } - else if (key.equals(contentEncodingKey)) { + } else if (key.equals(contentEncodingKey)) { props.setContentEncoding(Symbol.getSymbol(msg.getStringProperty(key))); - } - else if (key.equals(replyToGroupIDKey)) { + } else if (key.equals(replyToGroupIDKey)) { props.setReplyToGroupId(msg.getStringProperty(key)); - } - else if (key.startsWith(prefixFooterKey)) { + } else if (key.startsWith(prefixFooterKey)) { if (footerMap == null) { footerMap = new HashMap(); } String name = key.substring(prefixFooterKey.length()); footerMap.put(name, msg.getObjectProperty(key)); - } - else if (key.equals(AMQPMessageTypes.AMQP_TYPE_KEY)) { + } else if (key.equals(AMQPMessageTypes.AMQP_TYPE_KEY)) { // skip - } - else { + } else { if (apMap == null) { apMap = new HashMap(); } @@ -283,8 +265,7 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { if (objectProperty instanceof byte[]) { Binary binary = new Binary((byte[]) objectProperty); apMap.put(key, binary); - } - else { + } else { apMap.put(key, objectProperty); } } @@ -314,16 +295,13 @@ public class JMSMappingOutboundTransformer extends OutboundTransformer { if (destination instanceof Queue) { if (destination instanceof TemporaryQueue) { return TEMP_QUEUE_TYPE; - } - else { + } else { return QUEUE_TYPE; } - } - else if (destination instanceof Topic) { + } else if (destination instanceof Topic) { if (destination instanceof TemporaryTopic) { return TEMP_TOPIC_TYPE; - } - else { + } else { return TOPIC_TYPE; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java index 2bfe5fc..898bab0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/logger/ActiveMQAMQPProtocolMessageBundle.java @@ -16,13 +16,13 @@ */ package org.apache.activemq.artemis.protocol.amqp.logger; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInvalidFieldException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; +import org.jboss.logging.Messages; import org.jboss.logging.annotations.Message; import org.jboss.logging.annotations.MessageBundle; -import org.jboss.logging.Messages; /** * Logger Code 11 http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java index 8b14e67..3d79026 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPConnectionContext.java @@ -16,13 +16,22 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import java.util.HashMap; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.Executor; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; + import io.netty.buffer.ByteBuf; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPConnectionCallback; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.proton.handler.EventHandler; -import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; +import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; import org.apache.activemq.artemis.protocol.amqp.proton.handler.ProtonHandler; +import org.apache.activemq.artemis.protocol.amqp.sasl.SASLResult; import org.apache.activemq.artemis.utils.ByteUtil; import org.apache.activemq.artemis.utils.VersionLoader; import org.apache.qpid.proton.amqp.Symbol; @@ -35,17 +44,8 @@ import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.engine.Session; import org.apache.qpid.proton.engine.Transport; import org.jboss.logging.Logger; -import org.apache.activemq.artemis.protocol.amqp.proton.handler.ExtCapability; - -import java.util.HashMap; -import java.util.Map; -import java.util.UUID; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.Executor; -import java.util.concurrent.ScheduledExecutorService; -import java.util.concurrent.TimeUnit; -public class AMQPConnectionContext extends ProtonInitializable { +public class AMQPConnectionContext extends ProtonInitializable { private static final Logger log = Logger.getLogger(AMQPConnectionContext.class); @@ -63,8 +63,6 @@ public class AMQPConnectionContext extends ProtonInitializable { protected LocalListener listener = new LocalListener(); - - public AMQPConnectionContext(AMQPConnectionCallback connectionSP, String containerId, int idleTimeout, @@ -138,7 +136,6 @@ public class AMQPConnectionContext extends ProtonInitializable { handler.close(); } - protected AMQPSessionContext getSessionExtension(Session realSession) throws ActiveMQAMQPException { AMQPSessionContext sessionExtension = sessions.get(realSession); if (sessionExtension == null) { @@ -150,8 +147,6 @@ public class AMQPConnectionContext extends ProtonInitializable { return sessionExtension; } - - protected boolean validateConnection(Connection connection) { return connectionCallback.validateConnection(connection, handler.getSASLResult()); } @@ -194,12 +189,10 @@ public class AMQPConnectionContext extends ProtonInitializable { if (link.getRemoteTarget() instanceof Coordinator) { Coordinator coordinator = (Coordinator) link.getRemoteTarget(); protonSession.addTransactionHandler(coordinator, receiver); - } - else { + } else { protonSession.addReceiver(receiver); } - } - else { + } else { Sender sender = (Sender) link; protonSession.addSender(sender); sender.offer(1); @@ -210,8 +203,6 @@ public class AMQPConnectionContext extends ProtonInitializable { return ExtCapability.getCapabilities(); } - - // This listener will perform a bunch of things here class LocalListener implements EventHandler { @@ -269,8 +260,7 @@ public class AMQPConnectionContext extends ProtonInitializable { public void onAuthInit(ProtonHandler handler, Connection connection, boolean sasl) { if (sasl) { handler.createServerSASL(connectionCallback.getSASLMechnisms()); - } - else { + } else { if (!connectionCallback.isSupportsAnonymous()) { connectionCallback.sendSASLSupported(); connectionCallback.close(); @@ -289,14 +279,12 @@ public class AMQPConnectionContext extends ProtonInitializable { synchronized (getLock()) { try { initInternal(); - } - catch (Exception e) { + } catch (Exception e) { log.error("Error init connection", e); } if (!validateConnection(connection)) { connection.close(); - } - else { + } else { connection.setContext(AMQPConnectionContext.this); connection.setContainer(containerId); connection.setProperties(connectionProperties); @@ -365,7 +353,7 @@ public class AMQPConnectionContext extends ProtonInitializable { session.close(); } - AMQPSessionContext sessionContext = (AMQPSessionContext)session.getContext(); + AMQPSessionContext sessionContext = (AMQPSessionContext) session.getContext(); if (sessionContext != null) { sessionContext.close(); sessions.remove(session); @@ -411,8 +399,7 @@ public class AMQPConnectionContext extends ProtonInitializable { ProtonDeliveryHandler handler = (ProtonDeliveryHandler) delivery.getLink().getContext(); if (handler != null) { handler.onMessage(delivery); - } - else { + } else { // TODO: logs System.err.println("Handler is null, can't delivery " + delivery); @@ -420,5 +407,4 @@ public class AMQPConnectionContext extends ProtonInitializable { } } - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java index 9003d3b..6a6c1fa 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AMQPSessionContext.java @@ -22,8 +22,8 @@ import java.util.Map; import java.util.Set; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.transaction.Coordinator; import org.apache.qpid.proton.amqp.transport.ErrorCondition; @@ -49,9 +49,7 @@ public class AMQPSessionContext extends ProtonInitializable { protected boolean closed = false; - public AMQPSessionContext(AMQPSessionCallback sessionSPI, - AMQPConnectionContext connection, - Session session) { + public AMQPSessionContext(AMQPSessionCallback sessionSPI, AMQPConnectionContext connection, Session session) { this.connection = connection; this.sessionSPI = sessionSPI; this.session = session; @@ -67,8 +65,7 @@ public class AMQPSessionContext extends ProtonInitializable { if (sessionSPI != null) { try { sessionSPI.init(this, connection.getSASLResult()); - } - catch (Exception e) { + } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } } @@ -84,15 +81,13 @@ public class AMQPSessionContext extends ProtonInitializable { if (protonConsumer != null) { try { protonConsumer.close(false); - } - catch (ActiveMQAMQPException e) { + } catch (ActiveMQAMQPException e) { protonConsumer.getSender().setTarget(null); protonConsumer.getSender().setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); } } } - /** * The consumer object from the broker or the key used to store the sender * @@ -129,8 +124,7 @@ public class AMQPSessionContext extends ProtonInitializable { for (ProtonServerReceiverContext protonProducer : receiversCopy) { try { protonProducer.close(false); - } - catch (Exception e) { + } catch (Exception e) { log.warn(e.getMessage(), e); } } @@ -142,8 +136,7 @@ public class AMQPSessionContext extends ProtonInitializable { for (ProtonServerSenderContext protonConsumer : protonSendersClone) { try { protonConsumer.close(false); - } - catch (Exception e) { + } catch (Exception e) { log.warn(e.getMessage(), e); } } @@ -152,8 +145,7 @@ public class AMQPSessionContext extends ProtonInitializable { if (sessionSPI != null) { sessionSPI.close(); } - } - catch (Exception e) { + } catch (Exception e) { log.warn(e.getMessage(), e); } closed = true; @@ -166,9 +158,7 @@ public class AMQPSessionContext extends ProtonInitializable { public void addTransactionHandler(Coordinator coordinator, Receiver receiver) { ProtonTransactionHandler transactionHandler = new ProtonTransactionHandler(sessionSPI); - coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), - Symbol.getSymbol("amqp:multi-txns-per-ssn"), - Symbol.getSymbol("amqp:multi-ssns-per-txn")); + coordinator.setCapabilities(Symbol.getSymbol("amqp:local-transactions"), Symbol.getSymbol("amqp:multi-txns-per-ssn"), Symbol.getSymbol("amqp:multi-ssns-per-txn")); receiver.setContext(transactionHandler); receiver.open(); @@ -185,8 +175,7 @@ public class AMQPSessionContext extends ProtonInitializable { sender.setContext(protonSender); sender.open(); protonSender.start(); - } - catch (ActiveMQAMQPException e) { + } catch (ActiveMQAMQPException e) { senders.remove(sender); sender.setSource(null); sender.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); @@ -196,7 +185,7 @@ public class AMQPSessionContext extends ProtonInitializable { public void removeSender(Sender sender) throws ActiveMQAMQPException { senders.remove(sender); - ProtonServerSenderContext senderRemoved = senders.remove(sender); + ProtonServerSenderContext senderRemoved = senders.remove(sender); if (senderRemoved != null) { serverSenders.remove(senderRemoved.getBrokerConsumer()); } @@ -209,8 +198,7 @@ public class AMQPSessionContext extends ProtonInitializable { receivers.put(receiver, protonReceiver); receiver.setContext(protonReceiver); receiver.open(); - } - catch (ActiveMQAMQPException e) { + } catch (ActiveMQAMQPException e) { receivers.remove(receiver); receiver.setTarget(null); receiver.setCondition(new ErrorCondition(e.getAmqpError(), e.getMessage())); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java index 848f0d2..13d7170 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/AmqpSupport.java @@ -16,13 +16,13 @@ */ package org.apache.activemq.artemis.protocol.amqp.proton; +import java.util.AbstractMap; +import java.util.Map; + import org.apache.qpid.proton.amqp.DescribedType; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.UnsignedLong; -import java.util.AbstractMap; -import java.util.Map; - /** * Set of useful methods and definitions used in the AMQP protocol handling */ @@ -61,14 +61,12 @@ public class AmqpSupport { public static final Symbol LIFETIME_POLICY = Symbol.valueOf("lifetime-policy"); public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container"); + /** * Search for a given Symbol in a given array of Symbol object. * - * @param symbols - * the set of Symbols to search. - * @param key - * the value to try and find in the Symbol array. - * + * @param symbols the set of Symbols to search. + * @param key the value to try and find in the Symbol array. * @return true if the key is found in the given Symbol array. */ public static boolean contains(Symbol[] symbols, Symbol key) { @@ -89,11 +87,8 @@ public class AmqpSupport { * Search for a particular filter using a set of known indentification values * in the Map of filters. * - * @param filters - * The filters map that should be searched. - * @param filterIds - * The aliases for the target filter to be located. - * + * @param filters The filters map that should be searched. + * @param filterIds The aliases for the target filter to be located. * @return the filter if found in the mapping or null if not found. */ public static Map.Entry<Symbol, DescribedType> findFilter(Map<Symbol, Object> filters, Object[] filterIds) { http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java index 4b97831..41caea9 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerReceiverContext.java @@ -18,13 +18,13 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; +import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; -import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; +import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; -import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.qpid.proton.amqp.Symbol; import org.apache.qpid.proton.amqp.messaging.Rejected; import org.apache.qpid.proton.amqp.transaction.TransactionalState; @@ -47,7 +47,6 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements protected final AMQPSessionCallback sessionSPI; - /* The maximum number of credits we will allocate to clients. This number is also used by the broker when refresh client credits. @@ -85,13 +84,11 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements try { sessionSPI.createTemporaryQueue(address); - } - catch (Exception e) { + } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } target.setAddress(address); - } - else { + } else { //if not dynamic then we use the targets address as the address to forward the messages to, however there has to //be a queue bound to it so we nee to check this. address = target.getAddress(); @@ -103,11 +100,9 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements if (!sessionSPI.bindingQuery(address)) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.addressDoesntExist(); } - } - catch (ActiveMQAMQPNotFoundException e) { + } catch (ActiveMQAMQPNotFoundException e) { throw e; - } - catch (Exception e) { + } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } } @@ -152,12 +147,10 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements flow(maxCreditAllocation, minCreditRefresh); } - } - finally { + } finally { buffer.release(); } - } - catch (Exception e) { + } catch (Exception e) { log.warn(e.getMessage(), e); Rejected rejected = new Rejected(); ErrorCondition condition = new ErrorCondition(); @@ -183,8 +176,7 @@ public class ProtonServerReceiverContext extends ProtonInitializable implements // Use the SessionSPI to allocate producer credits, or default, always allocate credit. if (sessionSPI != null) { sessionSPI.offerProducerCredit(address, credits, threshold, receiver); - } - else { + } else { synchronized (connection.getLock()) { receiver.flow(credits); connection.flush(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java index 0a071fd..7ef4944 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonServerSenderContext.java @@ -22,15 +22,17 @@ import java.util.Objects; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; -import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.activemq.artemis.core.server.QueueQueryResult; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.jms.client.ActiveMQConnection; +import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPInternalErrorException; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPNotFoundException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; +import org.apache.activemq.artemis.protocol.amqp.util.CreditsSemaphore; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; import org.apache.activemq.artemis.selector.filter.FilterException; import org.apache.activemq.artemis.selector.impl.SelectorParser; import org.apache.qpid.proton.amqp.DescribedType; @@ -52,8 +54,6 @@ import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Sender; import org.apache.qpid.proton.message.ProtonJMessage; import org.jboss.logging.Logger; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPIllegalStateException; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; public class ProtonServerSenderContext extends ProtonInitializable implements ProtonDeliveryHandler { @@ -72,7 +72,6 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr protected final AMQPSessionCallback sessionSPI; protected CreditsSemaphore creditsSemaphore = new CreditsSemaphore(0); - public ProtonServerSenderContext(AMQPConnectionContext connection, Sender sender, AMQPSessionContext protonSession, @@ -113,8 +112,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sessionSPI.startSender(brokerConsumer); } //protonSession.getServerSession().receiveConsumerCredits(consumerID, -1); - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorStartingConsumer(e.getMessage()); } } @@ -142,8 +140,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // Validate the Selector. try { SelectorParser.parse(selector); - } - catch (FilterException e) { + } catch (FilterException e) { close(new ErrorCondition(AmqpError.INVALID_FIELD, e.getMessage())); return; } @@ -162,8 +159,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr String noLocalFilter = ActiveMQConnection.CONNECTION_ID_PROPERTY_NAME.toString() + "<>'" + remoteContainerId + "'"; if (selector != null) { selector += " AND " + noLocalFilter; - } - else { + } else { selector = noLocalFilter; } } @@ -188,12 +184,10 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr source.setDistributionMode(COPY); source.setCapabilities(TOPIC); sender.setSource(source); - } - else { + } else { throw new ActiveMQAMQPNotFoundException("Unknown subscription link: " + sender.getName()); } - } - else { + } else { if (source.getDynamic()) { //if dynamic we have to create the node (queue) and set the address on the target, the node is temporary and // will be deleted on closing of the session @@ -201,19 +195,16 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr try { sessionSPI.createTemporaryQueue(queue); //protonSession.getServerSession().createQueue(queue, queue, null, true, false); - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } source.setAddress(queue); - } - else { + } else { //if not dynamic then we use the targets address as the address to forward the messages to, however there has to //be a queue bound to it so we nee to check this. if (isPubSub) { // if we are a subscription and durable create a durable queue using the container id and link name - if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || - TerminusDurability.CONFIGURATION.equals(source.getDurable())) { + if (TerminusDurability.UNSETTLED_STATE.equals(source.getDurable()) || TerminusDurability.CONFIGURATION.equals(source.getDurable())) { String clientId = connection.getRemoteContainer(); String pubId = sender.getName(); queue = clientId + ":" + pubId; @@ -223,35 +214,29 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // If a client reattaches to a durable subscription with a different no-local filter value, selector // or address then we must recreate the queue (JMS semantics). - if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || - (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { + if (!Objects.equals(result.getFilterString(), SimpleString.toSimpleString(selector)) || (sender.getSource() != null && !sender.getSource().getAddress().equals(result.getAddress().toString()))) { if (result.getConsumerCount() == 0) { sessionSPI.deleteQueue(queue); sessionSPI.createDurableQueue(source.getAddress(), queue, selector); - } - else { + } else { throw new ActiveMQAMQPIllegalStateException("Unable to recreate subscription, consumers already exist"); } } - } - else { + } else { sessionSPI.createDurableQueue(source.getAddress(), queue, selector); } source.setAddress(queue); - } - //otherwise we are a volatile subscription - else { + } else { + //otherwise we are a volatile subscription queue = java.util.UUID.randomUUID().toString(); try { sessionSPI.createTemporaryQueue(source.getAddress(), queue, selector); - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingTemporaryQueue(e.getMessage()); } source.setAddress(queue); } - } - else { + } else { queue = source.getAddress(); } if (queue == null) { @@ -262,11 +247,9 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (!sessionSPI.queueQuery(queue, !isPubSub).isExists()) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.sourceAddressDoesntExist(); } - } - catch (ActiveMQAMQPNotFoundException e) { + } catch (ActiveMQAMQPNotFoundException e) { throw e; - } - catch (Exception e) { + } catch (Exception e) { throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } } @@ -274,8 +257,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr boolean browseOnly = !isPubSub && source.getDistributionMode() != null && source.getDistributionMode().equals(COPY); try { brokerConsumer = sessionSPI.createSender(this, queue, isPubSub ? null : selector, browseOnly); - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCreatingConsumer(e.getMessage()); } } @@ -300,8 +282,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr try { sessionSPI.closeSender(brokerConsumer); - } - catch (Exception e) { + } catch (Exception e) { log.warn(e.getMessage(), e); throw new ActiveMQAMQPInternalErrorException(e.getMessage()); } @@ -323,8 +304,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr QueueQueryResult result = sessionSPI.queueQuery(queueName, false); if (result.isExists() && source.getDynamic()) { sessionSPI.deleteQueue(queueName); - } - else { + } else { String clientId = connection.getRemoteContainer(); String pubId = sender.getName(); String queue = clientId + ":" + pubId; @@ -338,8 +318,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } } } - } - catch (Exception e) { + } catch (Exception e) { log.warn(e.getMessage(), e); throw new ActiveMQAMQPInternalErrorException(e.getMessage()); } @@ -373,36 +352,29 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr // from dealer, a perf hit but a must try { sessionSPI.ack(tx, brokerConsumer, message); - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); } } } - } - else if (remoteState instanceof Accepted) { + } else if (remoteState instanceof Accepted) { //we have to individual ack as we can't guarantee we will get the delivery updates (including acks) in order // from dealer, a perf hit but a must try { sessionSPI.ack(null, brokerConsumer, message); - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorAcknowledgingMessage(message.toString(), e.getMessage()); } - } - else if (remoteState instanceof Released) { + } else if (remoteState instanceof Released) { try { sessionSPI.cancel(brokerConsumer, message, false); - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } - } - else if (remoteState instanceof Rejected || remoteState instanceof Modified) { + } else if (remoteState instanceof Rejected || remoteState instanceof Modified) { try { sessionSPI.cancel(brokerConsumer, message, true); - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCancellingMessage(message.toString(), e.getMessage()); } } @@ -416,8 +388,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr sender.offer(1); } - } - else { + } else { //todo not sure if we need to do anything here } } @@ -440,8 +411,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr try { // This can be done a lot better here serverMessage = sessionSPI.encodeMessage(message, deliveryCount); - } - catch (Throwable e) { + } catch (Throwable e) { log.warn(e.getMessage(), e); throw new ActiveMQAMQPInternalErrorException(e.getMessage(), e); } @@ -461,12 +431,12 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr } return false; } + protected int performSend(ProtonJMessage serverMessage, Object context) { if (!creditsSemaphore.tryAcquire()) { try { creditsSemaphore.acquire(); - } - catch (InterruptedException e) { + } catch (InterruptedException e) { Thread.currentThread().interrupt(); // nothing to be done here.. we just keep going throw new IllegalStateException(e.getMessage(), e); @@ -495,8 +465,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr if (preSettle) { delivery.settle(); - } - else { + } else { sender.advance(); } } @@ -504,8 +473,7 @@ public class ProtonServerSenderContext extends ProtonInitializable implements Pr connection.flush(); return size; - } - finally { + } finally { nettyBuffer.release(); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java index 6d4e73a..51f42a3 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/ProtonTransactionHandler.java @@ -19,6 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.proton; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.protocol.amqp.broker.AMQPSessionCallback; +import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; import org.apache.activemq.artemis.protocol.amqp.logger.ActiveMQAMQPProtocolMessageBundle; import org.apache.activemq.artemis.protocol.amqp.util.DeliveryUtil; import org.apache.qpid.proton.amqp.Binary; @@ -34,7 +35,6 @@ import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Receiver; import org.apache.qpid.proton.message.impl.MessageImpl; import org.jboss.logging.Logger; -import org.apache.activemq.artemis.protocol.amqp.exceptions.ActiveMQAMQPException; /** * handles an amqp Coordinator to deal with transaction boundaries etc @@ -75,8 +75,7 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { declared.setTxnId(txID); delivery.disposition(declared); delivery.settle(); - } - else if (action instanceof Discharge) { + } else if (action instanceof Discharge) { Discharge discharge = (Discharge) action; Binary txID = discharge.getTxnId(); @@ -84,33 +83,26 @@ public class ProtonTransactionHandler implements ProtonDeliveryHandler { try { sessionSPI.rollbackTX(txID, true); delivery.disposition(new Accepted()); - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorRollingbackCoordinator(e.getMessage()); } - } - else { + } else { try { sessionSPI.commitTX(txID); delivery.disposition(new Accepted()); - } - catch (ActiveMQAMQPException amqpE) { + } catch (ActiveMQAMQPException amqpE) { throw amqpE; - } - catch (Exception e) { + } catch (Exception e) { throw ActiveMQAMQPProtocolMessageBundle.BUNDLE.errorCommittingCoordinator(e.getMessage()); } } } - } - catch (ActiveMQAMQPException amqpE) { + } catch (ActiveMQAMQPException amqpE) { delivery.disposition(createRejected(amqpE.getAmqpError(), amqpE.getMessage())); - } - catch (Exception e) { + } catch (Exception e) { log.warn(e.getMessage(), e); delivery.disposition(createRejected(Symbol.getSymbol("failed"), e.getMessage())); - } - finally { + } finally { delivery.settle(); buffer.release(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java index b2a6230..6325ff6 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ExtCapability.java @@ -22,9 +22,7 @@ import org.apache.qpid.proton.engine.Connection; public class ExtCapability { - public static final Symbol[] capabilities = new Symbol[] { - AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY - }; + public static final Symbol[] capabilities = new Symbol[]{AmqpSupport.SOLE_CONNECTION_CAPABILITY, AmqpSupport.DELAYED_DELIVERY}; public static Symbol[] getCapabilities() { return capabilities; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java index 2efaa1b..0d667b0 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/proton/handler/ProtonHandler.java @@ -99,8 +99,7 @@ public class ProtonHandler extends ProtonInitializable { } return rescheduleAt; } - } - catch (Exception e) { + } catch (Exception e) { transport.close(); connection.setCondition(new ErrorCondition()); } @@ -166,8 +165,7 @@ public class ProtonHandler extends ProtonInitializable { * */ capacity = transport.capacity(); } - } - catch (Throwable e) { + } catch (Throwable e) { log.debug(e.getMessage(), e); } @@ -181,12 +179,10 @@ public class ProtonHandler extends ProtonInitializable { buffer.readBytes(tail); flush(); - } - else { + } else { if (capacity == 0) { log.debugf("abandoning: readableBytes=%d", buffer.readableBytes()); - } - else { + } else { log.debugf("transport closed, discarding: readableBytes=%d, capacity=%d", buffer.readableBytes(), transport.capacity()); } break; @@ -289,13 +285,11 @@ public class ProtonHandler extends ProtonInitializable { serverSasl = null; saslHandlers.clear(); saslHandlers = null; - } - else { + } else { serverSasl.done(Sasl.SaslOutcome.PN_SASL_AUTH); } serverSasl = null; - } - else { + } else { // no auth available, system error serverSasl.done(Sasl.SaslOutcome.PN_SASL_SYS); } @@ -329,14 +323,13 @@ public class ProtonHandler extends ProtonInitializable { // while processing events (for instance onTransport) // while a client is also trying to write here while ((ev = popEvent()) != null) { - for ( EventHandler h : handlers) { + for (EventHandler h : handlers) { if (log.isTraceEnabled()) { log.trace("Handling " + ev + " towards " + h); } try { Events.dispatch(ev, h); - } - catch (Exception e) { + } catch (Exception e) { log.warn(e.getMessage(), e); connection.setCondition(new ErrorCondition()); } @@ -346,8 +339,7 @@ public class ProtonHandler extends ProtonInitializable { for (EventHandler h : handlers) { try { h.onTransport(transport); - } - catch (Exception e) { + } catch (Exception e) { log.warn(e.getMessage(), e); connection.setCondition(new ErrorCondition()); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java index cb82eba..20cd8ad 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/sasl/PlainSASL.java @@ -32,12 +32,10 @@ public class PlainSASL extends ServerSASLPlain { try { securityStore.authenticate(user, password, null); return true; - } - catch (Exception e) { + } catch (Exception e) { return false; } - } - else { + } else { return true; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java index 3eda199..77b61bc 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphore.java @@ -41,8 +41,7 @@ public class CreditsSemaphore { if (actualSize == getState()) { return -1; } - } - else if (compareAndSetState(actualSize, newValue)) { + } else if (compareAndSetState(actualSize, newValue)) { return newValue; } } @@ -107,4 +106,4 @@ public class CreditsSemaphore { return sync.hasQueuedThreads(); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/ProtonServerMessage.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/ProtonServerMessage.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/ProtonServerMessage.java index c15741e..5f46f22 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/ProtonServerMessage.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/protocol/amqp/util/ProtonServerMessage.java @@ -109,8 +109,7 @@ public class ProtonServerMessage implements ProtonJMessage { rawBody = new byte[buffer.limit() - buffer.position()]; buffer.get(rawBody); } - } - finally { + } finally { decoder.setByteBuffer(null); } @@ -151,14 +150,12 @@ public class ProtonServerMessage implements ProtonJMessage { if (parsedFooter != null) { encoder.writeObject(parsedFooter); } - } - else if (rawBody != null) { + } else if (rawBody != null) { writableBuffer.put(rawBody, 0, rawBody.length); } return writableBuffer.position() - firstPosition; - } - finally { + } finally { encoder.setByteBuffer((WritableBuffer) null); } } @@ -173,12 +170,10 @@ public class ProtonServerMessage implements ProtonJMessage { try { if (buffer.get() != 0) { return EOF; - } - else { + } else { return ((Number) decoder.readObject()).intValue(); } - } - finally { + } finally { buffer.position(pos); } } @@ -186,8 +181,7 @@ public class ProtonServerMessage implements ProtonJMessage { private Section readSection(ByteBuffer buffer, DecoderImpl decoder) { if (buffer.hasRemaining()) { return (Section) decoder.readObject(); - } - else { + } else { return null; } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java index 27a533a..148482e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/converter/TestConversions.java @@ -28,13 +28,19 @@ import java.util.Map; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; -import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; +import org.apache.activemq.artemis.api.core.SimpleString; +import org.apache.activemq.artemis.core.journal.EncodingSupport; +import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSBytesMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMapMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSObjectMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSStreamMessage; import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSTextMessage; +import org.apache.activemq.artemis.protocol.amqp.converter.message.EncodedMessage; +import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; +import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.apache.blacklist.ABadClass; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; @@ -44,14 +50,8 @@ import org.apache.qpid.proton.amqp.messaging.Data; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.qpid.proton.message.impl.MessageImpl; -import org.apache.activemq.artemis.api.core.SimpleString; -import org.apache.activemq.artemis.core.journal.EncodingSupport; -import org.apache.activemq.artemis.protocol.amqp.converter.jms.ServerJMSMessage; -import org.apache.activemq.artemis.core.server.ServerMessage; -import org.apache.activemq.artemis.utils.SimpleIDGenerator; import org.junit.Assert; import org.junit.Test; -import org.apache.activemq.artemis.protocol.amqp.util.NettyWritable; public class TestConversions extends Assert { @@ -81,7 +81,7 @@ public class TestConversions extends Assert { Object obj = converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); - AmqpValue value = (AmqpValue) ((Message)obj).getBody(); + AmqpValue value = (AmqpValue) ((Message) obj).getBody(); assertEquals(value.getValue(), true); } @@ -89,7 +89,6 @@ public class TestConversions extends Assert { @Test public void testObjectMessageNotOnWhiteList() throws Exception { - ProtonMessageConverter converter = new ProtonMessageConverter(new SimpleIDGenerator(0)); ServerMessageImpl message = new ServerMessageImpl(1, 1024); message.setType((byte) 2); @@ -104,13 +103,11 @@ public class TestConversions extends Assert { try { converter.outbound((ServerMessage) serverMessage.getInnerMessage(), 0); fail("should throw ClassNotFoundException"); - } - catch (ClassNotFoundException e) { + } catch (ClassNotFoundException e) { //ignore } } - @Test public void testSimpleConversionBytes() throws Exception { Map<String, Object> mapprop = createPropertiesMap(); http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLPlain.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLPlain.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLPlain.java index 0e9d0d4..ca17139 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLPlain.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/sasl/ClientSASLPlain.java @@ -19,7 +19,7 @@ package org.apache.activemq.artemis.protocol.amqp.sasl; /** * This will generate what a client would generate for bytes on Plain sasl. Used on test */ -public class ClientSASLPlain { +public class ClientSASLPlain { private String username; private String password; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphoreTest.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphoreTest.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphoreTest.java index c608b85..f795fa5 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphoreTest.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/protocol/amqp/util/CreditsSemaphoreTest.java @@ -44,8 +44,7 @@ public class CreditsSemaphoreTest { } acquired.incrementAndGet(); } - } - catch (Throwable e) { + } catch (Throwable e) { e.printStackTrace(); errors.incrementAndGet(); } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/blacklist/ABadClass.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/blacklist/ABadClass.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/blacklist/ABadClass.java index 7ea54a5..fea3a5f 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/blacklist/ABadClass.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/blacklist/ABadClass.java @@ -19,4 +19,5 @@ package org.apache.blacklist; import java.io.Serializable; public class ABadClass implements Serializable { + } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-hornetq-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/pom.xml b/artemis-protocols/artemis-hornetq-protocol/pom.xml index a485129..91e81d0 100644 --- a/artemis-protocols/artemis-hornetq-protocol/pom.xml +++ b/artemis-protocols/artemis-hornetq-protocol/pom.xml @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>artemis-protocols</artifactId> <groupId>org.apache.activemq</groupId> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java index bd4274a..2f6ed2f 100644 --- a/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java +++ b/artemis-protocols/artemis-hornetq-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HornetQProtocolManager.java @@ -58,10 +58,9 @@ class HornetQProtocolManager extends CoreProtocolManager { return true; } - @Override public boolean isProtocol(byte[] array) { String frameStart = new String(array, StandardCharsets.US_ASCII); return frameStart.startsWith("HORNETQ"); } -} \ No newline at end of file +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-hqclient-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/pom.xml b/artemis-protocols/artemis-hqclient-protocol/pom.xml index d4a177c..8615b07 100644 --- a/artemis-protocols/artemis-hqclient-protocol/pom.xml +++ b/artemis-protocols/artemis-hqclient-protocol/pom.xml @@ -14,7 +14,8 @@ See the License for the specific language governing permissions and limitations under the License. --> -<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> +<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> <parent> <artifactId>artemis-protocols</artifactId> <groupId>org.apache.activemq</groupId> @@ -52,19 +53,19 @@ </dependency> </dependencies> - <build> - <plugins> - <plugin> - <groupId>org.apache.felix</groupId> - <artifactId>maven-bundle-plugin</artifactId> - <version>3.0.0</version> - <extensions>true</extensions> - <configuration> - <instructions> -<!-- <Bundle-Activator>org.apache.activemq.artemis.core.protocol.hornetq.Activator</Bundle-Activator> --> - </instructions> - </configuration> - </plugin> - </plugins> - </build> + <build> + <plugins> + <plugin> + <groupId>org.apache.felix</groupId> + <artifactId>maven-bundle-plugin</artifactId> + <version>3.0.0</version> + <extensions>true</extensions> + <configuration> + <instructions> + <!-- <Bundle-Activator>org.apache.activemq.artemis.core.protocol.hornetq.Activator</Bundle-Activator> --> + </instructions> + </configuration> + </plugin> + </plugins> + </build> </project> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java index 012727f..1f14ba8 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/HQPropertiesConversionInterceptor.java @@ -26,7 +26,6 @@ import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection; public class HQPropertiesConversionInterceptor implements Interceptor { - private final boolean replaceHQ; public HQPropertiesConversionInterceptor(final boolean replaceHQ) { @@ -45,8 +44,7 @@ public class HQPropertiesConversionInterceptor implements Interceptor { private void handleReceiveMessage(MessagePacketI messagePacket) { if (replaceHQ) { HQPropertiesConverter.replaceHQProperties(messagePacket.getMessage()); - } - else { + } else { HQPropertiesConverter.replaceAMQProperties(messagePacket.getMessage()); } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java index c6fec87..6a998ef 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManager.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -30,11 +30,11 @@ import org.apache.activemq.artemis.spi.core.remoting.SessionContext; public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager { private static final int VERSION_PLAYED = 123; + @Override protected void sendHandshake(Connection transportConnection) { } - @Override protected SessionContext newSessionContext(String name, int confirmationWindowSize, @@ -64,6 +64,4 @@ public class HornetQClientProtocolManager extends ActiveMQClientProtocolManager getChannel0().send(new SubscribeClusterTopologyUpdatesMessageV2(isServer, VERSION_PLAYED)); } - - } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/ec48f9ed/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java index 351b096..99cfcb9 100644 --- a/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java +++ b/artemis-protocols/artemis-hqclient-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/hornetq/client/HornetQClientProtocolManagerFactory.java @@ -6,7 +6,7 @@ * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * - * http://www.apache.org/licenses/LICENSE-2.0 + * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, @@ -26,7 +26,6 @@ import org.osgi.service.component.annotations.Component; @Component(service = ClientProtocolManagerFactory.class) public class HornetQClientProtocolManagerFactory implements ClientProtocolManagerFactory { - ServerLocator locator; @Override
