Repository: nifi Updated Branches: refs/heads/master 0fed158d1 -> 0f05c77a7
NIFI-1197 checkstyle (+2 squashed commits) Squashed commits: [b4e9b5f] NIFI-1197 fixed name/displayName on properties [d39f82b] NIFI-1197 Added SSL support for MongoDB processors This closes #360. Signed-off-by: Andy LoPresto <[email protected]> Project: http://git-wip-us.apache.org/repos/asf/nifi/repo Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/0f05c77a Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/0f05c77a Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/0f05c77a Branch: refs/heads/master Commit: 0f05c77a7376bc44ca1e2242c03cc11f201e6a1f Parents: 0fed158 Author: Pierre Villard <[email protected]> Authored: Sun Apr 17 17:24:59 2016 +0200 Committer: Andy LoPresto <[email protected]> Committed: Tue Apr 19 20:25:19 2016 -0700 ---------------------------------------------------------------------- .../nifi-mongodb-nar/pom.xml | 7 +- .../nifi-mongodb-processors/pom.xml | 4 + .../mongodb/AbstractMongoProcessor.java | 73 ++++++++++++- .../nifi/processors/mongodb/GetMongo.java | 41 ++++---- .../nifi/processors/mongodb/PutMongo.java | 41 ++++---- .../mongodb/AbstractMongoProcessorTest.java | 104 +++++++++++++++++++ nifi-nar-bundles/nifi-mongodb-bundle/pom.xml | 12 ++- pom.xml | 2 +- 8 files changed, 235 insertions(+), 49 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml index a17b05f..e714da3 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-nar/pom.xml @@ -23,7 +23,6 @@ </parent> <artifactId>nifi-mongodb-nar</artifactId> - <version>1.0.0-SNAPSHOT</version> <packaging>nar</packaging> <properties> <maven.javadoc.skip>true</maven.javadoc.skip> @@ -33,8 +32,12 @@ <dependencies> <dependency> <groupId>org.apache.nifi</groupId> + <artifactId>nifi-standard-services-api-nar</artifactId> + <type>nar</type> + </dependency> + <dependency> + <groupId>org.apache.nifi</groupId> <artifactId>nifi-mongodb-processors</artifactId> - <version>1.0.0-SNAPSHOT</version> </dependency> </dependencies> http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml index ff67f79..0a26cd6 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/pom.xml @@ -39,6 +39,10 @@ <artifactId>nifi-processor-utils</artifactId> </dependency> <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-ssl-context-service-api</artifactId> + </dependency> + <dependency> <groupId>commons-io</groupId> <artifactId>commons-io</artifactId> </dependency> http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java index fae007f..7e3a196 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessor.java @@ -19,16 +19,26 @@ package org.apache.nifi.processors.mongodb; import java.io.IOException; +import java.util.ArrayList; +import java.util.List; +import javax.net.ssl.SSLContext; + +import org.apache.commons.lang3.StringUtils; import org.apache.nifi.annotation.lifecycle.OnScheduled; import org.apache.nifi.annotation.lifecycle.OnStopped; +import org.apache.nifi.authentication.exception.ProviderCreationException; import org.apache.nifi.components.PropertyDescriptor; import org.apache.nifi.processor.AbstractProcessor; import org.apache.nifi.processor.ProcessContext; import org.apache.nifi.processor.util.StandardValidators; +import org.apache.nifi.security.util.SslContextFactory; +import org.apache.nifi.ssl.SSLContextService; import org.bson.Document; import com.mongodb.MongoClient; +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientOptions.Builder; import com.mongodb.MongoClientURI; import com.mongodb.client.MongoCollection; import com.mongodb.client.MongoDatabase; @@ -52,6 +62,34 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { .required(true) .addValidator(StandardValidators.NON_EMPTY_VALIDATOR) .build(); + public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder() + .name("ssl-context-service") + .displayName("SSL Context Service") + .description("The SSL Context Service used to provide client certificate information for TLS/SSL " + + "connections.") + .required(false) + .identifiesControllerService(SSLContextService.class) + .build(); + public static final PropertyDescriptor CLIENT_AUTH = new PropertyDescriptor.Builder() + .name("ssl-client-auth") + .displayName("Client Auth") + .description("Client authentication policy when connecting to secure (TLS/SSL) cluster. " + + "Possible values are REQUIRED, WANT, NONE. This property is only used when an SSL Context " + + "has been defined and enabled.") + .required(false) + .allowableValues(SSLContextService.ClientAuth.values()) + .defaultValue("REQUIRED") + .build(); + + static List<PropertyDescriptor> descriptors = new ArrayList<>(); + + static { + descriptors.add(URI); + descriptors.add(DATABASE_NAME); + descriptors.add(COLLECTION_NAME); + descriptors.add(SSL_CONTEXT_SERVICE); + descriptors.add(CLIENT_AUTH); + } protected MongoClient mongoClient; @@ -63,15 +101,48 @@ public abstract class AbstractMongoProcessor extends AbstractProcessor { getLogger().info("Creating MongoClient"); + // Set up the client for secure (SSL/TLS communications) if configured to do so + final SSLContextService sslService = context.getProperty(SSL_CONTEXT_SERVICE).asControllerService(SSLContextService.class); + final String rawClientAuth = context.getProperty(CLIENT_AUTH).getValue(); + final SSLContext sslContext; + + if (sslService != null) { + final SSLContextService.ClientAuth clientAuth; + if (StringUtils.isBlank(rawClientAuth)) { + clientAuth = SSLContextService.ClientAuth.REQUIRED; + } else { + try { + clientAuth = SSLContextService.ClientAuth.valueOf(rawClientAuth); + } catch (final IllegalArgumentException iae) { + throw new ProviderCreationException(String.format("Unrecognized client auth '%s'. Possible values are [%s]", + rawClientAuth, StringUtils.join(SslContextFactory.ClientAuth.values(), ", "))); + } + } + sslContext = sslService.createSSLContext(clientAuth); + } else { + sslContext = null; + } + try { final String uri = context.getProperty(URI).getValue(); - mongoClient = new MongoClient(new MongoClientURI(uri)); + if(sslContext == null) { + mongoClient = new MongoClient(new MongoClientURI(uri)); + } else { + mongoClient = new MongoClient(new MongoClientURI(uri, getClientOptions(sslContext))); + } } catch (Exception e) { getLogger().error("Failed to schedule PutMongo due to {}", new Object[] { e }, e); throw e; } } + protected Builder getClientOptions(final SSLContext sslContext) { + MongoClientOptions.Builder builder = MongoClientOptions.builder(); + builder.sslEnabled(true); + builder.socketFactory(sslContext.getSocketFactory()); + return builder; + } + @OnStopped public final void closeClient() { if (mongoClient != null) { http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java index c2b49d9..ebe7a24 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/GetMongo.java @@ -100,35 +100,34 @@ public class GetMongo extends AbstractMongoProcessor { .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR) .build(); - private final List<PropertyDescriptor> descriptors; - - private final Set<Relationship> relationships; - - public GetMongo() { - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(URI); - descriptors.add(DATABASE_NAME); - descriptors.add(COLLECTION_NAME); - descriptors.add(QUERY); - descriptors.add(PROJECTION); - descriptors.add(SORT); - descriptors.add(LIMIT); - descriptors.add(BATCH_SIZE); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - this.relationships = Collections.unmodifiableSet(relationships); + private final static Set<Relationship> relationships; + private final static List<PropertyDescriptor> propertyDescriptors; + + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(QUERY); + _propertyDescriptors.add(PROJECTION); + _propertyDescriptors.add(SORT); + _propertyDescriptors.add(LIMIT); + _propertyDescriptors.add(BATCH_SIZE); + _propertyDescriptors.add(SSL_CONTEXT_SERVICE); + _propertyDescriptors.add(CLIENT_AUTH); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + relationships = Collections.unmodifiableSet(_relationships); } @Override public Set<Relationship> getRelationships() { - return this.relationships; + return relationships; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; + return propertyDescriptors; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java index ae4009c..5f6d875 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/main/java/org/apache/nifi/processors/mongodb/PutMongo.java @@ -106,36 +106,33 @@ public class PutMongo extends AbstractMongoProcessor { .defaultValue("UTF-8") .build(); - private final List<PropertyDescriptor> descriptors; - - private final Set<Relationship> relationships; - - public PutMongo() { - final List<PropertyDescriptor> descriptors = new ArrayList<>(); - descriptors.add(URI); - descriptors.add(DATABASE_NAME); - descriptors.add(COLLECTION_NAME); - descriptors.add(MODE); - descriptors.add(UPSERT); - descriptors.add(UPDATE_QUERY_KEY); - descriptors.add(WRITE_CONCERN); - descriptors.add(CHARACTER_SET); - this.descriptors = Collections.unmodifiableList(descriptors); - - final Set<Relationship> relationships = new HashSet<>(); - relationships.add(REL_SUCCESS); - relationships.add(REL_FAILURE); - this.relationships = Collections.unmodifiableSet(relationships); + private final static Set<Relationship> relationships; + private final static List<PropertyDescriptor> propertyDescriptors; + + static { + List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>(); + _propertyDescriptors.addAll(descriptors); + _propertyDescriptors.add(MODE); + _propertyDescriptors.add(UPSERT); + _propertyDescriptors.add(UPDATE_QUERY_KEY); + _propertyDescriptors.add(WRITE_CONCERN); + _propertyDescriptors.add(CHARACTER_SET); + propertyDescriptors = Collections.unmodifiableList(_propertyDescriptors); + + final Set<Relationship> _relationships = new HashSet<>(); + _relationships.add(REL_SUCCESS); + _relationships.add(REL_FAILURE); + relationships = Collections.unmodifiableSet(_relationships); } @Override public Set<Relationship> getRelationships() { - return this.relationships; + return relationships; } @Override public final List<PropertyDescriptor> getSupportedPropertyDescriptors() { - return descriptors; + return propertyDescriptors; } @Override http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java new file mode 100644 index 0000000..1750cc2 --- /dev/null +++ b/nifi-nar-bundles/nifi-mongodb-bundle/nifi-mongodb-processors/src/test/java/org/apache/nifi/processors/mongodb/AbstractMongoProcessorTest.java @@ -0,0 +1,104 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.nifi.processors.mongodb; + +import static org.junit.Assert.assertNotNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; +import static org.mockito.Mockito.any; + +import javax.net.ssl.SSLContext; + +import org.apache.nifi.authentication.exception.ProviderCreationException; +import org.apache.nifi.processor.ProcessContext; +import org.apache.nifi.processor.ProcessSession; +import org.apache.nifi.processor.exception.ProcessException; +import org.apache.nifi.ssl.SSLContextService; +import org.apache.nifi.ssl.SSLContextService.ClientAuth; +import org.apache.nifi.util.TestRunner; +import org.apache.nifi.util.TestRunners; +import org.junit.Before; +import org.junit.Test; + +import com.mongodb.MongoClientOptions; +import com.mongodb.MongoClientOptions.Builder; + +public class AbstractMongoProcessorTest { + + MockAbstractMongoProcessor processor; + private TestRunner testRunner; + + @Before + public void setUp() throws Exception { + processor = new MockAbstractMongoProcessor(); + testRunner = TestRunners.newTestRunner(processor); + } + + @Test + public void testcreateClientWithSSL() throws Exception { + SSLContextService sslService = mock(SSLContextService.class); + SSLContext sslContext = mock(SSLContext.class); + when(sslService.getIdentifier()).thenReturn("ssl-context"); + when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext); + testRunner.addControllerService("ssl-context", sslService); + testRunner.enableControllerService(sslService); + testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017"); + testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context"); + testRunner.assertValid(sslService); + processor.createClient(testRunner.getProcessContext()); + assertNotNull(processor.mongoClient); + processor.mongoClient = null; + testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "WANT"); + processor.createClient(testRunner.getProcessContext()); + assertNotNull(processor.mongoClient); + } + + @Test(expected = ProviderCreationException.class) + public void testcreateClientWithSSLBadClientAuth() throws Exception { + SSLContextService sslService = mock(SSLContextService.class); + SSLContext sslContext = mock(SSLContext.class); + when(sslService.getIdentifier()).thenReturn("ssl-context"); + when(sslService.createSSLContext(any(ClientAuth.class))).thenReturn(sslContext); + testRunner.addControllerService("ssl-context", sslService); + testRunner.enableControllerService(sslService); + testRunner.setProperty(AbstractMongoProcessor.URI, "mongodb://localhost:27017"); + testRunner.setProperty(AbstractMongoProcessor.SSL_CONTEXT_SERVICE, "ssl-context"); + testRunner.assertValid(sslService); + processor.createClient(testRunner.getProcessContext()); + assertNotNull(processor.mongoClient); + processor.mongoClient = null; + testRunner.setProperty(AbstractMongoProcessor.CLIENT_AUTH, "BAD"); + processor.createClient(testRunner.getProcessContext()); + } + + + /** + * Provides a stubbed processor instance for testing + */ + public static class MockAbstractMongoProcessor extends AbstractMongoProcessor { + @Override + public void onTrigger(ProcessContext context, ProcessSession session) throws ProcessException { + // nothing to do + } + + @Override + protected Builder getClientOptions(SSLContext sslContext) { + return MongoClientOptions.builder(); + } + } + +} http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml ---------------------------------------------------------------------- diff --git a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml index db9e87d..25c6f69 100644 --- a/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml +++ b/nifi-nar-bundles/nifi-mongodb-bundle/pom.xml @@ -22,9 +22,7 @@ <version>1.0.0-SNAPSHOT</version> </parent> - <groupId>org.apache.nifi</groupId> <artifactId>nifi-mongodb-bundle</artifactId> - <version>1.0.0-SNAPSHOT</version> <packaging>pom</packaging> <modules> @@ -32,4 +30,14 @@ <module>nifi-mongodb-nar</module> </modules> + <dependencyManagement> + <dependencies> + <dependency> + <groupId>org.apache.nifi</groupId> + <artifactId>nifi-mongodb-processors</artifactId> + <version>1.0.0-SNAPSHOT</version> + </dependency> + </dependencies> + </dependencyManagement> + </project> http://git-wip-us.apache.org/repos/asf/nifi/blob/0f05c77a/pom.xml ---------------------------------------------------------------------- diff --git a/pom.xml b/pom.xml index e702785..29b936b 100644 --- a/pom.xml +++ b/pom.xml @@ -219,7 +219,7 @@ language governing permissions and limitations under the License. --> <dependency> <groupId>org.mongodb</groupId> <artifactId>mongo-java-driver</artifactId> - <version>3.0.2</version> + <version>3.2.2</version> </dependency> <dependency> <groupId>com.wordnik</groupId>
