This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new ddfaf16f68 NIFI-10781 Made the MongoDB controller service implement
VerifiableControllerService.
ddfaf16f68 is described below
commit ddfaf16f68543882e1180d6f2d7c433d72d798e2
Author: Mike Thomsen <[email protected]>
AuthorDate: Tue Nov 8 15:57:39 2022 -0500
NIFI-10781 Made the MongoDB controller service implement
VerifiableControllerService.
Signed-off-by: Matthew Burgess <[email protected]>
This closes #6635
---
.../apache/nifi/mongodb/MongoDBClientService.java | 21 ++++-----
.../nifi/mongodb/MongoDBControllerService.java | 52 +++++++++++++++++-----
.../nifi/mongodb/MongoDBControllerServiceIT.java | 49 ++++++++++++++++++--
3 files changed, 98 insertions(+), 24 deletions(-)
diff --git
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
index c840525928..9f8893b46d 100644
---
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
+++
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-client-service-api/src/main/java/org/apache/nifi/mongodb/MongoDBClientService.java
@@ -23,13 +23,14 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.ControllerService;
+import org.apache.nifi.controller.VerifiableControllerService;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.security.util.ClientAuth;
import org.apache.nifi.ssl.SSLContextService;
import org.bson.Document;
-public interface MongoDBClientService extends ControllerService {
+public interface MongoDBClientService extends ControllerService,
VerifiableControllerService {
String WRITE_CONCERN_ACKNOWLEDGED = "ACKNOWLEDGED";
String WRITE_CONCERN_UNACKNOWLEDGED = "UNACKNOWLEDGED";
String WRITE_CONCERN_FSYNCED = "FSYNCED";
@@ -40,33 +41,33 @@ public interface MongoDBClientService extends
ControllerService {
String WRITE_CONCERN_W2 = "W2";
String WRITE_CONCERN_W3 = "W3";
- static final AllowableValue WRITE_CONCERN_ACKNOWLEDGED_VALUE = new
AllowableValue(
+ AllowableValue WRITE_CONCERN_ACKNOWLEDGED_VALUE = new AllowableValue(
WRITE_CONCERN_ACKNOWLEDGED, WRITE_CONCERN_ACKNOWLEDGED,
"Write operations that use this write concern will wait for
acknowledgement, " +
"using the default write concern configured on the server");
- static final AllowableValue WRITE_CONCERN_UNACKNOWLEDGED_VALUE = new
AllowableValue(
+ AllowableValue WRITE_CONCERN_UNACKNOWLEDGED_VALUE = new AllowableValue(
WRITE_CONCERN_UNACKNOWLEDGED, WRITE_CONCERN_UNACKNOWLEDGED,
"Write operations that use this write concern will return as soon
as the message is written to the socket. " +
"Exceptions are raised for network issues, but not server errors");
- static final AllowableValue WRITE_CONCERN_FSYNCED_VALUE = new
AllowableValue(
+ AllowableValue WRITE_CONCERN_FSYNCED_VALUE = new AllowableValue(
WRITE_CONCERN_FSYNCED, WRITE_CONCERN_FSYNCED,
"Deprecated. Use of \"" + WRITE_CONCERN_JOURNALED + "\" is
preferred");
- static final AllowableValue WRITE_CONCERN_JOURNALED_VALUE = new
AllowableValue(
+ AllowableValue WRITE_CONCERN_JOURNALED_VALUE = new AllowableValue(
WRITE_CONCERN_JOURNALED, WRITE_CONCERN_JOURNALED,
"Write operations wait for the server to group commit to the
journal file on disk");
- static final AllowableValue WRITE_CONCERN_REPLICA_ACKNOWLEDGED_VALUE =
new AllowableValue(
+ AllowableValue WRITE_CONCERN_REPLICA_ACKNOWLEDGED_VALUE = new
AllowableValue(
WRITE_CONCERN_REPLICA_ACKNOWLEDGED,
WRITE_CONCERN_REPLICA_ACKNOWLEDGED,
"Deprecated. Use of \"" + WRITE_CONCERN_W2 + "\" is preferred");
- static final AllowableValue WRITE_CONCERN_MAJORITY_VALUE = new
AllowableValue(
+ AllowableValue WRITE_CONCERN_MAJORITY_VALUE = new AllowableValue(
WRITE_CONCERN_MAJORITY, WRITE_CONCERN_MAJORITY,
"Exceptions are raised for network issues, and server errors;
waits on a majority of servers for the write operation");
- static final AllowableValue WRITE_CONCERN_W1_VALUE = new AllowableValue(
+ AllowableValue WRITE_CONCERN_W1_VALUE = new AllowableValue(
WRITE_CONCERN_W1, WRITE_CONCERN_W1,
"Write operations that use this write concern will wait for
acknowledgement from a single member");
- static final AllowableValue WRITE_CONCERN_W2_VALUE = new AllowableValue(
+ AllowableValue WRITE_CONCERN_W2_VALUE = new AllowableValue(
WRITE_CONCERN_W2, WRITE_CONCERN_W2,
"Write operations that use this write concern will wait for
acknowledgement from two members");
- static final AllowableValue WRITE_CONCERN_W3_VALUE = new AllowableValue(
+ AllowableValue WRITE_CONCERN_W3_VALUE = new AllowableValue(
WRITE_CONCERN_W3, WRITE_CONCERN_W3,
"Write operations that use this write concern will wait for
acknowledgement from three members");
diff --git
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
index 22cf308434..3951ed227a 100644
---
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
+++
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/main/java/org/apache/nifi/mongodb/MongoDBControllerService.java
@@ -23,20 +23,25 @@ import com.mongodb.MongoClientURI;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoDatabase;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import java.io.UnsupportedEncodingException;
import java.net.URLEncoder;
import java.nio.charset.StandardCharsets;
+import java.util.Map;
import javax.net.ssl.SSLContext;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnDisabled;
import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.components.ConfigVerificationResult;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.ssl.SSLContextService;
+import org.bson.Document;
@Tags({"mongo", "mongodb", "service"})
@CapabilityDescription(
@@ -49,7 +54,7 @@ public class MongoDBControllerService extends
AbstractControllerService implemen
@OnEnabled
public void onEnabled(final ConfigurationContext context) {
this.uri = getURI(context);
- this.createClient(context);
+ this.mongoClient = createClient(context, this.mongoClient);
}
static List<PropertyDescriptor> descriptors = new ArrayList<>();
@@ -65,9 +70,9 @@ public class MongoDBControllerService extends
AbstractControllerService implemen
protected MongoClient mongoClient;
// TODO: Remove duplicate code by refactoring shared method to accept
PropertyContext
- protected final void createClient(ConfigurationContext context) {
- if (mongoClient != null) {
- closeClient();
+ protected MongoClient createClient(ConfigurationContext context,
MongoClient existing) {
+ if (existing != null) {
+ closeClient(existing);
}
getLogger().info("Creating MongoClient");
@@ -84,9 +89,9 @@ public class MongoDBControllerService extends
AbstractControllerService implemen
try {
if(sslContext == null) {
- mongoClient = new MongoClient(new
MongoClientURI(getURI(context)));
+ return new MongoClient(new MongoClientURI(getURI(context)));
} else {
- mongoClient = new MongoClient(new
MongoClientURI(getURI(context), getClientOptions(sslContext)));
+ return new MongoClient(new MongoClientURI(getURI(context),
getClientOptions(sslContext)));
}
} catch (Exception e) {
getLogger().error("Failed to schedule {} due to {}", new Object[]
{ this.getClass().getName(), e }, e);
@@ -102,10 +107,13 @@ public class MongoDBControllerService extends
AbstractControllerService implemen
}
@OnStopped
- public final void closeClient() {
- if (mongoClient != null) {
- mongoClient.close();
- mongoClient = null;
+ public final void onStopped() {
+ closeClient(mongoClient);
+ }
+
+ private void closeClient(MongoClient client) {
+ if (client != null) {
+ client.close();
}
}
@@ -185,4 +193,28 @@ public class MongoDBControllerService extends
AbstractControllerService implemen
public String getURI() {
return uri;
}
+
+ @Override
+ public List<ConfigVerificationResult> verify(ConfigurationContext context,
+ ComponentLog
verificationLogger,
+ Map<String, String>
variables) {
+ ConfigVerificationResult.Builder connectionSuccessful = new
ConfigVerificationResult.Builder()
+ .verificationStepName("Connection test");
+
+ MongoClient client = null;
+ try {
+ client = createClient(context, null);
+ MongoDatabase db = client.getDatabase("test");
+ db.runCommand(new Document("buildInfo", 1));
+
connectionSuccessful.outcome(ConfigVerificationResult.Outcome.SUCCESSFUL);
+ } catch (Exception ex) {
+ connectionSuccessful
+ .explanation(ex.getMessage())
+ .outcome(ConfigVerificationResult.Outcome.FAILED);
+ } finally {
+ closeClient(client);
+ }
+
+ return Arrays.asList(connectionSuccessful.build());
+ }
}
diff --git
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
index 32d4e86b4a..d75ad01da6 100644
---
a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
+++
b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-services/src/test/java/org/apache/nifi/mongodb/MongoDBControllerServiceIT.java
@@ -17,17 +17,27 @@
package org.apache.nifi.mongodb;
+import org.apache.nifi.components.ConfigVerificationResult;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.VerifiableControllerService;
+import org.apache.nifi.util.MockConfigurationContext;
+import org.apache.nifi.util.MockControllerServiceLookup;
+import org.apache.nifi.util.MockVariableRegistry;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
-import java.util.Calendar;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
public class MongoDBControllerServiceIT {
- private static final String DB_NAME = String.format("nifi_test-%d",
Calendar.getInstance().getTimeInMillis());
- private static final String COL_NAME = String.format("nifi_test-%d",
Calendar.getInstance().getTimeInMillis());
+
+ private static final String IDENTIFIER = "Client Service";
private TestRunner runner;
private MongoDBControllerService service;
@@ -36,7 +46,7 @@ public class MongoDBControllerServiceIT {
public void before() throws Exception {
runner =
TestRunners.newTestRunner(TestControllerServiceProcessor.class);
service = new MongoDBControllerService();
- runner.addControllerService("Client Service", service);
+ runner.addControllerService(IDENTIFIER, service);
runner.setProperty(service, MongoDBControllerService.URI,
"mongodb://localhost:27017");
runner.enableControllerService(service);
}
@@ -50,4 +60,35 @@ public class MongoDBControllerServiceIT {
public void testInit() throws Exception {
runner.assertValid(service);
}
+
+ private Map<PropertyDescriptor, String> getClientServiceProperties() {
+ return ((MockControllerServiceLookup)
runner.getProcessContext().getControllerServiceLookup())
+ .getControllerServices().get(IDENTIFIER).getProperties();
+ }
+
+ @Test
+ public void testVerifyWithCorrectConnectionString() {
+ final List<ConfigVerificationResult> results =
((VerifiableControllerService) service).verify(
+ new MockConfigurationContext(service,
getClientServiceProperties(),
runner.getProcessContext().getControllerServiceLookup(), new
MockVariableRegistry()),
+ runner.getLogger(),
+ Collections.emptyMap()
+ );
+
+ assertEquals(1, results.size());
+ assertEquals(ConfigVerificationResult.Outcome.SUCCESSFUL,
results.get(0).getOutcome());
+ }
+
+ @Test
+ public void testVerifyWithIncorrectConnectionString() {
+ runner.disableControllerService(service);
+ runner.setProperty(service, MongoDBControllerService.URI,
"mongodb://localhost:2701");
+ final List<ConfigVerificationResult> results =
((VerifiableControllerService) service).verify(
+ new MockConfigurationContext(service,
getClientServiceProperties(),
runner.getProcessContext().getControllerServiceLookup(), new
MockVariableRegistry()),
+ runner.getLogger(),
+ Collections.emptyMap()
+ );
+
+ assertEquals(1, results.size());
+ assertEquals(ConfigVerificationResult.Outcome.FAILED,
results.get(0).getOutcome());
+ }
}