This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 3492313 NIFI-5869 Support Reconnection for JMS
3492313 is described below
commit 3492313d0b3436cdd0f7390d46d403fed9d65b77
Author: Ed <[email protected]>
AuthorDate: Thu Jan 10 11:55:29 2019 -0500
NIFI-5869 Support Reconnection for JMS
resets worker if it doesn't work anymore for any reason. this will add
"reconnect" capabilities. Will solve issues for following use cases:
- authentication changed after successful connection
- JNDI mapping changed and requires recaching.
- JMS server isn't available anymore or restarted.
improved controller reset on exception
Signed-off-by: Matthew Burgess <[email protected]>
This closes #3261
---
.../cf/JMSConnectionFactoryProviderDefinition.java | 8 +++++
.../nifi/jms/cf/JMSConnectionFactoryProvider.java | 9 ++++-
.../jms/cf/JndiJmsConnectionFactoryProvider.java | 8 +++++
.../nifi/jms/processors/AbstractJMSProcessor.java | 22 +++++++++++-
.../org/apache/nifi/jms/processors/ConsumeJMS.java | 41 ++++++++++++----------
.../org/apache/nifi/jms/processors/JMSWorker.java | 9 +++++
.../org/apache/nifi/jms/processors/PublishJMS.java | 15 ++++++--
7 files changed, 90 insertions(+), 22 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java
index adb94fd..6bab920 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderDefinition.java
@@ -35,4 +35,12 @@ public interface JMSConnectionFactoryProviderDefinition
extends ControllerServic
*/
ConnectionFactory getConnectionFactory();
+ /**
+ * Resets {@link ConnectionFactory}.
+ * Provider should reset {@link ConnectionFactory} only if a copy provided
by a client matches
+ * current {@link ConnectionFactory}.
+ * @param cachedFactory - {@link ConnectionFactory} cached by client.
+ */
+ void resetConnectionFactory(ConnectionFactory cachedFactory);
+
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
index ecb4e7a..781ce65 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
@@ -139,6 +139,14 @@ public class JMSConnectionFactoryProvider extends
AbstractControllerService impl
.build();
}
+ @Override
+ public void resetConnectionFactory(ConnectionFactory cachedFactory) {
+ if (cachedFactory == connectionFactory) {
+ getLogger().debug("Resetting connection factory");
+ connectionFactory = null;
+ }
+ }
+
/**
* @return new instance of {@link ConnectionFactory}
*/
@@ -316,5 +324,4 @@ public class JMSConnectionFactoryProvider extends
AbstractControllerService impl
return StandardValidators.NON_EMPTY_VALIDATOR.validate(subject,
input, context);
}
}
-
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
index a293d84..44d8d99 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryProvider.java
@@ -139,6 +139,14 @@ public class JndiJmsConnectionFactoryProvider extends
AbstractControllerService
}
@Override
+ public synchronized void resetConnectionFactory(ConnectionFactory
cachedFactory) {
+ if (cachedFactory == connectionFactory) {
+ getLogger().debug("Resetting connection factory");
+ connectionFactory = null;
+ }
+ }
+
+ @Override
public synchronized ConnectionFactory getConnectionFactory() {
if (connectionFactory == null) {
connectionFactory = lookupConnectionFactory();
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 0094eaf..f47cf78 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -158,7 +158,27 @@ abstract class AbstractJMSProcessor<T extends JMSWorker>
extends AbstractProcess
try {
rendezvousWithJms(context, session, worker);
} finally {
- workerPool.offer(worker);
+ //in case of exception during worker's connection (consumer or
publisher),
+ //an appropriate service is responsible to invalidate the worker.
+ //if worker is not valid anymore, don't put it back into a pool,
try to rebuild it first, or discard.
+ //this will be helpful in a situation, when JNDI has changed, or
JMS server is not available
+ //and reconnection is required.
+ if (worker == null || !worker.isValid()){
+ getLogger().debug("Worker is invalid. Will try re-create... ");
+ final JMSConnectionFactoryProviderDefinition cfProvider =
context.getProperty(CF_SERVICE).asControllerService(JMSConnectionFactoryProviderDefinition.class);
+ try {
+ // Safe to cast. Method
#buildTargetResource(ProcessContext context) sets only CachingConnectionFactory
+ CachingConnectionFactory currentCF =
(CachingConnectionFactory)worker.jmsTemplate.getConnectionFactory();
+
cfProvider.resetConnectionFactory(currentCF.getTargetConnectionFactory());
+ worker = buildTargetResource(context);
+ }catch(Exception e) {
+ getLogger().error("Failed to rebuild: " + cfProvider);
+ worker = null;
+ }
+ }
+ if (worker != null) {
+ workerPool.offer(worker);
+ }
}
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index 997c6dd..4b149e2 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -188,28 +188,33 @@ public class ConsumeJMS extends
AbstractJMSProcessor<JMSConsumer> {
final String subscriptionName =
context.getProperty(SUBSCRIPTION_NAME).evaluateAttributeExpressions().getValue();
final String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions().getValue();
- consumer.consume(destinationName, durable, shared, subscriptionName,
charset, new ConsumerCallback() {
- @Override
- public void accept(final JMSResponse response) {
- if (response == null) {
- return;
- }
+ try {
+ consumer.consume(destinationName, durable, shared,
subscriptionName, charset, new ConsumerCallback() {
+ @Override
+ public void accept(final JMSResponse response) {
+ if (response == null) {
+ return;
+ }
- FlowFile flowFile = processSession.create();
- flowFile = processSession.write(flowFile, out ->
out.write(response.getMessageBody()));
+ FlowFile flowFile = processSession.create();
+ flowFile = processSession.write(flowFile, out ->
out.write(response.getMessageBody()));
- final Map<String, String> jmsHeaders =
response.getMessageHeaders();
- final Map<String, String> jmsProperties =
response.getMessageProperties();
+ final Map<String, String> jmsHeaders =
response.getMessageHeaders();
+ final Map<String, String> jmsProperties =
response.getMessageProperties();
- flowFile =
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile,
processSession);
- flowFile =
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties,
flowFile, processSession);
- flowFile = processSession.putAttribute(flowFile,
JMS_SOURCE_DESTINATION_NAME, destinationName);
+ flowFile =
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsHeaders, flowFile,
processSession);
+ flowFile =
ConsumeJMS.this.updateFlowFileAttributesWithJMSAttributes(jmsProperties,
flowFile, processSession);
+ flowFile = processSession.putAttribute(flowFile,
JMS_SOURCE_DESTINATION_NAME, destinationName);
- processSession.getProvenanceReporter().receive(flowFile,
destinationName);
- processSession.transfer(flowFile, REL_SUCCESS);
- processSession.commit();
- }
- });
+ processSession.getProvenanceReporter().receive(flowFile,
destinationName);
+ processSession.transfer(flowFile, REL_SUCCESS);
+ processSession.commit();
+ }
+ });
+ } catch(Exception e) {
+ consumer.setValid(false);
+ throw e; // for backward compatibility with exception handling in
flows
+ }
}
/**
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
index e6fa1bb..ee4d76d 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/JMSWorker.java
@@ -36,6 +36,7 @@ abstract class JMSWorker {
protected final JmsTemplate jmsTemplate;
protected final ComponentLog processLog;
private final CachingConnectionFactory connectionFactory;
+ private boolean isValid = true;
/**
@@ -61,4 +62,12 @@ abstract class JMSWorker {
return this.getClass().getSimpleName() + "[destination:" +
this.jmsTemplate.getDefaultDestinationName()
+ "; pub-sub:" + this.jmsTemplate.isPubSubDomain() + ";]";
}
+
+ public boolean isValid() {
+ return isValid;
+ }
+
+ public void setValid(boolean isValid) {
+ this.isValid = isValid;
+ }
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index a32a895..12451cf 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -123,11 +123,21 @@ public class PublishJMS extends
AbstractJMSProcessor<JMSPublisher> {
String charset =
context.getProperty(CHARSET).evaluateAttributeExpressions(flowFile).getValue();
switch (context.getProperty(MESSAGE_BODY).getValue()) {
case TEXT_MESSAGE:
- publisher.publish(destinationName,
this.extractTextMessageBody(flowFile, processSession, charset),
flowFile.getAttributes());
+ try {
+ publisher.publish(destinationName,
this.extractTextMessageBody(flowFile, processSession, charset),
flowFile.getAttributes());
+ } catch(Exception e) {
+ publisher.setValid(false);
+ throw e;
+ }
break;
case BYTES_MESSAGE:
default:
- publisher.publish(destinationName,
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+ try {
+ publisher.publish(destinationName,
this.extractMessageBody(flowFile, processSession), flowFile.getAttributes());
+ } catch(Exception e) {
+ publisher.setValid(false);
+ throw e;
+ }
break;
}
processSession.transfer(flowFile, REL_SUCCESS);
@@ -136,6 +146,7 @@ public class PublishJMS extends
AbstractJMSProcessor<JMSPublisher> {
processSession.transfer(flowFile, REL_FAILURE);
this.getLogger().error("Failed while sending message to JMS
via " + publisher, e);
context.yield();
+ publisher.setValid(false);
}
}
}