This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new badbc1aca6 NIFI-12022 Extract verification logic from
JMSConnectionFactoryProvider
badbc1aca6 is described below
commit badbc1aca66684187093ff82fde6bfb28db549c7
Author: Nandor Soma Abonyi <[email protected]>
AuthorDate: Mon Aug 28 10:39:30 2023 +0200
NIFI-12022 Extract verification logic from JMSConnectionFactoryProvider
This closes #7667.
Signed-off-by: Peter Turcsanyi <[email protected]>
---
.../cf/AbstractJMSConnectionFactoryProvider.java} | 66 ++---------
.../jms/cf/CachedJMSConnectionFactoryHandler.java | 53 +++++++++
.../cf/JMSConnectionFactoryHandlerDefinition.java} | 20 +---
.../nifi/jms/cf/JMSConnectionFactoryHandler.java | 80 ++++++-------
.../nifi/jms/cf/JMSConnectionFactoryProvider.java | 126 +--------------------
.../jms/cf/JndiJmsConnectionFactoryHandler.java | 24 +---
.../jms/cf/JMSConnectionFactoryHandlerForTest.java | 6 +-
.../cf/JMSConnectionFactoryProviderForTest.java | 1 -
8 files changed, 111 insertions(+), 265 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/AbstractJMSConnectionFactoryProvider.java
similarity index 64%
copy from
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
copy to
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/AbstractJMSConnectionFactoryProvider.java
index 8a8c4b12fe..a2be311727 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/AbstractJMSConnectionFactoryProvider.java
@@ -16,23 +16,14 @@
*/
package org.apache.nifi.jms.cf;
-import org.apache.nifi.annotation.behavior.DynamicProperty;
-import org.apache.nifi.annotation.behavior.Restricted;
-import org.apache.nifi.annotation.behavior.Restriction;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.controller.AbstractControllerService;
-import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
-import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.ConfigVerificationResult.Outcome;
+import org.apache.nifi.controller.AbstractControllerService;
+import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.VerifiableControllerService;
+import org.apache.nifi.logging.ComponentLog;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
@@ -45,52 +36,19 @@ import java.util.Map;
import java.util.concurrent.atomic.AtomicReference;
/**
- * Provides a factory service that creates and initializes
- * {@link ConnectionFactory} specific to the third party JMS system.
- * <p>
- * It accomplishes it by adjusting current classpath by adding to it the
- * additional resources (i.e., JMS client libraries) provided by the user via
- * {@link JMSConnectionFactoryProperties#JMS_CLIENT_LIBRARIES}, allowing it
then to create an instance of the
- * target {@link ConnectionFactory} based on the provided
- * {@link JMSConnectionFactoryProperties#JMS_CONNECTION_FACTORY_IMPL} which
can be than access via
- * {@link #getConnectionFactory()} method.
+ * Base JMS controller service implementation that provides verification logic.
*/
-@Tags({"jms", "messaging", "integration", "queue", "topic", "publish",
"subscribe"})
-@CapabilityDescription("Provides a generic service to create vendor specific
javax.jms.ConnectionFactory implementations. "
- + "The Connection Factory can be served once this service is
configured successfully.")
-@DynamicProperty(name = "The name of a Connection Factory configuration
property.", value = "The value of a given Connection Factory configuration
property.",
- description = "The properties that are set following Java Beans
convention where a property name is derived from the 'set*' method of the
vendor "
- + "specific ConnectionFactory's implementation. For example,
'com.ibm.mq.jms.MQConnectionFactory.setChannel(String)' would imply 'channel' "
- + "property and
'com.ibm.mq.jms.MQConnectionFactory.setTransportType(int)' would imply
'transportType' property.",
- expressionLanguageScope =
ExpressionLanguageScope.VARIABLE_REGISTRY)
-@SeeAlso(classNames = {"org.apache.nifi.jms.processors.ConsumeJMS",
"org.apache.nifi.jms.processors.PublishJMS"})
-@Restricted(
- restrictions = {
- @Restriction(
- requiredPermission =
RequiredPermission.REFERENCE_REMOTE_RESOURCES,
- explanation = "Client Library Location can reference
resources over HTTP"
- )
- }
-)
-public class JMSConnectionFactoryProvider extends AbstractControllerService
implements JMSConnectionFactoryProviderDefinition, VerifiableControllerService {
+public abstract class AbstractJMSConnectionFactoryProvider extends
AbstractControllerService implements JMSConnectionFactoryProviderDefinition,
VerifiableControllerService {
private static final String ESTABLISH_CONNECTION = "Establish Connection";
private static final String VERIFY_JMS_INTERACTION = "Verify JMS
Interaction";
- protected volatile JMSConnectionFactoryHandler delegate;
-
- @Override
- protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
- return JMSConnectionFactoryProperties.getPropertyDescriptors();
- }
+ protected volatile JMSConnectionFactoryHandlerDefinition delegate;
- @Override
- protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final
String propertyDescriptorName) {
- return
JMSConnectionFactoryProperties.getDynamicPropertyDescriptor(propertyDescriptorName);
- }
+ protected abstract JMSConnectionFactoryHandlerDefinition
createConnectionFactoryHandler(ConfigurationContext context, ComponentLog
logger);
@OnEnabled
public void onEnabled(ConfigurationContext context) {
- delegate = new JMSConnectionFactoryHandler(context, getLogger());
+ delegate = createConnectionFactoryHandler(context, getLogger());
}
@OnDisabled
@@ -111,7 +69,7 @@ public class JMSConnectionFactoryProvider extends
AbstractControllerService impl
@Override
public List<ConfigVerificationResult> verify(final ConfigurationContext
context, final ComponentLog verificationLogger, final Map<String, String>
variables) {
final List<ConfigVerificationResult> results = new ArrayList<>();
- final JMSConnectionFactoryHandler handler = new
JMSConnectionFactoryHandler(context, verificationLogger);
+ final IJMSConnectionFactoryProvider handler =
createConnectionFactoryHandler(context, verificationLogger);
final AtomicReference<Exception> failureReason = new
AtomicReference<>();
final ExceptionListener listener = failureReason::set;
@@ -152,7 +110,7 @@ public class JMSConnectionFactoryProvider extends
AbstractControllerService impl
results.add(new ConfigVerificationResult.Builder()
.verificationStepName(ESTABLISH_CONNECTION)
.outcome(Outcome.SKIPPED)
- .explanation("Could not establish a Connection because doing
so requires that a username and password be provided")
+ .explanation("Could not establish a Connection because doing
so requires a valid username and password")
.build());
} catch (final Exception e) {
logger.warn("Failed to establish a connection to the JMS Server in
order to verify configuration", e);
@@ -160,7 +118,7 @@ public class JMSConnectionFactoryProvider extends
AbstractControllerService impl
results.add(new ConfigVerificationResult.Builder()
.verificationStepName(ESTABLISH_CONNECTION)
.outcome(Outcome.FAILED)
- .explanation("Was not able to establish a connection to the
JMS Server: " + e.toString())
+ .explanation("Was not able to establish a connection to the
JMS Server: " + e)
.build());
}
@@ -191,7 +149,7 @@ public class JMSConnectionFactoryProvider extends
AbstractControllerService impl
results.add(new ConfigVerificationResult.Builder()
.verificationStepName(VERIFY_JMS_INTERACTION)
.outcome(Outcome.FAILED)
- .explanation("Was not able to create a JMS Session: " +
failure.toString())
+ .explanation("Was not able to create a JMS Session: " +
failure)
.build());
}
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/CachedJMSConnectionFactoryHandler.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/CachedJMSConnectionFactoryHandler.java
new file mode 100644
index 0000000000..86e4953afd
--- /dev/null
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/CachedJMSConnectionFactoryHandler.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.jms.cf;
+
+import org.apache.nifi.logging.ComponentLog;
+
+import javax.jms.ConnectionFactory;
+
+public abstract class CachedJMSConnectionFactoryHandler implements
JMSConnectionFactoryHandlerDefinition {
+
+ private final ComponentLog logger;
+
+ private volatile ConnectionFactory connectionFactory;
+
+ protected CachedJMSConnectionFactoryHandler(ComponentLog logger) {
+ this.logger = logger;
+ }
+
+ public abstract ConnectionFactory createConnectionFactory();
+
+ @Override
+ public synchronized ConnectionFactory getConnectionFactory() {
+ if (connectionFactory == null) {
+ connectionFactory = createConnectionFactory();
+ } else {
+ logger.debug("Connection Factory has already been initialized.
Will return cached instance.");
+ }
+
+ return connectionFactory;
+ }
+
+ @Override
+ public synchronized void resetConnectionFactory(ConnectionFactory
cachedFactory) {
+ if (cachedFactory == connectionFactory) {
+ logger.debug("Resetting connection factory");
+ connectionFactory = null;
+ }
+ }
+}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerDefinition.java
similarity index 55%
copy from
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java
copy to
nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerDefinition.java
index 9e56e9bd8d..ebfe8c6980 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-cf-service/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerDefinition.java
@@ -16,24 +16,8 @@
*/
package org.apache.nifi.jms.cf;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
-import org.apache.nifi.controller.ConfigurationContext;
-
-import java.util.Map;
-
/**
- * Sub-class of {@link JMSConnectionFactoryProvider} only for testing purpose
+ * Base interface of handler implementations of IJMSConnectionFactoryProvider.
*/
-public class JMSConnectionFactoryProviderForTest extends
JMSConnectionFactoryProvider {
-
- @OnEnabled
- @Override
- public void onEnabled(ConfigurationContext context) {
- delegate = new JMSConnectionFactoryHandlerForTest(context,
getLogger());
- delegate.setConnectionFactoryProperties();
- }
-
- public Map<String, Object> getConfiguredProperties() {
- return ((JMSConnectionFactoryHandlerForTest)
delegate).getConfiguredProperties();
- }
+public interface JMSConnectionFactoryHandlerDefinition extends
IJMSConnectionFactoryProvider {
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java
index 44de835738..7db1c69e20 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandler.java
@@ -38,46 +38,28 @@ import org.apache.nifi.ssl.SSLContextService;
* implementation class and configuring the Connection Factory object directly.
* The handler can be used from controller services and processors as well.
*/
-public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvider {
+public class JMSConnectionFactoryHandler extends
CachedJMSConnectionFactoryHandler {
private final PropertyContext context;
private final Set<PropertyDescriptor> propertyDescriptors;
private final ComponentLog logger;
public JMSConnectionFactoryHandler(ConfigurationContext context,
ComponentLog logger) {
+ super(logger);
this.context = context;
this.propertyDescriptors = context.getProperties().keySet();
this.logger = logger;
}
public JMSConnectionFactoryHandler(ProcessContext context, ComponentLog
logger) {
+ super(logger);
this.context = context;
this.propertyDescriptors = context.getProperties().keySet();
this.logger = logger;
}
- private volatile ConnectionFactory connectionFactory;
-
- @Override
- public synchronized ConnectionFactory getConnectionFactory() {
- if (connectionFactory == null) {
- initConnectionFactory();
- } else {
- logger.debug("Connection Factory has already been initialized.
Will return cached instance.");
- }
-
- return connectionFactory;
- }
-
@Override
- public synchronized void resetConnectionFactory(ConnectionFactory
cachedFactory) {
- if (cachedFactory == connectionFactory) {
- logger.debug("Resetting connection factory");
- connectionFactory = null;
- }
- }
-
- private void initConnectionFactory() {
+ public ConnectionFactory createConnectionFactory() {
try {
if (logger.isInfoEnabled()) {
logger.info("Configuring " + getClass().getSimpleName() + "
for '"
@@ -85,10 +67,11 @@ public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvide
+
context.getProperty(JMS_BROKER_URI).evaluateAttributeExpressions().getValue() +
"'");
}
- createConnectionFactoryInstance();
- setConnectionFactoryProperties();
+ final ConnectionFactory connectionFactory =
createConnectionFactoryInstance();
+ setConnectionFactoryProperties(connectionFactory);
+
+ return connectionFactory;
} catch (Exception e) {
- connectionFactory = null;
logger.error("Failed to configure " + getClass().getSimpleName(),
e);
throw new IllegalStateException(e);
}
@@ -98,9 +81,9 @@ public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvide
* Creates an instance of the {@link ConnectionFactory} from the provided
* 'CONNECTION_FACTORY_IMPL'.
*/
- private void createConnectionFactoryInstance() {
- String connectionFactoryImplName =
context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
- connectionFactory =
Utils.newDefaultInstance(connectionFactoryImplName);
+ private ConnectionFactory createConnectionFactoryInstance() {
+ final String connectionFactoryImplName =
context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
+ return Utils.newDefaultInstance(connectionFactoryImplName);
}
/**
@@ -135,18 +118,18 @@ public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvide
* @see <a
href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setHostName_java.lang.String_">setHostName(String
hostname)</a>
* @see <a
href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setPort_int_">setPort(int
port)</a>
* @see <a
href="https://www.ibm.com/support/knowledgecenter/en/SSFKSJ_7.1.0/com.ibm.mq.javadoc.doc/WMQJMSClasses/com/ibm/mq/jms/MQConnectionFactory.html#setConnectionNameList_java.lang.String_">setConnectionNameList(String
hosts)</a>
- * @see #setProperty(String propertyName, Object propertyValue)
+ * @see #setProperty(ConnectionFactory connectionFactory, String
propertyName, Object propertyValue)
*/
- void setConnectionFactoryProperties() {
+ void setConnectionFactoryProperties(ConnectionFactory connectionFactory) {
String connectionFactoryValue =
context.getProperty(JMS_CONNECTION_FACTORY_IMPL).evaluateAttributeExpressions().getValue();
if (context.getProperty(JMS_BROKER_URI).isSet()) {
String brokerValue =
context.getProperty(JMS_BROKER_URI).evaluateAttributeExpressions().getValue();
if (connectionFactoryValue.startsWith("org.apache.activemq")) {
- setProperty("brokerURL", brokerValue);
+ setProperty(connectionFactory, "brokerURL", brokerValue);
} else if (connectionFactoryValue.startsWith("com.tibco.tibjms")) {
- setProperty("serverUrl", brokerValue);
+ setProperty(connectionFactory, "serverUrl", brokerValue);
} else if
(connectionFactoryValue.startsWith("org.apache.qpid.jms")) {
- setProperty("remoteURI", brokerValue);
+ setProperty(connectionFactory, "remoteURI", brokerValue);
} else {
String[] brokerList = brokerValue.split(",");
if (connectionFactoryValue.startsWith("com.ibm.mq.jms")) {
@@ -159,14 +142,14 @@ public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvide
ibmConList.add(broker);
}
}
- setProperty("connectionNameList", String.join(",",
ibmConList));
+ setProperty(connectionFactory, "connectionNameList",
String.join(",", ibmConList));
} else {
// Try to parse broker URI as colon separated host/port
pair. Use first pair if multiple given.
String[] hostPort = brokerList[0].split(":");
if (hostPort.length == 2) {
// If broker URI indeed was colon separated host/port
pair
- setProperty("hostName", hostPort[0]);
- setProperty("port", hostPort[1]);
+ setProperty(connectionFactory, "hostName",
hostPort[0]);
+ setProperty(connectionFactory, "port", hostPort[1]);
}
}
}
@@ -177,21 +160,21 @@ public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvide
SSLContext sslContext = sslContextService.createContext();
if (connectionFactoryValue.startsWith("org.apache.activemq")) {
if (sslContextService.isTrustStoreConfigured()) {
- setProperty("trustStore",
sslContextService.getTrustStoreFile());
- setProperty("trustStorePassword",
sslContextService.getTrustStorePassword());
- setProperty("trustStoreType",
sslContextService.getTrustStoreType());
+ setProperty(connectionFactory, "trustStore",
sslContextService.getTrustStoreFile());
+ setProperty(connectionFactory, "trustStorePassword",
sslContextService.getTrustStorePassword());
+ setProperty(connectionFactory, "trustStoreType",
sslContextService.getTrustStoreType());
}
if (sslContextService.isKeyStoreConfigured()) {
- setProperty("keyStore",
sslContextService.getKeyStoreFile());
- setProperty("keyStorePassword",
sslContextService.getKeyStorePassword());
- setProperty("keyStoreKeyPassword",
sslContextService.getKeyPassword());
- setProperty("keyStoreType",
sslContextService.getKeyStoreType());
+ setProperty(connectionFactory, "keyStore",
sslContextService.getKeyStoreFile());
+ setProperty(connectionFactory, "keyStorePassword",
sslContextService.getKeyStorePassword());
+ setProperty(connectionFactory, "keyStoreKeyPassword",
sslContextService.getKeyPassword());
+ setProperty(connectionFactory, "keyStoreType",
sslContextService.getKeyStoreType());
}
} else if
(connectionFactoryValue.startsWith("org.apache.qpid.jms")) {
- setProperty("sslContext", sslContext);
+ setProperty(connectionFactory, "sslContext", sslContext);
} else {
// IBM MQ (and others)
- setProperty("sSLSocketFactory", sslContext.getSocketFactory());
+ setProperty(connectionFactory, "sSLSocketFactory",
sslContext.getSocketFactory());
}
}
@@ -200,7 +183,7 @@ public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvide
.forEach(descriptor -> {
String propertyName = descriptor.getName();
String propertyValue =
context.getProperty(descriptor).evaluateAttributeExpressions().getValue();
- setProperty(propertyName, propertyValue);
+ setProperty(connectionFactory, propertyName,
propertyValue);
});
}
@@ -222,7 +205,7 @@ public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvide
* follow bean convention and all their properties using Java primitives as
* arguments.
*/
- void setProperty(String propertyName, Object propertyValue) {
+ void setProperty(ConnectionFactory connectionFactory, String propertyName,
Object propertyValue) {
String methodName = toMethodName(propertyName);
Method[] methods = Utils.findMethods(methodName,
connectionFactory.getClass());
if (methods != null && methods.length > 0) {
@@ -248,7 +231,7 @@ public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvide
throw new IllegalStateException("Failed to set property " +
propertyName, e);
}
} else if (propertyName.equals("hostName")) {
- setProperty("host", propertyValue); // try 'host' as another
common convention.
+ setProperty(connectionFactory, "host", propertyValue); // try
'host' as another common convention.
}
}
@@ -262,4 +245,5 @@ public class JMSConnectionFactoryHandler implements
IJMSConnectionFactoryProvide
c[0] = Character.toUpperCase(c[0]);
return "set" + new String(c);
}
+
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
index 8a8c4b12fe..331624537d 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
@@ -22,27 +22,14 @@ import org.apache.nifi.annotation.behavior.Restriction;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnDisabled;
-import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.RequiredPermission;
-import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.components.ConfigVerificationResult;
-import org.apache.nifi.components.ConfigVerificationResult.Outcome;
-import org.apache.nifi.controller.VerifiableControllerService;
-import javax.jms.Connection;
import javax.jms.ConnectionFactory;
-import javax.jms.ExceptionListener;
-import javax.jms.JMSSecurityException;
-import javax.jms.Session;
-import java.util.ArrayList;
import java.util.List;
-import java.util.Map;
-import java.util.concurrent.atomic.AtomicReference;
/**
* Provides a factory service that creates and initializes
@@ -72,11 +59,7 @@ import java.util.concurrent.atomic.AtomicReference;
)
}
)
-public class JMSConnectionFactoryProvider extends AbstractControllerService
implements JMSConnectionFactoryProviderDefinition, VerifiableControllerService {
- private static final String ESTABLISH_CONNECTION = "Establish Connection";
- private static final String VERIFY_JMS_INTERACTION = "Verify JMS
Interaction";
-
- protected volatile JMSConnectionFactoryHandler delegate;
+public class JMSConnectionFactoryProvider extends
AbstractJMSConnectionFactoryProvider {
@Override
protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -88,111 +71,8 @@ public class JMSConnectionFactoryProvider extends
AbstractControllerService impl
return
JMSConnectionFactoryProperties.getDynamicPropertyDescriptor(propertyDescriptorName);
}
- @OnEnabled
- public void onEnabled(ConfigurationContext context) {
- delegate = new JMSConnectionFactoryHandler(context, getLogger());
- }
-
- @OnDisabled
- public void onDisabled() {
- delegate = null;
- }
-
- @Override
- public ConnectionFactory getConnectionFactory() {
- return delegate.getConnectionFactory();
- }
-
@Override
- public void resetConnectionFactory(ConnectionFactory cachedFactory) {
- delegate.resetConnectionFactory(cachedFactory);
- }
-
- @Override
- public List<ConfigVerificationResult> verify(final ConfigurationContext
context, final ComponentLog verificationLogger, final Map<String, String>
variables) {
- final List<ConfigVerificationResult> results = new ArrayList<>();
- final JMSConnectionFactoryHandler handler = new
JMSConnectionFactoryHandler(context, verificationLogger);
-
- final AtomicReference<Exception> failureReason = new
AtomicReference<>();
- final ExceptionListener listener = failureReason::set;
-
- final Connection connection =
createConnection(handler.getConnectionFactory(), results, listener,
verificationLogger);
- if (connection != null) {
- try {
- createSession(connection, results, failureReason.get(),
verificationLogger);
- } finally {
- try {
- connection.close();
- } catch (final Exception ignored) {
- }
- }
- }
-
- return results;
- }
-
- private Connection createConnection(final ConnectionFactory
connectionFactory, final List<ConfigVerificationResult> results, final
ExceptionListener exceptionListener, final ComponentLog logger) {
- try {
- final Connection connection = connectionFactory.createConnection();
- connection.setExceptionListener(exceptionListener);
-
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName(ESTABLISH_CONNECTION)
- .outcome(Outcome.SUCCESSFUL)
- .explanation("Successfully established a JMS Connection")
- .build());
-
- return connection;
- } catch (final JMSSecurityException se) {
- // If we encounter a JMS Security Exception, the documentation
states that it is because of an invalid username or password.
- // There is no username or password configured for the Controller
Service itself, however. Those are configured in processors, etc.
- // As a result, if this is encountered, we will skip verification.
- logger.debug("Failed to establish a connection to the JMS Server
in order to verify configuration because encountered JMS Security Exception",
se);
-
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName(ESTABLISH_CONNECTION)
- .outcome(Outcome.SKIPPED)
- .explanation("Could not establish a Connection because doing
so requires that a username and password be provided")
- .build());
- } catch (final Exception e) {
- logger.warn("Failed to establish a connection to the JMS Server in
order to verify configuration", e);
-
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName(ESTABLISH_CONNECTION)
- .outcome(Outcome.FAILED)
- .explanation("Was not able to establish a connection to the
JMS Server: " + e.toString())
- .build());
- }
-
- return null;
- }
-
- private void createSession(final Connection connection, final
List<ConfigVerificationResult> results, final Exception capturedException,
final ComponentLog logger) {
- try {
- final Session session = connection.createSession(false,
Session.CLIENT_ACKNOWLEDGE);
- session.close();
-
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName(VERIFY_JMS_INTERACTION)
- .outcome(Outcome.SUCCESSFUL)
- .explanation("Established a JMS Session with server and
successfully terminated it")
- .build());
- } catch (final Exception e) {
- final Exception failure;
- if (capturedException == null) {
- failure = e;
- } else {
- failure = capturedException;
- failure.addSuppressed(e);
- }
-
- logger.warn("Failed to create a JMS Session in order to verify
configuration", failure);
-
- results.add(new ConfigVerificationResult.Builder()
- .verificationStepName(VERIFY_JMS_INTERACTION)
- .outcome(Outcome.FAILED)
- .explanation("Was not able to create a JMS Session: " +
failure.toString())
- .build());
- }
+ protected JMSConnectionFactoryHandlerDefinition
createConnectionFactoryHandler(ConfigurationContext context, ComponentLog
logger) {
+ return new JMSConnectionFactoryHandler(context, logger);
}
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java
index 53c216383e..468a1455ad 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JndiJmsConnectionFactoryHandler.java
@@ -43,43 +43,29 @@ import static
org.apache.nifi.jms.cf.JndiJmsConnectionFactoryProperties.JNDI_PRO
* Handler class to retrieve a JMS Connection Factory object via JNDI.
* The handler can be used from controller services and processors as well.
*/
-public class JndiJmsConnectionFactoryHandler implements
IJMSConnectionFactoryProvider {
+public class JndiJmsConnectionFactoryHandler extends
CachedJMSConnectionFactoryHandler {
private final PropertyContext context;
private final Set<PropertyDescriptor> propertyDescriptors;
private final ComponentLog logger;
- private volatile ConnectionFactory connectionFactory;
-
public JndiJmsConnectionFactoryHandler(ConfigurationContext context,
ComponentLog logger) {
+ super(logger);
this.context = context;
this.propertyDescriptors = context.getProperties().keySet();
this.logger = logger;
}
public JndiJmsConnectionFactoryHandler(ProcessContext context,
ComponentLog logger) {
+ super(logger);
this.context = context;
this.propertyDescriptors = context.getProperties().keySet();
this.logger = logger;
}
@Override
- public synchronized ConnectionFactory getConnectionFactory() {
- if (connectionFactory == null) {
- connectionFactory = lookupConnectionFactory();
- } else {
- logger.debug("Connection Factory has already been obtained from
JNDI. Will return cached instance.");
- }
-
- return connectionFactory;
- }
-
- @Override
- public synchronized void resetConnectionFactory(ConnectionFactory
cachedFactory) {
- if (cachedFactory == connectionFactory) {
- logger.debug("Resetting connection factory");
- connectionFactory = null;
- }
+ public ConnectionFactory createConnectionFactory() {
+ return lookupConnectionFactory();
}
private ConnectionFactory lookupConnectionFactory() {
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerForTest.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerForTest.java
index f8e087aa4b..4d349bee03 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerForTest.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryHandlerForTest.java
@@ -19,6 +19,7 @@ package org.apache.nifi.jms.cf;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.logging.ComponentLog;
+import javax.jms.ConnectionFactory;
import java.util.HashMap;
import java.util.Map;
@@ -27,14 +28,15 @@ import java.util.Map;
*/
public class JMSConnectionFactoryHandlerForTest extends
JMSConnectionFactoryHandler {
- private Map<String, Object> configuredProperties = new HashMap<>();
+ private final Map<String, Object> configuredProperties = new HashMap<>();
public JMSConnectionFactoryHandlerForTest(ConfigurationContext context,
ComponentLog logger) {
super(context, logger);
+ setConnectionFactoryProperties(null);
}
@Override
- void setProperty(String propertyName, Object propertyValue) {
+ void setProperty(ConnectionFactory connectionFactory, String propertyName,
Object propertyValue) {
configuredProperties.put(propertyName, propertyValue);
}
diff --git
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java
index 9e56e9bd8d..9223b3bde5 100644
---
a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java
+++
b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/test/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProviderForTest.java
@@ -30,7 +30,6 @@ public class JMSConnectionFactoryProviderForTest extends
JMSConnectionFactoryPro
@Override
public void onEnabled(ConfigurationContext context) {
delegate = new JMSConnectionFactoryHandlerForTest(context,
getLogger());
- delegate.setConnectionFactoryProperties();
}
public Map<String, Object> getConfiguredProperties() {