This is an automated email from the ASF dual-hosted git repository. davsclaus pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/camel.git
commit d2899e770734c7734244ac696973c3784a77c7eb Author: Claus Ibsen <[email protected]> AuthorDate: Sat Oct 21 21:02:09 2017 +0200 CAMEL-11931: adding artemis example and optimise camel-jms for Artemis in streaming mode for large messages. --- .../org/apache/camel/component/jms/JmsBinding.java | 66 +++++++++++++++++++--- .../camel/component/jms/JmsMessageHelper.java | 33 +++++++++++ examples/camel-example-artemis-stream/README.md | 51 ++++++++++++++--- .../resources/META-INF/spring/camel-context.xml | 13 +++-- .../src/main/resources/log4j2.properties | 5 ++ 5 files changed, 146 insertions(+), 22 deletions(-) diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java index 2684ea8..9ebe704 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java @@ -50,17 +50,21 @@ import org.apache.camel.NoTypeConversionAvailableException; import org.apache.camel.RuntimeCamelException; import org.apache.camel.StreamCache; import org.apache.camel.WrappedFile; +import org.apache.camel.converter.stream.CachedOutputStream; import org.apache.camel.impl.DefaultExchangeHolder; import org.apache.camel.spi.HeaderFilterStrategy; import org.apache.camel.util.CamelContextHelper; import org.apache.camel.util.EndpointHelper; import org.apache.camel.util.ExchangeHelper; import org.apache.camel.util.FileUtil; +import org.apache.camel.util.IOHelper; import org.apache.camel.util.ObjectHelper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import static org.apache.camel.component.jms.JmsConstants.JMS_X_GROUP_ID; +import static org.apache.camel.component.jms.JmsMessageHelper.getSafeLongProperty; +import static org.apache.camel.component.jms.JmsMessageHelper.isVendor; import static org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName; import static org.apache.camel.component.jms.JmsMessageType.Bytes; import static org.apache.camel.component.jms.JmsMessageType.Map; @@ -154,11 +158,11 @@ public class JmsBinding { return createMapFromMapMessage((MapMessage)message); } else if (message instanceof BytesMessage) { LOG.trace("Extracting body as a BytesMessage from JMS message: {}", message); - return createByteArrayFromBytesMessage((BytesMessage)message); + return createByteArrayFromBytesMessage(exchange, (BytesMessage)message); } else if (message instanceof StreamMessage) { LOG.trace("Extracting body as a StreamMessage from JMS message: {}", message); StreamMessage streamMessage = (StreamMessage)message; - return createInputStreamFromStreamMessage(streamMessage); + return createInputStreamFromStreamMessage(exchange, streamMessage); } else { return null; } @@ -219,7 +223,11 @@ public class JmsBinding { return map; } - + + /** + * @deprecated not in use + */ + @Deprecated public Object getObjectProperty(Message jmsMessage, String name) throws JMSException { // try a direct lookup first Object answer = jmsMessage.getObjectProperty(name); @@ -231,7 +239,26 @@ public class JmsBinding { return answer; } - protected byte[] createByteArrayFromBytesMessage(BytesMessage message) throws JMSException { + protected Object createByteArrayFromBytesMessage(Exchange exchange, BytesMessage message) throws JMSException { + // ActiveMQ has special optimised mode for bytes message, so we should use streaming if possible + Long size = getSafeLongProperty(message, "_AMQ_LARGE_SIZE"); + if (size != null && size > 0) { + LOG.trace("Optimised for Artemis: Reading from BytesMessage in streaming mode directly into CachedOutputStream payload"); + CachedOutputStream cos = new CachedOutputStream(exchange, true); + // this will save the stream and wait until the entire message is written before continuing. + message.setObjectProperty("JMS_AMQ_SaveStream", cos); + try { + // and then lets get the input stream of this so we can read it + return cos.getInputStream(); + } catch (IOException e) { + JMSException cause = new MessageFormatException(e.getMessage()); + cause.initCause(e); + throw cause; + } finally { + IOHelper.close(cos); + } + } + if (message.getBodyLength() > Integer.MAX_VALUE) { LOG.warn("Length of BytesMessage is too long: {}", message.getBodyLength()); return null; @@ -241,7 +268,7 @@ public class JmsBinding { return result; } - protected InputStream createInputStreamFromStreamMessage(StreamMessage message) { + protected Object createInputStreamFromStreamMessage(Exchange exchange, StreamMessage message) throws JMSException { return new StreamMessageInputStream(message); } @@ -574,6 +601,15 @@ public class JmsBinding { || exchange.getContext().getTypeConverter().tryConvertTo(InputStream.class, body) != null) { type = streamingEnabled ? Stream : Bytes; } + + if (type == Stream) { + boolean artemis = isVendor(session, "Artemis"); + if (artemis) { + // if running ActiveMQ Artemis then it has optimised streaming mode using byte messages so enforce as bytes + type = Bytes; + } + } + return type; } @@ -596,8 +632,22 @@ public class JmsBinding { case Bytes: { BytesMessage message = session.createBytesMessage(); if (body != null) { - byte[] payload = context.getTypeConverter().convertTo(byte[].class, exchange, body); - message.writeBytes(payload); + try { + if (isVendor(session, "Artemis")) { + LOG.trace("Optimised for Artemis: Streaming payload in BytesMessage"); + InputStream is = context.getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, body); + message.setObjectProperty("JMS_AMQ_InputStream", is); + LOG.trace("Optimised for Artemis: Finished streaming payload in BytesMessage"); + } else { + byte[] payload = context.getTypeConverter().mandatoryConvertTo(byte[].class, exchange, body); + message.writeBytes(payload); + } + } catch (NoTypeConversionAvailableException e) { + // cannot convert to inputstream then thrown an exception to avoid sending a null message + JMSException cause = new MessageFormatException(e.getMessage()); + cause.initCause(e); + throw cause; + } } return message; } @@ -629,8 +679,8 @@ public class JmsBinding { if (body != null) { long size = 0; try { - LOG.trace("Writing payload in StreamMessage"); InputStream is = context.getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, body); + LOG.trace("Writing payload in StreamMessage"); // assume streaming is bigger payload so use same buffer size as the file component byte[] buffer = new byte[FileUtil.BUFFER_SIZE]; int len = 0; diff --git a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java index fe9ae21..f352e9e 100644 --- a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java +++ b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java @@ -24,6 +24,7 @@ import javax.jms.DeliveryMode; import javax.jms.Destination; import javax.jms.JMSException; import javax.jms.Message; +import javax.jms.Session; import org.apache.camel.Exchange; import org.apache.camel.util.ExchangeHelper; @@ -121,6 +122,38 @@ public final class JmsMessageHelper { } /** + * Gets a JMS property in a safe way + * + * @param jmsMessage the JMS message + * @param name name of the property to get + * @return the property value, or <tt>null</tt> if does not exists or failure to get the value + */ + public static Long getSafeLongProperty(Message jmsMessage, String name) { + try { + return jmsMessage.getLongProperty(name); + } catch (Exception e) { + // ignore + } + return null; + } + + /** + * Is the JMS session from a given vendor + * + * @param session the JMS session + * @param vendor the vendor, such as <tt>ActiveMQ</tt>, or <tt>Artemis</tt> + * @return <tt>true</tt> if from the vendor, <tt>false</tt> if not or not possible to determine + */ + public static boolean isVendor(Session session, String vendor) { + if ("Artemis".equals(vendor)) { + return session.getClass().getName().startsWith("org.apache.activemq.artemis"); + } else if ("ActiveMQ".equals(vendor)) { + return !isVendor(session, "Artemis") && session.getClass().getName().startsWith("org.apache.activemq"); + } + return false; + } + + /** * Sets the property on the given JMS message. * * @param jmsMessage the JMS message diff --git a/examples/camel-example-artemis-stream/README.md b/examples/camel-example-artemis-stream/README.md index 608e931..1be77ea 100644 --- a/examples/camel-example-artemis-stream/README.md +++ b/examples/camel-example-artemis-stream/README.md @@ -2,8 +2,14 @@ ### Introduction -This example shows how to use Camel Aggregator EIP which offers (since Camel 2.3) -database persistence. +This example shows how to send large messages between Apache Camel and ActiveMQ Artemis. +When we say large messages we refer to messages with sizes of GB. + +You should be able to run Camel and Artemis in JVMs with lower memory such as 256/512mb etc, and +still be able to send messages in GB of sizes between them. + +This works by spool big messages to disk. Artemis spool large messages to its `data/large-messages` +directory, and Camel uses stream caching to spool to a temporary directory during routing. ### Build @@ -11,24 +17,53 @@ The example is run using Maven. First compile the example by entering: - mvn compile + mvn compile ### Install ActiveMQ Artemis -TODO: How to install Artemis +You download and unzip Apache ActiveMQ Artemis from: http://activemq.apache.org/artemis/download.html + +After unzipping the download, you can then create a new broker with the name `mybroker`: + + $ cd apache-artemis-2.3.0 + $ bin/artemis create mybroker + +### Run ActiveMQ Artemis + +You start ActiveMQ in a shell by running: + + $ cd mybroker + $ bin/artemis run -### Run +Which startup Artemis in the foreground and keeps it running until you hit <kbd>ctrl</kbd>+<kbd>c</kbd> +to shutdown Artemis. -TODO: How to start Artemis +### Run Camel -To run the example type: +Before running this example, then ensure the JVM has limited memory by executing - mvn camel:run + export MAVEN_OPTS="-Xmx256m" + +And then start the Camel application: + + mvn camel:run + +You can then copy files to `target/inbox` folder which is send to Artemis, and then +back again to Camel and written to the `target/outbox` folder. + +This should work for small and big files such as files with sizes of GB. +The JVM should not run out of memory. To stop the example hit <kbd>ctrl</kbd>+<kbd>c</kbd>. If you restart it and resume entering numbers you should see that it remembered previously entered values, as it uses a persistent store. +### ActiveMQ Artemis web console + +You can browse the Artemis web console: <http://localhost:8161/console> +to see activity such as number of consumers and producers. +You can also delete all messages from queues which is a handy operation. + ### Forum, Help, etc diff --git a/examples/camel-example-artemis-stream/src/main/resources/META-INF/spring/camel-context.xml b/examples/camel-example-artemis-stream/src/main/resources/META-INF/spring/camel-context.xml index abc80eb..aa906da 100644 --- a/examples/camel-example-artemis-stream/src/main/resources/META-INF/spring/camel-context.xml +++ b/examples/camel-example-artemis-stream/src/main/resources/META-INF/spring/camel-context.xml @@ -17,23 +17,20 @@ limitations under the License. --> -<!-- START SNIPPET: e1 --> <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation=" http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd"> - + <!-- setup ActiveMQ Artemis connection factory --> <bean id="artemisConnectionFactory" class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory"> <constructor-arg index="0" value="tcp://localhost:61616"/> </bean> <bean id="jms" class="org.apache.camel.component.jms.JmsComponent"> - <!-- specify connection factory --> + <!-- specify connection factory on JMS component --> <property name="connectionFactory" ref="artemisConnectionFactory"/> - <!-- turn on streaming message support --> - <property name="streamMessageTypeEnabled" value="true"/> </bean> <camelContext id="myCamel" xmlns="http://camel.apache.org/schema/spring"> @@ -42,12 +39,16 @@ <from uri="file:target/inbox"/> <log message="Sending file ${file:name} to Artemis"/> <to uri="jms:queue:data"/> + <log message="Finish sending file to Artemis"/> </route> - <route> + <!-- turn on stream caching so we can stream big messages from Artemis + to spool disk to avoid reading into memory --> + <route streamCache="true"> <from uri="jms:queue:data"/> <log message="Received data from Artemis"/> <to uri="file:target/outbox"/> + <log message="Finish saving data from Artemis as file"/> </route> </camelContext> diff --git a/examples/camel-example-artemis-stream/src/main/resources/log4j2.properties b/examples/camel-example-artemis-stream/src/main/resources/log4j2.properties index d9f0508..56ee6f3 100644 --- a/examples/camel-example-artemis-stream/src/main/resources/log4j2.properties +++ b/examples/camel-example-artemis-stream/src/main/resources/log4j2.properties @@ -21,3 +21,8 @@ appender.out.layout.type = PatternLayout appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n rootLogger.level = INFO rootLogger.appenderRef.out.ref = out + +# turn on logging on camel-jms to see more activity +# logger.jms.name = org.apache.camel.component.jms +# logger.jms.level = TRACE + -- To stop receiving notification emails like this one, please contact "[email protected]" <[email protected]>.
