This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit d2899e770734c7734244ac696973c3784a77c7eb
Author: Claus Ibsen <[email protected]>
AuthorDate: Sat Oct 21 21:02:09 2017 +0200

    CAMEL-11931: adding artemis example and optimise camel-jms for Artemis in 
streaming mode for large messages.
---
 .../org/apache/camel/component/jms/JmsBinding.java | 66 +++++++++++++++++++---
 .../camel/component/jms/JmsMessageHelper.java      | 33 +++++++++++
 examples/camel-example-artemis-stream/README.md    | 51 ++++++++++++++---
 .../resources/META-INF/spring/camel-context.xml    | 13 +++--
 .../src/main/resources/log4j2.properties           |  5 ++
 5 files changed, 146 insertions(+), 22 deletions(-)

diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 2684ea8..9ebe704 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -50,17 +50,21 @@ import org.apache.camel.NoTypeConversionAvailableException;
 import org.apache.camel.RuntimeCamelException;
 import org.apache.camel.StreamCache;
 import org.apache.camel.WrappedFile;
+import org.apache.camel.converter.stream.CachedOutputStream;
 import org.apache.camel.impl.DefaultExchangeHolder;
 import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.FileUtil;
+import org.apache.camel.util.IOHelper;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 import static org.apache.camel.component.jms.JmsConstants.JMS_X_GROUP_ID;
+import static 
org.apache.camel.component.jms.JmsMessageHelper.getSafeLongProperty;
+import static org.apache.camel.component.jms.JmsMessageHelper.isVendor;
 import static 
org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinationName;
 import static org.apache.camel.component.jms.JmsMessageType.Bytes;
 import static org.apache.camel.component.jms.JmsMessageType.Map;
@@ -154,11 +158,11 @@ public class JmsBinding {
                 return createMapFromMapMessage((MapMessage)message);
             } else if (message instanceof BytesMessage) {
                 LOG.trace("Extracting body as a BytesMessage from JMS message: 
{}", message);
-                return createByteArrayFromBytesMessage((BytesMessage)message);
+                return createByteArrayFromBytesMessage(exchange, 
(BytesMessage)message);
             } else if (message instanceof StreamMessage) {
                 LOG.trace("Extracting body as a StreamMessage from JMS 
message: {}", message);
                 StreamMessage streamMessage = (StreamMessage)message;
-                return createInputStreamFromStreamMessage(streamMessage);
+                return createInputStreamFromStreamMessage(exchange, 
streamMessage);
             } else {
                 return null;
             }
@@ -219,7 +223,11 @@ public class JmsBinding {
 
         return map;
     }
-    
+
+    /**
+     * @deprecated not in use
+     */
+    @Deprecated
     public Object getObjectProperty(Message jmsMessage, String name) throws 
JMSException {
         // try a direct lookup first
         Object answer = jmsMessage.getObjectProperty(name);
@@ -231,7 +239,26 @@ public class JmsBinding {
         return answer;
     }
 
-    protected byte[] createByteArrayFromBytesMessage(BytesMessage message) 
throws JMSException {
+    protected Object createByteArrayFromBytesMessage(Exchange exchange, 
BytesMessage message) throws JMSException {
+        // ActiveMQ has special optimised mode for bytes message, so we should 
use streaming if possible
+        Long size = getSafeLongProperty(message, "_AMQ_LARGE_SIZE");
+        if (size != null && size > 0) {
+            LOG.trace("Optimised for Artemis: Reading from BytesMessage in 
streaming mode directly into CachedOutputStream payload");
+            CachedOutputStream cos = new CachedOutputStream(exchange, true);
+            // this will save the stream and wait until the entire message is 
written before continuing.
+            message.setObjectProperty("JMS_AMQ_SaveStream", cos);
+            try {
+                // and then lets get the input stream of this so we can read it
+                return cos.getInputStream();
+            } catch (IOException e) {
+                JMSException cause = new 
MessageFormatException(e.getMessage());
+                cause.initCause(e);
+                throw cause;
+            } finally {
+                IOHelper.close(cos);
+            }
+        }
+
         if (message.getBodyLength() > Integer.MAX_VALUE) {
             LOG.warn("Length of BytesMessage is too long: {}", 
message.getBodyLength());
             return null;
@@ -241,7 +268,7 @@ public class JmsBinding {
         return result;
     }
 
-    protected InputStream createInputStreamFromStreamMessage(StreamMessage 
message) {
+    protected Object createInputStreamFromStreamMessage(Exchange exchange, 
StreamMessage message) throws JMSException {
         return new StreamMessageInputStream(message);
     }
 
@@ -574,6 +601,15 @@ public class JmsBinding {
                 || 
exchange.getContext().getTypeConverter().tryConvertTo(InputStream.class, body) 
!= null) {
             type = streamingEnabled ? Stream : Bytes;
         }
+
+        if (type == Stream) {
+            boolean artemis = isVendor(session, "Artemis");
+            if (artemis) {
+                // if running ActiveMQ Artemis then it has optimised streaming 
mode using byte messages so enforce as bytes
+                type = Bytes;
+            }
+        }
+
         return type;
     }
     
@@ -596,8 +632,22 @@ public class JmsBinding {
         case Bytes: {
             BytesMessage message = session.createBytesMessage();
             if (body != null) {
-                byte[] payload = 
context.getTypeConverter().convertTo(byte[].class, exchange, body);
-                message.writeBytes(payload);
+                try {
+                    if (isVendor(session, "Artemis")) {
+                        LOG.trace("Optimised for Artemis: Streaming payload in 
BytesMessage");
+                        InputStream is = 
context.getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, 
body);
+                        message.setObjectProperty("JMS_AMQ_InputStream", is);
+                        LOG.trace("Optimised for Artemis: Finished streaming 
payload in BytesMessage");
+                    } else {
+                        byte[] payload = 
context.getTypeConverter().mandatoryConvertTo(byte[].class, exchange, body);
+                        message.writeBytes(payload);
+                    }
+                } catch (NoTypeConversionAvailableException e) {
+                    // cannot convert to inputstream then thrown an exception 
to avoid sending a null message
+                    JMSException cause = new 
MessageFormatException(e.getMessage());
+                    cause.initCause(e);
+                    throw cause;
+                }
             }
             return message;
         }
@@ -629,8 +679,8 @@ public class JmsBinding {
             if (body != null) {
                 long size = 0;
                 try {
-                    LOG.trace("Writing payload in StreamMessage");
                     InputStream is = 
context.getTypeConverter().mandatoryConvertTo(InputStream.class, exchange, 
body);
+                    LOG.trace("Writing payload in StreamMessage");
                     // assume streaming is bigger payload so use same buffer 
size as the file component
                     byte[] buffer = new byte[FileUtil.BUFFER_SIZE];
                     int len = 0;
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
index fe9ae21..f352e9e 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsMessageHelper.java
@@ -24,6 +24,7 @@ import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.Session;
 
 import org.apache.camel.Exchange;
 import org.apache.camel.util.ExchangeHelper;
@@ -121,6 +122,38 @@ public final class JmsMessageHelper {
     }
 
     /**
+     * Gets a JMS property in a safe way
+     *
+     * @param jmsMessage the JMS message
+     * @param name       name of the property to get
+     * @return the property value, or <tt>null</tt> if does not exists or 
failure to get the value
+     */
+    public static Long getSafeLongProperty(Message jmsMessage, String name) {
+        try {
+            return jmsMessage.getLongProperty(name);
+        } catch (Exception e) {
+            // ignore
+        }
+        return null;
+    }
+
+    /**
+     * Is the JMS session from a given vendor
+     *
+     * @param session the JMS session
+     * @param vendor the vendor, such as <tt>ActiveMQ</tt>, or <tt>Artemis</tt>
+     * @return <tt>true</tt> if from the vendor, <tt>false</tt> if not or not 
possible to determine
+     */
+    public static boolean isVendor(Session session, String vendor) {
+        if ("Artemis".equals(vendor)) {
+            return 
session.getClass().getName().startsWith("org.apache.activemq.artemis");
+        } else if ("ActiveMQ".equals(vendor)) {
+            return !isVendor(session, "Artemis") && 
session.getClass().getName().startsWith("org.apache.activemq");
+        }
+        return false;
+    }
+
+    /**
      * Sets the property on the given JMS message.
      *
      * @param jmsMessage  the JMS message
diff --git a/examples/camel-example-artemis-stream/README.md 
b/examples/camel-example-artemis-stream/README.md
index 608e931..1be77ea 100644
--- a/examples/camel-example-artemis-stream/README.md
+++ b/examples/camel-example-artemis-stream/README.md
@@ -2,8 +2,14 @@
 
 ### Introduction
 
-This example shows how to use Camel Aggregator EIP which offers (since Camel 
2.3)
-database persistence.
+This example shows how to send large messages between Apache Camel and 
ActiveMQ Artemis.
+When we say large messages we refer to messages with sizes of GB.
+
+You should be able to run Camel and Artemis in JVMs with lower memory such as 
256/512mb etc, and
+still be able to send messages in GB of sizes between them.
+
+This works by spool big messages to disk. Artemis spool large messages to its 
`data/large-messages`
+directory, and Camel uses stream caching to spool to a temporary directory 
during routing.
 
 ### Build
 
@@ -11,24 +17,53 @@ The example is run using Maven.
 
 First compile the example by entering:
 
-       mvn compile
+    mvn compile
 
 ### Install ActiveMQ Artemis
 
-TODO: How to install Artemis
+You download and unzip Apache ActiveMQ Artemis from: 
http://activemq.apache.org/artemis/download.html
+
+After unzipping the download, you can then create a new broker with the name 
`mybroker`:
+
+    $ cd apache-artemis-2.3.0 
+    $ bin/artemis create mybroker
+
+### Run ActiveMQ Artemis
+
+You start ActiveMQ in a shell by running:
+
+    $ cd mybroker
+    $ bin/artemis run
 
-### Run
+Which startup Artemis in the foreground and keeps it running until you hit 
<kbd>ctrl</kbd>+<kbd>c</kbd>
+to shutdown Artemis.
 
-TODO: How to start Artemis
+### Run Camel
 
-To run the example type:
+Before running this example, then ensure the JVM has limited memory by 
executing
 
-       mvn camel:run
+    export MAVEN_OPTS="-Xmx256m"
+
+And then start the Camel application:
+
+    mvn camel:run
+
+You can then copy files to `target/inbox` folder which is send to Artemis, and 
then
+back again to Camel and written to the `target/outbox` folder.
+
+This should work for small and big files such as files with sizes of GB.
+The JVM should not run out of memory.
 
 To stop the example hit <kbd>ctrl</kbd>+<kbd>c</kbd>.  If you restart it and 
resume
 entering numbers you should see that it remembered previously entered values, 
as it
 uses a persistent store.
 
+### ActiveMQ Artemis web console
+
+You can browse the Artemis web console: <http://localhost:8161/console> 
+to see activity such as number of consumers and producers.
+You can also delete all messages from queues which is a handy operation.
+
 
 ### Forum, Help, etc
 
diff --git 
a/examples/camel-example-artemis-stream/src/main/resources/META-INF/spring/camel-context.xml
 
b/examples/camel-example-artemis-stream/src/main/resources/META-INF/spring/camel-context.xml
index abc80eb..aa906da 100644
--- 
a/examples/camel-example-artemis-stream/src/main/resources/META-INF/spring/camel-context.xml
+++ 
b/examples/camel-example-artemis-stream/src/main/resources/META-INF/spring/camel-context.xml
@@ -17,23 +17,20 @@
     limitations under the License.
 
 -->
-<!-- START SNIPPET: e1 -->
 <beans xmlns="http://www.springframework.org/schema/beans";
        xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance";
        xsi:schemaLocation="
         http://www.springframework.org/schema/beans 
http://www.springframework.org/schema/beans/spring-beans.xsd
         http://camel.apache.org/schema/spring 
http://camel.apache.org/schema/spring/camel-spring.xsd";>
 
-
+  <!-- setup ActiveMQ Artemis connection factory -->
   <bean id="artemisConnectionFactory" 
class="org.apache.activemq.artemis.jms.client.ActiveMQJMSConnectionFactory">
     <constructor-arg index="0" value="tcp://localhost:61616"/>
   </bean>
 
   <bean id="jms" class="org.apache.camel.component.jms.JmsComponent">
-    <!-- specify connection factory -->
+    <!-- specify connection factory on JMS component -->
     <property name="connectionFactory" ref="artemisConnectionFactory"/>
-    <!-- turn on streaming message support -->
-    <property name="streamMessageTypeEnabled" value="true"/>
   </bean>
 
   <camelContext id="myCamel" xmlns="http://camel.apache.org/schema/spring";>
@@ -42,12 +39,16 @@
       <from uri="file:target/inbox"/>
       <log message="Sending file ${file:name} to Artemis"/>
       <to uri="jms:queue:data"/>
+      <log message="Finish sending file to Artemis"/>
     </route>
 
-    <route>
+    <!-- turn on stream caching so we can stream big messages from Artemis
+         to spool disk to avoid reading into memory -->
+    <route streamCache="true">
       <from uri="jms:queue:data"/>
       <log message="Received data from Artemis"/>
       <to uri="file:target/outbox"/>
+      <log message="Finish saving data from Artemis as file"/>
     </route>
 
   </camelContext>
diff --git 
a/examples/camel-example-artemis-stream/src/main/resources/log4j2.properties 
b/examples/camel-example-artemis-stream/src/main/resources/log4j2.properties
index d9f0508..56ee6f3 100644
--- a/examples/camel-example-artemis-stream/src/main/resources/log4j2.properties
+++ b/examples/camel-example-artemis-stream/src/main/resources/log4j2.properties
@@ -21,3 +21,8 @@ appender.out.layout.type = PatternLayout
 appender.out.layout.pattern = [%30.30t] %-30.30c{1} %-5p %m%n
 rootLogger.level = INFO
 rootLogger.appenderRef.out.ref = out
+
+# turn on logging on camel-jms to see more activity
+# logger.jms.name = org.apache.camel.component.jms
+# logger.jms.level = TRACE
+

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to