This is an automated email from the ASF dual-hosted git repository.
mosermw pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 9fdf8f79d9 NIFI-14124 Allow ConsumeJMS to close consumer connections
when set to run only on primary node
9fdf8f79d9 is described below
commit 9fdf8f79d94fb58b44b6f21b1227b5eead86a8d2
Author: Nissim Shiman <[email protected]>
AuthorDate: Tue Jan 7 21:14:32 2025 +0000
NIFI-14124 Allow ConsumeJMS to close consumer connections when set to run
only on primary node
Signed-off-by: Mike Moser <[email protected]>
Closes #9766
---
.../nifi/jms/processors/AbstractJMSProcessor.java | 34 ++++++++++----
.../apache/nifi/jms/processors/ConsumeJMSIT.java | 52 ++++++++++++++++++++++
2 files changed, 78 insertions(+), 8 deletions(-)
diff --git
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index b5f958598e..d8284b294a 100644
---
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -19,6 +19,8 @@ package org.apache.nifi.jms.processors;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
+import org.apache.nifi.annotation.notification.OnPrimaryNodeStateChange;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.ValidationContext;
@@ -38,6 +40,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.Processor;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.serialization.RecordReaderFactory;
import org.apache.nifi.serialization.RecordSetWriterFactory;
import org.springframework.jms.connection.CachingConnectionFactory;
@@ -54,6 +57,7 @@ import java.util.Collections;
import java.util.List;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
@@ -171,6 +175,8 @@ public abstract class AbstractJMSProcessor<T extends
JMSWorker> extends Abstract
private volatile IJMSConnectionFactoryProvider connectionFactoryProvider;
private volatile BlockingQueue<T> workerPool;
+ private volatile boolean runOnPrimary;
+ private final AtomicBoolean shutdownWorkers = new AtomicBoolean(false);
private final AtomicInteger clientIdCounter = new AtomicInteger(1);
protected static String getClientId(ProcessContext context) {
@@ -198,6 +204,16 @@ public abstract class AbstractJMSProcessor<T extends
JMSWorker> extends Abstract
return new
ConnectionFactoryConfigValidator(validationContext).validateConnectionFactoryConfig();
}
+ @OnPrimaryNodeStateChange
+ public void onPrimaryNodeChange(final PrimaryNodeState newState) {
+ if (isScheduled() && runOnPrimary &&
newState.equals(PrimaryNodeState.PRIMARY_NODE_REVOKED)) {
+ shutdownWorkers.set(true);
+ close();
+ } else {
+ shutdownWorkers.set(false);
+ }
+ }
+
@Override
public void onTrigger(ProcessContext context, ProcessSession session)
throws ProcessException {
T worker = workerPool.poll();
@@ -239,13 +255,17 @@ public abstract class AbstractJMSProcessor<T extends
JMSWorker> extends Abstract
worker.jmsTemplate.setDeliveryMode(Message.DEFAULT_DELIVERY_MODE);
worker.jmsTemplate.setTimeToLive(Message.DEFAULT_TIME_TO_LIVE);
worker.jmsTemplate.setPriority(Message.DEFAULT_PRIORITY);
- workerPool.offer(worker);
+ if (!shutdownWorkers.get()) {
+ workerPool.offer(worker);
+ } else {
+ worker.shutdown();
+ }
}
}
}
@OnScheduled
- public void setupConnectionFactoryProvider(final ProcessContext context) {
+ public void setup(final ProcessContext context) {
if (context.getProperty(CF_SERVICE).isSet()) {
connectionFactoryProvider =
context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
} else if
(context.getProperty(JndiJmsConnectionFactoryProperties.JNDI_CONNECTION_FACTORY_NAME).isSet())
{
@@ -255,6 +275,10 @@ public abstract class AbstractJMSProcessor<T extends
JMSWorker> extends Abstract
} else {
throw new ProcessException("No Connection Factory configured.");
}
+
+ workerPool = new
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+ runOnPrimary =
context.getExecutionNode().equals(ExecutionNode.PRIMARY);
+ shutdownWorkers.set(false);
}
@OnUnscheduled
@@ -262,12 +286,6 @@ public abstract class AbstractJMSProcessor<T extends
JMSWorker> extends Abstract
connectionFactoryProvider = null;
}
- @OnScheduled
- public void setupWorkerPool(final ProcessContext context) {
- workerPool = new
LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
- }
-
-
@OnStopped
public void close() {
T worker;
diff --git
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
index ce8464b546..6d73e3b977 100644
---
a/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
+++
b/nifi-extension-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/processors/ConsumeJMSIT.java
@@ -27,6 +27,7 @@ import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.transport.tcp.TcpTransport;
import org.apache.activemq.transport.tcp.TcpTransportFactory;
import org.apache.activemq.wireformat.WireFormat;
+import org.apache.nifi.annotation.notification.PrimaryNodeState;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProperties;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
@@ -38,6 +39,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.scheduling.ExecutionNode;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.MockProcessContext;
import org.apache.nifi.util.TestRunner;
@@ -638,6 +640,56 @@ public class ConsumeJMSIT {
}
}
+ @Test
+ public void validateConnectionClosedOnPrimaryOnlyNodeChange() throws
Exception {
+ BrokerService broker = new BrokerService();
+ try {
+ broker.setPersistent(false);
+ broker.setBrokerName("broker");
+ broker.start();
+
+ ActiveMQConnectionFactory cf = new
ActiveMQConnectionFactory("vm:localhost");
+ final String destinationName = "validateConnectionClosing";
+ TestRunner consumer = createNonSharedDurableConsumer(cf,
destinationName);
+ consumer.setIsConfiguredForClustering(true);
+ consumer.setConnected(true);
+ consumer.setClustered(true);
+ consumer.setPrimaryNode(true);
+
+ final ProcessContext processContext =
spy(consumer.getProcessContext());
+
when(processContext.getExecutionNode()).thenReturn(ExecutionNode.PRIMARY);
+
+ final ConsumeJMS processor = (ConsumeJMS) consumer.getProcessor();
+ processor.onSchedule(processContext);
+ processor.setup(processContext);
+ processor.updateScheduledTrue();
+
+ // running ConsumeJMS. There is nothing to consume yet, but it
sets up the durable consumer
+ consumer.run(1, false, false);
+ List<MockFlowFile> flowFiles =
consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+ assertEquals(0, flowFiles.size());
+
+ String message = "Hello World";
+ publishAMessage(cf, destinationName, message);
+
+ consumer.run(1, false, false);
+ flowFiles =
consumer.getFlowFilesForRelationship(ConsumeJMS.REL_SUCCESS);
+ assertEquals(1, flowFiles.size());
+ assertEquals(1, broker.getCurrentConnections());
+
+ final MockFlowFile successFF = flowFiles.getFirst();
+ successFF.assertContentEquals(message.getBytes());
+
+
processor.onPrimaryNodeChange(PrimaryNodeState.PRIMARY_NODE_REVOKED);
+
+ assertEquals(0, broker.getCurrentConnections());
+ } finally {
+ if (broker != null) {
+ broker.stop();
+ }
+ }
+ }
+
private static ArrayNode createTestJsonInput() {
final ObjectMapper mapper = new ObjectMapper();