Repository: nifi
Updated Branches:
  refs/heads/master 9bfa7469c -> bdfd71069


NIFI-3983 - Support ability to make JMS 2.0 durable subscriptions on Topic

Add logic in Consumer adding support for all topic consumer combinations, 
non-durable, durable, shared, durable-shared.
Add new optional config option to supply subscription name.
Add new optional config option to supply clientId.

This closes #1863.

Signed-off-by: Koji Kawamura <[email protected]>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/bdfd7106
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/bdfd7106
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/bdfd7106

Branch: refs/heads/master
Commit: bdfd710692c809b63951a9855c5f46962db937ed
Parents: 9bfa746
Author: Michael Andre Pearce <[email protected]>
Authored: Wed Jun 28 13:08:58 2017 +0100
Committer: Koji Kawamura <[email protected]>
Committed: Thu Jun 29 10:23:15 2017 +0900

----------------------------------------------------------------------
 .../jms/processors/AbstractJMSProcessor.java    | 16 +++++++-
 .../apache/nifi/jms/processors/ConsumeJMS.java  | 39 +++++++++++++++++++-
 .../apache/nifi/jms/processors/JMSConsumer.java | 30 +++++++++++++--
 .../processors/JMSPublisherConsumerTest.java    | 12 +++---
 4 files changed, 85 insertions(+), 12 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/bdfd7106/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index d5b704b..39118a7 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -79,6 +79,15 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             .defaultValue(QUEUE)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .build();
+    static final PropertyDescriptor CLIENT_ID = new 
PropertyDescriptor.Builder()
+            .name("Connection Client ID")
+            .description("The client id to be set on the connection, if set. 
For durable non shared consumer this is mandatory, " +
+                         "for all others it is optional, typically with shared 
consumers it is undesirable to be set. " +
+                         "Please see JMS spec for further details")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
     static final PropertyDescriptor SESSION_CACHE_SIZE = new 
PropertyDescriptor.Builder()
             .name("Session Cache size")
             .description("The maximum limit for the number of cached 
Sessions.")
@@ -87,6 +96,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
             .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .build();
 
+
     // ConnectionFactoryProvider ControllerService
     static final PropertyDescriptor CF_SERVICE = new 
PropertyDescriptor.Builder()
             .name("Connection Factory Service")
@@ -107,6 +117,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
         propertyDescriptors.add(DESTINATION_TYPE);
         propertyDescriptors.add(USER);
         propertyDescriptors.add(PASSWORD);
+        propertyDescriptors.add(CLIENT_ID);
         propertyDescriptors.add(SESSION_CACHE_SIZE);
     }
 
@@ -196,7 +207,10 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> 
extends AbstractProcess
 
             this.cachingConnectionFactory = new 
CachingConnectionFactory(cfCredentialsAdapter);
             
this.cachingConnectionFactory.setSessionCacheSize(Integer.parseInt(context.getProperty(SESSION_CACHE_SIZE).getValue()));
-
+            String clientId = 
context.getProperty(CLIENT_ID).evaluateAttributeExpressions().getValue();
+            if (clientId != null) {
+                this.cachingConnectionFactory.setClientId(clientId);
+            }
             JmsTemplate jmsTemplate = new JmsTemplate();
             jmsTemplate.setConnectionFactory(this.cachingConnectionFactory);
             
jmsTemplate.setPubSubDomain(TOPIC.equals(context.getProperty(DESTINATION_TYPE).getValue()));

http://git-wip-us.apache.org/repos/asf/nifi/blob/bdfd7106/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index d85d26f..8774397 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -45,6 +45,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.springframework.jms.core.JmsTemplate;
 
 /**
@@ -86,6 +87,34 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
             .defaultValue(CLIENT_ACK.getValue())
             .build();
 
+    static final PropertyDescriptor DURABLE_SUBSCRIBER = new 
PropertyDescriptor.Builder()
+            .name("Durable subscription")
+            .description("If destination is Topic if present then make it the 
consumer durable. " +
+                         "@see 
https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createDurableConsumer-javax.jms.Topic-java.lang.String-";)
+            .required(false)
+            .expressionLanguageSupported(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor SHARED_SUBSCRIBER = new 
PropertyDescriptor.Builder()
+            .name("Shared subscription")
+            .description("If destination is Topic if present then make it the 
consumer shared. " +
+                         "@see 
https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createSharedConsumer-javax.jms.Topic-java.lang.String-";)
+            .required(false)
+            .expressionLanguageSupported(true)
+            .defaultValue("false")
+            .allowableValues("true", "false")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+    static final PropertyDescriptor SUBSCRIPTION_NAME = new 
PropertyDescriptor.Builder()
+            .name("Subscription Name")
+            .description("The name of the subscription to use if destination 
is Topic and is shared or durable.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(true)
+            .build();
+
     public static final Relationship REL_SUCCESS = new Relationship.Builder()
             .name("success")
             .description("All FlowFiles that are received from the JMS 
Destination are routed to this relationship")
@@ -99,6 +128,9 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.addAll(propertyDescriptors);
         _propertyDescriptors.add(ACKNOWLEDGEMENT_MODE);
+        _propertyDescriptors.add(DURABLE_SUBSCRIBER);
+        _propertyDescriptors.add(SHARED_SUBSCRIBER);
+        _propertyDescriptors.add(SUBSCRIPTION_NAME);
         thisPropertyDescriptors = 
Collections.unmodifiableList(_propertyDescriptors);
 
         Set<Relationship> _relationships = new HashSet<>();
@@ -116,7 +148,12 @@ public class ConsumeJMS extends 
AbstractJMSProcessor<JMSConsumer> {
     @Override
     protected void rendezvousWithJms(final ProcessContext context, final 
ProcessSession processSession) throws ProcessException {
         final String destinationName = 
context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue();
-        this.targetResource.consume(destinationName, new ConsumerCallback(){
+        final Boolean durableBoolean = 
context.getProperty(DURABLE_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
+        final boolean durable = durableBoolean == null ? false : 
durableBoolean;
+        final Boolean sharedBoolean = 
context.getProperty(SHARED_SUBSCRIBER).evaluateAttributeExpressions().asBoolean();
+        final boolean shared = sharedBoolean == null ? false : sharedBoolean;
+        final String subscriptionName = 
context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
+        this.targetResource.consume(destinationName, durable, shared, 
subscriptionName, new ConsumerCallback(){
             @Override
             public void accept(final JMSResponse response) {
                 if (response != null){

http://git-wip-us.apache.org/repos/asf/nifi/blob/bdfd7106/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
index e955236..a4fc47a 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSConsumer.java
@@ -32,6 +32,7 @@ import javax.jms.TextMessage;
 import javax.jms.Topic;
 
 import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.exception.ProcessException;
 import org.springframework.jms.core.JmsTemplate;
 import org.springframework.jms.core.SessionCallback;
 import org.springframework.jms.support.JmsHeaders;
@@ -61,7 +62,7 @@ final class JMSConsumer extends JMSWorker {
     /**
      *
      */
-    public void consume(final String destinationName, final ConsumerCallback 
consumerCallback) {
+    public void consume(final String destinationName, final boolean durable, 
final boolean shared, final String subscriberName, final ConsumerCallback 
consumerCallback) {
         this.jmsTemplate.execute(new SessionCallback<Void>() {
             @Override
             public Void doInJms(Session session) throws JMSException {
@@ -71,10 +72,31 @@ final class JMSConsumer extends JMSWorker {
                  * delivery and restarts with the oldest unacknowledged message
                  */
                 session.recover();
+                boolean isPubSub = 
JMSConsumer.this.jmsTemplate.isPubSubDomain();
                 Destination destination = 
JMSConsumer.this.jmsTemplate.getDestinationResolver().resolveDestinationName(
-                        session, destinationName, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
-                MessageConsumer msgConsumer = 
session.createConsumer(destination, null,
-                        JMSConsumer.this.jmsTemplate.isPubSubDomain());
+                        session, destinationName, isPubSub);
+                MessageConsumer msgConsumer;
+                if (isPubSub) {
+                    if (shared) {
+                        try {
+                            if (durable) {
+                                msgConsumer = 
session.createSharedDurableConsumer((Topic)destination, subscriberName);
+                            } else {
+                                msgConsumer = 
session.createSharedConsumer((Topic)destination, subscriberName);
+                            }
+                        } catch (AbstractMethodError e) {
+                            throw new ProcessException("Failed to create a 
shared consumer. Make sure the target broker is JMS 2.0 compliant.", e);
+                        }
+                    } else {
+                        if (durable) {
+                            msgConsumer = 
session.createDurableConsumer((Topic)destination, subscriberName, null, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
+                        } else {
+                            msgConsumer = 
session.createConsumer((Topic)destination, null, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
+                        }
+                    }
+                } else {
+                    msgConsumer = session.createConsumer(destination, null, 
JMSConsumer.this.jmsTemplate.isPubSubDomain());
+                }
                 Message message = 
msgConsumer.receive(JMSConsumer.this.jmsTemplate.getReceiveTimeout());
                 JMSResponse response = null;
                 try {

http://git-wip-us.apache.org/repos/asf/nifi/blob/bdfd7106/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
----------------------------------------------------------------------
diff --git 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
index 3168443..0f8dafb 100644
--- 
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
+++ 
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/JMSPublisherConsumerTest.java
@@ -100,7 +100,7 @@ public class JMSPublisherConsumerTest {
 
         JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ComponentLog.class));
         try {
-            consumer.consume(destinationName, new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     // noop
@@ -129,7 +129,7 @@ public class JMSPublisherConsumerTest {
 
         JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ComponentLog.class));
         final AtomicBoolean callbackInvoked = new AtomicBoolean();
-        consumer.consume(destinationName, new ConsumerCallback() {
+        consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
             @Override
             public void accept(JMSResponse response) {
                 callbackInvoked.set(true);
@@ -155,7 +155,7 @@ public class JMSPublisherConsumerTest {
         JMSConsumer consumer = new JMSConsumer(jmsTemplate, 
mock(ComponentLog.class));
         final AtomicBoolean callbackInvoked = new AtomicBoolean();
         try {
-            consumer.consume(destinationName, new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);
@@ -171,7 +171,7 @@ public class JMSPublisherConsumerTest {
 
         // should receive the same message, but will process it successfully
         try {
-            consumer.consume(destinationName, new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);
@@ -186,7 +186,7 @@ public class JMSPublisherConsumerTest {
 
         // receiving next message and fail again
         try {
-            consumer.consume(destinationName, new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);
@@ -202,7 +202,7 @@ public class JMSPublisherConsumerTest {
 
         // should receive the same message, but will process it successfully
         try {
-            consumer.consume(destinationName, new ConsumerCallback() {
+            consumer.consume(destinationName, false, false, null, new 
ConsumerCallback() {
                 @Override
                 public void accept(JMSResponse response) {
                     callbackInvoked.set(true);

Reply via email to