Repository: nifi
Updated Branches:
  refs/heads/0.x 5c0c1d3bb -> 40618364e


NIFI-2774 added configurable QoS options to ConsumeJMS

Signed-off-by: Mike Moser <[email protected]>

This closes #1036.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/40618364
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/40618364
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/40618364

Branch: refs/heads/0.x
Commit: 40618364e70a966f9c1e425674b53b22b1fb0fb0
Parents: 5c0c1d3
Author: Oleg Zhurakousky <[email protected]>
Authored: Tue Sep 20 10:49:22 2016 -0400
Committer: Mike Moser <[email protected]>
Committed: Tue Oct 11 11:14:52 2016 -0400

----------------------------------------------------------------------
 .../jms/processors/AbstractJMSProcessor.java    |  12 +--
 .../apache/nifi/jms/processors/ConsumeJMS.java  |  96 ++++++++++++++----
 .../apache/nifi/jms/processors/JMSConsumer.java |  84 +++++++++++-----
 .../apache/nifi/jms/processors/PublishJMS.java  |   2 +-
 .../apache/nifi/jms/processors/CommonTest.java  |  10 +-
 .../processors/JMSPublisherConsumerTest.java    | 100 +++++++++++++++++--
 6 files changed, 239 insertions(+), 65 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/40618364/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 54e2d89..d5b704b 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -102,12 +102,12 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
      * all other lifecycle methods are invoked multiple times.
      */
     static {
-        propertyDescriptors.add(USER);
-        propertyDescriptors.add(PASSWORD);
+        propertyDescriptors.add(CF_SERVICE);
         propertyDescriptors.add(DESTINATION);
         propertyDescriptors.add(DESTINATION_TYPE);
+        propertyDescriptors.add(USER);
+        propertyDescriptors.add(PASSWORD);
         propertyDescriptors.add(SESSION_CACHE_SIZE);
-        propertyDescriptors.add(CF_SERVICE);
     }
 
     protected volatile T targetResource;
@@ -174,7 +174,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
      * @see JMSPublisher
      * @see JMSConsumer
      */
-    protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate);
+    protected abstract T finishBuildingTargetResource(JmsTemplate jmsTemplate, 
ProcessContext processContext);
 
     /**
      * This method essentially performs initialization of this Processor by
@@ -202,9 +202,9 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));
 
             // set of properties that may be good candidates for exposure via 
configuration
-            jmsTemplate.setReceiveTimeout(10000);
+            jmsTemplate.setReceiveTimeout(1000);
 
-            this.targetResource = 
this.finishBuildingTargetResource(jmsTemplate);
+            this.targetResource = 
this.finishBuildingTargetResource(jmsTemplate, context);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/40618364/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 131d113..c031ec1 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -18,20 +18,27 @@ package org.apache.nifi.jms.processors;
 
 import java.io.IOException;
 import java.io.OutputStream;
+import java.util.ArrayList;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
+import java.util.List;
 import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Set;
 
+import javax.jms.Session;
+
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.components.AllowableValue;
+import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
+import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
 import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -54,6 +61,31 @@ import org.springframework.jms.core.JmsTemplate;
 @SeeAlso(value = { PublishJMS.class, JMSConnectionFactoryProvider.class })
 public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
 
+    static final AllowableValue AUTO_ACK = new 
AllowableValue(String.valueOf(Session.AUTO_ACKNOWLEDGE),
+            "AUTO_ACKNOWLEDGE (" + String.valueOf(Session.AUTO_ACKNOWLEDGE) + 
")",
+            "Automatically acknowledges a client's receipt of a message, 
regardless if NiFi session has been commited. "
+                    + "Can result in data loss in the event where NiFi 
abruptly stopped before session was commited.");
+
+    static final AllowableValue CLIENT_ACK = new 
AllowableValue(String.valueOf(Session.CLIENT_ACKNOWLEDGE),
+            "CLIENT_ACKNOWLEDGE (" + 
String.valueOf(Session.CLIENT_ACKNOWLEDGE) + ")",
+            "(DEFAULT) Manually acknowledges a client's receipt of a message 
after NiFi Session was commited, thus ensuring no data loss");
+
+    static final AllowableValue DUPS_OK = new 
AllowableValue(String.valueOf(Session.DUPS_OK_ACKNOWLEDGE),
+            "DUPS_OK_ACKNOWLEDGE (" + 
String.valueOf(Session.DUPS_OK_ACKNOWLEDGE) + ")",
+            "This acknowledgment mode instructs the session to lazily 
acknowledge the delivery of messages. May result in both data "
+                    + "duplication and data loss while achieving the best 
throughput.");
+
+    public static final String JMS_SOURCE_DESTINATION_NAME = 
"jms.source.destination";
+
+    static final PropertyDescriptor ACKNOWLEDGEMENT_MODE = new 
PropertyDescriptor.Builder()
+            .name("Acknowledgement Mode")
+            .description("The JMS Acknowledgement Mode. Using Auto Acknowledge 
can cause messages to be lost on restart of NiFi but may provide "
+                            + "better performance than Client Acknowledge.")
+            .required(true)
+            .allowableValues(AUTO_ACK, CLIENT_ACK, DUPS_OK)
+            .defaultValue(CLIENT_ACK.getValue())
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles that are received from the JMS 
Destination are routed to this relationship")
@@ -61,7 +93,14 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
 
     private final static Set<Relationship> relationships;
 
+    private final static List<PropertyDescriptor> thisPropertyDescriptors;
+
     static {
+        List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
+        _propertyDescriptors.addAll(propertyDescriptors);
+        _propertyDescriptors.add(ACKNOWLEDGEMENT_MODE);
+        thisPropertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
+
         Set<Relationship> _relationships = new HashSet<>();
         _relationships.add(REL_SUCCESS);
         relationships = Collections.unmodifiableSet(_relationships);
@@ -75,33 +114,41 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
      * 'success' {@link Relationship}.
      */
     @Override
-    protected void rendezvousWithJms(ProcessContext context, ProcessSession 
processSession) throws ProcessException {
+    protected void rendezvousWithJms(final ProcessContext context, final 
ProcessSession processSession) throws ProcessException {
         final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
-        final JMSResponse response = 
this.targetResource.consume(destinationName);
-        if (response != null){
-            FlowFile flowFile = processSession.create();
-            flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException 
{
-                    out.write(response.getMessageBody());
+        this.targetResource.consume(destinationName, new ConsumerCallback(){
+            @Override
+            public void accept(final JMSResponse response) {
+                if (response != null){
+                    FlowFile flowFile = processSession.create();
+                    flowFile = processSession.write(flowFile, new 
OutputStreamCallback() {
+                        @Override
+                        public void process(final OutputStream out) throws 
IOException {
+                            out.write(response.getMessageBody());
+                        }
+                    });
+                    Map<String, Object> jmsHeaders = 
response.getMessageHeaders();
+                    Map<String, Object> jmsProperties = Collections.<String, 
Object>unmodifiableMap(response.getMessageProperties());
+                    flowFile = 
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, 
processSession);
+                    flowFile = 
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, 
flowFile, processSession);
+                    flowFile = processSession.putAttribute(flowFile, 
JMS_SOURCE_DESTINATION_NAME, destinationName);
+                    processSession.getProvenanceReporter().receive(flowFile, 
destinationName);
+                    processSession.transfer(flowFile, REL_SUCCESS);
+                    processSession.commit();
+                } else {
+                    context.yield();
                 }
-            });
-            Map<String, Object> jmsHeaders = response.getMessageHeaders();
-            Map<String, Object> jmsProperties = Collections.<String, 
Object>unmodifiableMap(response.getMessageProperties());
-            flowFile = 
this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile, 
processSession);
-            flowFile = 
this.updateFlowFileAttributesWithJMSAttributes(jmsProperties, flowFile, 
processSession);
-            processSession.getProvenanceReporter().receive(flowFile, 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
-            processSession.transfer(flowFile, REL_SUCCESS);
-        } else {
-            context.yield();
-        }
+            }
+        });
     }
 
     /**
      * Will create an instance of {@link JMSConsumer}
      */
     @Override
-    protected JMSConsumer finishBuildingTargetResource(JmsTemplate 
jmsTemplate) {
+    protected JMSConsumer finishBuildingTargetResource(JmsTemplate 
jmsTemplate, ProcessContext processContext) {
+        int ackMode = 
processContext.getProperty(ACKNOWLEDGEMENT_MODE).asInteger();
+        jmsTemplate.setSessionAcknowledgeMode(ackMode);
         return new JMSConsumer(jmsTemplate, this.getLogger());
     }
 
@@ -114,13 +161,20 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
     }
 
     /**
+     *
+     */
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return thisPropertyDescriptors;
+    }
+
+    /**
      * Copies JMS attributes (i.e., headers and properties) as FF attributes.
      * Given that FF attributes mandate that values are of type String, the
      * copied values of JMS attributes will be stringified via
      * String.valueOf(attribute).
      */
-    private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, 
Object> jmsAttributes, FlowFile flowFile,
-            ProcessSession processSession) {
+    private FlowFile updateFlowFileAttributesWithJMSAttributes(Map<String, 
Object> jmsAttributes, FlowFile flowFile, ProcessSession processSession) {
         Map<String, String> attributes = new HashMap<String, String>();
         for (Entry<String, Object> entry : jmsAttributes.entrySet()) {
             attributes.put(entry.getKey(), String.valueOf(entry.getValue()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/40618364/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index 5c3b599..282889f 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -25,23 +25,23 @@ import javax.jms.BytesMessage;
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.Message;
+import javax.jms.MessageConsumer;
 import javax.jms.Queue;
+import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
 import org.apache.nifi.logging.ProcessorLog;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.support.JmsHeaders;
+import org.springframework.jms.support.JmsUtils;
 
 /**
  * Generic consumer of messages from JMS compliant messaging system.
  */
 final class JMSConsumer extends JMSWorker {
 
-    private final static Logger logger = 
LoggerFactory.getLogger(JMSConsumer.class);
-
     /**
      * Creates an instance of this consumer
      *
@@ -52,8 +52,8 @@ final class JMSConsumer extends JMSWorker {
      */
     JMSConsumer(JmsTemplate jmsTemplate, ProcessorLog processLog) {
         super(jmsTemplate, processLog);
-        if (logger.isInfoEnabled()) {
-            logger.info("Created Message Consumer for '" + 
jmsTemplate.toString() + "'.");
+        if (this.processLog.isInfoEnabled()) {
+            this.processLog.info("Created Message Consumer for '" + 
jmsTemplate.toString() + "'.");
         }
     }
 
@@ -61,27 +61,51 @@ final class JMSConsumer extends JMSWorker {
     /**
      *
      */
-    public JMSResponse consume(String destinationName) {
-        Message message = this.jmsTemplate.receive(destinationName);
-        if (message != null) {
-            byte[] messageBody = null;
-            try {
-                if (message instanceof TextMessage) {
-                    messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message);
-                } else if (message instanceof BytesMessage) {
-                    messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
-                } else {
-                    throw new UnsupportedOperationException("Message type 
other then TextMessage and BytesMessage are "
-                            + "not supported at the moment");
+    public void consume(final String destinationName, final ConsumerCallback 
consumerCallback) {
+        this.jmsTemplate.execute(new SessionCallback<Void>() {
+            @Override
+            public Void doInJms(Session session) throws JMSException {
+                /*
+                 * We need to call recover to ensure that in in the event of
+                 * abrupt end or exception the current session will stop 
message
+                 * delivery and restarts with the oldest unacknowledged message
+                 */
+                session.recover();
+                Destination destination = 
JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(
+                        session, destinationName, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
+                MessageConsumer msgConsumer = 
session.createConsumer(destination, null,
+                        JMSConsumer.this.jmsTemplate.isPubSubDomain());
+                Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
+                JMSResponse response = null;
+                try {
+                    if (message != null) {
+                        byte[] messageBody = null;
+                        if (message instanceof TextMessage) {
+                            messageBody = 
MessageBodyToBytesConverter.toBytes((TextMessage) message);
+                        } else if (message instanceof BytesMessage) {
+                            messageBody = 
MessageBodyToBytesConverter.toBytes((BytesMessage) message);
+                        } else {
+                            throw new IllegalStateException("Message type 
other then TextMessage and BytesMessage are "
+                                    + "not supported at the moment");
+                        }
+                        Map<String, Object> messageHeaders = 
extractMessageHeaders(message);
+                        Map<String, String> messageProperties = 
extractMessageProperties(message);
+                        response = new JMSResponse(messageBody, 
messageHeaders, messageProperties);
+                    }
+                    // invoke the processor callback (regardless if it's null,
+                    // so the processor can yield) as part of this inJMS call
+                    // and ACK message *only* after its successful invocation
+                    // and if CLIENT_ACKNOWLEDGE is set.
+                    consumerCallback.accept(response);
+                    if (message != null && session.getAcknowledgeMode() == 
Session.CLIENT_ACKNOWLEDGE) {
+                        message.acknowledge();
+                    }
+                } finally {
+                    JmsUtils.closeMessageConsumer(msgConsumer);
                 }
-                Map<String, Object> messageHeaders = 
this.extractMessageHeaders(message);
-                Map<String, String> messageProperties = 
this.extractMessageProperties(message);
-                return new JMSResponse(messageBody, messageHeaders, 
messageProperties);
-            } catch (Exception e) {
-                throw new IllegalStateException(e);
+                return null;
             }
-        }
-        return null;
+        }, true);
     }
 
     /**
@@ -97,7 +121,6 @@ final class JMSConsumer extends JMSWorker {
                 properties.put(propertyName, 
String.valueOf(message.getObjectProperty(propertyName)));
             }
         } catch (JMSException e) {
-            logger.warn("Failed to extract message properties", e);
             this.processLog.warn("Failed to extract message properties", e);
         }
         return properties;
@@ -145,7 +168,6 @@ final class JMSConsumer extends JMSWorker {
                 destinationName = (destination instanceof Queue) ? ((Queue) 
destination).getQueueName()
                         : ((Topic) destination).getTopicName();
             } catch (JMSException e) {
-                logger.warn("Failed to retrieve Destination name for '" + 
headerName + "' header", e);
                 this.processLog.warn("Failed to retrieve Destination name for 
'" + headerName + "' header", e);
             }
         }
@@ -180,4 +202,12 @@ final class JMSConsumer extends JMSWorker {
             return messageProperties;
         }
     }
+
+    /**
+     * Callback to be invoked while executing inJMS call (the call within the
+     * live JMS session)
+     */
+    static interface ConsumerCallback {
+        void accept(JMSResponse response);
+    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/40618364/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index fb8b533..1b0caf7 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -122,7 +122,7 @@ public class PublishJMS extends 
AbstractJMSProcessor<JMSPublisher> {
      * Will create an instance of {@link JMSPublisher}
      */
     @Override
-    protected JMSPublisher finishBuildingTargetResource(JmsTemplate 
jmsTemplate) {
+    protected JMSPublisher finishBuildingTargetResource(JmsTemplate 
jmsTemplate, ProcessContext processContext) {
         return new JMSPublisher(jmsTemplate, this.getLogger());
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/40618364/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
index 8a69a14..12ffe03 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/CommonTest.java
@@ -21,6 +21,9 @@ import static org.junit.Assert.assertTrue;
 import java.util.Iterator;
 import java.util.ServiceLoader;
 
+import javax.jms.ConnectionFactory;
+import javax.jms.Session;
+
 import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.nifi.processor.Processor;
 import org.junit.Test;
@@ -49,12 +52,13 @@ public class CommonTest {
     }
 
     static JmsTemplate buildJmsTemplateForDestination(boolean pubSub) {
-        ActiveMQConnectionFactory connectionFactory = new 
ActiveMQConnectionFactory(
+        ConnectionFactory connectionFactory = new ActiveMQConnectionFactory(
                 "vm://localhost?broker.persistent=false");
-        CachingConnectionFactory cf = new 
CachingConnectionFactory(connectionFactory);
+        connectionFactory = new CachingConnectionFactory(connectionFactory);
 
-        JmsTemplate jmsTemplate = new JmsTemplate(cf);
+        JmsTemplate jmsTemplate = new JmsTemplate(connectionFactory);
         jmsTemplate.setPubSubDomain(pubSub);
+        jmsTemplate.setSessionAcknowledgeMode(Session.CLIENT_ACKNOWLEDGE);
         return jmsTemplate;
     }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/40618364/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
index 58de99f..3b85e3f 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
@@ -20,8 +20,10 @@ import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
+import java.nio.charset.StandardCharsets;
 import java.util.HashMap;
 import java.util.Map;
+import java.util.concurrent.atomic.AtomicBoolean;
 
 import javax.jms.BytesMessage;
 import javax.jms.JMSException;
@@ -30,6 +32,7 @@ import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.Topic;
 
+import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
 import org.apache.nifi.jms.processors.JMSConsumer.JMSResponse;
 import org.apache.nifi.logging.ProcessorLog;
 import org.junit.Test;
@@ -41,7 +44,7 @@ import org.springframework.jms.support.JmsHeaders;
 public class JMSPublisherConsumerTest {
 
     @Test
-    public void validateByesConvertedToBytesMessageOnSend() throws Exception {
+    public void validateBytesConvertedToBytesMessageOnSend() throws Exception {
         final String destinationName = "testQueue";
         JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
         JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
@@ -96,7 +99,12 @@ public class JMSPublisherConsumerTest {
 
         JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ProcessorLog.class));
         try {
-            consumer.consume(destinationName);
+            consumer.consume(destinationName, new ConsumerCallback() {
+                @Override
+                public void accept(JMSResponse response) {
+                    // noop
+                }
+            });
         } finally {
             ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
         }
@@ -119,12 +127,90 @@ public class JMSPublisherConsumerTest {
         });
 
         JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ProcessorLog.class));
-        JMSResponse response = consumer.consume(destinationName);
-        assertEquals("hello from the other side", new 
String(response.getMessageBody()));
-        assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
-        assertEquals("foo", response.getMessageProperties().get("foo"));
-        assertEquals("false", response.getMessageProperties().get("bar"));
+        final AtomicBoolean callbackInvoked = new AtomicBoolean();
+        consumer.consume(destinationName, new ConsumerCallback() {
+            @Override
+            public void accept(JMSResponse response) {
+                callbackInvoked.set(true);
+                assertEquals("hello from the other side", new 
String(response.getMessageBody()));
+                assertEquals("fooQueue", 
response.getMessageHeaders().get(JmsHeaders.REPLY_TO));
+                assertEquals("foo", 
response.getMessageProperties().get("foo"));
+                assertEquals("false", 
response.getMessageProperties().get("bar"));
+            }
+        });
+        assertTrue(callbackInvoked.get());
+
+        ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
+    }
+
+    @Test
+    public void validateMessageRedeliveryWhenNotAcked() throws Exception {
+        String destinationName = "testQueue";
+        JmsTemplate jmsTemplate = 
CommonTest.buildJmsTemplateForDestination(false);
+        JMSPublisher publisher = new JMSPublisher(jmsTemplate, 
mock(ProcessorLog.class));
+        publisher.publish(destinationName, 
"1".getBytes(StandardCharsets.UTF_8));
+        publisher.publish(destinationName, 
"2".getBytes(StandardCharsets.UTF_8));
+
+        JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ProcessorLog.class));
+        final AtomicBoolean callbackInvoked = new AtomicBoolean();
+        try {
+            consumer.consume(destinationName, new ConsumerCallback() {
+                @Override
+                public void accept(JMSResponse response) {
+                    callbackInvoked.set(true);
+                    assertEquals("1", new String(response.getMessageBody()));
+                    throw new RuntimeException("intentional to avoid explicit 
ack");
+                }
+            });
+        } catch (Exception e) {
+            // ignore
+        }
+        assertTrue(callbackInvoked.get());
+        callbackInvoked.set(false);
+
+        // should receive the same message, but will process it successfully
+        try {
+            consumer.consume(destinationName, new ConsumerCallback() {
+                @Override
+                public void accept(JMSResponse response) {
+                    callbackInvoked.set(true);
+                    assertEquals("1", new String(response.getMessageBody()));
+                }
+            });
+        } catch (Exception e) {
+            // ignore
+        }
+        assertTrue(callbackInvoked.get());
+        callbackInvoked.set(false);
 
+        // receiving next message and fail again
+        try {
+            consumer.consume(destinationName, new ConsumerCallback() {
+                @Override
+                public void accept(JMSResponse response) {
+                    callbackInvoked.set(true);
+                    assertEquals("2", new String(response.getMessageBody()));
+                    throw new RuntimeException("intentional to avoid explicit 
ack");
+                }
+            });
+        } catch (Exception e) {
+            // ignore
+        }
+        assertTrue(callbackInvoked.get());
+        callbackInvoked.set(false);
+
+        // should receive the same message, but will process it successfully
+        try {
+            consumer.consume(destinationName, new ConsumerCallback() {
+                @Override
+                public void accept(JMSResponse response) {
+                    callbackInvoked.set(true);
+                    assertEquals("2", new String(response.getMessageBody()));
+                }
+            });
+        } catch (Exception e) {
+            // ignore
+        }
         ((CachingConnectionFactory) 
jmsTemplate.getConnectionFactory()).destroy();
     }
 }

Reply via email to