This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 432ba87  NIFI-5921 - Timeout property for ConsumeJMS
432ba87 is described below

commit 432ba8787f99848d939596aad73f10315b80bf12
Author: Pierre Villard <[email protected]>
AuthorDate: Fri Jan 4 10:51:19 2019 +0100

    NIFI-5921 - Timeout property for ConsumeJMS
    
    Signed-off-by: Pierre Villard <[email protected]>
    
    This closes #3245.
---
 .../apache/nifi/jms/processors/AbstractJMSProcessor.java   |  3 ---
 .../java/org/apache/nifi/jms/processors/ConsumeJMS.java    |  9 ++++++---
 .../java/org/apache/nifi/jms/processors/JMSConsumer.java   |  4 ++--
 .../apache/nifi/jms/processors/JMSPublisherConsumerIT.java | 14 +++++++-------
 4 files changed, 15 insertions(+), 15 deletions(-)

diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 95a7103..0094eaf 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -221,9 +221,6 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
         jmsTemplate.setConnectionFactory(cachingFactory);
         
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
 
-        // set of properties that may be good candidates for exposure via 
configuration
-        jmsTemplate.setReceiveTimeout(1000);
-
         return finishBuildingJmsWorker(cachingFactory, jmsTemplate, context);
     }
 }
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 1f8358e..997c6dd 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -134,7 +134,7 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
             .description("How long to wait to consume a message from the 
remote broker before giving up.")
             .required(true)
             .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
-            .defaultValue("0 sec")
+            .defaultValue("1 sec")
             
.expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
 
@@ -187,9 +187,8 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         final boolean shared = sharedBoolean == null ? false : sharedBoolean;
         final String subscriptionName = 
context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
         final String charset = 
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
-        final long timeout = 
context.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
 
-        consumer.consume(destinationName, durable, shared, subscriptionName, 
charset, timeout, new ConsumerCallback() {
+        consumer.consume(destinationName, durable, shared, subscriptionName, 
charset, new ConsumerCallback() {
             @Override
             public void accept(final JMSResponse response) {
                 if (response == null) {
@@ -220,6 +219,10 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
     protected JMSConsumer finishBuildingJmsWorker(CachingConnectionFactory 
connectionFactory, JmsTemplate jmsTemplate, ProcessContext processContext) {
         int ackMode = 
processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
         jmsTemplate.setSessionAcknowledgeMode(ackMode);
+
+        long timeout = 
processContext.getProperty(TIMEOUT).evaluateAttributeExpressions().asTimePeriod(TimeUnit.MILLISECONDS);
+        jmsTemplate.setReceiveTimeout(timeout);
+
         return new JMSConsumer(connectionFactory, jmsTemplate, 
this.getLogger());
     }
 
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index e6e5913..a2c73b4 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -80,7 +80,7 @@ final class JMSConsumer extends JMSWorker {
     }
 
 
-    public void consume(final String destinationName, final boolean durable, 
final boolean shared, final String subscriberName, final String charset, final 
long timeout,
+    public void consume(final String destinationName, final boolean durable, 
final boolean shared, final String subscriberName, final String charset,
                         final ConsumerCallback consumerCallback) {
         this.jmsTemplate.execute(new SessionCallback<Void>() {
             @Override
@@ -88,7 +88,7 @@ final class JMSConsumer extends JMSWorker {
 
                 final MessageConsumer msgConsumer = 
createMessageConsumer(session, destinationName, durable, shared, 
subscriberName);
                 try {
-                    final Message message = msgConsumer.receive(timeout);
+                    final Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
                     JMSResponse response = null;
 
                     if (message != null) {
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
index eed9276..7812e71 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerIT.java
@@ -113,7 +113,7 @@ public class JMSPublisherConsumerIT {
             });
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
-            consumer.consume(destinationName, false, false, null, "UTF-8", 
1000, new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     // noop
@@ -143,7 +143,7 @@ public class JMSPublisherConsumerIT {
 
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
-            consumer.consume(destinationName, false, false, null, "UTF-8", 
1000, new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, "UTF-8", new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);
@@ -190,7 +190,7 @@ public class JMSPublisherConsumerIT {
                         JMSConsumer consumer = new 
JMSConsumer((CachingConnectionFactory) consumeTemplate.getConnectionFactory(), 
consumeTemplate, mock(ComponentLog.class));
 
                         for (int j = 0; j < 1000 && msgCount.get() < 4000; 
j++) {
-                            consumer.consume(destinationName, false, false, 
null, "UTF-8", 1000, callback);
+                            consumer.consume(destinationName, false, false, 
null, "UTF-8", callback);
                         }
                     } finally {
                         ((CachingConnectionFactory) 
consumeTemplate.getConnectionFactory()).destroy();
@@ -229,7 +229,7 @@ public class JMSPublisherConsumerIT {
             JMSConsumer consumer = new JMSConsumer((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory(), jmsTemplate, mock(ComponentLog.class));
             final AtomicBoolean callbackInvoked = new AtomicBoolean();
             try {
-                consumer.consume(destinationName, false, false, null, "UTF-8", 
1000, new ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", 
new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         callbackInvoked.set(true);
@@ -246,7 +246,7 @@ public class JMSPublisherConsumerIT {
 
             // should receive the same message, but will process it 
successfully
             while (!callbackInvoked.get()) {
-                consumer.consume(destinationName, false, false, null, "UTF-8", 
1000, new ConsumerCallback() {
+                consumer.consume(destinationName, false, false, null, "UTF-8", 
new ConsumerCallback() {
                     @Override
                     public void accept(JMSResponse response) {
                         if (response == null) {
@@ -265,7 +265,7 @@ public class JMSPublisherConsumerIT {
             // receiving next message and fail again
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, 
"UTF-8", 1000, new ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, 
"UTF-8", new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {
@@ -287,7 +287,7 @@ public class JMSPublisherConsumerIT {
             // should receive the same message, but will process it 
successfully
             try {
                 while (!callbackInvoked.get()) {
-                    consumer.consume(destinationName, false, false, null, 
"UTF-8", 1000, new ConsumerCallback() {
+                    consumer.consume(destinationName, false, false, null, 
"UTF-8", new ConsumerCallback() {
                         @Override
                         public void accept(JMSResponse response) {
                             if (response == null) {

Reply via email to