This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 314322590adcf181348dcdb8ab8842fc289fbdf7
Author: Claus Ibsen <[email protected]>
AuthorDate: Fri Oct 20 16:48:29 2017 +0200

    CAMEL-11931: camel-jms - Add better support for Stream JMS message type
---
 .../camel-jms/src/main/docs/jms-component.adoc     |  6 ++--
 .../org/apache/camel/component/jms/JmsBinding.java | 16 ++++++----
 .../apache/camel/component/jms/JmsComponent.java   | 20 ++++++++++++
 .../camel/component/jms/JmsConfiguration.java      | 21 ++++++++++++-
 .../component/jms/JmsStreamMessageTypeTest.java    |  9 +++---
 .../jms/springboot/JmsComponentConfiguration.java  | 36 ++++++++++++++++++++++
 6 files changed, 94 insertions(+), 14 deletions(-)

diff --git a/components/camel-jms/src/main/docs/jms-component.adoc 
b/components/camel-jms/src/main/docs/jms-component.adoc
index cf43483..351fdd0 100644
--- a/components/camel-jms/src/main/docs/jms-component.adoc
+++ b/components/camel-jms/src/main/docs/jms-component.adoc
@@ -199,7 +199,7 @@ about these properties by consulting the relevant Spring 
documentation.
 
 
 // component options: START
-The JMS component supports 78 options which are listed below.
+The JMS component supports 79 options which are listed below.
 
 
 
@@ -282,6 +282,7 @@ The JMS component supports 78 options which are listed 
below.
 | *subscriptionDurable* (consumer) | Set whether to make the subscription 
durable. The durable subscription name to be used can be specified through the 
subscriptionName property. Default is false. Set this to true to register a 
durable subscription typically in combination with a subscriptionName value 
(unless your message listener class name is good enough as subscription name). 
Only makes sense when listening to a topic (pub-sub domain) therefore this 
method switches the pubSubDomain  [...]
 | *subscriptionShared* (consumer) | Set whether to make the subscription 
shared. The shared subscription name to be used can be specified through the 
subscriptionName property. Default is false. Set this to true to register a 
shared subscription typically in combination with a subscriptionName value 
(unless your message listener class name is good enough as subscription name). 
Note that shared subscriptions may also be durable so this flag can (and often 
will) be combined with subscripti [...]
 | *subscriptionName* (consumer) | Set the name of a subscription to create. To 
be applied in case of a topic (pub-sub domain) with a shared or durable 
subscription. The subscription name needs to be unique within this client's JMS 
client id. Default is the class name of the specified message listener. Note: 
Only 1 concurrent consumer (which is the default of this message listener 
container) is allowed for each subscription except for a shared subscription 
(which requires JMS 2.0). |  | String
+| *streamMessageType Enabled* (producer) | Sets whether StreamMessage type is 
enabled or not. Message payloads of streaming kind such as files InputStream 
etc will either by sent as BytesMessage or StreamMessage. This option controls 
which kind will be used. By default BytesMessage is used which enforces the 
entire message payload to be read into memory. By enabling this option the 
message payload is read into memory in chunks and each chunk is then written to 
the StreamMessage until no  [...]
 | *headerFilterStrategy* (filter) | To use a custom 
org.apache.camel.spi.HeaderFilterStrategy to filter header to and from Camel 
message. |  | HeaderFilterStrategy
 | *resolveProperty Placeholders* (advanced) | Whether the component should 
resolve property placeholders on itself when starting. Only properties which 
are of String type can use property placeholders. | true | boolean
 |===
@@ -322,7 +323,7 @@ with the following path and query parameters:
 | *destinationName* | *Required* Name of the queue or topic to use as 
destination |  | String
 |===
 
-==== Query Parameters (89 parameters):
+==== Query Parameters (90 parameters):
 
 [width="100%",cols="2,5,^1,2",options="header"]
 |===
@@ -378,6 +379,7 @@ with the following path and query parameters:
 | *includeSentJMSMessageID* (producer) | Only applicable when sending to JMS 
destination using InOnly (eg fire and forget). Enabling this option will enrich 
the Camel Exchange with the actual JMSMessageID that was used by the JMS client 
when the message was sent to the JMS destination. | false | boolean
 | *replyToCacheLevelName* (producer) | Sets the cache level by name for the 
reply consumer when doing request/reply over JMS. This option only applies when 
using fixed reply queues (not temporary). Camel will by default use: 
CACHE_CONSUMER for exclusive or shared w/ replyToSelectorName. And 
CACHE_SESSION for shared without replyToSelectorName. Some JMS brokers such as 
IBM WebSphere may require to set the replyToCacheLevelName=CACHE_NONE to work. 
Note: If using temporary queues then CACHE [...]
 | *replyToDestinationSelector Name* (producer) | Sets the JMS Selector using 
the fixed name to be used so you can filter out your own replies from the 
others when using a shared queue (that is if you are not using a temporary 
reply queue). |  | String
+| *streamMessageTypeEnabled* (producer) | Sets whether StreamMessage type is 
enabled or not. Message payloads of streaming kind such as files InputStream 
etc will either by sent as BytesMessage or StreamMessage. This option controls 
which kind will be used. By default BytesMessage is used which enforces the 
entire message payload to be read into memory. By enabling this option the 
message payload is read into memory in chunks and each chunk is then written to 
the StreamMessage until no m [...]
 | *allowSerializedHeaders* (advanced) | Controls whether or not to include 
serialized headers. Applies only when link isTransferExchange() is true. This 
requires that the objects are serializable. Camel will exclude any 
non-serializable objects and log it at WARN level. | false | boolean
 | *asyncStartListener* (advanced) | Whether to startup the JmsConsumer message 
listener asynchronously when starting a route. For example if a JmsConsumer 
cannot get a connection to a remote JMS broker then it may block while retrying 
and/or failover. This will cause Camel to block while starting routes. By 
setting this option to true you will let routes startup while the JmsConsumer 
connects to the JMS broker using a dedicated thread in asynchronous mode. If 
this option is used then bew [...]
 | *asyncStopListener* (advanced) | Whether to stop the JmsConsumer message 
listener asynchronously when stopping a route. | false | boolean
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
index 3a8439f..2684ea8 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsBinding.java
@@ -42,8 +42,6 @@ import javax.jms.Session;
 import javax.jms.StreamMessage;
 import javax.jms.TextMessage;
 
-import org.apache.camel.util.FileUtil;
-import org.apache.camel.util.IOHelper;
 import org.w3c.dom.Node;
 
 import org.apache.camel.CamelContext;
@@ -57,6 +55,7 @@ import org.apache.camel.spi.HeaderFilterStrategy;
 import org.apache.camel.util.CamelContextHelper;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ExchangeHelper;
+import org.apache.camel.util.FileUtil;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -66,6 +65,7 @@ import static 
org.apache.camel.component.jms.JmsMessageHelper.normalizeDestinati
 import static org.apache.camel.component.jms.JmsMessageType.Bytes;
 import static org.apache.camel.component.jms.JmsMessageType.Map;
 import static org.apache.camel.component.jms.JmsMessageType.Object;
+import static org.apache.camel.component.jms.JmsMessageType.Stream;
 import static org.apache.camel.component.jms.JmsMessageType.Text;
 
 /**
@@ -293,7 +293,7 @@ public class JmsBinding {
                             answer = answer instanceof MapMessage ? answer : 
null;
                         } else if (type == JmsMessageType.Object) {
                             answer = answer instanceof ObjectMessage ? answer 
: null;
-                        } else if (type == JmsMessageType.Stream) {
+                        } else if (type == Stream) {
                             answer = answer instanceof StreamMessage ? answer 
: null;
                         }
                     }
@@ -555,20 +555,24 @@ public class JmsBinding {
      * @return type or null if no mapping was possible
      */
     protected JmsMessageType getJMSMessageTypeForBody(Exchange exchange, 
Object body, Map<String, Object> headers, Session session, CamelContext 
context) {
+        boolean streamingEnabled = 
endpoint.getConfiguration().isStreamMessageTypeEnabled();
+
         JmsMessageType type = null;
         // let body determine the type
         if (body instanceof Node || body instanceof String) {
             type = Text;
-        } else if (body instanceof byte[] || body instanceof WrappedFile || 
body instanceof File || body instanceof Reader
-                || body instanceof InputStream || body instanceof ByteBuffer 
|| body instanceof StreamCache) {
+        } else if (body instanceof byte[] || body instanceof ByteBuffer) {
             type = Bytes;
+        } else if (body instanceof WrappedFile || body instanceof File || body 
instanceof Reader
+                || body instanceof InputStream || body instanceof StreamCache) 
{
+            type = streamingEnabled ? Stream : Bytes;
         } else if (body instanceof Map) {
             type = Map;
         } else if (body instanceof Serializable) {
             type = Object;            
         } else if 
(exchange.getContext().getTypeConverter().tryConvertTo(File.class, body) != null
                 || 
exchange.getContext().getTypeConverter().tryConvertTo(InputStream.class, body) 
!= null) {
-            type = Bytes;
+            type = streamingEnabled ? Stream : Bytes;
         }
         return type;
     }
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
index 3e03966..134f8fa 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsComponent.java
@@ -1213,6 +1213,26 @@ public class JmsComponent extends 
HeaderFilterStrategyComponent implements Appli
         getConfiguration().setSubscriptionName(subscriptionName);
     }
 
+
+    public boolean isStreamMessageTypeEnabled() {
+        return getConfiguration().isStreamMessageTypeEnabled();
+    }
+
+    /**
+     * Sets whether StreamMessage type is enabled or not.
+     * Message payloads of streaming kind such as files, InputStream, etc will 
either by sent as BytesMessage or StreamMessage.
+     * This option controls which kind will be used. By default BytesMessage 
is used which enforces the entire message payload to be read into memory.
+     * By enabling this option the message payload is read into memory in 
chunks and each chunk is then written to the StreamMessage until no more data.
+     */
+    @Metadata(label = "producer,advanced", description = "Sets whether 
StreamMessage type is enabled or not."
+        + " Message payloads of streaming kind such as files, InputStream, etc 
will either by sent as BytesMessage or StreamMessage."
+        + " This option controls which kind will be used. By default 
BytesMessage is used which enforces the entire message payload to be read into 
memory."
+        + " By enabling this option the message payload is read into memory in 
chunks and each chunk is then written to the StreamMessage until no more data.")
+    public void setStreamMessageTypeEnabled(boolean streamMessageTypeEnabled) {
+        
getConfiguration().setStreamMessageTypeEnabled(streamMessageTypeEnabled);
+    }
+
+
     // Implementation methods
     // 
-------------------------------------------------------------------------
 
diff --git 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
index 27ac6b8..eb054ce 100644
--- 
a/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
+++ 
b/components/camel-jms/src/main/java/org/apache/camel/component/jms/JmsConfiguration.java
@@ -485,6 +485,12 @@ public class JmsConfiguration implements Cloneable {
         + " Requires a JMS 2.0 compatible message broker.")
     private boolean subscriptionShared;
 
+    @UriParam(label = "producer,advanced", description = "Sets whether 
StreamMessage type is enabled or not."
+        + " Message payloads of streaming kind such as files, InputStream, etc 
will either by sent as BytesMessage or StreamMessage."
+        + " This option controls which kind will be used. By default 
BytesMessage is used which enforces the entire message payload to be read into 
memory."
+        + " By enabling this option the message payload is read into memory in 
chunks and each chunk is then written to the StreamMessage until no more data.")
+    private boolean streamMessageTypeEnabled;
+
     public JmsConfiguration() {
     }
 
@@ -2193,5 +2199,18 @@ public class JmsConfiguration implements Cloneable {
     public void setSubscriptionName(String subscriptionName) {
         this.subscriptionName = subscriptionName;
     }
-    
+
+    public boolean isStreamMessageTypeEnabled() {
+        return streamMessageTypeEnabled;
+    }
+
+    /**
+     * Sets whether StreamMessage type is enabled or not.
+     * Message payloads of streaming kind such as files, InputStream, etc will 
either by sent as BytesMessage or StreamMessage.
+     * This option controls which kind will be used. By default BytesMessage 
is used which enforces the entire message payload to be read into memory.
+     * By enabling this option the message payload is read into memory in 
chunks and each chunk is then written to the StreamMessage until no more data.
+     */
+    public void setStreamMessageTypeEnabled(boolean streamMessageTypeEnabled) {
+        this.streamMessageTypeEnabled = streamMessageTypeEnabled;
+    }
 }
diff --git 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
index d9ee858..72d3d64 100644
--- 
a/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
+++ 
b/components/camel-jms/src/test/java/org/apache/camel/component/jms/JmsStreamMessageTypeTest.java
@@ -28,9 +28,6 @@ import org.junit.Test;
 
 import static 
org.apache.camel.component.jms.JmsComponent.jmsComponentAutoAcknowledge;
 
-/**
- * @version 
- */
 public class JmsStreamMessageTypeTest extends CamelTestSupport {
 
     @Override
@@ -43,7 +40,9 @@ public class JmsStreamMessageTypeTest extends 
CamelTestSupport {
         CamelContext camelContext = super.createCamelContext();
 
         ConnectionFactory connectionFactory = 
CamelJmsTestHelper.createConnectionFactory();
-        camelContext.addComponent("jms", 
jmsComponentAutoAcknowledge(connectionFactory));
+        JmsComponent jms = jmsComponentAutoAcknowledge(connectionFactory);
+        jms.setStreamMessageTypeEnabled(true); // turn on streaming
+        camelContext.addComponent("jms", jms);
         return camelContext;
     }
 
@@ -68,7 +67,7 @@ public class JmsStreamMessageTypeTest extends 
CamelTestSupport {
         return new RouteBuilder() {
             @Override
             public void configure() throws Exception {
-                
from("file:target/stream/in").to("jms:queue:foo?jmsMessageType=Stream");
+                from("file:target/stream/in").to("jms:queue:foo");
 
                 
from("jms:queue:foo").to("file:target/stream/out").to("mock:result");
             }
diff --git 
a/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
 
b/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
index 66f737b..4e76189 100644
--- 
a/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
+++ 
b/platforms/spring-boot/components-starter/camel-jms-starter/src/main/java/org/apache/camel/component/jms/springboot/JmsComponentConfiguration.java
@@ -599,6 +599,16 @@ public class JmsComponentConfiguration
      */
     private String subscriptionName;
     /**
+     * Sets whether StreamMessage type is enabled or not. Message payloads of
+     * streaming kind such as files InputStream etc will either by sent as
+     * BytesMessage or StreamMessage. This option controls which kind will be
+     * used. By default BytesMessage is used which enforces the entire message
+     * payload to be read into memory. By enabling this option the message
+     * payload is read into memory in chunks and each chunk is then written to
+     * the StreamMessage until no more data.
+     */
+    private Boolean streamMessageTypeEnabled = false;
+    /**
      * To use a custom org.apache.camel.spi.HeaderFilterStrategy to filter
      * header to and from Camel message.
      */
@@ -1233,6 +1243,14 @@ public class JmsComponentConfiguration
         this.subscriptionName = subscriptionName;
     }
 
+    public Boolean getStreamMessageTypeEnabled() {
+        return streamMessageTypeEnabled;
+    }
+
+    public void setStreamMessageTypeEnabled(Boolean streamMessageTypeEnabled) {
+        this.streamMessageTypeEnabled = streamMessageTypeEnabled;
+    }
+
     public HeaderFilterStrategy getHeaderFilterStrategy() {
         return headerFilterStrategy;
     }
@@ -1918,6 +1936,16 @@ public class JmsComponentConfiguration
          * for a shared subscription (which requires JMS 2.0).
          */
         private String subscriptionName;
+        /**
+         * Sets whether StreamMessage type is enabled or not. Message payloads
+         * of streaming kind such as files, InputStream, etc will either by 
sent
+         * as BytesMessage or StreamMessage. This option controls which kind
+         * will be used. By default BytesMessage is used which enforces the
+         * entire message payload to be read into memory. By enabling this
+         * option the message payload is read into memory in chunks and each
+         * chunk is then written to the StreamMessage until no more data.
+         */
+        private Boolean streamMessageTypeEnabled = false;
 
         public ConsumerType getConsumerType() {
             return consumerType;
@@ -2664,5 +2692,13 @@ public class JmsComponentConfiguration
         public void setSubscriptionName(String subscriptionName) {
             this.subscriptionName = subscriptionName;
         }
+
+        public Boolean getStreamMessageTypeEnabled() {
+            return streamMessageTypeEnabled;
+        }
+
+        public void setStreamMessageTypeEnabled(Boolean 
streamMessageTypeEnabled) {
+            this.streamMessageTypeEnabled = streamMessageTypeEnabled;
+        }
     }
 }
\ No newline at end of file

-- 
To stop receiving notification emails like this one, please contact
"[email protected]" <[email protected]>.

Reply via email to