ARTEMIS-503 - replace proton-jms with proton-jms from ActiveMQ https://issues.apache.org/jira/browse/ARTEMIS-503
Project: http://git-wip-us.apache.org/repos/asf/activemq-artemis/repo Commit: http://git-wip-us.apache.org/repos/asf/activemq-artemis/commit/9a17681f Tree: http://git-wip-us.apache.org/repos/asf/activemq-artemis/tree/9a17681f Diff: http://git-wip-us.apache.org/repos/asf/activemq-artemis/diff/9a17681f Branch: refs/heads/master Commit: 9a17681f83280fba31beaddb3359277512ebf673 Parents: 76d937f Author: Andy Taylor <[email protected]> Authored: Mon Apr 25 09:38:03 2016 +0100 Committer: Martyn Taylor <[email protected]> Committed: Mon Apr 25 14:13:30 2016 +0100 ---------------------------------------------------------------------- artemis-distribution/pom.xml | 8 +-- artemis-distribution/src/main/assembly/dep.xml | 2 +- artemis-protocols/artemis-amqp-protocol/pom.xml | 8 +-- .../AMQPNativeOutboundTransformer.java | 56 ++++++++++++++++++++ .../proton/converter/ActiveMQJMSVendor.java | 9 +--- .../converter/ProtonMessageConverter.java | 51 +++++++++++++++--- .../plug/ProtonSessionIntegrationCallback.java | 2 +- .../core/protocol/proton/TestConversions.java | 2 +- pom.xml | 14 +++-- tests/integration-tests/pom.xml | 4 -- 10 files changed, 123 insertions(+), 33 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-distribution/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/pom.xml b/artemis-distribution/pom.xml index 85fd398..6f462b4 100644 --- a/artemis-distribution/pom.xml +++ b/artemis-distribution/pom.xml @@ -147,10 +147,6 @@ <groupId>org.jboss.logmanager</groupId> <artifactId>jboss-logmanager</artifactId> </dependency> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>proton-jms</artifactId> - </dependency> <dependency> <groupId>io.airlift</groupId> <artifactId>airline</artifactId> @@ -190,6 +186,10 @@ <groupId>io.netty</groupId> <artifactId>netty-codec-mqtt</artifactId> </dependency> + <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-amqp</artifactId> + </dependency> </dependencies> <build> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-distribution/src/main/assembly/dep.xml ---------------------------------------------------------------------- diff --git a/artemis-distribution/src/main/assembly/dep.xml b/artemis-distribution/src/main/assembly/dep.xml index 4adfaf8..d723bae 100644 --- a/artemis-distribution/src/main/assembly/dep.xml +++ b/artemis-distribution/src/main/assembly/dep.xml @@ -80,7 +80,7 @@ <include>org.jboss.logging:jboss-logging</include> <include>io.netty:netty-all</include> <include>org.apache.qpid:proton-j</include> - <include>org.apache.qpid:proton-jms</include> + <include>org.apache.activemq:activemq-amqp</include> <include>org.apache.activemq:activemq-client</include> <include>org.slf4j:slf4j-api</include> <include>io.airlift:airline</include> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/pom.xml ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/pom.xml b/artemis-protocols/artemis-amqp-protocol/pom.xml index 98149c1..78e9c3b 100644 --- a/artemis-protocols/artemis-amqp-protocol/pom.xml +++ b/artemis-protocols/artemis-amqp-protocol/pom.xml @@ -42,6 +42,10 @@ <version>${project.version}</version> </dependency> <dependency> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-amqp</artifactId> + </dependency> + <dependency> <groupId>org.jboss.logging</groupId> <artifactId>jboss-logging-processor</artifactId> <scope>provided</scope> @@ -84,10 +88,6 @@ <artifactId>proton-j</artifactId> </dependency> <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>proton-jms</artifactId> - </dependency> - <dependency> <groupId>org.apache.geronimo.specs</groupId> <artifactId>geronimo-jms_2.0_spec</artifactId> <scope>provided</scope> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java new file mode 100644 index 0000000..c187ad0 --- /dev/null +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/AMQPNativeOutboundTransformer.java @@ -0,0 +1,56 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.activemq.artemis.core.protocol.proton.converter; + +import org.apache.activemq.transport.amqp.message.OutboundTransformer; +import org.apache.qpid.proton.amqp.UnsignedInteger; +import org.apache.qpid.proton.amqp.messaging.Header; +import org.apache.qpid.proton.message.ProtonJMessage; + +import javax.jms.BytesMessage; +import javax.jms.JMSException; + +public class AMQPNativeOutboundTransformer { + static ProtonJMessage transform(OutboundTransformer options, BytesMessage msg) throws JMSException { + byte[] data = new byte[(int) msg.getBodyLength()]; + msg.readBytes(data); + msg.reset(); + int count = msg.getIntProperty("JMSXDeliveryCount"); + + // decode... + ProtonJMessage amqp = (ProtonJMessage) org.apache.qpid.proton.message.Message.Factory.create(); + int offset = 0; + int len = data.length; + while (len > 0) { + final int decoded = amqp.decode(data, offset, len); + assert decoded > 0 : "Make progress decoding the message"; + offset += decoded; + len -= decoded; + } + + // Update the DeliveryCount header... + // The AMQP delivery-count field only includes prior failed delivery attempts, + // whereas JMSXDeliveryCount includes the first/current delivery attempt. Subtract 1. + if (amqp.getHeader() == null) { + amqp.setHeader(new Header()); + } + + amqp.getHeader().setDeliveryCount(new UnsignedInteger(count - 1)); + + return amqp; + } +} http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java index 639b390..ba6b9be 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ActiveMQJMSVendor.java @@ -26,7 +26,6 @@ import javax.jms.TextMessage; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerDestination; import org.apache.activemq.artemis.jms.client.ActiveMQDestination; -import org.apache.qpid.proton.jms.JMSVendor; import org.apache.activemq.artemis.core.buffers.impl.ResetLimitWrappedActiveMQBuffer; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSBytesMessage; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMapMessage; @@ -36,8 +35,9 @@ import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMST import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl; import org.apache.activemq.artemis.utils.IDGenerator; +import org.apache.activemq.transport.amqp.message.JMSVendor; -public class ActiveMQJMSVendor extends JMSVendor { +public class ActiveMQJMSVendor implements JMSVendor { private final IDGenerator serverGenerator; @@ -86,11 +86,6 @@ public class ActiveMQJMSVendor extends JMSVendor { } @Override - public <T extends Destination> T createDestination(String name, Class<T> kind) { - return super.createDestination(name, kind); - } - - @Override public void setJMSXGroupID(Message message, String s) { } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java index 4de2357..da99e68 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/converter/ProtonMessageConverter.java @@ -16,23 +16,30 @@ */ package org.apache.activemq.artemis.core.protocol.proton.converter; -import org.apache.qpid.proton.jms.EncodedMessage; -import org.apache.qpid.proton.jms.InboundTransformer; -import org.apache.qpid.proton.jms.JMSMappingInboundTransformer; -import org.apache.qpid.proton.jms.JMSMappingOutboundTransformer; +import org.apache.activemq.artemis.core.client.ActiveMQClientLogger; +import org.apache.activemq.transport.amqp.message.EncodedMessage; +import org.apache.activemq.transport.amqp.message.InboundTransformer; +import org.apache.activemq.transport.amqp.message.JMSMappingInboundTransformer; +import org.apache.activemq.transport.amqp.message.JMSMappingOutboundTransformer; import org.apache.activemq.artemis.core.protocol.proton.converter.jms.ServerJMSMessage; import org.apache.activemq.artemis.core.server.ServerMessage; import org.apache.activemq.artemis.spi.core.protocol.MessageConverter; import org.apache.activemq.artemis.utils.IDGenerator; +import javax.jms.BytesMessage; +import java.io.IOException; + public class ProtonMessageConverter implements MessageConverter { ActiveMQJMSVendor activeMQJMSVendor; + private final String prefixVendor; + public ProtonMessageConverter(IDGenerator idGenerator) { activeMQJMSVendor = new ActiveMQJMSVendor(idGenerator); inboundTransformer = new JMSMappingInboundTransformer(activeMQJMSVendor); outboundTransformer = new JMSMappingOutboundTransformer(activeMQJMSVendor); + prefixVendor = outboundTransformer.getPrefixVendor(); } private final InboundTransformer inboundTransformer; @@ -50,11 +57,30 @@ public class ProtonMessageConverter implements MessageConverter { * * @param messageSource * @return - * @throws Exception + * @throws Exception https://issues.jboss.org/browse/ENTMQ-1560 */ public ServerJMSMessage inboundJMSType(EncodedMessage messageSource) throws Exception { EncodedMessage encodedMessageSource = messageSource; - ServerJMSMessage transformedMessage = (ServerJMSMessage) inboundTransformer.transform(encodedMessageSource); + ServerJMSMessage transformedMessage = null; + + InboundTransformer transformer = inboundTransformer; + + while (transformer != null) { + try { + transformedMessage = (ServerJMSMessage) transformer.transform(encodedMessageSource); + break; + } + catch (Exception e) { + ActiveMQClientLogger.LOGGER.debug("Transform of message using [{}] transformer, failed" + inboundTransformer.getTransformerName()); + ActiveMQClientLogger.LOGGER.trace("Transformation error:", e); + + transformer = transformer.getFallbackTransformer(); + } + } + + if (transformedMessage == null) { + throw new IOException("Failed to transform incoming delivery, skipping."); + } transformedMessage.encode(); @@ -64,8 +90,19 @@ public class ProtonMessageConverter implements MessageConverter { @Override public Object outbound(ServerMessage messageOutbound, int deliveryCount) throws Exception { ServerJMSMessage jmsMessage = activeMQJMSVendor.wrapMessage(messageOutbound.getType(), messageOutbound, deliveryCount); + jmsMessage.decode(); - return outboundTransformer.convert(jmsMessage); + if (jmsMessage.getBooleanProperty(prefixVendor + "NATIVE")) { + if (jmsMessage instanceof BytesMessage) { + return AMQPNativeOutboundTransformer.transform(outboundTransformer, (BytesMessage) jmsMessage); + } + else { + return null; + } + } + else { + return outboundTransformer.convert(jmsMessage); + } } } http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java index 2dccc30..ccd2b7e 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java +++ b/artemis-protocols/artemis-amqp-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/proton/plug/ProtonSessionIntegrationCallback.java @@ -24,13 +24,13 @@ import org.apache.activemq.artemis.core.server.MessageReference; import org.apache.activemq.artemis.core.transaction.Transaction; import org.apache.activemq.artemis.spi.core.remoting.Connection; import org.apache.activemq.artemis.spi.core.remoting.ReadyListener; +import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.transport.AmqpError; import org.apache.qpid.proton.amqp.transport.ErrorCondition; import org.apache.qpid.proton.engine.Delivery; import org.apache.qpid.proton.engine.Link; import org.apache.qpid.proton.engine.Receiver; -import org.apache.qpid.proton.jms.EncodedMessage; import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.activemq.artemis.api.core.SimpleString; import org.apache.activemq.artemis.api.core.client.ActiveMQClient; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java ---------------------------------------------------------------------- diff --git a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java index de514bb..fc9fe2c 100644 --- a/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java +++ b/artemis-protocols/artemis-amqp-protocol/src/test/java/org/apache/activemq/artemis/core/protocol/proton/TestConversions.java @@ -26,12 +26,12 @@ import java.util.Map; import io.netty.buffer.ByteBuf; import io.netty.buffer.PooledByteBufAllocator; import org.apache.activemq.artemis.api.core.ActiveMQBuffer; +import org.apache.activemq.transport.amqp.message.EncodedMessage; import org.apache.qpid.proton.amqp.Binary; import org.apache.qpid.proton.amqp.messaging.AmqpSequence; import org.apache.qpid.proton.amqp.messaging.AmqpValue; import org.apache.qpid.proton.amqp.messaging.ApplicationProperties; import org.apache.qpid.proton.amqp.messaging.Data; -import org.apache.qpid.proton.jms.EncodedMessage; import org.apache.qpid.proton.message.Message; import org.apache.qpid.proton.message.ProtonJMessage; import org.apache.qpid.proton.message.impl.MessageImpl; http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index 292aa14..47b58e4 100644 --- a/pom.xml +++ b/pom.xml @@ -408,15 +408,21 @@ <!-- License: Apache 2.0 --> </dependency> <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>proton-jms</artifactId> - <version>${proton.version}</version> + <groupId>org.apache.activemq</groupId> + <artifactId>activemq-client</artifactId> + <version>${activemq5-version}</version> <!-- License: Apache 2.0 --> </dependency> <dependency> <groupId>org.apache.activemq</groupId> - <artifactId>activemq-client</artifactId> + <artifactId>activemq-amqp</artifactId> <version>${activemq5-version}</version> + <exclusions> + <exclusion> + <groupId>org.apache.geronimo.specs</groupId> + <artifactId>geronimo-jms_1.1_spec</artifactId> + </exclusion> + </exclusions> <!-- License: Apache 2.0 --> </dependency> <dependency> http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/9a17681f/tests/integration-tests/pom.xml ---------------------------------------------------------------------- diff --git a/tests/integration-tests/pom.xml b/tests/integration-tests/pom.xml index 6c38b7c..f0e1d14 100644 --- a/tests/integration-tests/pom.xml +++ b/tests/integration-tests/pom.xml @@ -194,10 +194,6 @@ <groupId>org.apache.qpid</groupId> <artifactId>proton-j</artifactId> </dependency> - <dependency> - <groupId>org.apache.qpid</groupId> - <artifactId>proton-jms</artifactId> - </dependency> <dependency> <groupId>org.apache.qpid</groupId> <artifactId>qpid-client</artifactId>
