This is an automated email from the ASF dual-hosted git repository.

mosermw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 9fdf8f79d9 NIFI-14124 Allow ConsumeJMS to close consumer connections 
when set to run only on primary node
9fdf8f79d9 is described below

commit 9fdf8f79d94fb58b44b6f21b1227b5eead86a8d2
Author: Nissim Shiman <[email protected]>
AuthorDate: Tue Jan 7 21:14:32 2025 +0000

    NIFI-14124 Allow ConsumeJMS to close consumer connections when set to run 
only on primary node
    
    Signed-off-by: Mike Moser <[email protected]>
    
    Closes #9766
---
 .../nifi/jms/processors/AbstractJMSProcessor.java  | 34 ++++++++++----
 .../apache/nifi/jms/processors/ConsumeJMSIT.java   | 52 ++++++++++++++++++++++
 2 files changed, 78 insertions(+), 8 deletions(-)

diff --git 
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
 
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index b5f958598e..d8284b294a 100644
--- 
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ 
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -19,6 +19,8 @@ package org.apache.nifi.jms.processors;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.ValidationContext;
@@ -38,6 +40,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.Processor;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.serialization.RecordReaderFactory;
 import org.apache.nifi.serialization.RecordSetWriterFactory;
 import org.springframework.jms.connection.CachingConnectionFactory;
@@ -54,6 +57,7 @@ import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
 import java.util.concurrent.atomic.AtomicInteger;
 import java.util.stream.Collectors;
 
@@ -171,6 +175,8 @@ public abstract class AbstractJMSProcessor<T extends 
JMSWorker> extends Abstract
 
     private volatile IJMSConnectionFactoryProvider connectionFactoryProvider;
     private volatile BlockingQueue<T> workerPool;
+    private volatile boolean runOnPrimary;
+    private final AtomicBoolean shutdownWorkers = new AtomicBoolean(false);
     private final AtomicInteger clientIdCounter = new AtomicInteger(1);
 
     protected static String getClientId(ProcessContext context) {
@@ -198,6 +204,16 @@ public abstract class AbstractJMSProcessor<T extends 
JMSWorker> extends Abstract
         return new 
ConnectionFactoryConfigValidator(validationContext).validateConnectionFactoryConfig();
     }
 
+    @OnPrimaryNodeStateChange
+    public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+        if (isScheduled() && runOnPrimary && 
newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
+            shutdownWorkers.set(true);
+            close();
+        } else {
+            shutdownWorkers.set(false);
+        }
+    }
+
     @Override
     public void onTrigger(ProcessContext context, ProcessSession session) 
throws ProcessException {
         T worker = workerPool.poll();
@@ -239,13 +255,17 @@ public abstract class AbstractJMSProcessor<T extends 
JMSWorker> extends Abstract
                 
worker.jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
                 worker.jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
                 worker.jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
-                workerPool.offer(worker);
+                if (!shutdownWorkers.get()) {
+                    workerPool.offer(worker);
+                } else {
+                    worker.shutdown();
+                }
             }
         }
     }
 
     @OnScheduled
-    public void setupConnectionFactoryProvider(final ProcessContext context) {
+    public void setup(final ProcessContext context) {
         if (context.getProperty(CF_SERVICE).isSet()) {
             connectionFactoryProvider = 
context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
         } else if 
(context.getProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME).isSet())
 {
@@ -255,6 +275,10 @@ public abstract class AbstractJMSProcessor<T extends 
JMSWorker> extends Abstract
         } else {
             throw new ProcessException("No Connection Factory configured.");
         }
+
+        workerPool = new 
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+        runOnPrimary = 
context.getExecutionNode().equals(ExecutionNode.PRIMARY);
+        shutdownWorkers.set(false);
     }
 
     @OnUnscheduled
@@ -262,12 +286,6 @@ public abstract class AbstractJMSProcessor<T extends 
JMSWorker> extends Abstract
         connectionFactoryProvider = null;
     }
 
-    @OnScheduled
-    public void setupWorkerPool(final ProcessContext context) {
-        workerPool = new 
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
-    }
-
-
     @OnStopped
     public void close() {
         T worker;
diff --git 
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
 
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index ce8464b546..6d73e3b977 100644
--- 
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++ 
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -27,6 +27,7 @@ import org.apache.activemq.command.ActiveMQMessage;
 import org.apache.activemq.transport.tcp.TcpTransport;
 import org.apache.activemq.transport.tcp.TcpTransportFactory;
 import org.apache.activemq.wireformat.WireFormat;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
 import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
@@ -38,6 +39,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.scheduling.ExecutionNode;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.MockProcessContext;
 import org.apache.nifi.util.TestRunner;
@@ -638,6 +640,56 @@ public class ConsumeJMSIT {
         }
     }
 
+    @Test
+    public void validateConnectionClosedOnPrimaryOnlyNodeChange() throws 
Exception {
+        BrokerService broker = new BrokerService();
+        try {
+            broker.setPersistent(false);
+            broker.setBrokerName("broker");
+            broker.start();
+
+            ActiveMQConnectionFactory cf = new 
ActiveMQConnectionFactory("vm:localhost");
+            final String destinationName = "validateConnectionClosing";
+            TestRunner consumer = createNonSharedDurableConsumer(cf, 
destinationName);
+            consumer.setIsConfiguredForClustering(true);
+            consumer.setConnected(true);
+            consumer.setClustered(true);
+            consumer.setPrimaryNode(true);
+
+            final ProcessContext processContext = 
spy(consumer.getProcessContext());
+            
when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY);
+
+            final ConsumeJMS processor = (ConsumeJMS) consumer.getProcessor();
+            processor.onSchedule(processContext);
+            processor.setup(processContext);
+            processor.updateScheduledTrue();
+
+            // running ConsumeJMS.  There is nothing to consume yet, but it 
sets up the durable consumer
+            consumer.run(1, false, false);
+            List<MockFlowFile> flowFiles = 
consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+            assertEquals(0, flowFiles.size());
+
+            String message = "Hello World";
+            publishAMessage(cf, destinationName, message);
+
+            consumer.run(1, false, false);
+            flowFiles = 
consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+            assertEquals(1, flowFiles.size());
+            assertEquals(1, broker.getCurrentConnections());
+
+            final MockFlowFile successFF = flowFiles.getFirst();
+            successFF.assertContentEquals(message.getBytes());
+
+            
processor.onPrimaryNodeChange(PrimaryNodeState.PRIMARY_NODE_REVOKED);
+
+            assertEquals(0, broker.getCurrentConnections());
+        } finally {
+            if (broker != null) {
+                broker.stop();
+            }
+        }
+    }
+
     private static ArrayNode createTestJsonInput() {
         final ObjectMapper mapper = new ObjectMapper();
 

Reply via email to